From db428b644263fe2bbf74861d5927f00535b16407 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 15 Jun 2021 18:40:50 +0200 Subject: [PATCH] Working on #706. Improved documentation and exception handling for the job_data_structure module. Added a warning to identify cases when the glitch reported in the issue might happen. Added detailed handling of the changes performed by the set status command in the historical database. --- autosubmit/autosubmit.py | 6 +- autosubmit/database/db_jobdata.py | 335 +++++++++++++++++------------- 2 files changed, 193 insertions(+), 148 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8d234f89a..0d7d01479 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1756,7 +1756,7 @@ class Autosubmit: except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise Log.result("No more jobs to run.") - # Updating job data header with current information + # Updating job data header with current information when experiment ends job_data_structure.validate_current_run( job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=False, only_update=True) @@ -4563,8 +4563,10 @@ class Autosubmit: if save and wrongExpid == 0: job_list.save() job_data_structure = JobDataStructure(expid) + # job_data_structure.update_jobs_from_change_status(job_tracked_changes) job_data_structure.process_status_changes( - job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), check_run=True, current_config=as_conf.get_full_config_as_json()) + job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), check_run=True, current_config=as_conf.get_full_config_as_json(), is_setstatus=True) + else: Log.printlog( "Changes NOT saved to the JobList!!!!: use -s option to save", 3000) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 32caaa091..014bfb371 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -52,6 +52,7 @@ class RowStatus: COMPLETED = 1 PROCESSED = 2 FAULTY = 3 + CHANGED = 4 _debug = False @@ -491,7 +492,8 @@ class ExperimentStatus(MainDataBase): self.current_row = next( (exp for exp in self.current_table if exp.expid == self.expid), None) if len(self.current_table) > 0 else None except Exception as exp: - Log.debug(str(exp)) + Log.debug( + "Historical database error on experiment status constructor: {}.".format(str(exp))) pass def print_current_table(self): @@ -512,18 +514,10 @@ class ExperimentStatus(MainDataBase): :return: Map from experiment name to (Id of experiment, Status, Seconds) :rtype: Dictionary Key: String, Value: Integer, String, Integer """ - #self.conn = self.create_connection(self.DB_FILE_AS_TIMES) - - #drop_table_query = ''' DROP TABLE experiment_status ''' - # create_table(conn, drop_table_query) - # self.create_table() current_table = self._get_exp_status() result = list() - # print(current_table) - # print(type(current_table)) if current_table: for item in current_table: - #exp_id, expid, status, seconds = item result.append(ExperimentRow(*item)) return result @@ -772,6 +766,8 @@ class JobDataStructure(MainDataBase): ''') self.database_exists = True + self.current_run_id = None + self.is_original_run_id = True self.db_version = 0 try: if check_only == False: @@ -800,7 +796,7 @@ class JobDataStructure(MainDataBase): Log.info("Database version set to {0}.".format( CURRENT_DB_VERSION)) self.db_version = CURRENT_DB_VERSION - self.current_run_id = self.get_current_run_id() + self.current_run_id = self.get_current_run_id() else: if not os.path.exists(self.database_path): self.database_exists = False @@ -809,12 +805,14 @@ class JobDataStructure(MainDataBase): self.db_version = self._select_pragma_version() except IOError as e: - Log.debug(str(e)) + Log.debug( + "Historical database I/O error on jobdatastructure constructor: {}".format(str(e))) pass # raise AutosubmitCritical("Historic Database route {0} is not accesible".format( # BasicConfig.JOBDATA_DIR), 7067, e.message) except Exception as e: - Log.debug(str(e)) + Log.debug( + "Historical database error on jobdatastructure constructor: {}".format(str(e))) pass # raise AutosubmitCritical( # "Historic Database {0} due an database error".format(), 7067, e.message) @@ -837,16 +835,24 @@ class JobDataStructure(MainDataBase): return RowType.NORMAL def get_current_run_id(self): + """ + Get the Id of the current Experiment run. + + :return: run_id + :rtype: int + """ current_run = self.get_max_id_experiment_run() if current_run: + self.is_original_run_id = True return current_run.run_id else: new_run = ExperimentRun(0) + self.is_original_run_id = False return self._insert_experiment_run(new_run) - def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config=""): + def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config="", is_setstatus=False): """ - Finds and updated the changes of status of the jobs in the current job list. + Finds and updates the changes of status of the jobs in the current job list. :param tracking_dictionary: map of changes :type tracking_dictionary: dict() @@ -867,17 +873,20 @@ class JobDataStructure(MainDataBase): current_run = self.get_max_id_experiment_run() if current_run: if tracking_dictionary is not None and bool(tracking_dictionary) == True: - # print("Changes {0}".format(tracking_dictionary)) + # Changes exist if job_list and check_run == True: current_date_member_completed_count = sum( 1 for job in job_list if job.date is not None and job.member is not None and job.status == Status.COMPLETED) if len(tracking_dictionary.keys()) >= int(current_date_member_completed_count * 0.9): # If setstatus changes more than 90% of date-member completed jobs, it's a new run - # Must create a new experiment run + # Update status of individual jobs + if is_setstatus == True: + self.update_jobs_from_change_status(tracking_dictionary) + # Must create a new experiment run Log.debug( "Since a significant amount of jobs have changed status. Autosubmit will consider a new run of the same experiment.") self.validate_current_run( - job_list, chunk_unit, chunk_size, True, current_config=current_config) + job_list, chunk_unit, chunk_size, must_create=True, only_update=False, current_config=current_config) return None if job_list: if len(tracking_dictionary.items()) > 0: @@ -901,6 +910,9 @@ class JobDataStructure(MainDataBase): current_run.submitted = submit_count current_run.running = running_count current_run.suspended = suspended_count + # Update status of individual jobs + if is_setstatus == True: + self.update_jobs_from_change_status(tracking_dictionary) # Check if we are still dealing with the right number of jobs if current_run.total != total_number_jobs: self.validate_current_run(job_list, current_run.chunk_unit, current_run.chunk_size, @@ -918,20 +930,28 @@ class JobDataStructure(MainDataBase): def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False, current_config=""): """ - Checks current run - - :param job_list ([type]): [description] - :param chunk_unit (str, optional): [description]. Defaults to "NA". - :param chunk_size (int, optional): [description]. Defaults to 0. - :param must_create (bool, optional): [description]. Defaults to False. + Checks current run and created a new run or updates the existing run if necessary. Returns the current id of the run. - :return: [description] + :param job_list: list of jobs in experiment + :type job_list: list of Job objects + :param chunk_unit: Chunk unit in the settings of the experiment + :type chunk_unit: str + :param chunk_size: Chunk size in the settings of the experiment + :type chunk_size: str + :param must_create: True if a new experiment run register must be created + :type must_create: bool + :param only_update: True if the process should only update an existing register + :type only_update: bool + :param current_config: current configuration of the experiment as a JSON object + :type current_config: JSON object + :return: Id of the current run, None if error + :type: int """ try: if not job_list: raise Exception( - "Autosubmit couldn't find the job_list. validate_current_run.") - current_run = self.get_max_id_experiment_run() + "Historical database: Autosubmit couldn't find the job_list. validate_current_run.") + current_run = self.get_max_id_experiment_run() current_total = len(job_list) completed_count = sum( 1 for job in job_list if job.status == Status.COMPLETED) @@ -947,36 +967,40 @@ class JobDataStructure(MainDataBase): 1 for job in job_list if job.status == Status.SUSPENDED) if not current_run or must_create == True: + # If there is not current run register, or it must be created new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config) self.current_run_id = self._insert_experiment_run(new_run) - else: - # print("Current run {0}".format(current_run.total)) - # print("Current total {0}".format(len(job_list))) + self.is_original_run_id = False + return self.current_run_id + else: if current_run.total != current_total and only_update == False: - # print("Creating new run") + # There is a difference in total jobs, create new experiment run new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config) self.current_run_id = self._insert_experiment_run(new_run) + self.is_original_run_id = False + return self.current_run_id else: - # print("Updating current run") + # There is no difference in total jobs or it must only update current_run.completed = completed_count current_run.failed = failed_count current_run.queuing = queue_count - current_run.submitted = submit_count - # print("New suspended count {0}".format(suspended_count)) + current_run.submitted = submit_count current_run.running = running_count current_run.suspended = suspended_count current_run.total = current_total if only_update == True else current_run.total current_run.finish = 0 self._update_experiment_run(current_run) self.current_run_id = current_run.run_id + self.is_original_run_id = True + return self.current_run_id except Exception as exp: if _debug == True: Log.info(traceback.format_exc()) Log.debug(traceback.format_exc()) Log.warning( - "Autosubmit couldn't insert a new experiment run register. validate_current_run {0}".format(str(exp))) + "Historical database error: Autosubmit couldn't insert a new experiment run register. validate_current_run {0}".format(str(exp))) return None def update_finish_time(self): @@ -989,6 +1013,7 @@ class JobDataStructure(MainDataBase): current_run.finish = int(time.time()) self._update_experiment_run(current_run) self.current_run_id = current_run.run_id + self.is_original_run_id = True except Exception as exp: Log.debug(str(exp)) pass @@ -1309,15 +1334,16 @@ class JobDataStructure(MainDataBase): if no_slurm == False and is_end_of_wrapper == True: self.process_current_run_collection() return True - # It is necessary to create a new row + else: + # Implementing a warning to keep track of it in the log. + Log.warning("Historical database: The register for {} from path {} was not found. The system will try to restore it with default values.".format(job_name, self.database_path)) + # It is necessary to create a new row submit_inserted = self.write_submit_time( job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) write_inserted = self.write_start_time(job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) - # print(submit_inserted) - # print(write_inserted) + if submit_inserted and write_inserted: - #print("retro finish") self.write_finish_time( job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file, wrapper_queue) else: @@ -1506,7 +1532,7 @@ class JobDataStructure(MainDataBase): # (filename, line, procname, text) = stack[-1] Log.info(traceback.format_exc()) Log.warning( - "Autosubmit couldn't process the SLURM. ".format(str(exp))) + "Autosubmit couldn't process the SLURM output. ".format(str(exp))) pass def update_energy_values(self, update_job_data): @@ -1528,8 +1554,48 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't retrieve experiment run header. update_energy_values. Exception {0}".format(str(exp))) pass + def update_jobs_from_change_status(self, tracking_dictionary): + """ + Updates the status of the jobs according to the tracked changes. + Status allowed: 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', 'QUEUING', 'RUNNING', 'HELD' + + :param tracking_dictionary: name -> (status, final_status) + :type tracking_dictionary: dict() + :return: True if updated, False otherwise + :rtype: bool + """ + if tracking_dictionary: + changes = [] + current_job_data_detail = self.get_current_job_data(self.current_run_id, only_finished=False) + # for x in current_job_data_detail: + # print("{} {} {}".format(x.job_name, x.last, x.status)) + for job_name in tracking_dictionary: + status_code, final_status_code = tracking_dictionary[job_name] + #print("{} from {} to {}".format(job_name, status_code, final_status_code)) + status_string = Status.VALUE_TO_KEY[status_code] + final_status_string = Status.VALUE_TO_KEY[final_status_code] + # REMOVED "and job_data.status == status_string" from the filter. + current_job_data = next((job_data for job_data in current_job_data_detail if job_data.job_name == job_name and job_data.last == 1), None) + # We found the current row that matches the provided current status + if current_job_data: + # print("{} to {}".format(job_name, final_status_code)) + if final_status_code in [Status.COMPLETED, Status.FAILED, Status.QUEUING, Status.RUNNING, Status.HELD, Status.SUSPENDED]: + # new_current_job_data = copy.deepcopy(current_job_data) + current_job_data.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + current_job_data.status = final_status_string + current_job_data.finish = time.time() if final_status_code in [Status.COMPLETED, Status.FAILED] else 0 + changes.append((current_job_data.finish, current_job_data.modified, current_job_data.status, RowStatus.CHANGED, current_job_data._id)) + #print("Added {}".format(current_job_data.job_name)) + # else: + # print("{} not found".format(job_name)) + if len(changes) > 0: + result = self._update_many_job_data_change_status(changes) + return result + return None + def get_all_job_data(self): """ + Get all register from job_data. """ try: if os.path.exists(self.folder_path): @@ -1555,14 +1621,10 @@ class JobDataStructure(MainDataBase): def get_job_data(self, job_name): """Retrieves all the rows that have the same job_name - Args: - job_name (str): [description] - - Raises: - Exception: If path to data folder does not exist - - Returns: - [type]: None if error, list of jobs if successful + :param job_name: name of job + :type job_name: str + :return: all jobs with the sanme job name + :rtype: list of JobItem objects """ try: job_data = list() @@ -1584,18 +1646,22 @@ class JobDataStructure(MainDataBase): Log.warning("Autosubmit couldn't retrieve job data. get_job_data") return None - def get_current_job_data(self, run_id): - """[summary] - - Args: - run_id ([type]): [description] + def get_current_job_data(self, run_id, only_finished=True): + """ + Gets current job_data for provided run_id. + :param run_id: run identifier + :type run_id: int """ try: current_collection = [] # if self.db_version < DB_VERSION_SCHEMA_CHANGES: # raise Exception("This function requieres a newer DB version.") if os.path.exists(self.folder_path): - current_job_data = self._get_current_job_data(run_id) + current_job_data = None + if only_finished == True: + current_job_data = self._get_current_job_data(run_id) + else: + current_job_data = self._get_current_last_job_data(run_id) if current_job_data: for job_data in current_job_data: jobitem = JobItem(*job_data) @@ -1607,31 +1673,7 @@ class JobDataStructure(MainDataBase): print(traceback.format_exc()) print( "Error on returning current job data. run_id {0}".format(run_id)) - return None - - # def get_pending_data(self): - # """[summary] - # """ - # try: - # job_names_list = list() - # if os.path.exists(self.folder_path): - # current_pending = self._get_job_data_pending() - # if current_pending: - # for item in current_pending: - # job_id, job_name, job_rowtype = item - # job_names_list.append(job_name) - # # job_name_to_detail[job_name] = (job_id, job_rowtype) - # # jobid_list.append(job_id) - # return job_names_list - # else: - # return None - # except Exception as exp: - # if _debug == True: - # Log.info(traceback.format_exc()) - # Log.debug(traceback.format_exc()) - # Log.warning( - # "Autosubmit couldn't retrieve job data. get_job_data_last") - # return None + return None def get_max_id_experiment_run(self): """Get Max experiment run object (last experiment run) @@ -1660,16 +1702,13 @@ class JobDataStructure(MainDataBase): return None def get_job_data_last(self, job_name): - """ Returns latest jobdata row for a job_name. The current version. + """ + Returns latest jobdata row for a job_name. The current version. - Args: - job_name ([type]): [description] - - Raises: - Exception: [description] - - Returns: - [type]: None if error, JobData if success + :param job_name: + :type job_name: + :return: Rows with last = 1 + :rtype: List of JobData objects """ try: jobdata = list() @@ -1695,13 +1734,8 @@ class JobDataStructure(MainDataBase): return None def _deactivate_current_last(self, jobdata): - """Sets last = 0 to row with id - - Args: - jobdata ([type]): [description] - - Returns: - [type]: [description] + """ + Sets last = 0 to row with id """ try: if self.conn: @@ -1769,6 +1803,29 @@ class JobDataStructure(MainDataBase): Log.debug(traceback.format_exc()) Log.warning("Error on Update : " + str(type(e).__name__)) return None + + def _update_many_job_data_change_status(self, changes): + """ + Update a many job_data updates. + + :param changes: list of tuples of the data to change (finish, modified, status, rowstatus, id) + :type changes: 5-tuple (int, str, str, int, int) + :return: True if updated + :rtype: bool + """ + try: + if self.conn: + sql = ''' UPDATE job_data SET finish=?, modified=?, status=?, rowstatus=? WHERE id=? ''' + cur = self.conn.cursor() + cur.executemany(sql, changes) + self.conn.commit() + return True + return False + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Historical database error: {}".format(exp)) def _update_finish_job_data(self, jobdata): """ @@ -1877,7 +1934,7 @@ class JobDataStructure(MainDataBase): def _insert_experiment_run(self, experiment_run): """ - Inserts a new experiment_run register. + Inserts a new experiment_run register. :param experiment_run: ExperimentRun object """ try: @@ -1902,7 +1959,7 @@ class JobDataStructure(MainDataBase): def _get_all_job_data(self): """ - Get all registers from job_data.\n + Get all registers from job_data. :return: row content: :rtype: 23-tuple """ @@ -1925,10 +1982,8 @@ class JobDataStructure(MainDataBase): return list() def _get_current_job_data(self, run_id): - """[summary] - - Args: - run_id ([type]): [description] + """ + Get job data for a current run. """ try: if self.conn: @@ -1946,54 +2001,41 @@ class JobDataStructure(MainDataBase): print("Error on select job data: {0}".format( str(type(e).__name__))) return None - - def _get_job_data(self, job_name): - """[summary] - - Args: - job_name ([type]): [description] - - Returns: - [type]: None if error, list of tuple if found (list can be empty) + + def _get_current_last_job_data(self, run_id): + """ + Get job data for a current run. """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() - cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) + cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and rowtype >= 2 ORDER BY id", (run_id,)) rows = cur.fetchall() - # print(rows) - return rows - else: - return None - except sqlite3.Error as e: + if len(rows) > 0: + return rows + else: + return None + except Exception as exp: if _debug == True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning("Error on Select : " + str(type(e).__name__)) + print(traceback.format_exc()) + print("Historical database error: {0}".format(str(exp))) return None - def _get_job_data_last(self, job_name): - """Returns the latest row for a job_name. The current version. - - Args: - job_name ([type]): [description] + def _get_job_data(self, job_name): + """ + Returns rows belonging to a job_name - Returns: - [type]: [description] """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() - if rows and len(rows) > 0: - return rows - else: - return None + # print(rows) + return rows else: return None except sqlite3.Error as e: @@ -2003,27 +2045,35 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Select : " + str(type(e).__name__)) return None - def _get_job_data_pending(self): + def _get_job_data_last(self, job_name): """ - Gets the list of job_id, job_name of those jobs that have pending information. - This function is no longer used. + Returns the latest rows (last = 1) for a job_name. + + :param job_name: Name of the requested job + :type job_name: str + :return: Rows from historical database + :rtype: list of tuples """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT job_id, job_name, rowtype FROM job_data WHERE last=1 and platform='marenostrum4' and energy <= 0 and (status = 'COMPLETED' or status = 'FAILED')") + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() if rows and len(rows) > 0: return rows else: - return None - except sqlite3.Error as e: + raise Exception( + "Historical database error: Valid last row not found for job {}.".format(job_name)) + else: + raise Exception( + "Historical database error: Connection not found when requesting information from job {}.".format(job_name)) + except Exception as exp: if _debug == True: Log.info(traceback.format_exc()) Log.debug(traceback.format_exc()) - Log.warning("Error on historic database retrieval.") + Log.warning("Historical database error on select : " + str(exp)) return None def _set_pragma_version(self, version=2): @@ -2128,11 +2178,4 @@ class JobDataStructure(MainDataBase): str(type(e).__name__)) return None - # def _retry_database_operation(self): - # completed = False - # tries = 0 - # while completed == False and tries <= 3: - # try: - # pass - # except sqlite3.Error as e: - # pass + -- GitLab