diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index e3c83c2d5d36612b7564117f261d25048ba3ae88..d1dae9b1c0d6b5850768524b82919088b5e1041e 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1081,6 +1081,14 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + def get_wrapper_policy(self): + """ + Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config + + :return: wrapper type (or none) + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'POLICY', 'flexible').lower() def get_wrapper_jobs(self): """ diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index a412f1e74612ebd94e89b1db73806439bb600eaf..232b669f1c7128c30c7b5d3cd63e5e3ff9c35f69 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -340,7 +340,6 @@ class DicJobs: job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') job.retrials = int(self.get_option(section, 'RETRIALS', -1)) - if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 9f8c089628016e9ece43a58986d0d322add3e5f6..d3b926763f63d316175cf3a31ec98b3a6e0c40c1 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -25,7 +25,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo from operator import attrgetter from math import ceil import operator - +from time import sleep class JobPackager(object): """ Main class that manages Job wrapping. @@ -66,6 +66,7 @@ class JobPackager(object): self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + self.wrapper_policy = self._as_config.get_wrapper_policy() self.wrapper_method = self._as_config.get_wrapper_method().lower() # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -175,12 +176,23 @@ class JobPackager(object): num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len( jobs_ready), self._max_jobs_to_submit) # Take the first num_jobs_to_submit from the list of available - jobs_to_submit = list_of_available[0:num_jobs_to_submit] + jobs_to_submit_tmp = list_of_available[0:num_jobs_to_submit] # print(len(jobs_to_submit)) + jobs_to_submit = [fresh_job for fresh_job in jobs_to_submit_tmp if fresh_job.fail_count == 0] + jobs_to_submit_seq = [failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] jobs_to_submit_by_section = self._divide_list_by_section( jobs_to_submit) + for job in jobs_to_submit_seq: #Failed jobs at least one time + job.packed = False + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + for section in jobs_to_submit_by_section: + wrapped = False # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] if self._platform.allow_wrappers and self.wrapper_type in ['horizontal', 'vertical', 'vertical-mixed', @@ -205,78 +217,84 @@ class JobPackager(object): k_divided = k.split("-") if k_divided[0] not in self.jobs_in_wrapper: number = int(k_divided[1].strip(" ")) - if number < hard_limit_wrapper: + if number < max_wrapped_jobs: hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) - - built_packages = [] + packages_to_submit = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: + wrapped = True built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) - for p in built_packages_tmp: - for job in p.jobs: - job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= min_wrapped_jobs: - built_packages.append(p) - elif self._jobs_list._chunk_list.index(p.jobs[0].chunk)+1 >= len(self._jobs_list._chunk_list) - ( - len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) - else: - if job.repacked > 10: # too many tries, imposible to wrap this job - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. - job.repacked = job.repacked + 1 - for job in p.jobs: - job.packed = False elif self.wrapper_type == 'horizontal': + wrapped = True built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - for p in built_packages_tmp: - for job in p.jobs: - job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()): - built_packages.append(p) - elif self._jobs_list._member_list.index(p.jobs[0].member)+1 >= len( - self._jobs_list._member_list) - (len(self._jobs_list._member_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. - for job in p.jobs: - job.packed = False + elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp = [] + wrapped = True + built_packages_tmp = list() built_packages_tmp.append(self._build_hybrid_package( jobs_to_submit_by_section[section], max_wrapped_jobs, section)) - for p in built_packages_tmp: + if wrapped: + for p in built_packages_tmp: + for job in p.jobs: + job.packed = True + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + packages_to_submit.append(p) + else: + deadlock = True for job in p.jobs: - job.packed = True - # if the quantity is not enough, don't make the wrapper - if len(p.jobs) >= min_wrapped_jobs: - built_packages.append(p) - elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( - len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. + independent_inner_job = True + for parent in job.parents: + if parent in p.jobs and parent.name != job.name: # This job depends on others inner jobs? T/F + independent_inner_job = False + break + tmp = [parent for parent in job.parents if + independent_inner_job and parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + deadlock = False + if deadlock and self.wrapper_policy == "strict": + Log.debug("Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") + for job in p.jobs: + job.packed = False + elif deadlock and self.wrapper_policy != "strict": + Log.debug("Wrapper policy is set to flexible and there is a deadlock, As will submit the jobs sequentally") + for job in p.jobs: + job.packed = False + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + elif not deadlock: # last case + last_inner_job = False + for job in p.jobs: + all_children_out_wrapper = True + for child in job.children: # Check if job is considered child + if child in p.jobs and child.name != job.name: + all_children_out_wrapper = False + if all_children_out_wrapper: + last_inner_job = True + break + if last_inner_job: # Last case + packages_to_submit.append(p) + else: for job in p.jobs: job.packed = False - built_packages = built_packages_tmp - else: - built_packages = built_packages_tmp - self.max_jobs = self.max_jobs - 1 - packages_to_submit += built_packages else: - # No wrapper allowed / well-configured for job in jobs_to_submit_by_section[section]: if job.type == Type.PYTHON and not self._platform.allow_python_jobs: package = JobPackageSimpleWrapped([job]) else: package = JobPackageSimple([job]) - self.max_jobs = self.max_jobs - 1 packages_to_submit.append(package) + for package in packages_to_submit: + self.max_jobs = self.max_jobs - 1 package.hold = self.hold + return packages_to_submit def _divide_list_by_section(self, jobs_list): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 96b3f8c472a3a57647ff62755bf03914cdef7daf..58f008eb546c26681c0dec3ef5322dccc083457c 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -140,8 +140,8 @@ class ParamikoPlatform(Platform): log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) multiple_delete_previous_run = os.path.join(log_dir,"multiple_delete_previous_run.sh") - if os.path.exists(multiple_delete_previous_run): - open(multiple_delete_previous_run, 'w+').write("rm -f "+filenames) + if os.path.exists(log_dir): + open(multiple_delete_previous_run, 'w+').write("rm -f"+filenames) os.chmod(multiple_delete_previous_run, 0o770) self.send_file(multiple_delete_previous_run, False) command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh") diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 9e659ba66d962e85a9eb12967adeb601b2e19070..6020afe3c2d34c5897e4034bcad496d8b1119989 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -442,7 +442,38 @@ class PythonVerticalHorizontalWrapperBuilder(PythonWrapperBuilder): class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): + def build_parallel_threads_launcher_horizontal_vertical(self, jobs_list, thread, footer=True): + parallel_threads_launcher = textwrap.dedent(""" + pid_list = [] + + for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + + {2} + current = {1}({0}[i], i+self.id_run) + pid_list.append(current) + current.start() + + # Waiting until all scripts finish + for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() + """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) + if footer: + parallel_threads_launcher += self._indent(textwrap.dedent(""" + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + print datetime.now(), "The job ", pid.template," has FAILED" + {1} + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) + return parallel_threads_launcher def build_joblist_thread(self): return textwrap.dedent(""" class JobListThread(Thread): @@ -456,13 +487,13 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): all_cores = self.all_cores {0} """).format( - self._indent(self.build_parallel_threads_launcher("self.jobs_list", "JobThread"), 8), '\n'.ljust(13)) + self._indent(self.build_parallel_threads_launcher_horizontal_vertical("self.jobs_list", "JobThread"), 8), '\n'.ljust(13)) def build_main(self): nodes_list = self.build_nodes_list() self.exit_thread = "os._exit(1)" joblist_thread = self.build_joblist_thread() - threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)-1), " + threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts[i])), " "copy.deepcopy(all_cores))", footer=False) return joblist_thread + nodes_list + threads_launcher diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 5a991576fed0f11efe1c08b13d43d540ad64a9ec..ebb9c196084d50d92716b5ab059872009e48385e 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -164,6 +164,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_jobs = Mock(return_value='None') self.config.get_wrapper_method = Mock(return_value='ASThread') self.config.get_wrapper_queue = Mock(return_value='debug') + self.config.get_wrapper_policy = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self.platform, self.job_list)