diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 1058896e1e70e3d0d687eda55789303064729ab0..3e5df112b33a7012416963490f828a66aa37fee4 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -66,6 +66,9 @@ ExperimentRow = collections.namedtuple( class ExperimentRun(): + """ + Class that represents an experiment run + """ def __init__(self, run_id, created=None, start=0, finish=0, chunk_unit="NA", chunk_size=0, completed=0, total=0, failed=0, queuing=0, running=0, submitted=0, suspended=0, metadata=""): self.run_id = run_id @@ -83,39 +86,6 @@ class ExperimentRun(): self.suspended = suspended self.metadata = metadata - def _increase_counter(self, status): - if status == Status.FAILED: - self.failed += 1 - elif status == Status.SUBMITTED: - self.submitted += 1 - elif status == Status.QUEUING: - self.queuing += 1 - elif status == Status.RUNNING: - self.running += 1 - elif status == Status.COMPLETED: - self.completed += 1 if self.completed < self.total else 0 - else: - pass - - def _decrease_counter(self, status): - if status == Status.FAILED: - self.failed -= 1 if self.failed > 0 else 0 - elif status == Status.SUBMITTED: - self.submitted -= 1 if self.submitted > 0 else 0 - elif status == Status.QUEUING: - self.queuing -= 1 if self.queuing > 0 else 0 - elif status == Status.RUNNING: - self.running -= 1 if self.running > 0 else 0 - elif status == Status.COMPLETED: - self.completed -= 1 if self.completed > 0 else 0 - else: - pass - - def update_counters(self, prev_status, status): - if prev_status != status: - self._increase_counter(status) - self._decrease_counter(prev_status) - class JobStepExtraData(): """ @@ -287,6 +257,9 @@ class JobData(object): return None def submit_datetime_str(self): + """ + Returns the submit datetime as a string with format %Y-%m-%d-%H:%M:%S + """ o_datetime = self.submit_datetime() if o_datetime: return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') @@ -294,6 +267,9 @@ class JobData(object): return None def start_datetime_str(self): + """ + Returns the start datetime as a string with format %Y-%m-%d-%H:%M:%S + """ o_datetime = self.start_datetime() if o_datetime: return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') @@ -301,6 +277,9 @@ class JobData(object): return None def finish_datetime_str(self): + """ + Returns the finish datetime as a string with format %Y-%m-%d-%H:%M:%S + """ o_datetime = self.finish_datetime() if o_datetime: return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') @@ -444,7 +423,8 @@ class MainDataBase(): return None def update_table_schema(self): - """[summary] + """ + Updates the table schema from a list of changes. """ try: if self.conn: @@ -695,10 +675,8 @@ def check_if_database_exists(expid): class JobDataStructure(MainDataBase): def __init__(self, expid, check_only=False): - """Initializes the object based on the unique identifier of the experiment. - - Args: - expid (str): Experiment identifier + """ + Initializes the object based on the unique identifier of the experiment. """ MainDataBase.__init__(self, expid) BasicConfig.read() @@ -903,6 +881,7 @@ class JobDataStructure(MainDataBase): return None if job_list: if len(tracking_dictionary.items()) > 0: + total_number_jobs = len(job_list) # Changes exist completed_count = sum( 1 for job in job_list if job.status == Status.COMPLETED) @@ -922,7 +901,12 @@ class JobDataStructure(MainDataBase): current_run.submitted = submit_count current_run.running = running_count current_run.suspended = suspended_count - self._update_experiment_run(current_run) + # Check if we are still dealing with the right number of jobs + if current_run.total != total_number_jobs: + self.validate_current_run(job_list, current_run.chunk_unit, current_run.chunk_size, + must_create=True, only_update=False, current_config=current_run.metadata) + else: + self._update_experiment_run(current_run) return None except Exception as exp: if _debug == True: @@ -1221,8 +1205,8 @@ class JobDataStructure(MainDataBase): platform_object.write_job_extrainfo( job_data_last.get_hdata(), out_file_path) except Exception as exp: - #Log.info(traceback.format_exc()) #TODO Wilmer, this is stopping autosubmit, "Tuple index out of range" - Log.info("Couldn't write finish time {0}",exp.message) + # Log.info(traceback.format_exc()) #TODO Wilmer, this is stopping autosubmit, "Tuple index out of range" + Log.info("Couldn't write finish time {0}", exp.message) Log.warning(str(exp)) #energy = 0 @@ -1283,48 +1267,48 @@ class JobDataStructure(MainDataBase): Log.warning("Autosubmit couldn't write finish time.") return None - def retry_incompleted_data(self, list_jobs): - """ - DEPRECATED - Retries retrieval of data that might be incompleted. - - :param list_jobs: list of jobs in experiment - :type list_jobs: list() - - :return: None (Modifies database) - """ - try: - pending_jobs = self.get_pending_data() - if pending_jobs: - for item in pending_jobs: - job_object = section = next( - (job for job in list_jobs if job.name == item), None) - if (job_object): - platform_object = job_object.platform - if type(platform_object) is not str: - if platform_object.type == "slurm": - # print("Checking Slurm for " + str(job_name)) - Log.info("Attempting to complete information for {0}".format( - job_object.name)) - submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( - job_object.id, job_object.packed) - if submit_time > 0 and start_time > 0: - job_data_last = self.get_job_data_last( - job_object.name)[0] - job_data_last.submit = int(submit_time) - job_data_last.start = int(start_time) - job_data_last.energy = energy - job_data_last.extra_data = dumps( - extra_data) - job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - rowid = self._update_finish_job_data_plus( - job_data_last) - Log.info("Historic data successfully retrieved and updated for: {0} {1}".format( - job_object.name, rowid)) - except Exception as exp: - print(traceback.format_exc()) - Log.warning(str(exp)) - return None + # def retry_incompleted_data(self, list_jobs): + # """ + # DEPRECATED + # Retries retrieval of data that might be incompleted. + + # :param list_jobs: list of jobs in experiment + # :type list_jobs: list() + + # :return: None (Modifies database) + # """ + # try: + # pending_jobs = self.get_pending_data() + # if pending_jobs: + # for item in pending_jobs: + # job_object = section = next( + # (job for job in list_jobs if job.name == item), None) + # if (job_object): + # platform_object = job_object.platform + # if type(platform_object) is not str: + # if platform_object.type == "slurm": + # # print("Checking Slurm for " + str(job_name)) + # Log.info("Attempting to complete information for {0}".format( + # job_object.name)) + # submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( + # job_object.id, job_object.packed) + # if submit_time > 0 and start_time > 0: + # job_data_last = self.get_job_data_last( + # job_object.name)[0] + # job_data_last.submit = int(submit_time) + # job_data_last.start = int(start_time) + # job_data_last.energy = energy + # job_data_last.extra_data = dumps( + # extra_data) + # job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + # rowid = self._update_finish_job_data_plus( + # job_data_last) + # Log.info("Historic data successfully retrieved and updated for: {0} {1}".format( + # job_object.name, rowid)) + # except Exception as exp: + # print(traceback.format_exc()) + # Log.warning(str(exp)) + # return None def process_current_run_collection(self): """ @@ -1528,10 +1512,7 @@ class JobDataStructure(MainDataBase): pass def get_all_job_data(self): - """[summary] - - Raises: - Exception: [description] + """ """ try: if os.path.exists(self.folder_path): @@ -1611,29 +1592,29 @@ class JobDataStructure(MainDataBase): "Error on returning current job data. run_id {0}".format(run_id)) return None - def get_pending_data(self): - """[summary] - """ - try: - job_names_list = list() - if os.path.exists(self.folder_path): - current_pending = self._get_job_data_pending() - if current_pending: - for item in current_pending: - job_id, job_name, job_rowtype = item - job_names_list.append(job_name) - # job_name_to_detail[job_name] = (job_id, job_rowtype) - # jobid_list.append(job_id) - return job_names_list - else: - return None - except Exception as exp: - if _debug == True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning( - "Autosubmit couldn't retrieve job data. get_job_data_last") - return None + # def get_pending_data(self): + # """[summary] + # """ + # try: + # job_names_list = list() + # if os.path.exists(self.folder_path): + # current_pending = self._get_job_data_pending() + # if current_pending: + # for item in current_pending: + # job_id, job_name, job_rowtype = item + # job_names_list.append(job_name) + # # job_name_to_detail[job_name] = (job_id, job_rowtype) + # # jobid_list.append(job_id) + # return job_names_list + # else: + # return None + # except Exception as exp: + # if _debug == True: + # Log.info(traceback.format_exc()) + # Log.debug(traceback.format_exc()) + # Log.warning( + # "Autosubmit couldn't retrieve job data. get_job_data_last") + # return None def get_max_id_experiment_run(self): """Get Max experiment run object (last experiment run) @@ -1850,7 +1831,7 @@ class JobDataStructure(MainDataBase): return None def _insert_job_data(self, jobdata): - """[summary] + """ Inserts a new job_data register. :param jobdata: JobData object """ @@ -1878,7 +1859,7 @@ class JobDataStructure(MainDataBase): return None def _insert_experiment_run(self, experiment_run): - """[summary] + """ Inserts a new experiment_run register. :param experiment_run: ExperimentRun object """ @@ -1902,7 +1883,7 @@ class JobDataStructure(MainDataBase): str(type(e).__name__))) return None - def _get__all_job_data(self): + def _get_all_job_data(self): """ Get all registers from job_data.\n :return: row content: