From a67150c63f1b074a86efe93d8e244bcef0fef872 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Mon, 16 Aug 2021 16:29:18 +0200 Subject: [PATCH] Improving documentation of the section of the historical database that controls the creation of a new run. Implementing fix for #725 --- autosubmit/autosubmit.py | 8 +++- autosubmit/database/db_jobdata.py | 70 ++++++++++++++++++------------- autosubmit/git/autosubmit_git.py | 9 ++-- 3 files changed, 53 insertions(+), 34 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 9c9c012bf..b263f2717 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1347,6 +1347,7 @@ class Autosubmit: # Check if experiment exists. If False or None, it does not exist if not check_experiment_exists(start_after): return None + # Historical Database: We use the historical database to retrieve the current progress data of the supplied expid (start_after) # JobStructure object, check_only flag to avoid updating remote experiment jobStructure = JobDataStructure(start_after, check_only=True) # Check if database exists @@ -1491,6 +1492,7 @@ class Autosubmit: # Before starting main loop, setup historical database tables and main information Log.debug("Running job data structure") try: + # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current state does not exist. job_data_structure = JobDataStructure(expid) job_data_structure.validate_current_run(job_list.get_job_list( ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) @@ -3841,6 +3843,7 @@ class Autosubmit: groups_dict = dict() # Setting up job historical database header. Must create a new run. + # Historical Database: Setup new run JobDataStructure(expid).validate_current_run(job_list.get_job_list( ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True, current_config=as_conf.get_full_config_as_json()) @@ -4604,8 +4607,9 @@ class Autosubmit: if save and wrongExpid == 0: job_list.save() - job_data_structure = JobDataStructure(expid) - # job_data_structure.update_jobs_from_change_status(job_tracked_changes) + # Historical Database: Setup new run if greater or equal than 90% of completed date-member jobs are going to be changed. + # Or if the total number of jobs in the job_list is different than the total number of jobs in the current experiment run register in the database + job_data_structure = JobDataStructure(expid) job_data_structure.process_status_changes( job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), check_run=True, current_config=as_conf.get_full_config_as_json(), is_setstatus=True) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 014bfb371..45aeb4a3f 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -796,7 +796,7 @@ class JobDataStructure(MainDataBase): Log.info("Database version set to {0}.".format( CURRENT_DB_VERSION)) self.db_version = CURRENT_DB_VERSION - self.current_run_id = self.get_current_run_id() + self.current_run_id = self.get_current_run_id() else: if not os.path.exists(self.database_path): self.database_exists = False @@ -852,7 +852,10 @@ class JobDataStructure(MainDataBase): def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config="", is_setstatus=False): """ - Finds and updates the changes of status of the jobs in the current job list. + Finds and updates the changes of status of the jobs in the current job list. + Creates new run if: + - Greater or equal to 90% of completed date-member jobs are going to be changed. + - The number of total jobs is not equal than the number of total jobs in the experiment header of the database. :param tracking_dictionary: map of changes :type tracking_dictionary: dict() @@ -881,12 +884,13 @@ class JobDataStructure(MainDataBase): # If setstatus changes more than 90% of date-member completed jobs, it's a new run # Update status of individual jobs if is_setstatus == True: - self.update_jobs_from_change_status(tracking_dictionary) - # Must create a new experiment run + self.update_jobs_from_change_status( + tracking_dictionary) + # Must create a new experiment run Log.debug( "Since a significant amount of jobs have changed status. Autosubmit will consider a new run of the same experiment.") self.validate_current_run( - job_list, chunk_unit, chunk_size, must_create=True, only_update=False, current_config=current_config) + job_list, chunk_unit, chunk_size, must_create=True, only_update=False, current_config=current_config) return None if job_list: if len(tracking_dictionary.items()) > 0: @@ -910,11 +914,13 @@ class JobDataStructure(MainDataBase): current_run.submitted = submit_count current_run.running = running_count current_run.suspended = suspended_count - # Update status of individual jobs - if is_setstatus == True: - self.update_jobs_from_change_status(tracking_dictionary) + # Update status of individual jobs + if is_setstatus == True: + self.update_jobs_from_change_status( + tracking_dictionary) # Check if we are still dealing with the right number of jobs if current_run.total != total_number_jobs: + # must_create = True, signals that a new run should be created self.validate_current_run(job_list, current_run.chunk_unit, current_run.chunk_size, must_create=True, only_update=False, current_config=current_run.metadata) else: @@ -930,7 +936,12 @@ class JobDataStructure(MainDataBase): def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False, current_config=""): """ - Checks current run and created a new run or updates the existing run if necessary. Returns the current id of the run. + Checks current run and created a new run or updates the existing run if necessary. Returns the current id of the run. + A new run is created when: + - A new run is always generated on autosubmit create. + - If there is no current run in the database when experiment run starts, it creates a new experiment run register. + - If the must_create Flag is true. + - If the total number of jobs in the job_list object is different than the total number of jobs in the current state and only_update is False. :param job_list: list of jobs in experiment :type job_list: list of Job objects @@ -951,7 +962,7 @@ class JobDataStructure(MainDataBase): if not job_list: raise Exception( "Historical database: Autosubmit couldn't find the job_list. validate_current_run.") - current_run = self.get_max_id_experiment_run() + current_run = self.get_max_id_experiment_run() current_total = len(job_list) completed_count = sum( 1 for job in job_list if job.status == Status.COMPLETED) @@ -973,7 +984,7 @@ class JobDataStructure(MainDataBase): self.current_run_id = self._insert_experiment_run(new_run) self.is_original_run_id = False return self.current_run_id - else: + else: if current_run.total != current_total and only_update == False: # There is a difference in total jobs, create new experiment run new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, @@ -986,7 +997,7 @@ class JobDataStructure(MainDataBase): current_run.completed = completed_count current_run.failed = failed_count current_run.queuing = queue_count - current_run.submitted = submit_count + current_run.submitted = submit_count current_run.running = running_count current_run.suspended = suspended_count current_run.total = current_total if only_update == True else current_run.total @@ -1336,8 +1347,9 @@ class JobDataStructure(MainDataBase): return True else: # Implementing a warning to keep track of it in the log. - Log.warning("Historical database: The register for {} from path {} was not found. The system will try to restore it with default values.".format(job_name, self.database_path)) - # It is necessary to create a new row + Log.warning("Historical database: The register for {} from path {} was not found. The system will try to restore it with default values.".format( + job_name, self.database_path)) + # It is necessary to create a new row submit_inserted = self.write_submit_time( job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) write_inserted = self.write_start_time(job_name, finish, status, ncpus, @@ -1563,28 +1575,32 @@ class JobDataStructure(MainDataBase): :type tracking_dictionary: dict() :return: True if updated, False otherwise :rtype: bool - """ - if tracking_dictionary: - changes = [] - current_job_data_detail = self.get_current_job_data(self.current_run_id, only_finished=False) + """ + if tracking_dictionary: + changes = [] + current_job_data_detail = self.get_current_job_data( + self.current_run_id, only_finished=False) # for x in current_job_data_detail: # print("{} {} {}".format(x.job_name, x.last, x.status)) - for job_name in tracking_dictionary: + for job_name in tracking_dictionary: status_code, final_status_code = tracking_dictionary[job_name] #print("{} from {} to {}".format(job_name, status_code, final_status_code)) status_string = Status.VALUE_TO_KEY[status_code] final_status_string = Status.VALUE_TO_KEY[final_status_code] # REMOVED "and job_data.status == status_string" from the filter. - current_job_data = next((job_data for job_data in current_job_data_detail if job_data.job_name == job_name and job_data.last == 1), None) + current_job_data = next( + (job_data for job_data in current_job_data_detail if job_data.job_name == job_name and job_data.last == 1), None) # We found the current row that matches the provided current status if current_job_data: # print("{} to {}".format(job_name, final_status_code)) if final_status_code in [Status.COMPLETED, Status.FAILED, Status.QUEUING, Status.RUNNING, Status.HELD, Status.SUSPENDED]: - # new_current_job_data = copy.deepcopy(current_job_data) + # new_current_job_data = copy.deepcopy(current_job_data) current_job_data.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') current_job_data.status = final_status_string - current_job_data.finish = time.time() if final_status_code in [Status.COMPLETED, Status.FAILED] else 0 - changes.append((current_job_data.finish, current_job_data.modified, current_job_data.status, RowStatus.CHANGED, current_job_data._id)) + current_job_data.finish = time.time() if final_status_code in [ + Status.COMPLETED, Status.FAILED] else 0 + changes.append((current_job_data.finish, current_job_data.modified, + current_job_data.status, RowStatus.CHANGED, current_job_data._id)) #print("Added {}".format(current_job_data.job_name)) # else: # print("{} not found".format(job_name)) @@ -1673,7 +1689,7 @@ class JobDataStructure(MainDataBase): print(traceback.format_exc()) print( "Error on returning current job data. run_id {0}".format(run_id)) - return None + return None def get_max_id_experiment_run(self): """Get Max experiment run object (last experiment run) @@ -1803,7 +1819,7 @@ class JobDataStructure(MainDataBase): Log.debug(traceback.format_exc()) Log.warning("Error on Update : " + str(type(e).__name__)) return None - + def _update_many_job_data_change_status(self, changes): """ Update a many job_data updates. @@ -2001,7 +2017,7 @@ class JobDataStructure(MainDataBase): print("Error on select job data: {0}".format( str(type(e).__name__))) return None - + def _get_current_last_job_data(self, run_id): """ Get job data for a current run. @@ -2177,5 +2193,3 @@ class JobDataStructure(MainDataBase): Log.warning("Error on select max run_id : " + str(type(e).__name__)) return None - - diff --git a/autosubmit/git/autosubmit_git.py b/autosubmit/git/autosubmit_git.py index 078dd4564..650ec3857 100644 --- a/autosubmit/git/autosubmit_git.py +++ b/autosubmit/git/autosubmit_git.py @@ -159,9 +159,10 @@ class AutosubmitGit: # Making proj backup if force: - Log.info("Making a backup of your current proj folder at {0}".format( - project_backup_path)) - shutil.move(project_path, project_backup_path) + if os.path.exists(project_path): + Log.info("Making a backup of your current proj folder at {0}".format( + project_backup_path)) + shutil.move(project_path, project_backup_path) #shutil.make_archive(project_backup_path, 'zip', project_path) #project_backup_path = project_backup_path + ".zip" @@ -276,7 +277,7 @@ class AutosubmitGit: if submodule_failure: Log.info( "Some Submodule failures have been detected. Backup {0} will not be removed.".format(project_backup_path)) - return False + return False if os.path.exists(project_backup_path): Log.info("Removing backup...") shutil.rmtree(project_backup_path) -- GitLab