From 66a802ec13ab963df643816f6a16cf6391e12c5b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 11:27:37 +0200 Subject: [PATCH 01/15] Vertical - min_wrapped changed. --- autosubmit/config/config_common.py | 8 +++++++ autosubmit/job/job_packager.py | 35 ++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index e3c83c2d5..6bd0dd6b8 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1081,6 +1081,14 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + def get_wrapper_type(self): + """ + Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config + + :return: wrapper type (or none) + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'POLICY', 'flexible').lower() def get_wrapper_jobs(self): """ diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 9f8c08962..26387beea 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -66,6 +66,7 @@ class JobPackager(object): self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + self.wrapper_policy = self._as_config.get_wrapper_policy() self.wrapper_method = self._as_config.get_wrapper_method().lower() # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -220,16 +221,32 @@ class JobPackager(object): # if the quantity is not enough, don't make the wrapper if len(p.jobs) >= min_wrapped_jobs: built_packages.append(p) - elif self._jobs_list._chunk_list.index(p.jobs[0].chunk)+1 >= len(self._jobs_list._chunk_list) - ( - len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) else: - if job.repacked > 10: # too many tries, imposible to wrap this job - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. - job.repacked = job.repacked + 1 + deadlock = True + for job in p.jobs: + independent_innerjob = True + for parent in job.parents: + if parent in p.jobs and parent.name != job.name: # This job depends on others inner jobs? T/F + independent_innerjob = False + break + tmp = [parent for parent in job.parents if independent_innerjob and parent.status == Status.COMPLETED ] + if len(tmp) != len(job.parents): + deadlock = False + if deadlock and self.wrapper_policy == "strict": + Log.critical("Wrapper policy is set to strict and there is a deadlock, exiting AS") + exit(-1) + elif deadlock and self.wrapper_policy != "strict": + Log.warning("Wrapper policy is set to flexible and there is a deadlock, As will submit the wrapper with less Quantity of jobs") + elif not deadlock:# Treat Max-wrapped as well and last case + last_inner_job = True for job in p.jobs: - job.packed = False + for child in job.children: #Check if job is considered child + if child in p.jobs and child.name != job.name: + last_inner_job = False + if last_inner_job: + built_packages.append(p) + for job in p.jobs: + job.packed = False elif self.wrapper_type == 'horizontal': built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) @@ -246,7 +263,7 @@ class JobPackager(object): for job in p.jobs: job.packed = False elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp = [] + built_packages_tmp = list() built_packages_tmp.append(self._build_hybrid_package( jobs_to_submit_by_section[section], max_wrapped_jobs, section)) for p in built_packages_tmp: -- GitLab From f6ed3f2c8c7adc2d73612f111d967eda2a546329 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 11:34:18 +0200 Subject: [PATCH 02/15] Vertical - min_wrapped changed. --- autosubmit/job/job_packager.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 26387beea..ed38f33d3 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -218,35 +218,44 @@ class JobPackager(object): for p in built_packages_tmp: for job in p.jobs: job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= min_wrapped_jobs: + if len(p.jobs) >= min_wrapped_jobs:# if the quantity is not enough, don't make the wrapper built_packages.append(p) else: deadlock = True for job in p.jobs: - independent_innerjob = True + independent_inner_job = True for parent in job.parents: if parent in p.jobs and parent.name != job.name: # This job depends on others inner jobs? T/F - independent_innerjob = False + independent_inner_job = False break tmp = [parent for parent in job.parents if independent_innerjob and parent.status == Status.COMPLETED ] if len(tmp) != len(job.parents): deadlock = False if deadlock and self.wrapper_policy == "strict": Log.critical("Wrapper policy is set to strict and there is a deadlock, exiting AS") + for job in p.jobs: + job.packed = False exit(-1) elif deadlock and self.wrapper_policy != "strict": Log.warning("Wrapper policy is set to flexible and there is a deadlock, As will submit the wrapper with less Quantity of jobs") + for job in p.jobs: + job.packed = False elif not deadlock:# Treat Max-wrapped as well and last case - last_inner_job = True + last_inner_job = False for job in p.jobs: + all_children_out_wrapper = True for child in job.children: #Check if job is considered child if child in p.jobs and child.name != job.name: - last_inner_job = False + all_children_out_wrapper = False + if all_children_out_wrapper: + last_inner_job = True + break + if last_inner_job: built_packages.append(p) - for job in p.jobs: - job.packed = False + else: + for job in p.jobs: + job.packed = False elif self.wrapper_type == 'horizontal': built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) -- GitLab From 294506c1ebf1b0152673c3503833a870da9aa576 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 11:47:28 +0200 Subject: [PATCH 03/15] Built Packages changes --- autosubmit/job/job_packager.py | 120 ++++++++++++++------------------- 1 file changed, 49 insertions(+), 71 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index ed38f33d3..7b28f4df6 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -210,99 +210,77 @@ class JobPackager(object): hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) - - built_packages = [] + wrapped = False + packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: + wrapped = True built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) - for p in built_packages_tmp: - for job in p.jobs: - job.packed = True - if len(p.jobs) >= min_wrapped_jobs:# if the quantity is not enough, don't make the wrapper - built_packages.append(p) - else: - deadlock = True - for job in p.jobs: - independent_inner_job = True - for parent in job.parents: - if parent in p.jobs and parent.name != job.name: # This job depends on others inner jobs? T/F - independent_inner_job = False - break - tmp = [parent for parent in job.parents if independent_innerjob and parent.status == Status.COMPLETED ] - if len(tmp) != len(job.parents): - deadlock = False - if deadlock and self.wrapper_policy == "strict": - Log.critical("Wrapper policy is set to strict and there is a deadlock, exiting AS") - for job in p.jobs: - job.packed = False - exit(-1) - elif deadlock and self.wrapper_policy != "strict": - Log.warning("Wrapper policy is set to flexible and there is a deadlock, As will submit the wrapper with less Quantity of jobs") - for job in p.jobs: - job.packed = False - elif not deadlock:# Treat Max-wrapped as well and last case - last_inner_job = False - for job in p.jobs: - all_children_out_wrapper = True - for child in job.children: #Check if job is considered child - if child in p.jobs and child.name != job.name: - all_children_out_wrapper = False - if all_children_out_wrapper: - last_inner_job = True - break - - if last_inner_job: - built_packages.append(p) - else: - for job in p.jobs: - job.packed = False elif self.wrapper_type == 'horizontal': + wrapped = True built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - for p in built_packages_tmp: - for job in p.jobs: - job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()): - built_packages.append(p) - elif self._jobs_list._member_list.index(p.jobs[0].member)+1 >= len( - self._jobs_list._member_list) - (len(self._jobs_list._member_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. - for job in p.jobs: - job.packed = False + elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: + wrapped = True built_packages_tmp = list() built_packages_tmp.append(self._build_hybrid_package( jobs_to_submit_by_section[section], max_wrapped_jobs, section)) - for p in built_packages_tmp: + if wrapped: + for p in built_packages_tmp: + for job in p.jobs: + job.packed = True + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + packages_to_submit.append(p) + else: + deadlock = True for job in p.jobs: - job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= min_wrapped_jobs: - built_packages.append(p) - elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( - len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. + independent_inner_job = True + for parent in job.parents: + if parent in p.jobs and parent.name != job.name: # This job depends on others inner jobs? T/F + independent_inner_job = False + break + tmp = [parent for parent in job.parents if + independent_inner_job and parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + deadlock = False + if deadlock and self.wrapper_policy == "strict": + Log.critical("Wrapper policy is set to strict and there is a deadlock, exiting AS") + for job in p.jobs: + job.packed = False + exit(-1) + elif deadlock and self.wrapper_policy != "strict": + Log.warning( + "Wrapper policy is set to flexible and there is a deadlock, As will submit the wrapper with less Quantity of jobs") + for job in p.jobs: + job.packed = False + elif not deadlock: # Treat Max-wrapped as well and last case + last_inner_job = False + for job in p.jobs: + all_children_out_wrapper = True + for child in job.children: # Check if job is considered child + if child in p.jobs and child.name != job.name: + all_children_out_wrapper = False + if all_children_out_wrapper: + last_inner_job = True + break + if last_inner_job: # Last case + packages_to_submit.append(p) + else: for job in p.jobs: job.packed = False - built_packages = built_packages_tmp - else: - built_packages = built_packages_tmp - self.max_jobs = self.max_jobs - 1 - packages_to_submit += built_packages else: - # No wrapper allowed / well-configured for job in jobs_to_submit_by_section[section]: if job.type == Type.PYTHON and not self._platform.allow_python_jobs: package = JobPackageSimpleWrapped([job]) else: package = JobPackageSimple([job]) - self.max_jobs = self.max_jobs - 1 packages_to_submit.append(package) + for package in packages_to_submit: + self.max_jobs = self.max_jobs - 1 package.hold = self.hold + return packages_to_submit def _divide_list_by_section(self, jobs_list): -- GitLab From 53086b031dd04054075f51047a87b95344570873 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 11:50:19 +0200 Subject: [PATCH 04/15] wrong function name --- autosubmit/config/config_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 6bd0dd6b8..d1dae9b1c 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1081,7 +1081,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() - def get_wrapper_type(self): + def get_wrapper_policy(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config -- GitLab From 33eadf7acba2b7f676b2f6ed5aee207335284e7c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 11:51:19 +0200 Subject: [PATCH 05/15] wrong VARIABLE name --- autosubmit/job/job_packager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 7b28f4df6..311b9b4b6 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -181,7 +181,9 @@ class JobPackager(object): jobs_to_submit_by_section = self._divide_list_by_section( jobs_to_submit) + for section in jobs_to_submit_by_section: + wrapped = False # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] if self._platform.allow_wrappers and self.wrapper_type in ['horizontal', 'vertical', 'vertical-mixed', @@ -210,7 +212,6 @@ class JobPackager(object): hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) - wrapped = False packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: wrapped = True -- GitLab From a1532f5383fa06b9ecb5533d7ecf067519c80202 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 14:21:53 +0200 Subject: [PATCH 06/15] Failed jobs atleast one time --- autosubmit/job/job_dict.py | 2 +- autosubmit/job/job_packager.py | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index a412f1e74..fda0b06d6 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -340,7 +340,7 @@ class DicJobs: job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') job.retrials = int(self.get_option(section, 'RETRIALS', -1)) - + job.max_retrials = int(self.get_option(section, 'RETRIALS', -1)) if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 311b9b4b6..32385ca0b 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -176,11 +176,19 @@ class JobPackager(object): num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len( jobs_ready), self._max_jobs_to_submit) # Take the first num_jobs_to_submit from the list of available - jobs_to_submit = list_of_available[0:num_jobs_to_submit] + jobs_to_submit_tmp = list_of_available[0:num_jobs_to_submit] # print(len(jobs_to_submit)) + jobs_to_submit = [fresh_job for fresh_job in jobs_to_submit_tmp if fresh_job.fail_count == 0] + jobs_to_submit_seq = [failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] jobs_to_submit_by_section = self._divide_list_by_section( jobs_to_submit) + for job in jobs_to_submit_seq: #Failed jobs at least one time + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) for section in jobs_to_submit_by_section: wrapped = False -- GitLab From d98bd645904d4f8cf80112169308d296caabb8b8 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 14:54:27 +0200 Subject: [PATCH 07/15] Restructured added failed jobs retrial --- autosubmit/job/job_packager.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 32385ca0b..4141f7f66 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -25,6 +25,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo from operator import attrgetter from math import ceil import operator +from os import sleep class JobPackager(object): """ @@ -184,6 +185,7 @@ class JobPackager(object): jobs_to_submit) for job in jobs_to_submit_seq: #Failed jobs at least one time + job.packed = False if job.type == Type.PYTHON and not self._platform.allow_python_jobs: package = JobPackageSimpleWrapped([job]) else: @@ -216,7 +218,7 @@ class JobPackager(object): k_divided = k.split("-") if k_divided[0] not in self.jobs_in_wrapper: number = int(k_divided[1].strip(" ")) - if number < hard_limit_wrapper: + if number < max_wrapped_jobs: hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) @@ -254,16 +256,21 @@ class JobPackager(object): if len(tmp) != len(job.parents): deadlock = False if deadlock and self.wrapper_policy == "strict": - Log.critical("Wrapper policy is set to strict and there is a deadlock, exiting AS") + Log.debug("Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") + sleep(30) for job in p.jobs: job.packed = False - exit(-1) elif deadlock and self.wrapper_policy != "strict": - Log.warning( - "Wrapper policy is set to flexible and there is a deadlock, As will submit the wrapper with less Quantity of jobs") + Log.debug("Wrapper policy is set to flexible and there is a deadlock, As will submit the jobs sequentally") for job in p.jobs: job.packed = False - elif not deadlock: # Treat Max-wrapped as well and last case + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + elif not deadlock: # last case last_inner_job = False for job in p.jobs: all_children_out_wrapper = True -- GitLab From 868246ec5f8b71d29ca8a4111d2afbf0a027f68d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 14:55:33 +0200 Subject: [PATCH 08/15] Restructured added failed jobs retrial --- autosubmit/job/job_packager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 4141f7f66..45c4ff109 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -25,8 +25,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo from operator import attrgetter from math import ceil import operator -from os import sleep - +from time import os class JobPackager(object): """ Main class that manages Job wrapping. -- GitLab From a5beacfcd2dab24efd3de6eee449d49af2ea0016 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 30 Jun 2020 14:56:03 +0200 Subject: [PATCH 09/15] Restructured added failed jobs retrial --- autosubmit/job/job_packager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 45c4ff109..88ed1d1d5 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -25,7 +25,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo from operator import attrgetter from math import ceil import operator -from time import os +from time import sleep class JobPackager(object): """ Main class that manages Job wrapping. -- GitLab From ec01ff4fbba7f0388b58b37e52c60953e948df9a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Jul 2020 16:11:24 +0200 Subject: [PATCH 10/15] fixed completed/stat not being removed --- autosubmit/job/job_packager.py | 1 - autosubmit/platforms/paramiko_platform.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 88ed1d1d5..d3b926763 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -256,7 +256,6 @@ class JobPackager(object): deadlock = False if deadlock and self.wrapper_policy == "strict": Log.debug("Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") - sleep(30) for job in p.jobs: job.packed = False elif deadlock and self.wrapper_policy != "strict": diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 96b3f8c47..58f008eb5 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -140,8 +140,8 @@ class ParamikoPlatform(Platform): log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) multiple_delete_previous_run = os.path.join(log_dir,"multiple_delete_previous_run.sh") - if os.path.exists(multiple_delete_previous_run): - open(multiple_delete_previous_run, 'w+').write("rm -f "+filenames) + if os.path.exists(log_dir): + open(multiple_delete_previous_run, 'w+').write("rm -f"+filenames) os.chmod(multiple_delete_previous_run, 0o770) self.send_file(multiple_delete_previous_run, False) command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh") -- GitLab From 1020994739965b5e136b5093fba3a34857eab58a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Jul 2020 17:17:51 +0200 Subject: [PATCH 11/15] Logs fix for horizontal_vertical --- .../platforms/wrappers/wrapper_builder.py | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 9e659ba66..781f434ec 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -442,7 +442,38 @@ class PythonVerticalHorizontalWrapperBuilder(PythonWrapperBuilder): class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): + def build_parallel_threads_launcher_horizontal_vertical(self, jobs_list, thread, footer=True): + parallel_threads_launcher = textwrap.dedent(""" + pid_list = [] + + for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + {2} + current = {1}({0}[i], i*len({0})) + pid_list.append(current) + current.start() + + # Waiting until all scripts finish + for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() + """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) + if footer: + parallel_threads_launcher += self._indent(textwrap.dedent(""" + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + print datetime.now(), "The job ", pid.template," has FAILED" + {1} + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) + + return parallel_threads_launcher def build_joblist_thread(self): return textwrap.dedent(""" class JobListThread(Thread): @@ -456,7 +487,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): all_cores = self.all_cores {0} """).format( - self._indent(self.build_parallel_threads_launcher("self.jobs_list", "JobThread"), 8), '\n'.ljust(13)) + self._indent(self.build_parallel_threads_launcher_horizontal_vertical("self.jobs_list", "JobThread"), 8), '\n'.ljust(13)) def build_main(self): nodes_list = self.build_nodes_list() -- GitLab From e1d17c52e1baa3a597528f7a31d6603467a644e3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Jul 2020 17:25:24 +0200 Subject: [PATCH 12/15] Fix for pipeline tests --- test/unit/test_wrappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 5a991576f..ebb9c1960 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -164,6 +164,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_jobs = Mock(return_value='None') self.config.get_wrapper_method = Mock(return_value='ASThread') self.config.get_wrapper_queue = Mock(return_value='debug') + self.config.get_wrapper_policy = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self.platform, self.job_list) -- GitLab From 6bc86d88c910ca017a07452701b8b5ef2232b09d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Jul 2020 17:28:45 +0200 Subject: [PATCH 13/15] Fix for pipeline tests --- autosubmit/job/job_dict.py | 1 - 1 file changed, 1 deletion(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index fda0b06d6..232b669f1 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -340,7 +340,6 @@ class DicJobs: job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') job.retrials = int(self.get_option(section, 'RETRIALS', -1)) - job.max_retrials = int(self.get_option(section, 'RETRIALS', -1)) if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] -- GitLab From 6091888fd16bd77002e4e1f85503fff798340464 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Jul 2020 17:49:56 +0200 Subject: [PATCH 14/15] Fix for logs tests --- autosubmit/platforms/wrappers/wrapper_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 781f434ec..1cb7ec7c8 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -453,7 +453,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): section = jobname.split('_')[-1] {2} - current = {1}({0}[i], i*len({0})) + current = {1}({0}[i], i+self.id_run) pid_list.append(current) current.start() @@ -493,7 +493,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): nodes_list = self.build_nodes_list() self.exit_thread = "os._exit(1)" joblist_thread = self.build_joblist_thread() - threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)-1), " + threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)), " "copy.deepcopy(all_cores))", footer=False) return joblist_thread + nodes_list + threads_launcher -- GitLab From 66913cd9da8326d9a74b67387402514b4a2b7496 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 2 Jul 2020 10:02:17 +0200 Subject: [PATCH 15/15] Fix for logs now working --- autosubmit/platforms/wrappers/wrapper_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 1cb7ec7c8..6020afe3c 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -493,7 +493,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): nodes_list = self.build_nodes_list() self.exit_thread = "os._exit(1)" joblist_thread = self.build_joblist_thread() - threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)), " + threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts[i])), " "copy.deepcopy(all_cores))", footer=False) return joblist_thread + nodes_list + threads_launcher -- GitLab