From 0fb533508010669a0a89224e118d7174fe9a41f8 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 25 Oct 2021 22:11:54 +0200 Subject: [PATCH 1/7] Added '?' sign --- autosubmit/job/job_list.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b2eb7d9db..7042f0707 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -263,18 +263,24 @@ class JobList(object): splits = None sign = None - if '-' not in key and '+' not in key and '*' not in key: + if '-' not in key and '+' not in key and '*' not in key and '?' not in key: section = key else: - if '-' in key: - sign = '-' - elif '+' in key: - sign = '+' - elif '*' in key: - sign = '*' - key_split = key.split(sign) - section = key_split[0] - distance = int(key_split[1]) + if '?' in key: + sign = '?' + #key_split = key.split(sign) + section = key[:-1] + #distance = int(key_split[1]) + else: + if '-' in key: + sign = '-' + elif '+' in key: + sign = '+' + elif '*' in key: + sign = '*' + key_split = key.split(sign) + section = key_split[0] + distance = int(key_split[1]) if '[' in section: section_name = section[0:section.find("[")] @@ -331,8 +337,7 @@ class JobList(object): section, distance, dependency_running_type, sign, delay, splits, selected_chunks) else: # []select_chunks_dest,select_chunks_orig - dependency = Dependency( - section, distance, dependency_running_type, sign, delay, splits, []) + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits, []) dependencies[key] = dependency return dependencies @@ -450,6 +455,8 @@ class JobList(object): date = date_list[date_index - dependency.distance] else: skip = True + #if dependency.sign is '?': + # skip = True return skip, (chunk, member, date) @staticmethod -- GitLab From 1ac5e9619b7dd408da778bee6034e7a217683ea7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Oct 2021 17:25:00 +0200 Subject: [PATCH 2/7] Added support for conditional jobs. Found a bug with job_historical --- autosubmit/autosubmit.py | 4 ++-- autosubmit/config/config_common.py | 31 ++++++++++++++++++++++++++++++ autosubmit/job/job.py | 3 +++ autosubmit/job/job_dict.py | 3 ++- autosubmit/job/job_list.py | 28 +++++++++++++++++---------- autosubmit/monitor/monitor.py | 10 ++++++++-- 6 files changed, 64 insertions(+), 15 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5147980dc..77f9b2ecd 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1574,8 +1574,8 @@ class Autosubmit: exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) ExperimentStatus(expid).set_as_running() except Exception as e: - raise AutosubmitCritical( - "Error while processing historical database.", 7067, str(e)) + pass # @wuruchi, this is raising an error EXPERIMENT TABLE NOT FOUND + #raise AutosubmitCritical("Error while processing historical database.", 7067, str(e)) if allowed_members: # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. job_list.run_members = allowed_members diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 64ca3fe66..86b015e6c 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -698,6 +698,8 @@ class AutosubmitConfig(object): dependency = dependency.split('+')[0] elif '*' in dependency: dependency = dependency.split('*')[0] + elif '?' in dependency: + dependency = dependency.split('?')[0] if '[' in dependency: dependency = dependency[:dependency.find('[')] if dependency not in sections: @@ -1269,6 +1271,34 @@ class AutosubmitConfig(object): if string_member is not None: member_list.append(string_member) return member_list + def get_dependencies(self, section="None"): + """ + Returns dependencies list from jobs config file + + :return: experiment's members + :rtype: list + """ + try: + dependencies=[] + for dependency in str(self._jobs_parser.get_option(section, 'DEPENDENCIES', '')).split(' '): + if '-' in dependency: + dependencies.append(dependency.split('-')[0]) + elif '+' in dependency: + dependencies.append(dependency.split('+')[0]) + elif '*' in dependency: + dependencies.append(dependency.split('*')[0]) + elif '?' in dependency: + dependencies.append(dependency.split('?')[0]) + if '[' in dependency: + dependencies.append(dependency[:dependency.find('[')]) + return dependencies + except: + return [] + + if section is not None: + return member_list + else: + return None def get_rerun(self): """ @@ -1750,3 +1780,4 @@ class AutosubmitConfig(object): "{}\n This file and the correctness of its content are necessary.".format(str(exp))) # parser.read(file_path) return parser + diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 780ccbe50..430fbb1ee 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -143,6 +143,7 @@ class Job(object): self.distance_weight = 0 self.level = 0 self.export = "none" + self.dependencies = [] def __getstate__(self): odict = self.__dict__ @@ -1018,6 +1019,8 @@ class Job(object): parameters['NUMMEMBERS'] = len(as_conf.get_member_list()) parameters['WRAPPER'] = as_conf.get_wrapper_type() + parameters['DEPENDENCIES'] = as_conf.get_dependencies(self.section) + self.dependencies = parameters['DEPENDENCIES'] if self.export != "none": variables = re.findall('%(? 0: # Found @@ -500,6 +501,7 @@ class JobList(object): for i in xrange(num_parents): parent = parents[i] if isinstance(parents, list) else parents graph.add_edge(parent.name, job.name) + pass @staticmethod def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): @@ -1500,8 +1502,9 @@ class JobList(object): if datetime.datetime.now() >= job.delay_end: job.status = Status.READY for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == - Status.COMPLETED or parent.status == Status.SKIPPED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] + tmp2 = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False @@ -1509,16 +1512,22 @@ class JobList(object): "Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) + if len(tmp2) == len(job.parents): + if '?' in job.dependencies: + for parent in job.parents: + if job.section+'?' in job.dependencies: + job.status = Status.READY + job.hold = False + Log.debug( + "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format(job.name)) + if as_conf.get_remote_dependencies(): + all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): for job in self.get_prepared(): tmp = [ parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - job.packed = False - save = True - Log.debug( - "Resetting job: {0} status to: READY".format(job.name)) + tmp2 = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] if len(tmp) == len(job.parents): job.status = Status.READY job.packed = False @@ -1526,8 +1535,7 @@ class JobList(object): save = True Log.debug( "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format(job.name)) - Log.debug( - 'Updating WAITING jobs eligible for be prepared') + Log.debug('Updating WAITING jobs eligible for be prepared') for job in self.get_waiting_remote_dependencies('slurm'): if job.name not in all_parents_completed: tmp = [parent for parent in job.parents if ( diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 7ff33b0a6..efc2e009a 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -233,12 +233,18 @@ class Monitor: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) - exp.add_edge(pydotplus.Edge(node_job, node_child)) + if job.section+"?" in child.dependencies: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child)) else: skip = True elif not skip: node_child = node_child[0] - exp.add_edge(pydotplus.Edge(node_job, node_child)) + if job.section + "?" in child.dependencies: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child)) skip = True if not skip: self._add_children( -- GitLab From 0487616a278d88a20fd5235f0bef71e9f45220b0 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Oct 2021 17:43:03 +0200 Subject: [PATCH 3/7] Well the bug wasn't really a bug, my fault redoing it --- autosubmit/autosubmit.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 77f9b2ecd..b67d9144a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1574,8 +1574,7 @@ class Autosubmit: exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) ExperimentStatus(expid).set_as_running() except Exception as e: - pass # @wuruchi, this is raising an error EXPERIMENT TABLE NOT FOUND - #raise AutosubmitCritical("Error while processing historical database.", 7067, str(e)) + raise AutosubmitCritical("Error while processing historical database.", 7067, str(e)) if allowed_members: # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. job_list.run_members = allowed_members -- GitLab From 6efcb6ec063f2592da33bcbe6e3b1053444ed6f6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 27 Oct 2021 21:48:16 +0200 Subject: [PATCH 4/7] Completed conditional dependencies --- autosubmit/config/config_common.py | 14 +------- autosubmit/job/job_list.py | 52 ++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 86b015e6c..92c4d24c2 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1279,19 +1279,7 @@ class AutosubmitConfig(object): :rtype: list """ try: - dependencies=[] - for dependency in str(self._jobs_parser.get_option(section, 'DEPENDENCIES', '')).split(' '): - if '-' in dependency: - dependencies.append(dependency.split('-')[0]) - elif '+' in dependency: - dependencies.append(dependency.split('+')[0]) - elif '*' in dependency: - dependencies.append(dependency.split('*')[0]) - elif '?' in dependency: - dependencies.append(dependency.split('?')[0]) - if '[' in dependency: - dependencies.append(dependency[:dependency.find('[')]) - return dependencies + return self.jobs_parser.get_option(section, "DEPENDENCIES", "").split() except: return [] diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 429cd6654..9c21d4f64 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1487,13 +1487,26 @@ class JobList(object): # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): - job.status = Status.WAITING - save = True - Log.debug( - "Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + tmp2 = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if len(tmp2) == len(job.parents): + for parent in job.parents: + if parent.section + '?' not in job.dependencies and parent.status != Status.COMPLETED: + job.status = Status.WAITING + save = True + Log.debug( + "Resetting sync job: {0} status to: WAITING for parents completion...".format( + job.name)) + break + else: + job.status = Status.WAITING + save = True + Log.debug( + "Resetting sync job: {0} status to: WAITING for parents completion...".format( + job.name)) + #Log.debug('Update finished') Log.debug('Updating WAITING jobs') if not fromSetStatus: @@ -1504,6 +1517,7 @@ class JobList(object): for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] tmp2 = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + tmp3 = [parent for parent in job.parents if parent.status == Status.SKIPPED or parent.status == Status.FAILED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY @@ -1512,23 +1526,27 @@ class JobList(object): "Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) - if len(tmp2) == len(job.parents): - if '?' in job.dependencies: - for parent in job.parents: - if job.section+'?' in job.dependencies: - job.status = Status.READY - job.hold = False - Log.debug( - "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format(job.name)) - if as_conf.get_remote_dependencies(): - all_parents_completed.append(job.name) + if job.status != Status.READY: + if len(tmp3) != len(job.parents): + if len(tmp2) == len(job.parents): + for parent in job.parents: + if parent.section+'?' in job.dependencies: + job.status = Status.READY + job.hold = False + Log.debug( + "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format(job.name)) + break + if as_conf.get_remote_dependencies(): + all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): for job in self.get_prepared(): tmp = [ parent for parent in job.parents if parent.status == Status.COMPLETED] tmp2 = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] - if len(tmp) == len(job.parents): + tmp3 = [parent for parent in job.parents if + parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if len(tmp2) == len(job.parents) and len(tmp3) != len(job.parents): job.status = Status.READY job.packed = False job.hold = False -- GitLab From d6d376106ef1908ccde146b2572487b1b9767039 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 Oct 2021 10:06:30 +0200 Subject: [PATCH 5/7] Pipeline --- autosubmit/monitor/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index efc2e009a..f5d6fad2c 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -233,7 +233,7 @@ class Monitor: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) - if job.section+"?" in child.dependencies: + if job.section is not None and job.section+"?" in child.dependencies: exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) -- GitLab From f67f2fde710cb6742476789c6d0dcb25a3b87117 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 Oct 2021 10:17:06 +0200 Subject: [PATCH 6/7] Pipeline --- autosubmit/monitor/monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index f5d6fad2c..616afaf5b 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -241,7 +241,7 @@ class Monitor: skip = True elif not skip: node_child = node_child[0] - if job.section + "?" in child.dependencies: + if job.section is not None and job.section + "?" in child.dependencies: exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) -- GitLab From ff5c0348825638dcc17e00d27995cbf2021ac92c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 Oct 2021 10:26:23 +0200 Subject: [PATCH 7/7] Pipeline --- test/unit/test_wrappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index f3e9194ea..54c71c43e 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -169,6 +169,7 @@ class TestWrappers(TestCase): self.config.get_extensible_wallclock = Mock(return_value=0) self.config.get_retrials = Mock(return_value=0) self.config.get = Mock(return_value='flexible') + #self.snippet = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self._platform, self.job_list) self.job_list._ordered_jobs_by_date_member["wrapper"] = dict() -- GitLab