diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c31b53e1718035849b310081a8927fca04d450b6..162a938c6efd8d6be7c35c6a0a57abbec84dbf6d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1309,7 +1309,7 @@ class Autosubmit: # Related to TWO_STEP_START new variable defined in expdef unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": - job_list.parse_two_step_start(unparsed_two_step_start) + job_list.parse_jobs_by_filter(unparsed_two_step_start) job_list.create_dictionary(date_list, member_list, num_chunks, chunk_ini, date_format, as_conf.get_retrials(), wrapper_jobs ) while job_list.get_active(): @@ -1602,7 +1602,7 @@ class Autosubmit: # Related to TWO_STEP_START new variable defined in expdef unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": - job_list.parse_two_step_start(unparsed_two_step_start) + job_list.parse_jobs_by_filter(unparsed_two_step_start) main_loop_retrials = 480 # Hard limit of tries 480 tries at 30seconds sleep each try # establish the connection to all platforms @@ -3991,9 +3991,7 @@ class Autosubmit: as_conf.get_wrapper_type(), wrapper_jobs, notransitive=notransitive, update_structure=True, run_only_members=run_only_members) if rerun == "true": - chunk_list = Autosubmit._create_json( - as_conf.get_chunk_list()) - job_list.rerun(chunk_list, notransitive) + job_list.rerun(as_conf.get_rerun_jobs()) else: job_list.remove_rerun_only_jobs(notransitive) Log.info("\nSaving the jobs list...") @@ -5188,22 +5186,8 @@ class Autosubmit: as_conf.get_default_job_type(), as_conf.get_wrapper_type(), wrapper_jobs, new=False, notransitive=notransitive, run_only_members=run_only_members) if rerun == "true": - - chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) - if not monitor: - job_list.rerun(chunk_list, notransitive) - else: - rerun_list = JobList(expid, BasicConfig, ConfigParserFactory(), - Autosubmit._get_job_list_persistence(expid, as_conf)) - rerun_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), - as_conf.get_chunk_ini(), - as_conf.load_parameters(), date_format, as_conf.get_retrials(), - as_conf.get_default_job_type(), as_conf.get_wrapper_type(), - as_conf.get_wrapper_jobs(), - new=False, notransitive=notransitive) - rerun_list.rerun(chunk_list, notransitive) - job_list = Autosubmit.rerun_recovery( - expid, job_list, rerun_list, as_conf) + rerun_jobs = as_conf.get_rerun_jobs() + job_list.rerun(rerun_jobs,monitor=monitor) else: job_list.remove_rerun_only_jobs(notransitive) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3484925ad6dafe3b75b954e9ad08e26147d2ce2e..e0b527392f92dac854d75907b2c37edb6a83dbca 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1002,6 +1002,16 @@ class AutosubmitConfig(object): return self._exp_parser.get_option('experiment', 'TWO_STEP_START', '').lower() + def get_rerun_jobs(self): + """ + Returns rerun jobs + + :return: jobs_list + :rtype: str + """ + + return self._exp_parser.get_option('rerun', 'RERUN_JOBLIST', '').lower() + def get_file_project_conf(self): """ Returns path to project config file from experiment config file @@ -1298,14 +1308,7 @@ class AutosubmitConfig(object): return self._exp_parser.get('rerun', 'RERUN').lower() - def get_chunk_list(self): - """ - Returns chunk list from experiment's config file - :return: experiment's chunks - :rtype: list - """ - return self._exp_parser.get('rerun', 'CHUNKLIST') def get_platform(self): """ diff --git a/autosubmit/config/files/expdef.conf b/autosubmit/config/files/expdef.conf index a7d0be70c28b64e797e18f646cdc391cf461d9c2..e3864fa9ccd45704524537aca8b5deab4ffafb52 100644 --- a/autosubmit/config/files/expdef.conf +++ b/autosubmit/config/files/expdef.conf @@ -82,4 +82,5 @@ JOB_SCRIPTS_TYPE = RERUN = FALSE # If RERUN = TRUE then supply the list of chunks to rerun # LIST = [ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ] -CHUNKLIST = \ No newline at end of file +# RERUN_JOBLIST = LOCAL_SETUP,LOCAL_SEND_INITIAL,LOCAL_SEND_SOURCE,LOCAL_SEND_STATIC,REMOTE_COMPILE,PREPROCFIX,PREPROCVAR,LOCAL_SEND_INITIAL_DA,COMPILE_DA;SIM[20120101 20120201[C:1]],DA[20120101[C:1]] +RERUN_JOBLIST = \ No newline at end of file diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d2f4776658f1db61611156cafefc90d3cfd8c062..8bdfb0428f8030fd923bebb207bb6e423fd8e3ce 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -87,6 +87,7 @@ class JobList(object): self.sections_checked = set() self._run_members = None self.jobs_to_run_first = list() + self.rerun_job_list = list() @property def expid(self): """ @@ -955,7 +956,7 @@ class JobList(object): if len(self.jobs_to_run_first) > 0 and keep_running is False: raise AutosubmitCritical("No more jobs to run first, there were still pending jobs but they're unable to run without their parents or there are failed jobs.",7014) - def parse_two_step_start(self, unparsed_jobs): + def parse_jobs_by_filter(self, unparsed_jobs,two_step_start = True): jobs_to_run_first = list() select_jobs_by_name = "" #job_name select_all_jobs_by_section = "" # all @@ -975,12 +976,22 @@ class JobList(object): aux = unparsed_jobs.split(';') select_all_jobs_by_section = aux[0] filter_jobs_by_section = aux[1] - try: - self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name,select_all_jobs_by_section=select_all_jobs_by_section,filter_jobs_by_section=filter_jobs_by_section) - except: - raise AutosubmitCritical("Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format(unparsed_jobs)) + if two_step_start: + try: + self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name,select_all_jobs_by_section=select_all_jobs_by_section,filter_jobs_by_section=filter_jobs_by_section) + except: + raise AutosubmitCritical("Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format(unparsed_jobs)) + else: + try: + self.rerun_job_list = self.get_job_related(select_jobs_by_name=select_jobs_by_name, + select_all_jobs_by_section=select_all_jobs_by_section, + filter_jobs_by_section=filter_jobs_by_section,two_step_start=two_step_start) + except: + raise AutosubmitCritical( + "Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format( + unparsed_jobs)) - def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section=""): + def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section="",two_step_start=True): """ :param select_jobs_by_name: job name :param select_all_jobs_by_section: section name @@ -1861,76 +1872,57 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, chunk_list, notransitive=False, monitor=False): + def rerun(self, job_list_unparsed, monitor=False): """ - Updates job list to rerun the jobs specified by chunk_list + Updates job list to rerun the jobs specified by a job list :param chunk_list: list of chunks to rerun :type chunk_list: str """ + self.parse_jobs_by_filter(job_list_unparsed,two_step_start=False) + member_list = set() + chunk_list = set() + date_list = set() + job_sections = set() + for job in self.get_all(): + if not monitor: + job.status = Status.COMPLETED + if job in self.rerun_job_list: + job_sections.add(job.section) + if not monitor: + job.status = Status.WAITING + if job.member is not None: + member_list.add(job.member) + if job.chunk is not None: + chunk_list.add(job.chunk) + if job.date is not None: + date_list.add(job.date) + else: + self._remove_job(job) + self._member_list = list(member_list) + self._chunk_list = list(chunk_list) + self._date_list = list(date_list) jobs_parser = self._get_jobs_parser() - Log.info("Adding dependencies...") dependencies = dict() - for job_section in jobs_parser.sections(): + + for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - - # If does not has rerun dependencies, do nothing - if not jobs_parser.has_option(job_section, "RERUN_DEPENDENCIES"): - continue - - dependencies_keys = jobs_parser.get( - job_section, "RERUN_DEPENDENCIES").split() - dependencies = JobList._manage_dependencies( - dependencies_keys, self._dic_jobs, job_section) - - for job in self._job_list: - job.status = Status.COMPLETED - - data = json.loads(chunk_list) - for d in data['sds']: - date = parse_date(d['sd']) - Log.debug("Date: {0}", date) - for m in d['ms']: - member = m['m'] - Log.debug("Member: " + member) - previous_chunk = 0 - for c in m['cs']: - Log.debug("Chunk: " + c) - chunk = int(c) - for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk)]: - - if not job.rerun_only or chunk != previous_chunk + 1: - job.status = Status.WAITING - Log.debug("Job: " + job.name) - - job_section = job.section - if job_section not in dependencies: + if jobs_parser.has_option(job_section, 'DEPENDENCIES'): + dependencies_keys = jobs_parser.get(job_section, "DEPENDENCIES").split() + dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs, job_section) + for job in self.get_jobs_by_section(job_section): + for key in dependencies_keys: + dependency = dependencies[key] + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, self._chunk_list, job.member, self._member_list, job.date, self._date_list, dependency) + if skip: continue - - for key in dependencies_keys: - skip, (current_chunk, current_member, current_date) = JobList._calculate_dependency_metadata(chunk, member, date, - dependencies[key]) - if skip: - continue - - section_name = dependencies[key].section - for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member, - current_chunk): + section_name = dependencies[key].section + for parent in self._dic_jobs.get_jobs(section_name, job.date, job.member,job.chunk): + if not monitor: parent.status = Status.WAITING - Log.debug("Parent: " + parent.name) - - for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - if job.synchronize is None: - self._remove_job(job) - - self.update_genealogy(notransitive=notransitive) - for job in [j for j in self._job_list if j.synchronize != None]: - if job.status == Status.COMPLETED: - job.status = Status.WAITING - else: - self._remove_job(job) + Log.debug("Parent: " + parent.name) def _get_jobs_parser(self): jobs_parser = self._parser_factory.create_parser()