diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5147980dc14df93a5532e861b9910a839d286d6f..b67d9144a84fb7f8bd9431f7facc246eaed68a7a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1574,8 +1574,7 @@ class Autosubmit: exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) ExperimentStatus(expid).set_as_running() except Exception as e: - raise AutosubmitCritical( - "Error while processing historical database.", 7067, str(e)) + raise AutosubmitCritical("Error while processing historical database.", 7067, str(e)) if allowed_members: # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. job_list.run_members = allowed_members diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 64ca3fe668930e4385d3880d5ed87d79b5412f6e..92c4d24c2a6ac494957440a84bdc1ef3f8c634cb 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -698,6 +698,8 @@ class AutosubmitConfig(object): dependency = dependency.split('+')[0] elif '*' in dependency: dependency = dependency.split('*')[0] + elif '?' in dependency: + dependency = dependency.split('?')[0] if '[' in dependency: dependency = dependency[:dependency.find('[')] if dependency not in sections: @@ -1269,6 +1271,22 @@ class AutosubmitConfig(object): if string_member is not None: member_list.append(string_member) return member_list + def get_dependencies(self, section="None"): + """ + Returns dependencies list from jobs config file + + :return: experiment's members + :rtype: list + """ + try: + return self.jobs_parser.get_option(section, "DEPENDENCIES", "").split() + except: + return [] + + if section is not None: + return member_list + else: + return None def get_rerun(self): """ @@ -1750,3 +1768,4 @@ class AutosubmitConfig(object): "{}\n This file and the correctness of its content are necessary.".format(str(exp))) # parser.read(file_path) return parser + diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 780ccbe506ae9e6d57e1f0bff4828bcee35ea0c7..430fbb1eea8012ea46347fa0ca22fecaba4e8f5e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -143,6 +143,7 @@ class Job(object): self.distance_weight = 0 self.level = 0 self.export = "none" + self.dependencies = [] def __getstate__(self): odict = self.__dict__ @@ -1018,6 +1019,8 @@ class Job(object): parameters['NUMMEMBERS'] = len(as_conf.get_member_list()) parameters['WRAPPER'] = as_conf.get_wrapper_type() + parameters['DEPENDENCIES'] = as_conf.get_dependencies(self.section) + self.dependencies = parameters['DEPENDENCIES'] if self.export != "none": variables = re.findall('%(? 0: # Found @@ -263,18 +264,24 @@ class JobList(object): splits = None sign = None - if '-' not in key and '+' not in key and '*' not in key: + if '-' not in key and '+' not in key and '*' not in key and '?' not in key: section = key else: - if '-' in key: - sign = '-' - elif '+' in key: - sign = '+' - elif '*' in key: - sign = '*' - key_split = key.split(sign) - section = key_split[0] - distance = int(key_split[1]) + if '?' in key: + sign = '?' + #key_split = key.split(sign) + section = key[:-1] + #distance = int(key_split[1]) + else: + if '-' in key: + sign = '-' + elif '+' in key: + sign = '+' + elif '*' in key: + sign = '*' + key_split = key.split(sign) + section = key_split[0] + distance = int(key_split[1]) if '[' in section: section_name = section[0:section.find("[")] @@ -331,8 +338,7 @@ class JobList(object): section, distance, dependency_running_type, sign, delay, splits, selected_chunks) else: # []select_chunks_dest,select_chunks_orig - dependency = Dependency( - section, distance, dependency_running_type, sign, delay, splits, []) + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits, []) dependencies[key] = dependency return dependencies @@ -450,6 +456,8 @@ class JobList(object): date = date_list[date_index - dependency.distance] else: skip = True + #if dependency.sign is '?': + # skip = True return skip, (chunk, member, date) @staticmethod @@ -493,6 +501,7 @@ class JobList(object): for i in xrange(num_parents): parent = parents[i] if isinstance(parents, list) else parents graph.add_edge(parent.name, job.name) + pass @staticmethod def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): @@ -1478,13 +1487,26 @@ class JobList(object): # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): - job.status = Status.WAITING - save = True - Log.debug( - "Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + tmp2 = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if len(tmp2) == len(job.parents): + for parent in job.parents: + if parent.section + '?' not in job.dependencies and parent.status != Status.COMPLETED: + job.status = Status.WAITING + save = True + Log.debug( + "Resetting sync job: {0} status to: WAITING for parents completion...".format( + job.name)) + break + else: + job.status = Status.WAITING + save = True + Log.debug( + "Resetting sync job: {0} status to: WAITING for parents completion...".format( + job.name)) + #Log.debug('Update finished') Log.debug('Updating WAITING jobs') if not fromSetStatus: @@ -1493,8 +1515,10 @@ class JobList(object): if datetime.datetime.now() >= job.delay_end: job.status = Status.READY for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == - Status.COMPLETED or parent.status == Status.SKIPPED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] + tmp2 = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + tmp3 = [parent for parent in job.parents if parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False @@ -1502,25 +1526,34 @@ class JobList(object): "Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) + if job.status != Status.READY: + if len(tmp3) != len(job.parents): + if len(tmp2) == len(job.parents): + for parent in job.parents: + if parent.section+'?' in job.dependencies: + job.status = Status.READY + job.hold = False + Log.debug( + "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format(job.name)) + break + if as_conf.get_remote_dependencies(): + all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): for job in self.get_prepared(): tmp = [ parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - job.packed = False - save = True - Log.debug( - "Resetting job: {0} status to: READY".format(job.name)) - if len(tmp) == len(job.parents): + tmp2 = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + tmp3 = [parent for parent in job.parents if + parent.status == Status.SKIPPED or parent.status == Status.FAILED] + if len(tmp2) == len(job.parents) and len(tmp3) != len(job.parents): job.status = Status.READY job.packed = False job.hold = False save = True Log.debug( "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format(job.name)) - Log.debug( - 'Updating WAITING jobs eligible for be prepared') + Log.debug('Updating WAITING jobs eligible for be prepared') for job in self.get_waiting_remote_dependencies('slurm'): if job.name not in all_parents_completed: tmp = [parent for parent in job.parents if ( diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 7ff33b0a667bf8ab6d6261d1d6bfa1804c986703..616afaf5b756b59908f2dce8be4e21268dc0019a 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -233,12 +233,18 @@ class Monitor: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) - exp.add_edge(pydotplus.Edge(node_job, node_child)) + if job.section is not None and job.section+"?" in child.dependencies: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child)) else: skip = True elif not skip: node_child = node_child[0] - exp.add_edge(pydotplus.Edge(node_job, node_child)) + if job.section is not None and job.section + "?" in child.dependencies: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child)) skip = True if not skip: self._add_children( diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index f3e9194ea63bf0636b7c45a4ce196b4ced6b64f3..54c71c43e28cc0828015d50b292026640fbfdbd3 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -169,6 +169,7 @@ class TestWrappers(TestCase): self.config.get_extensible_wallclock = Mock(return_value=0) self.config.get_retrials = Mock(return_value=0) self.config.get = Mock(return_value='flexible') + #self.snippet = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self._platform, self.job_list) self.job_list._ordered_jobs_by_date_member["wrapper"] = dict()