From 9e54e0b887b3b09e4c02ff34871cb8751b77ab03 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 21 Dec 2018 10:16:35 +0100 Subject: [PATCH 01/23] dummy remote work test --- autosubmit/job/job_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 11d14a9f8..15157e3e6 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -947,7 +947,7 @@ class JobList: dependencies[key]) if skip: continue - +#dummy commit for test if remote works section_name = dependencies[key].section for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member, current_chunk): -- GitLab From 0d0a7ff36f344606a8f8d98a73fe43a6ed34a25f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 21 Dec 2018 13:42:51 +0100 Subject: [PATCH 02/23] should work now need testing with a more complex exp , #150 --- autosubmit/job/job_list.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 15157e3e6..3e907d059 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -943,11 +943,11 @@ class JobList: continue for key in dependencies_keys: - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(chunk, member, date, + skip, (current_chunk, current_member, current_date) = JobList._calculate_dependency_metadata(chunk, member, date, dependencies[key]) if skip: continue -#dummy commit for test if remote works + section_name = dependencies[key].section for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member, current_chunk): -- GitLab From dc94dbf7d06a8c621a834424f2ba7327474788fb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 21 Dec 2018 15:07:39 +0100 Subject: [PATCH 03/23] should work needs testing with a more complex job together with #150 , #339 --- autosubmit/autosubmit.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0ff06a791..6fe5120d3 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -731,7 +731,9 @@ class Autosubmit: None, None, jobs[0].platform, as_conf) job_list.job_package_map[jobs[0].id] = wrapper_job - + if as_conf.get_rerun(): + job_list.update_list(as_conf) + job_list.save() ######################### # AUTOSUBMIT - MAIN LOOP ######################### -- GitLab From ee238a41e65cc96c8d31e443c9ea7ea8d0354b4c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 15 Jan 2019 14:29:18 +0100 Subject: [PATCH 04/23] added a mechanism for detect if the jobs.name using -fl option are different that the expid , also disable the save option if is the case, #249 --- autosubmit/autosubmit.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6fe5120d3..a8b5ac218 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -53,7 +53,7 @@ import datetime import portalocker from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool - +from collections import defaultdict from pyparsing import nestedExpr sys.path.insert(0, os.path.abspath('.')) @@ -190,6 +190,7 @@ class Autosubmit: help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group.add_argument('-ft', '--filter_type', type=str, @@ -731,9 +732,7 @@ class Autosubmit: None, None, jobs[0].platform, as_conf) job_list.job_package_map[jobs[0].id] = wrapper_job - if as_conf.get_rerun(): - job_list.update_list(as_conf) - job_list.save() + ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -807,7 +806,7 @@ class Autosubmit: as_conf.get_mails_to()) save = True - if job_list.update_list(as_conf) or save: + if job_list.updatze_list(as_conf) or save: job_list.save() if Autosubmit.exit: @@ -2222,7 +2221,7 @@ class Autosubmit: Log.debug('Chunks to change: {0}', filter_chunks) Log.debug('Status of jobs to change: {0}', filter_status) Log.debug('Sections to change: {0}', filter_section) - + wrongExpid = 0 as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): Log.critical('Can not run with invalid configuration') @@ -2284,6 +2283,15 @@ class Autosubmit: if lst: jobs = lst.split() + expidJoblist =defaultdict(int) + for x in lst.split(): + expidJoblist[str(x[0:4])] += 1 + + if str(expid) in expidJoblist: + wrongExpid=jobs.__len__()-expidJoblist[expid] + if wrongExpid > 0: + Log.warning("There are {0} job.name with an invalid Expid",wrongExpid) + if jobs == 'Any': for job in job_list.get_job_list(): @@ -2295,10 +2303,14 @@ class Autosubmit: sys.setrecursionlimit(50000) job_list.update_list(as_conf,False) - if save: + + if save and wrongExpid == 0: job_list.save() else: Log.warning("Changes NOT saved to the JobList!!!!: use -s option to save") + if wrongExpid == 0: + Log.error("Save disabled due invalid expid, please check or/and jobs expid name") + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).load() -- GitLab From 0c0156136266659d44bc450967db8d94ffcab72d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 11:46:52 +0100 Subject: [PATCH 05/23] Potentially fix for , #338,#150 (reverting , issues with pipeline) --- autosubmit/autosubmit.py | 2 +- autosubmit/config/config_common.py | 10 +++++++++- autosubmit/job/job.py | 1 + autosubmit/job/job_dict.py | 1 + autosubmit/job/job_list.py | 28 ++++++++++++++++++++-------- 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a8b5ac218..9e688e3e9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2302,7 +2302,7 @@ class Autosubmit: Autosubmit.change_status(final, final_status, job) sys.setrecursionlimit(50000) - job_list.update_list(as_conf,False) + job_list.update_list(as_conf,False,True) if save and wrongExpid == 0: job_list.save() diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4e19c9076..4ecb78cf0 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -143,7 +143,15 @@ class AutosubmitConfig(object): :rtype: str """ return self._jobs_parser.get_option(section, 'WALLCLOCK', '') - + def get_synchronize(self, section): + """ + Gets wallclock for the given job type + :param section: job type + :type section: str + :return: wallclock time + :rtype: str + """ + return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') def get_processors(self, section): """ Gets processors needed for the given job type diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 73b3e0969..bc4b6b8da 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -73,6 +73,7 @@ class Job(object): self.name = name self.split = None self.delay = None + self.synchronize = None self._long_name = None self.long_name = name self.date_format = '' diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 40c147be8..005ded4ea 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -334,6 +334,7 @@ class DicJobs: job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') + job.synchronize = self.get_option(section, "SYNCHRONIZE", '') job.retrials = int(self.get_option(section, 'RETRIALS', -1)) if job.retrials == -1: job.retrials = None diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 3e907d059..b7d38f855 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -777,7 +777,7 @@ class JobList: def parameters(self, value): self._parameters = value - def update_list(self, as_conf,store_change=True): + def update_list(self, as_conf,store_change=True,fromSetStatus=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -814,13 +814,25 @@ class JobList: Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY - Log.debug('Updating WAITING jobs') - for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - save = True - Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) + + if not fromSetStatus: + Log.debug('Updating WAITING jobs') + for job in self.get_waiting(): + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + save = True + Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) + Log.debug('Update finished') + #RERUN FIX + for job in self.get_completed(): + Log.debug('Updating SYNC jobs') + if job.synchronize is not None: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + job.status = Status.WAITING + save = True + Log.debug("Sync Job detected with parents uncompleted, changing into ready".format(job.name)) Log.debug('Update finished') return save -- GitLab From e0ac06b6d066a08e964159323a88b7f4fd5ba5e2 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 12:24:18 +0100 Subject: [PATCH 06/23] (issues with pipeline) --- autosubmit/config/config_common.py | 4 ++-- autosubmit/job/job.py | 1 + autosubmit/job/job_dict.py | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4ecb78cf0..403d62684 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -145,10 +145,10 @@ class AutosubmitConfig(object): return self._jobs_parser.get_option(section, 'WALLCLOCK', '') def get_synchronize(self, section): """ - Gets wallclock for the given job type + Gets synchronize for the given job type :param section: job type :type section: str - :return: wallclock time + :return: synchronize :rtype: str """ return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bc4b6b8da..f9513682f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -637,6 +637,7 @@ class Job(object): self.memory = as_conf.get_memory(self.section) self.memory_per_task = as_conf.get_memory_per_task(self.section) self.wallclock = as_conf.get_wallclock(self.section) + self.synchronize = as_conf.get_synchronize(self.section) self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 005ded4ea..64848d35e 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -334,7 +334,8 @@ class DicJobs: job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') - job.synchronize = self.get_option(section, "SYNCHRONIZE", '') + job.synchronize = str(self.get_option(section, "SYNCHRONIZE", '')) + job.retrials = int(self.get_option(section, 'RETRIALS', -1)) if job.retrials == -1: job.retrials = None -- GitLab From 84c880e5e3411c876bc346e2e92df77596fc8fc6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 12:57:24 +0100 Subject: [PATCH 07/23] (issues with pipeline) --- autosubmit/job/job_dict.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 64848d35e..00cf1bfae 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -334,12 +334,14 @@ class DicJobs: job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') - job.synchronize = str(self.get_option(section, "SYNCHRONIZE", '')) + job.retrials = int(self.get_option(section, 'RETRIALS', -1)) + if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] + job.synchronize = str(self.get_option(section, "SYNCHRONIZE", '')) self._jobs_list.get_job_list().append(job) return job -- GitLab From 547979be9f234bf7b20a232b4f419808b0192fb7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 13:04:05 +0100 Subject: [PATCH 08/23] (issues with pipeline) --- autosubmit/config/config_common.py | 2 +- autosubmit/job/job_dict.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 403d62684..751430bcb 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -151,7 +151,7 @@ class AutosubmitConfig(object): :return: synchronize :rtype: str """ - return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') + return self._jobs_parser.get_option(section, 'SYNCHRONIZE',None) def get_processors(self, section): """ Gets processors needed for the given job type diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 00cf1bfae..87b000af8 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -341,7 +341,7 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - job.synchronize = str(self.get_option(section, "SYNCHRONIZE", '')) + job.synchronize = str(self.get_option(section, "SYNCHRONIZE", None)) self._jobs_list.get_job_list().append(job) return job -- GitLab From e96ab1a35d0ef3caab57e1c21c1f88ec404f3e02 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 13:09:56 +0100 Subject: [PATCH 09/23] (issues with pipeline) --- autosubmit/config/config_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 751430bcb..e04923b3f 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -151,7 +151,7 @@ class AutosubmitConfig(object): :return: synchronize :rtype: str """ - return self._jobs_parser.get_option(section, 'SYNCHRONIZE',None) + return str(self._jobs_parser.get_option(section, 'SYNCHRONIZE',None)) def get_processors(self, section): """ Gets processors needed for the given job type -- GitLab From dbe0a532f4fe1de953f88633baad6ef5b1773dcf Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 18 Jan 2019 16:31:18 +0100 Subject: [PATCH 10/23] (issues with pipeline) --- autosubmit/config/config_common.py | 9 --------- autosubmit/job/job.py | 2 +- autosubmit/job/job_dict.py | 4 +++- autosubmit/job/job_list.py | 3 +++ 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index e04923b3f..d1f665ed8 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -143,15 +143,6 @@ class AutosubmitConfig(object): :rtype: str """ return self._jobs_parser.get_option(section, 'WALLCLOCK', '') - def get_synchronize(self, section): - """ - Gets synchronize for the given job type - :param section: job type - :type section: str - :return: synchronize - :rtype: str - """ - return str(self._jobs_parser.get_option(section, 'SYNCHRONIZE',None)) def get_processors(self, section): """ Gets processors needed for the given job type diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index f9513682f..a8395a863 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -637,7 +637,7 @@ class Job(object): self.memory = as_conf.get_memory(self.section) self.memory_per_task = as_conf.get_memory_per_task(self.section) self.wallclock = as_conf.get_wallclock(self.section) - self.synchronize = as_conf.get_synchronize(self.section) + self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 87b000af8..e08e1f153 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -159,6 +159,7 @@ class DicJobs: :type delay: int """ # Temporally creation for unified jobs in case of synchronize + if synchronize is not None: tmp_dic = dict() count = 0 @@ -341,8 +342,9 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - job.synchronize = str(self.get_option(section, "SYNCHRONIZE", None)) + self._jobs_list.get_job_list().append(job) + job.synchronize = str(self.get_option(section, 'SYNCHRONIZE', '')) return job def get_option(self, section, option, default): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b7d38f855..6d64f8c95 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -818,6 +818,7 @@ class JobList: if not fromSetStatus: Log.debug('Updating WAITING jobs') for job in self.get_waiting(): + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY @@ -825,6 +826,8 @@ class JobList: Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') #RERUN FIX + # for job in self.get_waiting(): + # Log.info("Sync:{0},{1}", job.name, job.synchronize) for job in self.get_completed(): Log.debug('Updating SYNC jobs') if job.synchronize is not None: -- GitLab From 4e242d041a77590296dbb85af8e6eb1a61b645cc Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 09:45:47 +0100 Subject: [PATCH 11/23] (issues with pipeline) --- autosubmit/autosubmit.py | 2 +- autosubmit/config/config_common.py | 10 ++++++++++ autosubmit/job/job.py | 4 +++- autosubmit/job/job_dict.py | 4 ++-- autosubmit/job/job_list.py | 4 ++-- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 9e688e3e9..5a3f8ad52 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2308,7 +2308,7 @@ class Autosubmit: job_list.save() else: Log.warning("Changes NOT saved to the JobList!!!!: use -s option to save") - if wrongExpid == 0: + if wrongExpid > 0: Log.error("Save disabled due invalid expid, please check or/and jobs expid name") diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d1f665ed8..981c7966f 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -143,6 +143,16 @@ class AutosubmitConfig(object): :rtype: str """ return self._jobs_parser.get_option(section, 'WALLCLOCK', '') + + def get_synchronize(self, section): + """ + Gets wallclock for the given job type + :param section: job type + :type section: str + :return: wallclock time + :rtype: str + """ + return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') def get_processors(self, section): """ Gets processors needed for the given job type diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a8395a863..994448f66 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -73,7 +73,7 @@ class Job(object): self.name = name self.split = None self.delay = None - self.synchronize = None + self._long_name = None self.long_name = name self.date_format = '' @@ -99,6 +99,7 @@ class Job(object): self.check = 'True' self.packed = False + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -591,6 +592,7 @@ class Job(object): parameters['CHUNK'] = chunk parameters['SPLIT'] = self.split parameters['DELAY'] = self.delay + parameters['SYNCHRONIZE'] = self.synchronize total_chunk = int(parameters['NUMCHUNKS']) chunk_length = int(parameters['CHUNKSIZE']) chunk_unit = parameters['CHUNKSIZEUNIT'].lower() diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index e08e1f153..61312d611 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -342,9 +342,9 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - + job.synchronize = self.get_option(section, "SYNCHRONIZE", '') self._jobs_list.get_job_list().append(job) - job.synchronize = str(self.get_option(section, 'SYNCHRONIZE', '')) + return job def get_option(self, section, option, default): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 6d64f8c95..5b7264591 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -826,8 +826,8 @@ class JobList: Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') #RERUN FIX - # for job in self.get_waiting(): - # Log.info("Sync:{0},{1}", job.name, job.synchronize) + #for job in self.get_waiting(): + #Log.info("Sync:{0},{1}", job.name, job.synchronize) for job in self.get_completed(): Log.debug('Updating SYNC jobs') if job.synchronize is not None: -- GitLab From 226ba844b50cd0fb0bc3694fac0e07cd48045c91 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 10:12:14 +0100 Subject: [PATCH 12/23] (issues with pipeline) --- autosubmit/job/job_dict.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 61312d611..0962116cc 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -302,6 +302,7 @@ class DicJobs: job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) else: job = Job(name, 0, Status.WAITING, priority) + job.synchronize = self.get_option(section, "SYNCHRONIZE", '') job.section = section job.date = date job.member = member @@ -342,7 +343,7 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - job.synchronize = self.get_option(section, "SYNCHRONIZE", '') + self._jobs_list.get_job_list().append(job) return job -- GitLab From ef52f5d7ae0b181858c7f538b247c2b3ea0d5782 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 10:25:02 +0100 Subject: [PATCH 13/23] (issues with pipeline) --- autosubmit/autosubmit.py | 1 + autosubmit/job/job.py | 1 + autosubmit/job/job_dict.py | 5 ++++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5a3f8ad52..0b92655d4 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2309,6 +2309,7 @@ class Autosubmit: else: Log.warning("Changes NOT saved to the JobList!!!!: use -s option to save") if wrongExpid > 0: + Log.error("Save disabled due invalid expid, please check or/and jobs expid name") diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 994448f66..eb427a69d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -73,6 +73,7 @@ class Job(object): self.name = name self.split = None self.delay = None + self.synchronize = None self._long_name = None self.long_name = name diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 0962116cc..8015b7492 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -302,7 +302,7 @@ class DicJobs: job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) else: job = Job(name, 0, Status.WAITING, priority) - job.synchronize = self.get_option(section, "SYNCHRONIZE", '') + job.section = section job.date = date job.member = member @@ -343,6 +343,9 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] + sync= self.get_option(section, "SYNCHRONIZE", '') + if sync != None: + job.synchronize = str(sync) self._jobs_list.get_job_list().append(job) -- GitLab From 2b93d95fb97db542d6a16d574442109421659b02 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 11:53:42 +0100 Subject: [PATCH 14/23] (issues with pipeline,changing test_dict right parameters) --- autosubmit/autosubmit.py | 2 +- test/unit/test_dic_jobs.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0b92655d4..40dee1e23 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -806,7 +806,7 @@ class Autosubmit: as_conf.get_mails_to()) save = True - if job_list.updatze_list(as_conf) or save: + if job_list.update_list(as_conf) or save: job_list.save() if Autosubmit.exit: diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 6a595479f..23b96878b 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -294,10 +294,11 @@ class TestDicJobs(TestCase): tasks = '333' memory = memory_per_task = 444 wallclock = 555 + synchronize =None notify_on = 'COMPLETED FAILED' - self.parser_mock.has_option = Mock(side_effect=[True, True, True, True, True, True, True, True, True, True, True, + self.parser_mock.has_option = Mock(side_effect=[True,True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True]) - self.parser_mock.get = Mock(side_effect=[frequency, delay, 'True', 'True', 'bash', platform_name, filename, queue, + self.parser_mock.get = Mock(side_effect=[frequency,synchronize, delay, 'True', 'True', 'bash', platform_name, filename, queue, 'True', processors, threads, tasks, memory, memory_per_task, wallclock, notify_on]) job_list_mock = Mock() -- GitLab From 91e2ea5681c01b4cba522913789f4cb5b9d8d094 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 12:02:55 +0100 Subject: [PATCH 15/23] (issues with pipeline,changing test_dict right parameters) --- autosubmit/job/job_dict.py | 5 +---- autosubmit/job/job_list.py | 2 +- test/unit/test_dic_jobs.py | 10 +++++----- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 8015b7492..9f2c143d7 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -343,10 +343,7 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - sync= self.get_option(section, "SYNCHRONIZE", '') - if sync != None: - job.synchronize = str(sync) - + job.synchronize = self._parser.get_option(section, "SYNCHRONIZE", '') self._jobs_list.get_job_list().append(job) return job diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 5b7264591..2eb13ecd8 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -827,7 +827,7 @@ class JobList: Log.debug('Update finished') #RERUN FIX #for job in self.get_waiting(): - #Log.info("Sync:{0},{1}", job.name, job.synchronize) + # Log.info("Sync:{0},{1}", job.name, job.synchronize) for job in self.get_completed(): Log.debug('Updating SYNC jobs') if job.synchronize is not None: diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 23b96878b..75442f79a 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -294,13 +294,13 @@ class TestDicJobs(TestCase): tasks = '333' memory = memory_per_task = 444 wallclock = 555 - synchronize =None + synchronize = None notify_on = 'COMPLETED FAILED' - self.parser_mock.has_option = Mock(side_effect=[True,True, True, True, True, True, True, True, True, True, True, True, - True, True, True, True, False, True]) - self.parser_mock.get = Mock(side_effect=[frequency,synchronize, delay, 'True', 'True', 'bash', platform_name, filename, queue, + self.parser_mock.has_option = Mock(side_effect=[True, True, True, True, True, True, True, True, True, True, True, + True, True, True, True, False, True, False]) + self.parser_mock.get = Mock(side_effect=[frequency, delay, 'True', 'True', 'bash', platform_name, filename, queue, 'True', processors, threads, tasks, memory, memory_per_task, - wallclock, notify_on]) + wallclock, notify_on,synchronize]) job_list_mock = Mock() job_list_mock.append = Mock() self.dictionary._jobs_list.get_job_list = Mock(return_value=job_list_mock) -- GitLab From 417ff2509585b36d9b26eec2befb0582615039f3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 12:12:04 +0100 Subject: [PATCH 16/23] (issues with pipeline,changing test_dict right parameters) --- autosubmit/job/job_dict.py | 3 ++- autosubmit/job/job_list.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 9f2c143d7..d87a85ebf 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -343,7 +343,8 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - job.synchronize = self._parser.get_option(section, "SYNCHRONIZE", '') + job.synchronize = self.get_option(section, "SYNCHRONIZE", '') + self._jobs_list.get_job_list().append(job) return job diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 2eb13ecd8..f49960e74 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -826,8 +826,9 @@ class JobList: Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') #RERUN FIX - #for job in self.get_waiting(): - # Log.info("Sync:{0},{1}", job.name, job.synchronize) + for job in self.get_waiting(): + Log.info("Sync:{0},{1}", job.name, job.synchronize) + for job in self.get_completed(): Log.debug('Updating SYNC jobs') if job.synchronize is not None: -- GitLab From c8e470050cd317ebf16671c0521f41e8d2cf9d3b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 21 Jan 2019 12:43:05 +0100 Subject: [PATCH 17/23] Issue,resolved cleaning function logs --- autosubmit/job/job_list.py | 3 --- test/unit/test_dic_jobs.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f49960e74..09b41f0ae 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -826,9 +826,6 @@ class JobList: Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') #RERUN FIX - for job in self.get_waiting(): - Log.info("Sync:{0},{1}", job.name, job.synchronize) - for job in self.get_completed(): Log.debug('Updating SYNC jobs') if job.synchronize is not None: diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 75442f79a..1cba4d1ad 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -294,8 +294,8 @@ class TestDicJobs(TestCase): tasks = '333' memory = memory_per_task = 444 wallclock = 555 - synchronize = None notify_on = 'COMPLETED FAILED' + synchronize = None self.parser_mock.has_option = Mock(side_effect=[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True, False]) self.parser_mock.get = Mock(side_effect=[frequency, delay, 'True', 'True', 'bash', platform_name, filename, queue, -- GitLab From f361e835e30a82d4107b95ad9b04979eaf93f112 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 22 Jan 2019 17:30:52 +0100 Subject: [PATCH 18/23] Issue,resolved cleaning function logs --- autosubmit/autosubmit.py | 23 +++++++++++++++++----- autosubmit/job/job_dict.py | 4 +++- autosubmit/job/job_list.py | 39 +++++++++++++++++++++++--------------- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 40dee1e23..b8ecb00a0 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -696,6 +696,7 @@ class Autosubmit: pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + Log.debug("Starting from job list restored from {0} files", pkl_dir) Log.debug("Length of the jobs list: {0}", len(job_list)) @@ -938,7 +939,10 @@ class Autosubmit: return False pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) + + #for job in job_list.get_job_list(): + # job.check_completion() Log.debug("Job list restored from {0} files", pkl_dir) if not isinstance(job_list, type([])): jobs = [] @@ -2059,13 +2063,12 @@ class Autosubmit: if rerun == "true": chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + job_list.rerun(chunk_list, notransitive) else: job_list.remove_rerun_only_jobs(notransitive) - Log.info("\nSaving the jobs list...") job_list.save() - JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).reset_table() @@ -2137,7 +2140,7 @@ class Autosubmit: shutil.rmtree(project_path, ignore_errors=True) return False Log.debug("{0}", output) - + #RSYNC elif project_type == "local": local_project_path = as_conf.get_local_project_path() project_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, BasicConfig.LOCAL_PROJ_DIR) @@ -2651,7 +2654,8 @@ class Autosubmit: open(as_conf.experiment_file, 'w').write(content) @staticmethod - def load_job_list(expid, as_conf, notransitive=False): + def load_job_list(expid, as_conf, notransitive=False,monitor=False): + rerun = as_conf.get_rerun() job_list = JobList(expid, BasicConfig, ConfigParserFactory(), Autosubmit._get_job_list_persistence(expid, as_conf)) date_list = as_conf.get_date_list() @@ -2667,6 +2671,15 @@ class Autosubmit: as_conf.load_parameters(), date_format, as_conf.get_retrials(), as_conf.get_default_job_type(), as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), new=False, notransitive=notransitive) + if not monitor: + if rerun == "true": + chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + + job_list.rerun(chunk_list, notransitive) + else: + job_list.remove_rerun_only_jobs(notransitive) + + return job_list @staticmethod diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index d87a85ebf..1a0239c9a 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -300,9 +300,11 @@ class DicJobs: job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) job.local_logs = (jobs_data[name][8], jobs_data[name][9]) job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) + else: job = Job(name, 0, Status.WAITING, priority) + job.section = section job.date = date job.member = member @@ -343,7 +345,7 @@ class DicJobs: if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] - job.synchronize = self.get_option(section, "SYNCHRONIZE", '') + job.synchronize = self.get_option(section, "SYNCHRONIZE", None) self._jobs_list.get_job_list().append(job) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 09b41f0ae..c6297c515 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -97,7 +97,7 @@ class JobList: self._graph = value def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, wrapper_type=None, wrapper_jobs=None, new=True, notransitive=False): + default_job_type, wrapper_type=None, wrapper_jobs=None,new=True, notransitive=False): """ Creates all jobs needed for the current workflow @@ -138,7 +138,6 @@ class JobList: if not new: jobs_data = {str(row[0]): row for row in self.load()} self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) - Log.info("Adding dependencies...") self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph) @@ -814,6 +813,16 @@ class JobList: Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY + #RERUN FIX + for job in self.get_completed(): + Log.debug('Updating SYNC jobs') + if job.synchronize is not None: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + job.status = Status.WAITING + save = True + Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + Log.debug('Update finished') if not fromSetStatus: Log.debug('Updating WAITING jobs') @@ -825,16 +834,7 @@ class JobList: save = True Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') - #RERUN FIX - for job in self.get_completed(): - Log.debug('Updating SYNC jobs') - if job.synchronize is not None: - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) != len(job.parents): - job.status = Status.WAITING - save = True - Log.debug("Sync Job detected with parents uncompleted, changing into ready".format(job.name)) - Log.debug('Update finished') + return save def update_genealogy(self, new=True, notransitive=False): @@ -967,11 +967,20 @@ class JobList: parent.status = Status.WAITING Log.debug("Parent: " + parent.name) - for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - self._remove_job(job) for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - self._remove_job(job) + if job.synchronize is not None: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + job.status = Status.WAITING + save = True + Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + else: + + self._remove_job(job) + else: + + self._remove_job(job) self.update_genealogy(notransitive=notransitive) -- GitLab From 9d34a40c22b3782bff584fa6f890633ead3acca4 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 22 Jan 2019 17:33:35 +0100 Subject: [PATCH 19/23] Check previous commit to this branch( wrong commit name) , almost fix rerun #150 just need to fix autosubmit monitor to work with rerun (hopefully) --- autosubmit/autosubmit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index b8ecb00a0..d7dc9e6a4 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -942,7 +942,7 @@ class Autosubmit: job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) #for job in job_list.get_job_list(): - # job.check_completion() + # job.check_completion() # Log.debug("Job list restored from {0} files", pkl_dir) if not isinstance(job_list, type([])): jobs = [] -- GitLab From d1ae2fd1aa5a349fc2ef560dba177a7a01b47edb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 23 Jan 2019 16:22:13 +0100 Subject: [PATCH 20/23] Since rerun always reset all jobs status to waiting, when try to plot from monitor, if is a rerun it see all as waiting, if i put an ignore to rerun if is called from monitor it will show te current plot in watting except the jobs reruning, trying to avoid that. --- autosubmit/autosubmit.py | 85 ++++++++++++++++++++++++++++++++++---- autosubmit/job/job_list.py | 4 +- 2 files changed, 78 insertions(+), 11 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d7dc9e6a4..2a220be30 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -940,10 +940,9 @@ class Autosubmit: pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) - - #for job in job_list.get_job_list(): - # job.check_completion() # Log.debug("Job list restored from {0} files", pkl_dir) + + if not isinstance(job_list, type([])): jobs = [] if filter_chunks: @@ -1189,7 +1188,7 @@ class Autosubmit: Log.info('Recovering experiment {0}'.format(expid)) pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) Log.debug("Job list restored from {0} files", pkl_dir) if not as_conf.check_conf_files(): @@ -2062,9 +2061,11 @@ class Autosubmit: as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), notransitive=notransitive) if rerun == "true": - chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) job_list.rerun(chunk_list, notransitive) + + else: job_list.remove_rerun_only_jobs(notransitive) Log.info("\nSaving the jobs list...") @@ -2093,6 +2094,7 @@ class Autosubmit: Log.result("\nJob list created successfully") Log.user_warning("Remember to MODIFY the MODEL config files!") + return True except portalocker.AlreadyLocked: @@ -2671,15 +2673,80 @@ class Autosubmit: as_conf.load_parameters(), date_format, as_conf.get_retrials(), as_conf.get_default_job_type(), as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), new=False, notransitive=notransitive) - if not monitor: - if rerun == "true": - chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + if rerun == "true": + chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + if not monitor: job_list.rerun(chunk_list, notransitive) else: - job_list.remove_rerun_only_jobs(notransitive) + rerun_list = copy.copy(job_list) + rerun_list.rerun(chunk_list, notransitive) + job_list =Autosubmit.rerun_recovery(expid,job_list,rerun_list,as_conf) + else: + job_list.remove_rerun_only_jobs(notransitive) + + + return job_list + @staticmethod + def rerun_recovery(expid,job_list,rerun_list,as_conf): + """ + Method to check all active jobs. If COMPLETED file is found, job status will be changed to COMPLETED, + otherwise it will be set to WAITING. It will also update the jobs list. + + :param expid: identifier of the experiment to recover + :type expid: str + :param save: If true, recovery saves changes to the jobs list + :type save: bool + :param all_jobs: if True, it tries to get completed files for all jobs, not only active. + :type all_jobs: bool + :param hide: hides plot window + :type hide: bool + """ + + hpcarch = as_conf.get_platform() + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + platforms = submitter.platforms + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = platforms[job.platform_name.lower()] + # noinspection PyTypeChecker + platforms_to_test.add(platforms[job.platform_name.lower()]) + + + rerun_names=[] + + [rerun_names.append(job.name) for job in rerun_list.get_job_list()] + jobs_to_recover = [i for i in job_list.get_job_list() if i.name not in rerun_names] + Log.info("{0},{1},{2}",job_list.get_job_list() ,rerun_names,jobs_to_recover) + Log.info("Looking for COMPLETED files") + start = datetime.datetime.now() + for job in jobs_to_recover: + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = platforms[job.platform_name.lower()] + + if job.platform.get_completed_files(job.name, 0): + job.status = Status.COMPLETED + Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) + elif job.status != Status.SUSPENDED: + job.status = Status.WAITING + job.fail_count = 0 + Log.info("CHANGED job '{0}' status to WAITING".format(job.name)) + + job.platform.get_logs_files(expid, job.remote_logs) + + end = datetime.datetime.now() + Log.info("Time spent: '{0}'".format(end - start)) + Log.info("Updating the jobs list") return job_list @staticmethod diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index c6297c515..d98518694 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -909,7 +909,7 @@ class JobList: self._job_list.remove(job) - def rerun(self, chunk_list, notransitive=False): + def rerun(self, chunk_list, notransitive=False,monitor=False): """ Updates job list to rerun the jobs specified by chunk_list @@ -973,7 +973,7 @@ class JobList: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): job.status = Status.WAITING - save = True + Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) else: -- GitLab From ff8c3614259346eb815efbc680428a684eb3dc2a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 23 Jan 2019 16:44:18 +0100 Subject: [PATCH 21/23] Monitor works with some limitations, #150 partial fixed --- autosubmit/autosubmit.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 2a220be30..3f45e61c9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2679,7 +2679,14 @@ class Autosubmit: if not monitor: job_list.rerun(chunk_list, notransitive) else: - rerun_list = copy.copy(job_list) + rerun_list = JobList(expid, BasicConfig, ConfigParserFactory(), + Autosubmit._get_job_list_persistence(expid, as_conf)) + rerun_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), + as_conf.get_chunk_ini(), + as_conf.load_parameters(), date_format, as_conf.get_retrials(), + as_conf.get_default_job_type(), as_conf.get_wrapper_type(), + as_conf.get_wrapper_jobs(), + new=False, notransitive=notransitive) rerun_list.rerun(chunk_list, notransitive) job_list =Autosubmit.rerun_recovery(expid,job_list,rerun_list,as_conf) else: -- GitLab From d641ec8ea5fcc07c04d95d09c649546ded1f16c7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 24 Jan 2019 16:06:29 +0100 Subject: [PATCH 22/23] All works, maybe the design could change , #150 fixed --- autosubmit/autosubmit.py | 18 +++++++++--------- autosubmit/job/job_list.py | 23 +++++++++-------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 3f45e61c9..5b5fc5881 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2731,7 +2731,7 @@ class Autosubmit: [rerun_names.append(job.name) for job in rerun_list.get_job_list()] jobs_to_recover = [i for i in job_list.get_job_list() if i.name not in rerun_names] - Log.info("{0},{1},{2}",job_list.get_job_list() ,rerun_names,jobs_to_recover) + Log.info("Looking for COMPLETED files") start = datetime.datetime.now() @@ -2743,17 +2743,17 @@ class Autosubmit: if job.platform.get_completed_files(job.name, 0): job.status = Status.COMPLETED - Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) - elif job.status != Status.SUSPENDED: - job.status = Status.WAITING - job.fail_count = 0 - Log.info("CHANGED job '{0}' status to WAITING".format(job.name)) + # Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) + #elif job.status != Status.SUSPENDED: + # job.status = Status.WAITING + # job.fail_count = 0 + # Log.info("CHANGED job '{0}' status to WAITING".format(job.name)) job.platform.get_logs_files(expid, job.remote_logs) - end = datetime.datetime.now() - Log.info("Time spent: '{0}'".format(end - start)) - Log.info("Updating the jobs list") + #end = datetime.datetime.now() + #Log.info("Time spent: '{0}'".format(end - start)) + #Log.info("Updating the jobs list") return job_list @staticmethod diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d98518694..49732f5d7 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -816,7 +816,8 @@ class JobList: #RERUN FIX for job in self.get_completed(): Log.debug('Updating SYNC jobs') - if job.synchronize is not None: + if job.synchronize is not None: #and job in self.get_active(): + Log.info("{0}",job.name) tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): job.status = Status.WAITING @@ -944,8 +945,7 @@ class JobList: for c in m['cs']: Log.debug("Chunk: " + c) chunk = int(c) - for job in [i for i in self._job_list if i.date == date and i.member == member and - i.chunk == chunk]: + for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk ) ]: if not job.rerun_only or chunk != previous_chunk + 1: job.status = Status.WAITING @@ -969,20 +969,15 @@ class JobList: for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - if job.synchronize is not None: - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) != len(job.parents): - job.status = Status.WAITING - - Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) - else: - - self._remove_job(job) - else: - + if job.synchronize is None: self._remove_job(job) self.update_genealogy(notransitive=notransitive) + for job in [j for j in self._job_list if j.synchronize !=None]: + if job.status == Status.COMPLETED: + job.status = Status.WAITING + else: + self._remove_job(job) def _get_jobs_parser(self): jobs_parser = self._parser_factory.create_parser() -- GitLab From 29fa43b897e28849a4f788fb4ff602c22f21c515 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 7 Feb 2019 13:44:53 +0100 Subject: [PATCH 23/23] #150 fixed ( for now), setstatus -fc behaviour is changed to not affect members without chunk assigned --- autosubmit/autosubmit.py | 7 +++++-- autosubmit/job/job_list.py | 13 +++++++------ autosubmit/platforms/platform.py | 1 + 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5b5fc5881..70380067d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2253,11 +2253,14 @@ class Autosubmit: member = member_json['m'] jobs_member = filter(lambda j: j.member == member, jobs_date) - for job in filter(lambda j: j.chunk is None, jobs_member): - Autosubmit.change_status(final, final_status, job) + #for job in filter(lambda j: j.chunk is None, jobs_member): + # Autosubmit.change_status(final, final_status, job) for chunk_json in member_json['cs']: chunk = int(chunk_json) + for job in filter(lambda j: j.chunk == chunk and j.synchronize is not None, jobs_date): + Autosubmit.change_status(final, final_status, job) + for job in filter(lambda j: j.chunk == chunk, jobs_member): Autosubmit.change_status(final, final_status, job) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 49732f5d7..933b8de0f 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -814,10 +814,11 @@ class JobList: # if waiting jobs has all parents completed change its State to READY #RERUN FIX + for job in self.get_completed(): - Log.debug('Updating SYNC jobs') + if job.synchronize is not None: #and job in self.get_active(): - Log.info("{0}",job.name) + Log.debug('Updating SYNC jobs') tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): job.status = Status.WAITING @@ -825,16 +826,16 @@ class JobList: Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) Log.debug('Update finished') - if not fromSetStatus: - Log.debug('Updating WAITING jobs') - for job in self.get_waiting(): + Log.debug('Updating WAITING jobs') + for job in self.get_waiting(): + if not fromSetStatus: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY save = True Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) - Log.debug('Update finished') + Log.debug('Update finished') return save diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index cef7957cd..d6a7198bb 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -132,6 +132,7 @@ class Platform(object): parameters['{0}SCRATCH_DIR'.format(prefix)] = self.scratch parameters['{0}TEMP_DIR'.format(prefix)] = self.temp_dir parameters['{0}ROOTDIR'.format(prefix)] = self.root_dir + parameters['{0}LOGDIR'.format(prefix)] = self.get_files_path() def send_file(self, filename): -- GitLab