From e215c020093455456f12952e59f062031d9421b5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 17 Mar 2021 16:49:59 +0100 Subject: [PATCH 1/8] Added auto-start for DA monarch (pending to fully test) --- autosubmit/autosubmit.py | 58 ++++++++++++++++++++++++++---- autosubmit/config/config_common.py | 10 ++++++ autosubmit/job/job_list.py | 35 ++++++++++++++++++ 3 files changed, 97 insertions(+), 6 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d9e962d4a..70cfff7d9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -104,6 +104,39 @@ def signal_handler_create(signal_received, frame): raise AutosubmitCritical( 'Autosubmit has been closed in an unexpected way. Killed or control + c.', 7010) +def parse_two_step_start(job_list, unparsed_jobs): + jobs_to_run_first = list() + names = job_list.get_job_names() + single = False + if "," in unparsed_jobs: + semiparsed_jobs = unparsed_jobs.split(",") + if len(semiparsed_jobs) > 2: + section_job = semiparsed_jobs[0] + date = semiparsed_jobs[1] + if len(semiparsed_jobs) == 3: + member_chunk = semiparsed_jobs[2] + else: + member_chunk = "" + jobs_to_run_first = job_list.get_job_related(section_list=section_job, date_list=date, member_or_chunk_list=member_chunk) + else: + unparsed_jobs = unparsed_jobs[:-1] + single = True + else: + single = True + + if single: + if "&" in unparsed_jobs: + jobs_to_check = unparsed_jobs.split("&") + section_list = "" + for section_or_job_name in jobs_to_check: + if section_or_job_name in job_list.sections_checked: # If a section is specified + section_list += section_or_job_name + "," + elif section_or_job_name in names: # If a job_name is specified + jobs_to_run_first.append(section_or_job_name) + #Get jobs related to sections specified + jobs_to_run_first += job_list.get_job_related(section_list=section_list) + + return list(set(jobs_to_run_first)) #Erase duplicates if any class Autosubmit: """ @@ -127,6 +160,9 @@ class Autosubmit: exit = False + + + @staticmethod def parse_args(): """ @@ -1222,6 +1258,7 @@ class Autosubmit: as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) job_list.update_list(as_conf, False) + @staticmethod def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, run_members=None): """ @@ -1292,7 +1329,7 @@ class Autosubmit: "\r{0} until execution starts".format(elapsed_time)) sys.stdout.flush() sleep(1) - # End of handling starting time block + # End of handling start ing time block # Start start after completion trigger block if start_after: @@ -1451,6 +1488,11 @@ class Autosubmit: except Exception as e: raise AutosubmitCritical( "Error in run initialization", 7067, str(e)) + #Two step start + jobs_to_run_first = list() + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + jobs_to_run_first = parse_two_step_start(job_list, unparsed_two_step_start) ######################### # AUTOSUBMIT - MAIN LOOP @@ -1619,12 +1661,16 @@ class Autosubmit: as_conf, submitter=submitter) if save or save2: job_list.save() - if len(job_list.get_ready()) > 0: + if len(jobs_to_run_first) > 0: Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=False) - if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: - Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=True) + as_conf, jobs_to_run_first, platforms_to_test, packages_persistence, hold=False) + else: + if len(job_list.get_ready()) > 0: + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=False) + if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=True) save = job_list.update_list( as_conf, submitter=submitter) if save: diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4d0b179b1..1525b7821 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -948,6 +948,16 @@ class AutosubmitConfig(object): """ return self._exp_parser.get('project', 'PROJECT_TYPE').lower() + def get_parse_two_step_start(self): + """ + Returns two step start jobs + + :return: jobs_list + :rtype: str + """ + + return self._exp_parser.get_option('experiment', 'TWO_STEP_START', '').lower() + def get_file_project_conf(self): """ Returns path to project config file from experiment config file diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b27ba1da1..f349a2ef6 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -830,6 +830,41 @@ class JobList(object): else: return all + def get_job_names(self): + """ + Returns a list of all job names + + :param platform: job platform + :type platform: HPCPlatform + :return: all jobs + :rtype: list + """ + all_jobs = [job.name for job in self._job_list] + + return all_jobs + + def get_job_related(self, date_list="", member_or_chunk_list="", section_list=""): + """ + :param datelist: job datelist + :param member_or_chunk_list: job member or chunk + :param chunk_list: job chunk + :type platform: HPCPlatform + :return: jobs_list + :rtype: list + """ + jobs = [ job for job in self._job_list if job.section in section_list ] + if date_list != "": + jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] + else: + jobs_date = jobs + if 'C' in member_or_chunk_list: + jobs_final = [job for job in jobs_date if job.chunk in member_or_chunk_list] + elif 'M' in member_or_chunk_list: + jobs_final = [job for job in jobs_date if job.member in member_or_chunk_list] + else: + jobs_final = jobs_date + return jobs_final + def get_logs(self): """ Returns a dict of logs by jobs_name jobs -- GitLab From c9d98ae13cc6235770db1c3044e96c61c55c7e61 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 18 Mar 2021 14:08:38 +0100 Subject: [PATCH 2/8] TWO_STEP_START implemented, to be test in the vdevel suite --- autosubmit/autosubmit.py | 57 ++++++++------------------------ autosubmit/job/job_list.py | 59 +++++++++++++++++++++++++++++++--- autosubmit/job/job_packager.py | 22 ++++++++++--- 3 files changed, 85 insertions(+), 53 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 70cfff7d9..d77f5c968 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -104,39 +104,7 @@ def signal_handler_create(signal_received, frame): raise AutosubmitCritical( 'Autosubmit has been closed in an unexpected way. Killed or control + c.', 7010) -def parse_two_step_start(job_list, unparsed_jobs): - jobs_to_run_first = list() - names = job_list.get_job_names() - single = False - if "," in unparsed_jobs: - semiparsed_jobs = unparsed_jobs.split(",") - if len(semiparsed_jobs) > 2: - section_job = semiparsed_jobs[0] - date = semiparsed_jobs[1] - if len(semiparsed_jobs) == 3: - member_chunk = semiparsed_jobs[2] - else: - member_chunk = "" - jobs_to_run_first = job_list.get_job_related(section_list=section_job, date_list=date, member_or_chunk_list=member_chunk) - else: - unparsed_jobs = unparsed_jobs[:-1] - single = True - else: - single = True - - if single: - if "&" in unparsed_jobs: - jobs_to_check = unparsed_jobs.split("&") - section_list = "" - for section_or_job_name in jobs_to_check: - if section_or_job_name in job_list.sections_checked: # If a section is specified - section_list += section_or_job_name + "," - elif section_or_job_name in names: # If a job_name is specified - jobs_to_run_first.append(section_or_job_name) - #Get jobs related to sections specified - jobs_to_run_first += job_list.get_job_related(section_list=section_list) - - return list(set(jobs_to_run_first)) #Erase duplicates if any + class Autosubmit: """ @@ -1253,6 +1221,10 @@ class Autosubmit: job_list.update_list(as_conf, False) # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + # Related to TWO_STEP_START new variable defined in expdef + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + job_list.parse_two_step_start(unparsed_two_step_start) while job_list.get_active(): Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) @@ -1490,10 +1462,10 @@ class Autosubmit: "Error in run initialization", 7067, str(e)) #Two step start jobs_to_run_first = list() + #Related to TWO_STEP_START new variable defined in expdef unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": - jobs_to_run_first = parse_two_step_start(job_list, unparsed_two_step_start) - + job_list.parse_two_step_start(unparsed_two_step_start) ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -1661,16 +1633,13 @@ class Autosubmit: as_conf, submitter=submitter) if save or save2: job_list.save() - if len(jobs_to_run_first) > 0: + if len(job_list.get_ready()) > 0: Autosubmit.submit_ready_jobs( - as_conf, jobs_to_run_first, platforms_to_test, packages_persistence, hold=False) - else: - if len(job_list.get_ready()) > 0: - Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=False) - if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: - Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=True) + as_conf, job_list, platforms_to_test, packages_persistence, hold=False) + if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=True) + save = job_list.update_list( as_conf, submitter=submitter) if save: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f349a2ef6..9d4228572 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -85,6 +85,7 @@ class JobList(object): self.job_package_map = dict() self.sections_checked = set() self._run_members = None + self.jobs_to_run_first = list() @property def expid(self): @@ -842,6 +843,53 @@ class JobList(object): all_jobs = [job.name for job in self._job_list] return all_jobs + def update_two_step_jobs(self): + if len(self.jobs_to_run_first) > 0: + self.jobs_to_run_first = [ job for job in self.jobs_to_run_first if job.status != Status.COMPLETED ] + if len(self.jobs_to_run_first) > 0: + waiting_jobs = [ job for job in self.jobs_to_run_first if job.status != Status.WAITING ] + if len(waiting_jobs) == len(self.jobs_to_run_first): + self.jobs_to_run_first = [] + Log.warning("No more jobs to run first, there were still pending jobs but they're unable to run without their parents.") + def parse_two_step_start(self, unparsed_jobs): + jobs_to_run_first = list() + names = self.get_job_names() + single = False + if "," in unparsed_jobs: + semiparsed_jobs = unparsed_jobs.split(",") + if len(semiparsed_jobs) == 2 or len(semiparsed_jobs) == 3: + section_job = semiparsed_jobs[0] + date = semiparsed_jobs[1] + if len(semiparsed_jobs) == 3: + member_chunk = semiparsed_jobs[2] + else: + member_chunk = "" + jobs_to_run_first = self.get_job_related(section_list=section_job, date_list=date, + member_or_chunk_list=member_chunk) + elif len(semiparsed_jobs) > 3: + raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 3 fields specified!") + else: + unparsed_jobs = unparsed_jobs[:-1] + single = True + else: + single = True + + if single: + if "&" in unparsed_jobs: + jobs_to_check = unparsed_jobs.split("&") + section_list = "" + for section_or_job_name in jobs_to_check: + if section_or_job_name in self.sections_checked: # If a section is specified + section_list += section_or_job_name + "," + elif section_or_job_name in names: # If a job_name is specified + jobs_to_run_first.append(section_or_job_name) + # Get jobs related to sections specified + jobs_to_run_first += self.get_job_related(section_list=section_list) + self.jobs_to_run_first = list(set(jobs_to_run_first)) # Erase duplicates if any + job_names = [job.name for job in self.jobs_to_run_first] + Log.debug("Jobs to run first: {0}", job_names) + pass + def get_job_related(self, date_list="", member_or_chunk_list="", section_list=""): """ @@ -852,15 +900,15 @@ class JobList(object): :return: jobs_list :rtype: list """ - jobs = [ job for job in self._job_list if job.section in section_list ] + jobs = [ job for job in self._job_list if job.section.lower() in section_list.lower() ] if date_list != "": jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] else: jobs_date = jobs - if 'C' in member_or_chunk_list: - jobs_final = [job for job in jobs_date if job.chunk in member_or_chunk_list] - elif 'M' in member_or_chunk_list: - jobs_final = [job for job in jobs_date if job.member in member_or_chunk_list] + if 'c' in member_or_chunk_list: + jobs_final = [job for job in jobs_date if str(job.chunk) in member_or_chunk_list or job.running == "once"] + elif 'm' in member_or_chunk_list: + jobs_final = [job for job in jobs_date if str(job.member) in member_or_chunk_list or job.running == "once"] else: jobs_final = jobs_date return jobs_final @@ -1438,6 +1486,7 @@ class JobList(object): job.status = Status.SKIPPED save = True #save = True + self.update_two_step_jobs() Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 640d2bb0a..c41c22c1e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -133,10 +133,16 @@ class JobPackager(object): max_wrapper_job_by_section = dict() # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty - if self.hold: - jobs_ready = self._jobs_list.get_prepared(self._platform) - else: - jobs_ready = self._jobs_list.get_ready(self._platform) + jobs_ready = list() + if len(self._jobs_list.jobs_to_run_first) > 0: + jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if + ( self._platform is None or job.platform.name.lower() == self._platform.name.lower() ) and + job.status == Status.READY] + if len(jobs_ready) == 0: + if self.hold: + jobs_ready = self._jobs_list.get_prepared(self._platform) + else: + jobs_ready = self._jobs_list.get_ready(self._platform) if self.hold and len(jobs_ready) > 0: self.compute_weight(jobs_ready) @@ -248,6 +254,14 @@ class JobPackager(object): built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section)) if wrapped: for p in built_packages_tmp: + if len(self._jobs_list.jobs_to_run_first) > 0: # related to TWO_STEP_START new variable , defined in expdef + temp_jobs = list() + for packed_job in p.jobs: + if packed_job in self._jobs_list.jobs_to_run_first: + temp_jobs.append(packed_job) + else: + packed_job.packed = False + p.jobs = temp_jobs failed_innerjobs = False #Check failed jobs first for job in p.jobs: -- GitLab From 1f26b0e790645f0d75de32a159a32490622ec389 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 19 Mar 2021 13:33:25 +0100 Subject: [PATCH 3/8] TWO_STEP_START Changed IN for a regex ( to avoid false positives) --- autosubmit/job/job_list.py | 3 ++- autosubmit/job/job_packager.py | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 9d4228572..7071eefdf 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -900,7 +900,8 @@ class JobList(object): :return: jobs_list :rtype: list """ - jobs = [ job for job in self._job_list if job.section.lower() in section_list.lower() ] + + jobs = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.section.lower()+"([^a-z0-9_]|$)",section_list.lower()) is not None ] if date_list != "": jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] else: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index c41c22c1e..d1aa0dba0 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -86,6 +86,7 @@ class JobPackager(object): Log.debug("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 + def compute_weight(self,job_list): job = self jobs_by_section = dict() @@ -238,6 +239,8 @@ class JobPackager(object): hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) + if len(self._jobs_list.jobs_to_run_first) > 0:# Allows to prepare an experiment with TWO_STEP_START and strict policy + min_wrapped_jobs = 2 packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: wrapped = True @@ -254,14 +257,14 @@ class JobPackager(object): built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section)) if wrapped: for p in built_packages_tmp: - if len(self._jobs_list.jobs_to_run_first) > 0: # related to TWO_STEP_START new variable , defined in expdef - temp_jobs = list() - for packed_job in p.jobs: - if packed_job in self._jobs_list.jobs_to_run_first: - temp_jobs.append(packed_job) - else: - packed_job.packed = False - p.jobs = temp_jobs + #if len(self._jobs_list.jobs_to_run_first) > 0: # related to TWO_STEP_START new variable , defined in expdef + # temp_jobs = list() + # for packed_job in p.jobs: + # if packed_job in self._jobs_list.jobs_to_run_first: + # temp_jobs.append(packed_job) + # else: + # packed_job.packed = False + # p.jobs = temp_jobs failed_innerjobs = False #Check failed jobs first for job in p.jobs: -- GitLab From 455ca06e256b24cc1e8707fa04dd2f68c071c5d4 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 19 Mar 2021 17:18:08 +0100 Subject: [PATCH 4/8] TWO_STEP_START added a new field for select job_names --- autosubmit/job/job_list.py | 53 ++++++++++++---------------------- autosubmit/job/job_packager.py | 2 +- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 7071eefdf..91e84cdd7 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -831,7 +831,7 @@ class JobList(object): else: return all - def get_job_names(self): + def get_job_names(self,lower_case=False): """ Returns a list of all job names @@ -840,7 +840,10 @@ class JobList(object): :return: all jobs :rtype: list """ - all_jobs = [job.name for job in self._job_list] + if lower_case: + all_jobs = [job.name.lower() for job in self._job_list] + else: + all_jobs = [job.name for job in self._job_list] return all_jobs def update_two_step_jobs(self): @@ -853,8 +856,10 @@ class JobList(object): Log.warning("No more jobs to run first, there were still pending jobs but they're unable to run without their parents.") def parse_two_step_start(self, unparsed_jobs): jobs_to_run_first = list() - names = self.get_job_names() - single = False + if "&" in unparsed_jobs: # If there are explicit jobs add them + jobs_to_check = unparsed_jobs.split("&") + job_names = jobs_to_check[0] + unparsed_jobs = jobs_to_check[1] if "," in unparsed_jobs: semiparsed_jobs = unparsed_jobs.split(",") if len(semiparsed_jobs) == 2 or len(semiparsed_jobs) == 3: @@ -864,34 +869,13 @@ class JobList(object): member_chunk = semiparsed_jobs[2] else: member_chunk = "" - jobs_to_run_first = self.get_job_related(section_list=section_job, date_list=date, - member_or_chunk_list=member_chunk) + jobs_to_run_first += self.get_job_related(section_list=section_job, date_list=date, + member_or_chunk_list=member_chunk,job_names=job_names) elif len(semiparsed_jobs) > 3: raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 3 fields specified!") - else: - unparsed_jobs = unparsed_jobs[:-1] - single = True - else: - single = True - - if single: - if "&" in unparsed_jobs: - jobs_to_check = unparsed_jobs.split("&") - section_list = "" - for section_or_job_name in jobs_to_check: - if section_or_job_name in self.sections_checked: # If a section is specified - section_list += section_or_job_name + "," - elif section_or_job_name in names: # If a job_name is specified - jobs_to_run_first.append(section_or_job_name) - # Get jobs related to sections specified - jobs_to_run_first += self.get_job_related(section_list=section_list) - self.jobs_to_run_first = list(set(jobs_to_run_first)) # Erase duplicates if any - job_names = [job.name for job in self.jobs_to_run_first] - Log.debug("Jobs to run first: {0}", job_names) - pass - - - def get_job_related(self, date_list="", member_or_chunk_list="", section_list=""): + self.jobs_to_run_first = list(set(jobs_to_run_first)) + + def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = ""): """ :param datelist: job datelist :param member_or_chunk_list: job member or chunk @@ -900,19 +884,20 @@ class JobList(object): :return: jobs_list :rtype: list """ - + jobs_by_name = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.name.lower()+"([^a-z0-9_]|$)",job_names.lower()) is not None ] jobs = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.section.lower()+"([^a-z0-9_]|$)",section_list.lower()) is not None ] if date_list != "": jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] else: jobs_date = jobs - if 'c' in member_or_chunk_list: + if 'c' in member_or_chunk_list[0]: jobs_final = [job for job in jobs_date if str(job.chunk) in member_or_chunk_list or job.running == "once"] - elif 'm' in member_or_chunk_list: + elif 'm' in member_or_chunk_list[0]: jobs_final = [job for job in jobs_date if str(job.member) in member_or_chunk_list or job.running == "once"] else: jobs_final = jobs_date - return jobs_final + + return jobs_final+jobs_by_name def get_logs(self): """ diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d1aa0dba0..e065793ec 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -240,7 +240,7 @@ class JobPackager(object): min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) if len(self._jobs_list.jobs_to_run_first) > 0:# Allows to prepare an experiment with TWO_STEP_START and strict policy - min_wrapped_jobs = 2 + min_wrapped_jobs = 0 packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: wrapped = True -- GitLab From d4e7c073d546994e36ff766e4acf3836d35d68f0 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 22 Mar 2021 16:36:04 +0100 Subject: [PATCH 5/8] TWO_STEP_START changes --- autosubmit/job/job_list.py | 15 +++++++++------ autosubmit/job/job_packager.py | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 91e84cdd7..a8520d9b3 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -86,7 +86,7 @@ class JobList(object): self.sections_checked = set() self._run_members = None self.jobs_to_run_first = list() - + self.jobs_to_run_first_initial = list() @property def expid(self): """ @@ -847,15 +847,17 @@ class JobList(object): return all_jobs def update_two_step_jobs(self): + prev_jobs_to_run_first = self.jobs_to_run_first if len(self.jobs_to_run_first) > 0: self.jobs_to_run_first = [ job for job in self.jobs_to_run_first if job.status != Status.COMPLETED ] - if len(self.jobs_to_run_first) > 0: - waiting_jobs = [ job for job in self.jobs_to_run_first if job.status != Status.WAITING ] - if len(waiting_jobs) == len(self.jobs_to_run_first): - self.jobs_to_run_first = [] - Log.warning("No more jobs to run first, there were still pending jobs but they're unable to run without their parents.") + #if len(self.jobs_to_run_first) > 0: + #waiting_jobs = [ job for job in self.jobs_to_run_first if job.status != Status.WAITING ] + #if len(waiting_jobs) == len(self.jobs_to_run_first): + #self.jobs_to_run_first = [] + #Log.warning("No more jobs to run first, there were still pending jobs but they're unable to run without their parents.") def parse_two_step_start(self, unparsed_jobs): jobs_to_run_first = list() + job_names = "" if "&" in unparsed_jobs: # If there are explicit jobs add them jobs_to_check = unparsed_jobs.split("&") job_names = jobs_to_check[0] @@ -874,6 +876,7 @@ class JobList(object): elif len(semiparsed_jobs) > 3: raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 3 fields specified!") self.jobs_to_run_first = list(set(jobs_to_run_first)) + self.jobs_to_run_first_initial = list(set(jobs_to_run_first)) def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = ""): """ diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index e065793ec..d1aa0dba0 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -240,7 +240,7 @@ class JobPackager(object): min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) if len(self._jobs_list.jobs_to_run_first) > 0:# Allows to prepare an experiment with TWO_STEP_START and strict policy - min_wrapped_jobs = 0 + min_wrapped_jobs = 2 packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: wrapped = True -- GitLab From a6abe44f236cf238ea4e3f998a84ca6a6ead8a00 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 23 Mar 2021 09:53:05 +0100 Subject: [PATCH 6/8] TWO_STEP_START added a new field --- autosubmit/job/job_list.py | 45 ++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a8520d9b3..a16dce2e9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -864,24 +864,31 @@ class JobList(object): unparsed_jobs = jobs_to_check[1] if "," in unparsed_jobs: semiparsed_jobs = unparsed_jobs.split(",") - if len(semiparsed_jobs) == 2 or len(semiparsed_jobs) == 3: + if 2 <= len(semiparsed_jobs) <= 4: section_job = semiparsed_jobs[0] date = semiparsed_jobs[1] - if len(semiparsed_jobs) == 3: + if len(semiparsed_jobs) > 2: member_chunk = semiparsed_jobs[2] + if len(semiparsed_jobs) == 4: + chunk_member = semiparsed_jobs[3] + else: + chunk_member = "" else: member_chunk = "" jobs_to_run_first += self.get_job_related(section_list=section_job, date_list=date, - member_or_chunk_list=member_chunk,job_names=job_names) - elif len(semiparsed_jobs) > 3: - raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 3 fields specified!") + member_or_chunk_list=member_chunk,job_names=job_names,chunk_or_member_list=chunk_member) + else: + raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 4 fields specified!") self.jobs_to_run_first = list(set(jobs_to_run_first)) self.jobs_to_run_first_initial = list(set(jobs_to_run_first)) + else: + jobs_to_run_first += self.get_job_related(section_list=unparsed_jobs) - def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = ""): + def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = "", chunk_or_member_list=""): """ :param datelist: job datelist :param member_or_chunk_list: job member or chunk + :param chunk_or_member_list: job chunk or member :param chunk_list: job chunk :type platform: HPCPlatform :return: jobs_list @@ -893,14 +900,30 @@ class JobList(object): jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] else: jobs_date = jobs - if 'c' in member_or_chunk_list[0]: - jobs_final = [job for job in jobs_date if str(job.chunk) in member_or_chunk_list or job.running == "once"] - elif 'm' in member_or_chunk_list[0]: - jobs_final = [job for job in jobs_date if str(job.member) in member_or_chunk_list or job.running == "once"] + + jobs_final = [] + jobs_final_2 = [] + + if member_or_chunk_list != "": + if 'c' in member_or_chunk_list[0]: + jobs_final = [job for job in jobs_date if str(job.chunk) in member_or_chunk_list or job.running == "once"] + elif 'm' in member_or_chunk_list[0]: + jobs_final = [job for job in jobs_date if str(job.member) in member_or_chunk_list or job.running == "once"] + else: + jobs_final = [] else: jobs_final = jobs_date - return jobs_final+jobs_by_name + if chunk_or_member_list != "": + if 'c' in chunk_or_member_list[0]: + jobs_final_2 = [job for job in jobs_date if str(job.chunk) in chunk_or_member_list or job.running == "once"] + elif 'm' in chunk_or_member_list[0]: + jobs_final_2 = [job for job in jobs_date if str(job.member) in chunk_or_member_list or job.running == "once"] + else: + jobs_final_2 = [] + + + return jobs_final+jobs_by_name+jobs_final_2 def get_logs(self): """ -- GitLab From 435580b4811f445021f78d4408fad4f30717dd6e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 23 Mar 2021 10:35:22 +0100 Subject: [PATCH 7/8] TWO_STEP_START Extend the regex to all fields ( I forgot member field) --- autosubmit/job/job_list.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a16dce2e9..ade4f66b6 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -897,33 +897,33 @@ class JobList(object): jobs_by_name = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.name.lower()+"([^a-z0-9_]|$)",job_names.lower()) is not None ] jobs = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.section.lower()+"([^a-z0-9_]|$)",section_list.lower()) is not None ] if date_list != "": - jobs_date = [ job for job in jobs if date2str(job.date, job.date_format) in date_list or job.date is None ] + jobs_date = [ job for job in jobs if re.search("(^|[^0-9a-z_])" + date2str(job.date, job.date_format) + "([^a-z0-9_]|$)", date_list.lower()) is not None or job.date is None ] else: jobs_date = jobs - + jobs_final = [] jobs_final_2 = [] if member_or_chunk_list != "": if 'c' in member_or_chunk_list[0]: - jobs_final = [job for job in jobs_date if str(job.chunk) in member_or_chunk_list or job.running == "once"] + jobs_final = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)",member_or_chunk_list.lower()) is not None or job.running == "once"] elif 'm' in member_or_chunk_list[0]: - jobs_final = [job for job in jobs_date if str(job.member) in member_or_chunk_list or job.running == "once"] + jobs_final = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.member).lower() + "([^a-z0-9_]|$)", member_or_chunk_list.lower()) is not None or job.running == "once"] else: jobs_final = [] else: jobs_final = jobs_date - if chunk_or_member_list != "": if 'c' in chunk_or_member_list[0]: - jobs_final_2 = [job for job in jobs_date if str(job.chunk) in chunk_or_member_list or job.running == "once"] + jobs_final_2 = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)",chunk_or_member_list.lower()) is not None or job.running == "once"] elif 'm' in chunk_or_member_list[0]: - jobs_final_2 = [job for job in jobs_date if str(job.member) in chunk_or_member_list or job.running == "once"] + jobs_final_2 = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.member).lower() + "([^a-z0-9_]|$)", chunk_or_member_list.lower()) is not None or job.running == "once"] else: jobs_final_2 = [] - - - return jobs_final+jobs_by_name+jobs_final_2 + ultimate_jobs_list = jobs_final+jobs_by_name+jobs_final_2 + #ultimate_jobs_list_names = [ job.name for job in ultimate_jobs_list ] + Log.debug("List of jobs filtered by TWO_STEP_START parameter:\n{0}".format(list(set([job.name for job in ultimate_jobs_list])))) + return ultimate_jobs_list def get_logs(self): """ -- GitLab From 2cc192bf0c815c44144c39b517d38206b07ec2a9 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 23 Mar 2021 10:39:26 +0100 Subject: [PATCH 8/8] TWO_STEP_START Improved efficiency --- autosubmit/job/job_list.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index ade4f66b6..ab897ae27 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -879,10 +879,10 @@ class JobList(object): member_or_chunk_list=member_chunk,job_names=job_names,chunk_or_member_list=chunk_member) else: raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 4 fields specified!") - self.jobs_to_run_first = list(set(jobs_to_run_first)) - self.jobs_to_run_first_initial = list(set(jobs_to_run_first)) else: jobs_to_run_first += self.get_job_related(section_list=unparsed_jobs) + self.jobs_to_run_first = jobs_to_run_first + self.jobs_to_run_first_initial = jobs_to_run_first def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = "", chunk_or_member_list=""): """ @@ -920,9 +920,8 @@ class JobList(object): jobs_final_2 = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.member).lower() + "([^a-z0-9_]|$)", chunk_or_member_list.lower()) is not None or job.running == "once"] else: jobs_final_2 = [] - ultimate_jobs_list = jobs_final+jobs_by_name+jobs_final_2 - #ultimate_jobs_list_names = [ job.name for job in ultimate_jobs_list ] - Log.debug("List of jobs filtered by TWO_STEP_START parameter:\n{0}".format(list(set([job.name for job in ultimate_jobs_list])))) + ultimate_jobs_list = list(set(jobs_final+jobs_by_name+jobs_final_2)) #Duplicates out + Log.debug("List of jobs filtered by TWO_STEP_START parameter:\n{0}".format([job.name for job in ultimate_jobs_list])) return ultimate_jobs_list def get_logs(self): -- GitLab