From 6f30c1c927583ba6d3aad45334d59e0a321f542b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 15 Sep 2022 11:30:11 +0200 Subject: [PATCH 1/7] Fixed splits #856 --- autosubmit/job/job_list.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 7b896a506..4429caa36 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -538,6 +538,15 @@ class JobList(object): other_parents = dic_jobs.get_jobs(dependency.section, None, None, None) parents_jobs = dic_jobs.get_jobs(dependency.section, date, member, chunk) + # Convert 2d list to 1d list if necessary + #aux_list = [] + #if isinstance(parents_jobs, list): + # for parent_list in parents_jobs: + # if isinstance(parent_list, list): + # aux_list.extend(parent_list) + # else: + # aux_list.append(parent_list) + #parents_jobs = aux_list natural_jobs = dic_jobs.get_jobs(dependency.section, date, member, None) all_parents = other_parents + parents_jobs @@ -547,16 +556,6 @@ class JobList(object): if len(filters_to_apply) > 0: pass for parent in all_parents: - if parent.name == job.name: - continue - # Check if it is a natural relation based in autosubmit terms ( same date,member,chunk ). - if parent in natural_jobs and (job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk): - natural_relationship = True - else: - natural_relationship = False - # Check if the current parent is a valid parent based on the dependencies set on expdef.conf - if not JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply): - continue # Generic for all dependencies if dependency.delay == -1 or chunk > dependency.delay: if isinstance(parent, list): @@ -567,6 +566,19 @@ class JobList(object): else: if dependency.splits is not None and len(str(dependency.splits)) > 0: parent = [_parent for _parent in parent if _parent.split in dependency.splits] + # If splits is not None, the job is a list of jobs + if parent.name == job.name: + continue + # Check if it is a natural relation based in autosubmit terms ( same date,member,chunk ). + if parent in natural_jobs and ((job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk ) and parent.split <= job.split ) : + natural_relationship = True + else: + natural_relationship = False + + # Check if the current parent is a valid parent based on the dependencies set on expdef.conf + if not JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply): + continue + job.add_parent(parent) JobList._add_edge(graph, job, parent) -- GitLab From 78c1b8ef64669c6245e9c2c9d19ae188b059a662 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 15 Sep 2022 11:33:39 +0200 Subject: [PATCH 2/7] Fixed splits #856 --- autosubmit/job/job_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4429caa36..18ee6fa86 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -570,7 +570,7 @@ class JobList(object): if parent.name == job.name: continue # Check if it is a natural relation based in autosubmit terms ( same date,member,chunk ). - if parent in natural_jobs and ((job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk ) and parent.split <= job.split ) : + if parent in natural_jobs and ((job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk ) and (parent.split is None or job.split is None or parent.split <= job.split) ) : natural_relationship = True else: natural_relationship = False -- GitLab From 347d8778da6fd84d79aacf3c43ec6b38ec4c9287 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 15 Sep 2022 13:20:35 +0200 Subject: [PATCH 3/7] mandatory files are now not mandatory, instead the checkers is made in based as whole data #856 --- autosubmit/config/config_common.py | 73 ++++++++++++++++++++---------- autosubmit/job/job_dict.py | 2 +- autosubmit/job/job_list.py | 1 + 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index fd1707899..d540fbcd6 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -788,7 +788,7 @@ class AutosubmitConfig(object): :return: True if everything is correct, False if it founds any error :rtype: bool """ - parser_data = self._conf_parser.data + parser_data = self.experiment_data if parser_data["CONFIG"].get('AUTOSUBMIT_VERSION',-1.1) == -1.1: self.wrong_config["Autosubmit"] += [['config', "AUTOSUBMIT_VERSION parameter not found"]] @@ -843,7 +843,7 @@ class AutosubmitConfig(object): """ Checks experiment's queues configuration file. """ - parser_data = self._platforms_parser.data["PLATFORMS"] + parser_data = self.experiment_data["PLATFORMS"] main_platform_found = False if self.hpcarch == "LOCAL": main_platform_found = True @@ -884,10 +884,9 @@ class AutosubmitConfig(object): :return: True if everything is correct, False if it founds any error :rtype: bool """ - parser = self._jobs_parser - platforms = self._platforms_parser.data - for section in parser.data["JOBS"]: - section_data=parser.data["JOBS"][section] + parser = self.experiment_data + for section in parser["JOBS"]: + section_data=parser["JOBS"][section] section_file_path = section_data.get('FILE',"") if not section_file_path: self.wrong_config["Jobs"] += [[section, @@ -921,7 +920,7 @@ class AutosubmitConfig(object): dependency = dependency.split('?')[0] if '[' in dependency: dependency = dependency[:dependency.find('[')] - if dependency.upper() not in parser.data["JOBS"].keys(): + if dependency.upper() not in parser["JOBS"].keys(): self.warn_config["Jobs"].append( [section, "Dependency parameter is invalid, job {0} is not configured".format(dependency)]) rerun_dependencies = section_data.get('RERUN_DEPENDENCIES',"").upper() @@ -931,7 +930,7 @@ class AutosubmitConfig(object): dependency = dependency.split('-')[0] if '[' in dependency: dependency = dependency[:dependency.find('[')] - if dependency not in parser.data["JOBS"].keys(): + if dependency not in parser["JOBS"].keys(): self.warn_config["Jobs"] += [ [section, "RERUN_DEPENDENCIES parameter is invalid, job {0} is not configured".format(dependency)]] running_type = section_data.get('RUNNING', "once").lower() @@ -951,7 +950,7 @@ class AutosubmitConfig(object): :return: True if everything is correct, False if it founds any error :rtype: bool """ - parser = self._exp_parser.data + parser = self.experiment_data if not parser['DEFAULT'].get('EXPID',""): self.wrong_config["Expdef"] += [['DEFAULT',"Mandatory DEFAULT.EXPID parameter is invalid"]] @@ -1068,15 +1067,19 @@ class AutosubmitConfig(object): any_file_changed = False # check if original_files has been edited for config_file in range(0,len(self._original_parser_files)): - if self._original_parser_files[config_file].name != self._proj_parser_file.name: - modified, self._original_parser_files_modtime[config_file] = self.file_modified( - self._original_parser_files[config_file], self._original_parser_files_modtime[config_file]) - else: - if self._proj_parser_file.exists(): + try: + if self._original_parser_files[config_file].name != self._proj_parser_file.name: modified, self._original_parser_files_modtime[config_file] = self.file_modified( self._original_parser_files[config_file], self._original_parser_files_modtime[config_file]) - if modified: - any_file_changed = True + else: + if self._proj_parser_file.exists(): + modified, self._original_parser_files_modtime[config_file] = self.file_modified( + self._original_parser_files[config_file], self._original_parser_files_modtime[config_file]) + if modified: + any_file_changed = True + except: + #Doesn't exists + pass # check if custom_files has been edited for config_file in range(0,len(self._custom_parser_files)): modified,self._custom_parser_files_modtime[config_file] = self.file_modified(self._custom_parser_files[config_file],self._custom_parser_files_modtime[config_file]) @@ -1089,16 +1092,29 @@ class AutosubmitConfig(object): self.parser_factory, self._conf_parser_file) self._platforms_parser = AutosubmitConfig.get_parser( self.parser_factory, self._platforms_parser_file) + self._jobs_parser = AutosubmitConfig.get_parser( self.parser_factory, self._jobs_parser_file) self._exp_parser = AutosubmitConfig.get_parser( self.parser_factory, self._exp_parser_file) if first_load: self._custom_parser = [] - self._exp_parser.data = self.deep_normalize(self._exp_parser.data) - self._conf_parser.data = self.deep_normalize(self._conf_parser.data) - self._jobs_parser.data = self.deep_normalize(self._jobs_parser.data) - self._platforms_parser.data = self.deep_normalize(self._platforms_parser.data) + try: + self._exp_parser.data = self.deep_normalize(self._exp_parser.data) + except: + self._exp_parser.data = {} + try: + self._conf_parser.data = self.deep_normalize(self._conf_parser.data) + except: + self._conf_parser.data = {} + try: + self._jobs_parser.data = self.deep_normalize(self._jobs_parser.data) + except: + self._jobs_parser.data = {} + try: + self._platforms_parser.data = self.deep_normalize(self._platforms_parser.data) + except: + self._platforms_parser.data = {} default_section = self._exp_parser.data.get("DEFAULT",None) default_path = Path(self.basic_config.LOCAL_ROOT_DIR) / self.expid custom_folder_path = default_path / "conf" @@ -1111,9 +1127,11 @@ class AutosubmitConfig(object): self._custom_parser_files_modtime = [] for f in custom_folder_path.rglob("*.yml"): - if not f == self._proj_parser_file and not f.samefile(self._jobs_parser_file) and not f.samefile( - self._platforms_parser_file) and not f.samefile(self._exp_parser_file) and not f.samefile( - self._conf_parser_file): + if not (self._proj_parser_file.exists() and f.samefile(self._proj_parser_file)) \ + and not (self._jobs_parser_file.exists() and f.samefile(self._jobs_parser_file)) \ + and not (self._platforms_parser_file.exists() and f.samefile(self._platforms_parser_file)) \ + and not (self._exp_parser_file.exists() and f.samefile(self._exp_parser_file)) \ + and not (self._conf_parser_file.exists() and f.samefile(self._conf_parser_file)): self._custom_parser_files.append(f) self._custom_parser_files_modtime.append(None) @@ -2085,7 +2103,8 @@ class AutosubmitConfig(object): # For testing purposes if file_path == Path('/dummy/local/root/dir/a000/conf/') or file_path == Path('dummy/file/path'): parser.data = parser.load(file_path) - + if parser.data is None: + parser.data = {} return parser # proj file might not be present @@ -2093,6 +2112,8 @@ class AutosubmitConfig(object): if file_path.match("*proj*"): if file_path.exists(): parser.data = parser.load(file_path) + if parser.data is None: + parser.data = {} #else: #Log.warning( "{0} was not found. Some variables might be missing. If your experiment does not need a proj file, you can ignore this message.", file_path) else: @@ -2100,8 +2121,10 @@ class AutosubmitConfig(object): try: with open(file_path) as f: parser.data = parser.load(f) + if parser.data is None: + parser.data = {} except IOError as exp: - raise + return parser except Exception as exp: raise Exception( "{}\n This file and the correctness of its content are necessary.".format(str(exp))) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 69d5be217..24d73e5ba 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -367,7 +367,7 @@ class DicJobs: job_type = str(parameters[section].get( "TYPE", default_job_type)).lower() job.dependencies = parameters[section].get( "DEPENDENCIES", "") - if job.dependencies and type(job.dependencies) is not list: + if job.dependencies and type(job.dependencies) is not dict: job.dependencies = str(job.dependencies).split() if job_type == 'bash': job.type = Type.BASH diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 18ee6fa86..63f0976d9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -580,6 +580,7 @@ class JobList(object): continue job.add_parent(parent) + #job.add_special_variables(parent.name,relationship) JobList._add_edge(graph, job, parent) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, -- GitLab From 07c1abb08b1e4103323000ed5c4cda6540b8677e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 15 Sep 2022 16:21:31 +0200 Subject: [PATCH 4/7] Some changes to natural that didnt work as intended for dates_from #856 --- autosubmit/job/job_list.py | 98 ++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 63f0976d9..d94e01044 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -362,16 +362,17 @@ class JobList(object): :return: boolean """ to_filter = [] - if str(parent_value).lower() == "none": + # strip special chars if any + filter_value = filter_value.strip("?") + if str(parent_value).lower().find("none") != -1: return True - if filter_value == "all": + if filter_value.lower().find("all") != -1: return True - elif filter_value == "natural": + elif filter_value.lower().find("natural") != -1: if parent_value is None or parent_value in associative_list: return True - elif filter_value == "none": + elif filter_value.lower().find("none") != -1: return False - elif filter_value.find(",") != -1: aux_filter = filter_value.split(",") if filter_type != "chunks": @@ -417,12 +418,13 @@ class JobList(object): if filter_type.upper() == level_to_check.upper(): for filter_range in filter_data.keys(): if str(value_to_check) is None or str(filter_range).upper().find(str(value_to_check).upper()) != -1: - current_filter.update(filter_data[filter_range]) + if filter_data[filter_range] is not None: + current_filter.update(filter_data[filter_range]) return current_filter @staticmethod def _filter_current_job(current_job,relationships): ''' - Check if the current_job is included in the filter_value + Check if the current_job is included in the filter_value ( from) :param current_job: :param dependency: :return: @@ -435,7 +437,7 @@ class JobList(object): # Check Date then Member or Chunk then Chunk filters_to_apply = {} if relationships is not None: - filters_to_apply = JobList._check_relationship(relationships,"DATES_FROM",current_job.date) + filters_to_apply = JobList._check_relationship(relationships,"DATES_FROM",date2str(current_job.date)) if len(filters_to_apply) > 0: for filter_number in range(0,len(filters_to_apply)): @@ -458,10 +460,39 @@ class JobList(object): filters_to_apply = JobList._check_relationship(relationships,"CHUNKS_FROM",current_job.chunk) # Generic filter if len(filters_to_apply) == 0: + relationships.pop("CHUNKS_FROM",None) + relationships.pop("MEMBERS_FROM",None) + relationships.pop("DATES_FROM",None) filters_to_apply = relationships return filters_to_apply + + @staticmethod + def _check_special_dependencies_signs(job_name,section,dates,members,chunks): + """ + Check if the job_name has special characters that need to be pre-processed. + :param job_name: + :param section: + :param dates: + :param members: + :param chunks: + :return: dictionary with the special characters and the values to replace. + """ + special_chars = {} + # Check ? character and add it to a dictionary grouped by special character + special_chars["?"] = [] + if section[-1] == "?": + special_chars["?"] = job_name + elif dates[-1] == "?": + special_chars["?"] = job_name + elif members[-1] == "?": + special_chars["?"] = job_name + elif chunks[-1] == "?": + special_chars["?"] = job_name + return special_chars + + @staticmethod def _valid_parent(parent,member_list,date_list,chunk_list,is_a_natural_relation,filters_to_apply): ''' @@ -476,20 +507,25 @@ class JobList(object): ''' #check if current_parent is listed on dependency.relationships associative_list = {} - if is_a_natural_relation: + optional = False + dates_to = str(filters_to_apply.get("DATES_TO", "natural")).lower() + members_to = str(filters_to_apply.get("MEMBERS_TO", "natural")).lower() + chunks_to = str(filters_to_apply.get("CHUNKS_TO", "natural")).lower() + if not is_a_natural_relation: + if dates_to == "natural": + dates_to = "none" + elif members_to == "natural": + members_to = "none" + elif chunks_to == "natural": + chunks_to = "none" relationship_type = "natural" else: - relationship_type = "none" + relationship_type = "natural" associative_list["dates"] = date_list associative_list["members"] = member_list associative_list["chunks"] = chunk_list - # Default filters if not present already - dates_to = str(filters_to_apply.get("DATES_TO", relationship_type)).lower() - members_to = str(filters_to_apply.get("MEMBERS_TO", relationship_type)).lower() - chunks_to = str(filters_to_apply.get("CHUNKS_TO", relationship_type)).lower() - if dates_to == "natural": associative_list["dates"] = [date2str(parent.date)] if parent.date is not None else date_list @@ -504,8 +540,10 @@ class JobList(object): valid_chunks = JobList._apply_filter(parent.chunk, chunks_to, associative_list["chunks"], "chunks") if valid_dates and valid_members and valid_chunks: - return True - return False + if dates_to.find("?") != -1 or members_to.find("?") != -1 or chunks_to.find("?") != -1: + optional = True + return True,optional + return False,optional @staticmethod def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph): @@ -538,23 +576,17 @@ class JobList(object): other_parents = dic_jobs.get_jobs(dependency.section, None, None, None) parents_jobs = dic_jobs.get_jobs(dependency.section, date, member, chunk) - # Convert 2d list to 1d list if necessary - #aux_list = [] - #if isinstance(parents_jobs, list): - # for parent_list in parents_jobs: - # if isinstance(parent_list, list): - # aux_list.extend(parent_list) - # else: - # aux_list.append(parent_list) - #parents_jobs = aux_list - natural_jobs = dic_jobs.get_jobs(dependency.section, date, member, None) + if dependency.sign in ["+", "-"]: + natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,None) + else: + natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,chunk) + all_parents = other_parents + parents_jobs # Get dates_to, members_to, chunks_to of the deepest level of the relationship. filters_to_apply = JobList._filter_current_job(job,dependency.relationships) - #debug if len(filters_to_apply) > 0: - pass + print("Debug: job: {1} | filters_to_apply: {0}".format(str(filters_to_apply),job.name)) for parent in all_parents: # Generic for all dependencies if dependency.delay == -1 or chunk > dependency.delay: @@ -576,9 +608,13 @@ class JobList(object): natural_relationship = False # Check if the current parent is a valid parent based on the dependencies set on expdef.conf - if not JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply): + valid,optional_to = JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply) + if not valid: continue - + # If the parent is valid, add it to the graph + optional_from = False + if optional_to or optional_from: + optional = True job.add_parent(parent) #job.add_special_variables(parent.name,relationship) JobList._add_edge(graph, job, parent) -- GitLab From e73b32660565642718c776589a79e94185ca4249 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 16 Sep 2022 10:57:25 +0200 Subject: [PATCH 5/7] log fix --- autosubmit/autosubmit.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 167aaf2f6..34b4bb614 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1624,7 +1624,7 @@ class Autosubmit: job_list.check_scripts(as_conf) except Exception as e: raise AutosubmitCritical( - "Error while checking job templates", 7015, str(e)) + "Error while checking job templates", 7014, str(e)) Log.debug("Loading job packages") try: packages_persistence = JobPackagePersistence(os.path.join( @@ -2332,7 +2332,7 @@ class Autosubmit: if not inspect: job_list.save() if error_message != "": - raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message),7015) + raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message),7014) return save @staticmethod -- GitLab From ff594d3cc7a7ae2e8601f51acb213181a40fc98b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 16 Sep 2022 15:30:51 +0200 Subject: [PATCH 6/7] #856 conditional ? added and wrapper fixed . To check wrapper with more than one job --- autosubmit/autosubmit.py | 4 +- autosubmit/job/job.py | 17 +++++++- autosubmit/job/job_list.py | 78 ++++++++++++---------------------- autosubmit/job/job_packager.py | 2 +- autosubmit/monitor/monitor.py | 4 +- bin/autosubmit | 9 ++-- 6 files changed, 52 insertions(+), 62 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 34b4bb614..a4551e6e8 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python # Copyright 2015-2020 Earth Sciences Department, BSC-CNS @@ -3824,7 +3824,7 @@ class Autosubmit: except (AutosubmitError,AutosubmitCritical): raise except BaseException as e: - raise AutosubmitCritical("Download failed",7064,str(e)) + raise AutosubmitCritical(" Download failed",7064,str(e)) return True @staticmethod diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index cb31a49ee..c047196b7 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -150,6 +150,8 @@ class Job(object): self.dependencies = [] self.running = "once" self.start_time = None + self.edge_info = dict() + def __getstate__(self): odict = self.__dict__ @@ -381,6 +383,20 @@ class Job(object): """ self.children.add(new_child) + def add_edge_info(self,parent_name, special_variables): + """ + Adds edge information to the job + + :param parent_name: parent name + :type parent_name: str + :param special_variables: special variables + :type special_variables: dict + """ + if parent_name not in self.edge_info: + self.edge_info[parent_name] = special_variables + else: + self.edge_info[parent_name].update(special_variables) + pass def delete_parent(self, parent): """ Remove a parent from the job @@ -1641,7 +1657,6 @@ class WrapperJob(Job): self.job_list = job_list # divide jobs in dictionary by state? self.wallclock = total_wallclock - self.num_processors = num_processors self.running_jobs_start = OrderedDict() self._platform = platform diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d94e01044..3bf19215a 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -23,7 +23,7 @@ import os import pickle import traceback import math - +import copy from time import localtime, strftime, mktime from shutil import move from autosubmit.job.job import Job @@ -412,15 +412,18 @@ class JobList(object): :param value_to_check: can be a date, member or chunk. :return: """ + optional = False current_filter = {} for filter_type, filter_data in relationships.items(): if isinstance(filter_data, collections.abc.Mapping): - if filter_type.upper() == level_to_check.upper(): + if filter_type.upper().find(level_to_check.upper()) != -1: for filter_range in filter_data.keys(): if str(value_to_check) is None or str(filter_range).upper().find(str(value_to_check).upper()) != -1: if filter_data[filter_range] is not None: + if filter_type.find("?") != -1 or filter_range.find("?") != -1: + optional = True current_filter.update(filter_data[filter_range]) - return current_filter + return current_filter,optional @staticmethod def _filter_current_job(current_job,relationships): ''' @@ -435,29 +438,29 @@ class JobList(object): # Third level can only be chunk. # If the filter is generic, it will be applied to all section jobs. # Check Date then Member or Chunk then Chunk + optional = False filters_to_apply = {} - if relationships is not None: - filters_to_apply = JobList._check_relationship(relationships,"DATES_FROM",date2str(current_job.date)) + if relationships is not None and len(relationships) > 0: + filters_to_apply,optional = JobList._check_relationship(relationships,"DATES_FROM",date2str(current_job.date)) if len(filters_to_apply) > 0: for filter_number in range(0,len(filters_to_apply)): - - filters_to_apply_m = JobList._check_relationship(filters_to_apply[filter_number],"MEMBERS_FROM",current_job.member) + filters_to_apply_m,optional = JobList._check_relationship(filters_to_apply[filter_number],"MEMBERS_FROM",current_job.member) if len(filters_to_apply_m) > 0: - filters_to_apply = filters_to_apply_m + filters_to_apply,optional = filters_to_apply_m else: - filters_to_apply_c = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) + filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) if len(filters_to_apply_c) > 0: filters_to_apply = filters_to_apply_c # Check Member then Chunk if len(filters_to_apply) == 0: - filters_to_apply = JobList._check_relationship(relationships,"MEMBERS_FROM",current_job.member) + filters_to_apply,optional = JobList._check_relationship(relationships,"MEMBERS_FROM",current_job.member) if len(filters_to_apply) > 0: - filters_to_apply_c = JobList._check_relationship(filters_to_apply,"CHUNKS_FROM",current_job.chunk) + filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply,"CHUNKS_FROM",current_job.chunk) if len(filters_to_apply_c) > 0: filters_to_apply = filters_to_apply_c #Check Chunk if len(filters_to_apply) == 0: - filters_to_apply = JobList._check_relationship(relationships,"CHUNKS_FROM",current_job.chunk) + filters_to_apply,optional = JobList._check_relationship(relationships,"CHUNKS_FROM",current_job.chunk) # Generic filter if len(filters_to_apply) == 0: relationships.pop("CHUNKS_FROM",None) @@ -466,31 +469,8 @@ class JobList(object): filters_to_apply = relationships - return filters_to_apply + return filters_to_apply,optional - @staticmethod - def _check_special_dependencies_signs(job_name,section,dates,members,chunks): - """ - Check if the job_name has special characters that need to be pre-processed. - :param job_name: - :param section: - :param dates: - :param members: - :param chunks: - :return: dictionary with the special characters and the values to replace. - """ - special_chars = {} - # Check ? character and add it to a dictionary grouped by special character - special_chars["?"] = [] - if section[-1] == "?": - special_chars["?"] = job_name - elif dates[-1] == "?": - special_chars["?"] = job_name - elif members[-1] == "?": - special_chars["?"] = job_name - elif chunks[-1] == "?": - special_chars["?"] = job_name - return special_chars @staticmethod @@ -518,9 +498,7 @@ class JobList(object): members_to = "none" elif chunks_to == "natural": chunks_to = "none" - relationship_type = "natural" - else: - relationship_type = "natural" + associative_list["dates"] = date_list associative_list["members"] = member_list @@ -580,11 +558,13 @@ class JobList(object): natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,None) else: natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,chunk) - - + if dependency.sign in ['?']: + optional_section = True + else: + optional_section = False all_parents = other_parents + parents_jobs # Get dates_to, members_to, chunks_to of the deepest level of the relationship. - filters_to_apply = JobList._filter_current_job(job,dependency.relationships) + filters_to_apply,optional_from = JobList._filter_current_job(job,copy.deepcopy(dependency.relationships)) if len(filters_to_apply) > 0: print("Debug: job: {1} | filters_to_apply: {0}".format(str(filters_to_apply),job.name)) for parent in all_parents: @@ -612,13 +592,11 @@ class JobList(object): if not valid: continue # If the parent is valid, add it to the graph - optional_from = False - if optional_to or optional_from: - optional = True job.add_parent(parent) - #job.add_special_variables(parent.name,relationship) JobList._add_edge(graph, job, parent) - + # Could be more variables in the future + if optional_to or optional_from or optional_section: + job.add_edge_info(parent.name,special_variables={"optional":True}) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph, other_parents) @@ -1772,7 +1750,7 @@ class JobList(object): parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] if len(tmp2) == len(job.parents): for parent in job.parents: - if parent.section + '?' not in job.dependencies and parent.status != Status.COMPLETED: + if () and parent.status != Status.COMPLETED: job.status = Status.WAITING save = True Log.debug( @@ -1811,7 +1789,7 @@ class JobList(object): strong_dependencies_failure = False weak_dependencies_failure = False for parent in failed_ones: - if parent.section+'?' in job.dependencies: + if parent.name in job.edge_info and job.edge_info[job.name].get('optional', False): weak_dependencies_failure = True elif parent.section in job.dependencies: if parent.status not in [Status.COMPLETED,Status.SKIPPED]: @@ -1828,7 +1806,7 @@ class JobList(object): else: if len(tmp3) == 1 and len(job.parents) == 1: for parent in job.parents: - if parent.section + '?' in job.dependencies: + if parent.name in job.edge_info and job.edge_info[job.name].get('optional', False): job.status = Status.READY job.hold = False Log.debug( diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index cc8111a1b..cb720c948 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -232,7 +232,7 @@ class JobPackager(object): if wrapper_limits["max_h"] == -1: wrapper_limits["max_h"] = wrapper_limits["max"] if '&' not in section: - dependencies_keys = self._as_config.jobs_data[section].get('DEPENDENCIES', "").upper() + dependencies_keys = self._as_config.jobs_data[section].get('DEPENDENCIES', "") wrapper_limits["max_by_section"][section] = wrapper_limits["max"] wrapper_limits["min"] = min(self._as_config.jobs_data[section].get( "MIN_WRAPPED", 99999999), 0) diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index e03c1ca55..f29d78474 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -238,7 +238,7 @@ class Monitor: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) - if job.section is not None and job.section+"?" in child.dependencies: + if job.name in child.edge_info and child.edge_info[job.name].get('optional', False): exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) @@ -246,7 +246,7 @@ class Monitor: skip = True elif not skip: node_child = node_child[0] - if job.section is not None and job.section + "?" in child.dependencies: + if job.name in child.edge_info and child.edge_info[job.name].get('optional', False): exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) diff --git a/bin/autosubmit b/bin/autosubmit index 3f6202b53..0e1fab0ee 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -1,5 +1,4 @@ #!/usr/bin/env python -# PYTHON_ARGCOMPLETE_OK # Copyright 2015 Earth Sciences Department, BSC-CNS @@ -25,11 +24,9 @@ import traceback scriptdir = os.path.abspath(os.path.dirname(sys.argv[0])) sys.path.append(scriptdir) sys.path.append(os.path.normpath(os.path.join(scriptdir, os.pardir))) -# noinspection PyUnresolvedReferences -try: - from log.log import Log, AutosubmitCritical , AutosubmitError -except: - pass + +from log.log import Log, AutosubmitCritical , AutosubmitError + from autosubmit.autosubmit import Autosubmit -- GitLab From d51f9f21692a970afeba1f97c64394136da3dcb2 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 19 Sep 2022 12:07:43 +0200 Subject: [PATCH 7/7] Fixes for wrapper --- autosubmit/config/config_common.py | 15 +++++++++------ autosubmit/job/job.py | 8 ++++---- autosubmit/platforms/paramiko_platform.py | 4 ++-- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d540fbcd6..c581bc427 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -554,13 +554,16 @@ class AutosubmitConfig(object): normalize a nested dictionary or similar mapping to uppercase. Modify ``source`` in place. """ - normalized_data = dict() - for key, val in data.items(): - normalized_data[str(key).upper()] = val - if isinstance(val, collections.abc.Mapping ): - normalized_value = self.deep_normalize(data.get(key, {})) - normalized_data[str(key).upper()] = normalized_value + normalized_data = dict() + try: + for key, val in data.items(): + normalized_data[str(key).upper()] = val + if isinstance(val, collections.abc.Mapping ): + normalized_value = self.deep_normalize(data.get(key, {})) + normalized_data[str(key).upper()] = normalized_value + except: + pass return normalized_data def deep_update(self,unified_config, new_dict): diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c047196b7..46bc40864 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -730,7 +730,7 @@ class Job(object): while log_start <= max_logs: try: if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): - with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'rb+') as f: + with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: total_stats = [f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]] try: total_stats[0] = float(total_stats[0]) @@ -1527,11 +1527,11 @@ class Job(object): if first_retrial: self.write_submit_time(enabled=True) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'ab') + f = open(path, 'a') if first_retrial: - f.write(b" " + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + b' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + b' ' + total_stats[2]) + f.write(" " + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) else: - f.write(b'\n' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + b' ' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + b' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + b' ' + total_stats[2]) + f.write('\n' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) out, err = self.local_logs path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 2085c183e..93551f819 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -520,9 +520,9 @@ class ParamikoPlatform(Platform): job_status = job.check_completion(over_wallclock=True) except: job_status = Status.FAILED - elif job_status in self.job_status['QUEUING'] and job.hold == "false": + elif job_status in self.job_status['QUEUING'] and (not job.hold or job.hold.lower() != "true"): job_status = Status.QUEUING - elif job_status in self.job_status['QUEUING'] and job.hold == "true": + elif job_status in self.job_status['QUEUING'] and (job.hold or job.hold.lower() == "true"): job_status = Status.HELD elif job_status in self.job_status['FAILED']: job_status = Status.FAILED -- GitLab