From f07aa51cc0596429b7686f27f5c6313f0719e3d5 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 15 Jan 2021 12:42:22 +0100 Subject: [PATCH] Fixed #642. Added a warning when after recovery there are some jobs that have conflicting dependency status. Furthermore, the current configuration of the experiment will be saved for each run of the experiment in the historical database. --- autosubmit/autosubmit.py | 16 +++++++-- autosubmit/config/config_common.py | 57 ++++++++++++++++++++++++++---- autosubmit/database/db_jobdata.py | 10 +++--- 3 files changed, 68 insertions(+), 15 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 59a7bb4fd..0c7f1ce9a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1240,6 +1240,7 @@ class Autosubmit: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(True) + Log.info( "Autosubmit is running with {0}", Autosubmit.autosubmit_version) if update_version: @@ -1432,7 +1433,7 @@ class Autosubmit: try: job_data_structure = JobDataStructure(expid) job_data_structure.validate_current_run(job_list.get_job_list( - ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) + ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) ExperimentStatus(expid).update_running_status() except Exception as e: @@ -2367,6 +2368,15 @@ class Autosubmit: else: Log.info(job_list.print_with_status()) Log.status(job_list.print_with_status()) + # Warnings about precedence completion + #time_0 = time.time() + notcompleted_parents_completed_jobs = [job for job in job_list.get_job_list( + ) if job.status == Status.COMPLETED and len([jobp for jobp in job.parents if jobp.status != Status.COMPLETED]) > 0] + + if notcompleted_parents_completed_jobs and len(notcompleted_parents_completed_jobs) > 0: + Log.error("The following COMPLETED jobs depend on jobs that have not been COMPLETED (this can result in unexpected behavior): {0}".format( + str([job.name for job in notcompleted_parents_completed_jobs]))) + #print("Warning calc took {0} seconds".format(time.time() - time_0)) return True @staticmethod @@ -3586,7 +3596,7 @@ class Autosubmit: # Setting up job historical database header. Must create a new run. JobDataStructure(expid).validate_current_run(job_list.get_job_list( - ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True) + ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True, current_config=as_conf.get_full_config_as_json()) if not noplot: if group_by: @@ -4343,7 +4353,7 @@ class Autosubmit: job_list.save() job_data_structure = JobDataStructure(expid) job_data_structure.process_status_changes( - job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), check_run=True) + job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), check_run=True, current_config=as_conf.get_full_config_as_json()) else: Log.printlog( "Changes NOT saved to the JobList!!!!: use -s option to save", 3000) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index b66b146cd..f1b70a334 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -26,6 +26,7 @@ import os import re import subprocess import traceback +import json from pyparsing import nestedExpr @@ -126,6 +127,44 @@ class AutosubmitConfig(object): """ return self._jobs_parser_file + def get_full_config_as_dict(self): + """ + Returns full configuration as json object + """ + _conf = _exp = _platforms = _jobs = _proj = None + result = {} + + def get_data(parser): + """ + dictionary comprehension to get data from parser + """ + res = {sec: {option: parser.get(sec, option) for option in parser.options(sec)} for sec in [ + section for section in parser.sections()]} + return res + + result["conf"] = get_data( + self._conf_parser) if self._conf_parser else None + result["exp"] = get_data( + self._exp_parser) if self._exp_parser else None + result["platforms"] = get_data( + self._platforms_parser) if self._platforms_parser else None + result["jobs"] = get_data( + self._jobs_parser) if self._jobs_parser else None + result["proj"] = get_data( + self._proj_parser) if self._proj_parser else None + return result + + def get_full_config_as_json(self): + """ + Return config as json object + """ + try: + return json.dumps(self.get_full_config_as_dict()) + except Exception as exp: + Log.warning( + "Autosubmit was not able to retrieve and save the configuration into the historical database.") + return "" + def get_project_dir(self): """ Returns experiment's project directory @@ -226,6 +265,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'USER_TO', '').lower() + def get_migrate_duplicate(self, section): """ Returns the user to change to from platform config file. @@ -234,6 +274,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'SAME_USER', 'false').lower() + def get_current_user(self, section): """ Returns the user to be changed from platform config file. @@ -460,7 +501,7 @@ class AutosubmitConfig(object): "TOTALJOBS parameter not found or non-integer"]] if not self._conf_parser.check_is_int('config', 'SAFETYSLEEPTIME', True): self.set_safetysleeptime(10) - #self.wrong_config["Autosubmit"] += [['config', + # self.wrong_config["Autosubmit"] += [['config', # "SAFETYSLEEPTIME parameter not found or non-integer"]] if not self._conf_parser.check_is_int('config', 'RETRIALS', True): self.wrong_config["Autosubmit"] += [['config', @@ -802,11 +843,11 @@ class AutosubmitConfig(object): parameters = dict() for section in self._platforms_parser.sections(): for option in self._platforms_parser.options(section): - parameters[section+"_"+option] = self._platforms_parser.get(section, option) + parameters[section + "_" + + option] = self._platforms_parser.get(section, option) return parameters - - def load_section_parameters(self,job_list,as_conf,submitter): + def load_section_parameters(self, job_list, as_conf, submitter): """ Load parameters from job config files. @@ -830,11 +871,13 @@ class AutosubmitConfig(object): job.platform = submitter.platforms["local"] for section in job_list_by_section.keys(): - job_list_by_section[section][0].update_parameters(as_conf, job_list.parameters) + job_list_by_section[section][0].update_parameters( + as_conf, job_list.parameters) section_list = job_list_by_section[section][0].parameters.keys() for section_param in section_list: if section_param not in job_list.parameters.keys(): - parameters[section + "_" + section_param] = job_list_by_section[section][0].parameters[section_param] + parameters[section + "_" + + section_param] = job_list_by_section[section][0].parameters[section_param] return parameters def load_project_parameters(self): @@ -1277,7 +1320,7 @@ class AutosubmitConfig(object): :return: safety sleep time :rtype: int """ - return int(self._conf_parser.get_option('config', 'SAFETYSLEEPTIME',10)) + return int(self._conf_parser.get_option('config', 'SAFETYSLEEPTIME', 10)) def set_safetysleeptime(self, sleep_time): """ diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 81a81db07..fff46c106 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -795,7 +795,7 @@ class JobDataStructure(MainDataBase): new_run = ExperimentRun(0) return self._insert_experiment_run(new_run) - def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False): + def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config=""): try: current_run = self.get_max_id_experiment_run() if current_run: @@ -810,7 +810,7 @@ class JobDataStructure(MainDataBase): Log.debug( "Since a significant amount of jobs have changed status. Autosubmit will consider a new run of the same experiment.") self.validate_current_run( - job_list, chunk_unit, chunk_size, True) + job_list, chunk_unit, chunk_size, True, current_config=current_config) return None if job_list: if len(tracking_dictionary.items()) > 0: @@ -843,7 +843,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't process status changes validate_current_run {0}".format(str(exp))) return None - def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False): + def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False, current_config=""): """[summary] :param job_list ([type]): [description] @@ -874,7 +874,7 @@ class JobDataStructure(MainDataBase): if not current_run or must_create == True: new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, - current_total, failed_count, queue_count, running_count, submit_count, suspended_count, None) + current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config) self.current_run_id = self._insert_experiment_run(new_run) else: # print("Current run {0}".format(current_run.total)) @@ -882,7 +882,7 @@ class JobDataStructure(MainDataBase): if current_run.total != current_total and only_update == False: # print("Creating new run") new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, - current_total, failed_count, queue_count, running_count, submit_count, suspended_count, None) + current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config) self.current_run_id = self._insert_experiment_run(new_run) else: # print("Updating current run") -- GitLab