From 1cbfd3e65ed258cc859977f30bd49c75180ae177 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 18 Nov 2021 11:46:10 +0100 Subject: [PATCH 1/3] First version, issues when calculating dependencies --- autosubmit/autosubmit.py | 13 ++-- autosubmit/config/config_common.py | 17 +++-- autosubmit/config/files/expdef.conf | 3 +- autosubmit/job/job_list.py | 107 ++++++++++++---------------- 4 files changed, 62 insertions(+), 78 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c31b53e17..1a8479552 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1309,7 +1309,7 @@ class Autosubmit: # Related to TWO_STEP_START new variable defined in expdef unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": - job_list.parse_two_step_start(unparsed_two_step_start) + job_list.parse_jobs_by_filter(unparsed_two_step_start) job_list.create_dictionary(date_list, member_list, num_chunks, chunk_ini, date_format, as_conf.get_retrials(), wrapper_jobs ) while job_list.get_active(): @@ -1602,7 +1602,7 @@ class Autosubmit: # Related to TWO_STEP_START new variable defined in expdef unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": - job_list.parse_two_step_start(unparsed_two_step_start) + job_list.parse_jobs_by_filter(unparsed_two_step_start) main_loop_retrials = 480 # Hard limit of tries 480 tries at 30seconds sleep each try # establish the connection to all platforms @@ -3991,9 +3991,7 @@ class Autosubmit: as_conf.get_wrapper_type(), wrapper_jobs, notransitive=notransitive, update_structure=True, run_only_members=run_only_members) if rerun == "true": - chunk_list = Autosubmit._create_json( - as_conf.get_chunk_list()) - job_list.rerun(chunk_list, notransitive) + job_list.rerun(as_conf.get_rerun_jobs()) else: job_list.remove_rerun_only_jobs(notransitive) Log.info("\nSaving the jobs list...") @@ -5188,10 +5186,9 @@ class Autosubmit: as_conf.get_default_job_type(), as_conf.get_wrapper_type(), wrapper_jobs, new=False, notransitive=notransitive, run_only_members=run_only_members) if rerun == "true": - - chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + rerun_jobs = as_conf.get_rerun_jobs() if not monitor: - job_list.rerun(chunk_list, notransitive) + job_list.rerun(rerun_jobs) else: rerun_list = JobList(expid, BasicConfig, ConfigParserFactory(), Autosubmit._get_job_list_persistence(expid, as_conf)) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3484925ad..e0b527392 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1002,6 +1002,16 @@ class AutosubmitConfig(object): return self._exp_parser.get_option('experiment', 'TWO_STEP_START', '').lower() + def get_rerun_jobs(self): + """ + Returns rerun jobs + + :return: jobs_list + :rtype: str + """ + + return self._exp_parser.get_option('rerun', 'RERUN_JOBLIST', '').lower() + def get_file_project_conf(self): """ Returns path to project config file from experiment config file @@ -1298,14 +1308,7 @@ class AutosubmitConfig(object): return self._exp_parser.get('rerun', 'RERUN').lower() - def get_chunk_list(self): - """ - Returns chunk list from experiment's config file - :return: experiment's chunks - :rtype: list - """ - return self._exp_parser.get('rerun', 'CHUNKLIST') def get_platform(self): """ diff --git a/autosubmit/config/files/expdef.conf b/autosubmit/config/files/expdef.conf index a7d0be70c..e3864fa9c 100644 --- a/autosubmit/config/files/expdef.conf +++ b/autosubmit/config/files/expdef.conf @@ -82,4 +82,5 @@ JOB_SCRIPTS_TYPE = RERUN = FALSE # If RERUN = TRUE then supply the list of chunks to rerun # LIST = [ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ] -CHUNKLIST = \ No newline at end of file +# RERUN_JOBLIST = LOCAL_SETUP,LOCAL_SEND_INITIAL,LOCAL_SEND_SOURCE,LOCAL_SEND_STATIC,REMOTE_COMPILE,PREPROCFIX,PREPROCVAR,LOCAL_SEND_INITIAL_DA,COMPILE_DA;SIM[20120101 20120201[C:1]],DA[20120101[C:1]] +RERUN_JOBLIST = \ No newline at end of file diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d2f477665..cb8c2e9cb 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -87,6 +87,7 @@ class JobList(object): self.sections_checked = set() self._run_members = None self.jobs_to_run_first = list() + self.rerun_job_list = list() @property def expid(self): """ @@ -955,7 +956,7 @@ class JobList(object): if len(self.jobs_to_run_first) > 0 and keep_running is False: raise AutosubmitCritical("No more jobs to run first, there were still pending jobs but they're unable to run without their parents or there are failed jobs.",7014) - def parse_two_step_start(self, unparsed_jobs): + def parse_jobs_by_filter(self, unparsed_jobs,two_step_start = True): jobs_to_run_first = list() select_jobs_by_name = "" #job_name select_all_jobs_by_section = "" # all @@ -975,10 +976,20 @@ class JobList(object): aux = unparsed_jobs.split(';') select_all_jobs_by_section = aux[0] filter_jobs_by_section = aux[1] - try: - self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name,select_all_jobs_by_section=select_all_jobs_by_section,filter_jobs_by_section=filter_jobs_by_section) - except: - raise AutosubmitCritical("Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format(unparsed_jobs)) + if two_step_start: + try: + self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name,select_all_jobs_by_section=select_all_jobs_by_section,filter_jobs_by_section=filter_jobs_by_section) + except: + raise AutosubmitCritical("Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format(unparsed_jobs)) + else: + try: + self.rerun_job_list = self.get_job_related(select_jobs_by_name=select_jobs_by_name, + select_all_jobs_by_section=select_all_jobs_by_section, + filter_jobs_by_section=filter_jobs_by_section) + except: + raise AutosubmitCritical( + "Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format( + unparsed_jobs)) def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section=""): """ @@ -1861,76 +1872,48 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, chunk_list, notransitive=False, monitor=False): + def rerun(self, job_list_unparsed, notransitive=False, monitor=False): """ - Updates job list to rerun the jobs specified by chunk_list + Updates job list to rerun the jobs specified by a job list :param chunk_list: list of chunks to rerun :type chunk_list: str """ - jobs_parser = self._get_jobs_parser() + self.parse_jobs_by_filter(job_list_unparsed,two_step_start=False) + member_list = set() + chunk_list = set() + date_list = set() + for job in self._job_list: + job.status = Status.COMPLETED + if job in self.rerun_job_list: + job.status = Status.WAITING + member_list.add(job.member) + chunk_list.add(job.chunk) + date_list.add(job.date) + else: + self._remove_job(job) + jobs_parser = self._get_jobs_parser() Log.info("Adding dependencies...") dependencies = dict() + for job_section in jobs_parser.sections(): Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - - # If does not has rerun dependencies, do nothing - if not jobs_parser.has_option(job_section, "RERUN_DEPENDENCIES"): - continue - - dependencies_keys = jobs_parser.get( - job_section, "RERUN_DEPENDENCIES").split() - dependencies = JobList._manage_dependencies( - dependencies_keys, self._dic_jobs, job_section) - - for job in self._job_list: - job.status = Status.COMPLETED - - data = json.loads(chunk_list) - for d in data['sds']: - date = parse_date(d['sd']) - Log.debug("Date: {0}", date) - for m in d['ms']: - member = m['m'] - Log.debug("Member: " + member) - previous_chunk = 0 - for c in m['cs']: - Log.debug("Chunk: " + c) - chunk = int(c) - for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk)]: - - if not job.rerun_only or chunk != previous_chunk + 1: - job.status = Status.WAITING - Log.debug("Job: " + job.name) - - job_section = job.section - if job_section not in dependencies: + if jobs_parser.has_option(job_section, 'DEPENDENCIES'): + dependencies_keys = jobs_parser.get(job_section, "DEPENDENCIES").split() + dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs, job_section) + for job in self.get_jobs_by_section(job_section): + for key in dependencies_keys: + dependency = dependencies[key] + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, list(chunk_list), job.member, list(member_list), job.date, list(date_list), dependency) + if skip: continue - - for key in dependencies_keys: - skip, (current_chunk, current_member, current_date) = JobList._calculate_dependency_metadata(chunk, member, date, - dependencies[key]) - if skip: - continue - - section_name = dependencies[key].section - for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member, - current_chunk): - parent.status = Status.WAITING - Log.debug("Parent: " + parent.name) - - for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - if job.synchronize is None: - self._remove_job(job) - + section_name = dependencies[key].section + for parent in self._dic_jobs.get_jobs(section_name, job.date, job.member,job.chunk): + parent.status = Status.WAITING + Log.debug("Parent: " + parent.name) self.update_genealogy(notransitive=notransitive) - for job in [j for j in self._job_list if j.synchronize != None]: - if job.status == Status.COMPLETED: - job.status = Status.WAITING - else: - self._remove_job(job) def _get_jobs_parser(self): jobs_parser = self._parser_factory.create_parser() -- GitLab From e8acf4fdf91aa7eda3fcb566aac8c3f3896e80e3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 19 Nov 2021 10:53:48 +0100 Subject: [PATCH 2/3] v2 --- autosubmit/job/job_list.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index cb8c2e9cb..b2b975d10 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -985,13 +985,13 @@ class JobList(object): try: self.rerun_job_list = self.get_job_related(select_jobs_by_name=select_jobs_by_name, select_all_jobs_by_section=select_all_jobs_by_section, - filter_jobs_by_section=filter_jobs_by_section) + filter_jobs_by_section=filter_jobs_by_section,two_step_start=two_step_start) except: raise AutosubmitCritical( "Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format( unparsed_jobs)) - def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section=""): + def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section="",two_step_start=True): """ :param select_jobs_by_name: job name :param select_all_jobs_by_section: section name @@ -1888,11 +1888,17 @@ class JobList(object): job.status = Status.COMPLETED if job in self.rerun_job_list: job.status = Status.WAITING - member_list.add(job.member) - chunk_list.add(job.chunk) - date_list.add(job.date) + if job.member is not None: + member_list.add(job.member) + if job.chunk is not None: + chunk_list.add(job.chunk) + if job.date is not None: + date_list.add(job.date) else: self._remove_job(job) + self._member_list = list(member_list) + self._chunk_list = list(chunk_list) + self._date_list = list(date_list) jobs_parser = self._get_jobs_parser() Log.info("Adding dependencies...") dependencies = dict() @@ -1906,7 +1912,7 @@ class JobList(object): for job in self.get_jobs_by_section(job_section): for key in dependencies_keys: dependency = dependencies[key] - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, list(chunk_list), job.member, list(member_list), job.date, list(date_list), dependency) + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, self._chunk_list, job.member, self._member_list, job.date, self._date_list, dependency) if skip: continue section_name = dependencies[key].section -- GitLab From e44d7754c76f9c9e12ff4a2aa1fd6178950afccb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 23 Nov 2021 13:16:37 +0100 Subject: [PATCH 3/3] Rerun implementation --- autosubmit/autosubmit.py | 15 +-------------- autosubmit/job/job_list.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1a8479552..162a938c6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -5187,20 +5187,7 @@ class Autosubmit: new=False, notransitive=notransitive, run_only_members=run_only_members) if rerun == "true": rerun_jobs = as_conf.get_rerun_jobs() - if not monitor: - job_list.rerun(rerun_jobs) - else: - rerun_list = JobList(expid, BasicConfig, ConfigParserFactory(), - Autosubmit._get_job_list_persistence(expid, as_conf)) - rerun_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), - as_conf.get_chunk_ini(), - as_conf.load_parameters(), date_format, as_conf.get_retrials(), - as_conf.get_default_job_type(), as_conf.get_wrapper_type(), - as_conf.get_wrapper_jobs(), - new=False, notransitive=notransitive) - rerun_list.rerun(chunk_list, notransitive) - job_list = Autosubmit.rerun_recovery( - expid, job_list, rerun_list, as_conf) + job_list.rerun(rerun_jobs,monitor=monitor) else: job_list.remove_rerun_only_jobs(notransitive) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b2b975d10..8bdfb0428 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1872,7 +1872,7 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, job_list_unparsed, notransitive=False, monitor=False): + def rerun(self, job_list_unparsed, monitor=False): """ Updates job list to rerun the jobs specified by a job list @@ -1883,11 +1883,14 @@ class JobList(object): member_list = set() chunk_list = set() date_list = set() - - for job in self._job_list: - job.status = Status.COMPLETED + job_sections = set() + for job in self.get_all(): + if not monitor: + job.status = Status.COMPLETED if job in self.rerun_job_list: - job.status = Status.WAITING + job_sections.add(job.section) + if not monitor: + job.status = Status.WAITING if job.member is not None: member_list.add(job.member) if job.chunk is not None: @@ -1903,7 +1906,7 @@ class JobList(object): Log.info("Adding dependencies...") dependencies = dict() - for job_section in jobs_parser.sections(): + for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) if jobs_parser.has_option(job_section, 'DEPENDENCIES'): @@ -1917,9 +1920,9 @@ class JobList(object): continue section_name = dependencies[key].section for parent in self._dic_jobs.get_jobs(section_name, job.date, job.member,job.chunk): - parent.status = Status.WAITING + if not monitor: + parent.status = Status.WAITING Log.debug("Parent: " + parent.name) - self.update_genealogy(notransitive=notransitive) def _get_jobs_parser(self): jobs_parser = self._parser_factory.create_parser() -- GitLab