diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d9e962d4a0b980079a47e209f128a3a9c0f871f0..d77f5c968fe47a5287b5a82e487e8d5fd2eb93a8 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -105,6 +105,7 @@ def signal_handler_create(signal_received, frame): 'Autosubmit has been closed in an unexpected way. Killed or control + c.', 7010) + class Autosubmit: """ Interface class for autosubmit. @@ -127,6 +128,9 @@ class Autosubmit: exit = False + + + @staticmethod def parse_args(): """ @@ -1217,11 +1221,16 @@ class Autosubmit: job_list.update_list(as_conf, False) # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + # Related to TWO_STEP_START new variable defined in expdef + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + job_list.parse_two_step_start(unparsed_two_step_start) while job_list.get_active(): Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) job_list.update_list(as_conf, False) + @staticmethod def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, run_members=None): """ @@ -1292,7 +1301,7 @@ class Autosubmit: "\r{0} until execution starts".format(elapsed_time)) sys.stdout.flush() sleep(1) - # End of handling starting time block + # End of handling start ing time block # Start start after completion trigger block if start_after: @@ -1451,7 +1460,12 @@ class Autosubmit: except Exception as e: raise AutosubmitCritical( "Error in run initialization", 7067, str(e)) - + #Two step start + jobs_to_run_first = list() + #Related to TWO_STEP_START new variable defined in expdef + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + job_list.parse_two_step_start(unparsed_two_step_start) ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -1625,6 +1639,7 @@ class Autosubmit: if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, hold=True) + save = job_list.update_list( as_conf, submitter=submitter) if save: diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4d0b179b1edd5259932940ef7ab8bf41da0adff3..1525b782111be582fdea600de0392fe334c5b3ab 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -948,6 +948,16 @@ class AutosubmitConfig(object): """ return self._exp_parser.get('project', 'PROJECT_TYPE').lower() + def get_parse_two_step_start(self): + """ + Returns two step start jobs + + :return: jobs_list + :rtype: str + """ + + return self._exp_parser.get_option('experiment', 'TWO_STEP_START', '').lower() + def get_file_project_conf(self): """ Returns path to project config file from experiment config file diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b27ba1da1bb38d3445657392e2cce5906395cac1..ab897ae27aa2d5393f7aa1f581abfa3c4e292c10 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -85,7 +85,8 @@ class JobList(object): self.job_package_map = dict() self.sections_checked = set() self._run_members = None - + self.jobs_to_run_first = list() + self.jobs_to_run_first_initial = list() @property def expid(self): """ @@ -830,6 +831,99 @@ class JobList(object): else: return all + def get_job_names(self,lower_case=False): + """ + Returns a list of all job names + + :param platform: job platform + :type platform: HPCPlatform + :return: all jobs + :rtype: list + """ + if lower_case: + all_jobs = [job.name.lower() for job in self._job_list] + else: + all_jobs = [job.name for job in self._job_list] + + return all_jobs + def update_two_step_jobs(self): + prev_jobs_to_run_first = self.jobs_to_run_first + if len(self.jobs_to_run_first) > 0: + self.jobs_to_run_first = [ job for job in self.jobs_to_run_first if job.status != Status.COMPLETED ] + #if len(self.jobs_to_run_first) > 0: + #waiting_jobs = [ job for job in self.jobs_to_run_first if job.status != Status.WAITING ] + #if len(waiting_jobs) == len(self.jobs_to_run_first): + #self.jobs_to_run_first = [] + #Log.warning("No more jobs to run first, there were still pending jobs but they're unable to run without their parents.") + def parse_two_step_start(self, unparsed_jobs): + jobs_to_run_first = list() + job_names = "" + if "&" in unparsed_jobs: # If there are explicit jobs add them + jobs_to_check = unparsed_jobs.split("&") + job_names = jobs_to_check[0] + unparsed_jobs = jobs_to_check[1] + if "," in unparsed_jobs: + semiparsed_jobs = unparsed_jobs.split(",") + if 2 <= len(semiparsed_jobs) <= 4: + section_job = semiparsed_jobs[0] + date = semiparsed_jobs[1] + if len(semiparsed_jobs) > 2: + member_chunk = semiparsed_jobs[2] + if len(semiparsed_jobs) == 4: + chunk_member = semiparsed_jobs[3] + else: + chunk_member = "" + else: + member_chunk = "" + jobs_to_run_first += self.get_job_related(section_list=section_job, date_list=date, + member_or_chunk_list=member_chunk,job_names=job_names,chunk_or_member_list=chunk_member) + else: + raise AutosubmitCritical("Invalid format for parameter {0}: {1}".format("TWO_STEP_START",unparsed_jobs), 7014 ," More than 4 fields specified!") + else: + jobs_to_run_first += self.get_job_related(section_list=unparsed_jobs) + self.jobs_to_run_first = jobs_to_run_first + self.jobs_to_run_first_initial = jobs_to_run_first + + def get_job_related(self, date_list="", member_or_chunk_list="", section_list="", job_names = "", chunk_or_member_list=""): + """ + :param datelist: job datelist + :param member_or_chunk_list: job member or chunk + :param chunk_or_member_list: job chunk or member + :param chunk_list: job chunk + :type platform: HPCPlatform + :return: jobs_list + :rtype: list + """ + jobs_by_name = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.name.lower()+"([^a-z0-9_]|$)",job_names.lower()) is not None ] + jobs = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.section.lower()+"([^a-z0-9_]|$)",section_list.lower()) is not None ] + if date_list != "": + jobs_date = [ job for job in jobs if re.search("(^|[^0-9a-z_])" + date2str(job.date, job.date_format) + "([^a-z0-9_]|$)", date_list.lower()) is not None or job.date is None ] + else: + jobs_date = jobs + + jobs_final = [] + jobs_final_2 = [] + + if member_or_chunk_list != "": + if 'c' in member_or_chunk_list[0]: + jobs_final = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)",member_or_chunk_list.lower()) is not None or job.running == "once"] + elif 'm' in member_or_chunk_list[0]: + jobs_final = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.member).lower() + "([^a-z0-9_]|$)", member_or_chunk_list.lower()) is not None or job.running == "once"] + else: + jobs_final = [] + else: + jobs_final = jobs_date + if chunk_or_member_list != "": + if 'c' in chunk_or_member_list[0]: + jobs_final_2 = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)",chunk_or_member_list.lower()) is not None or job.running == "once"] + elif 'm' in chunk_or_member_list[0]: + jobs_final_2 = [job for job in jobs_date if re.search("(^|[^0-9a-z_])" + str(job.member).lower() + "([^a-z0-9_]|$)", chunk_or_member_list.lower()) is not None or job.running == "once"] + else: + jobs_final_2 = [] + ultimate_jobs_list = list(set(jobs_final+jobs_by_name+jobs_final_2)) #Duplicates out + Log.debug("List of jobs filtered by TWO_STEP_START parameter:\n{0}".format([job.name for job in ultimate_jobs_list])) + return ultimate_jobs_list + def get_logs(self): """ Returns a dict of logs by jobs_name jobs @@ -1403,6 +1497,7 @@ class JobList(object): job.status = Status.SKIPPED save = True #save = True + self.update_two_step_jobs() Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 640d2bb0a0a0028018a44cf38b6722bde050e8a0..d1aa0dba0839b3d9065545f158d11843d49b68d8 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -86,6 +86,7 @@ class JobPackager(object): Log.debug("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 + def compute_weight(self,job_list): job = self jobs_by_section = dict() @@ -133,10 +134,16 @@ class JobPackager(object): max_wrapper_job_by_section = dict() # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty - if self.hold: - jobs_ready = self._jobs_list.get_prepared(self._platform) - else: - jobs_ready = self._jobs_list.get_ready(self._platform) + jobs_ready = list() + if len(self._jobs_list.jobs_to_run_first) > 0: + jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if + ( self._platform is None or job.platform.name.lower() == self._platform.name.lower() ) and + job.status == Status.READY] + if len(jobs_ready) == 0: + if self.hold: + jobs_ready = self._jobs_list.get_prepared(self._platform) + else: + jobs_ready = self._jobs_list.get_ready(self._platform) if self.hold and len(jobs_ready) > 0: self.compute_weight(jobs_ready) @@ -232,6 +239,8 @@ class JobPackager(object): hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) + if len(self._jobs_list.jobs_to_run_first) > 0:# Allows to prepare an experiment with TWO_STEP_START and strict policy + min_wrapped_jobs = 2 packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: wrapped = True @@ -248,6 +257,14 @@ class JobPackager(object): built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section)) if wrapped: for p in built_packages_tmp: + #if len(self._jobs_list.jobs_to_run_first) > 0: # related to TWO_STEP_START new variable , defined in expdef + # temp_jobs = list() + # for packed_job in p.jobs: + # if packed_job in self._jobs_list.jobs_to_run_first: + # temp_jobs.append(packed_job) + # else: + # packed_job.packed = False + # p.jobs = temp_jobs failed_innerjobs = False #Check failed jobs first for job in p.jobs: