diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 951771bed1523e908eb32efedfa784544d130cde..948f21c01f32e83ff7470dde24a7a01571a1bc50 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -22,6 +22,7 @@ from sys import setrecursionlimit import shutil from autosubmit.database.db_manager import DbManager from log.log import AutosubmitCritical, Log +from contextlib import suppress class JobListPersistence(object): @@ -100,8 +101,9 @@ class JobListPersistencePkl(JobListPersistence): """ path = os.path.join(persistence_path, persistence_file + '.pkl' + '.tmp') - if os.path.exists(path): + with suppress(FileNotFoundError, PermissionError): os.remove(path) + setrecursionlimit(500000000) Log.debug("Saving JobList: " + path) with open(path, 'wb') as fd: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 675f113011ca8110b6b9c094c5dd6270564032e9..d02b551c28ae994740217bfe744ce189cfae2a2c 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -226,7 +226,7 @@ class JobPackager(object): min_h = len(package.jobs) return min_v, min_h, balanced - def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits): + def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits, any_simple_packages = False): """ Check if the packages respect the wrapper policy and act in base of it ( submit wrapper, submit sequential, wait for more jobs to form a wrapper) :param built_packages_tmp: List of packages to be submitted @@ -237,10 +237,10 @@ class JobPackager(object): :rtype: List of packages to be submitted, int :return: packages_to_submit, max_jobs_to_submit """ + not_wrappeable_package_info = list() for p in built_packages_tmp: if max_jobs_to_submit == 0: break - infinite_deadlock = False # This will raise an autosubmit critical if true, infinite deadlock is when there are no more non-wrapped jobs in waiting or ready status failed_innerjobs = False # Check if the user is using the option to run first some jobs. if so, remove non-first jobs from the package and submit them sequentially following a flexible policy if len(self._jobs_list.jobs_to_run_first) > 0: @@ -269,129 +269,57 @@ class JobPackager(object): job.packed = True packages_to_submit.append(p) max_jobs_to_submit = max_jobs_to_submit - 1 - else: # Check if there is a deadlock or an infinite deadlock. Once checked, act in base of the wrapper policy. - deadlock = True - if deadlock: # Remaining jobs if chunk is the last one + else: + not_wrappeable_package_info.append([p, min_v, min_h, balanced]) + # It is a deadlock when: + # 1. There are no more non-wrapped jobs in ready status + # 2. And there are no more jobs in the queue ( submitted, queuing, running, held ) + # 3. And all current packages are not wrappable. + if not any_simple_packages and len(self._jobs_list.get_in_queue()) == 0 and len(not_wrappeable_package_info) == len(built_packages_tmp): + for p, min_v, min_h, balanced in not_wrappeable_package_info: + if self.wrapper_policy[self.current_wrapper_section] == "strict": for job in p.jobs: - if (job.running == "chunk" and job.chunk == int( - job.parameters["EXPERIMENT.NUMCHUNKS"])) and balanced: - deadlock = False - break - if not deadlock: # Submit package if deadlock has been liberated + job.packed = False + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced), 7014) + elif self.wrapper_policy[self.current_wrapper_section] == "mixed": + error = True for job in p.jobs: - job.packed = True - packages_to_submit.append(p) - max_jobs_to_submit = max_jobs_to_submit - 1 - else: - wallclock_sum = p.jobs[0].wallclock - for seq in range(1, min_v): - wallclock_sum = sum_str_hours(wallclock_sum, p.jobs[0].wallclock) - next_wrappable_jobs = self._jobs_list.get_jobs_by_section(self.jobs_in_wrapper[self.current_wrapper_section]) - next_wrappable_jobs = [job for job in next_wrappable_jobs if - job.status == Status.WAITING and job not in p.jobs] # Get only waiting jobs - active_jobs = list() - aux_active_jobs = list() - for job in next_wrappable_jobs: # Prone tree by looking only the closest children - direct_children = False - for related in job.parents: - if related in p.jobs: - direct_children = True - break - if direct_children: # Get parent of direct children that aren't in wrapper - aux_active_jobs += [aux_parent for aux_parent in job.parents if ( - aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED) and ( - aux_parent.section not in self.jobs_in_wrapper[ - self.current_wrapper_section] or ( - aux_parent.section in self.jobs_in_wrapper[ - self.current_wrapper_section] and aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED and aux_parent.status != Status.WAITING and aux_parent.status != Status.READY))] - aux_active_jobs = list(set(aux_active_jobs)) - track = [] # Tracker to prone tree for avoid the checking of the same parent from different nodes. - active_jobs_names = [job.name for job in - p.jobs] # We want to search if the actual wrapped jobs needs to run for add more jobs to this wrapper - hard_deadlock = False - for job in aux_active_jobs: - parents_to_check = [] - if job.status == Status.WAITING: # We only want to check uncompleted parents - aux_job = job - for parent in aux_job.parents: # First case - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if (parent.status == Status.WAITING) and parent.name != aux_job.name: - parents_to_check.append(parent) - track.extend(parents_to_check) - while len( - parents_to_check) > 0 and not infinite_deadlock: # We want to look deeper on the tree until all jobs are completed, or we find an unresolvable deadlock. - aux_job = parents_to_check.pop(0) - for parent in aux_job.parents: - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if ( - parent.status == Status.WAITING) and parent.name != aux_job.name and parent not in track: - parents_to_check.append(parent) - track.extend(parents_to_check) - if not infinite_deadlock: - active_jobs.append(job) # List of jobs that can continue to run without run this wrapper - - # Act in base of active_jobs and Policies - if self.wrapper_policy[self.current_wrapper_section] == "strict": - for job in p.jobs: + if max_jobs_to_submit == 0: + break + if job.fail_count > 0 and job.status == Status.READY: job.packed = False - if len(active_jobs) > 0: - Log.printlog(f'Wrapper policy is set to STRICT and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} <= defined_min:{wrapper_limits["min"]}] [wrappeable_h:{min_h} <= defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} <= defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.\nIf all values are <=, some innerjob has failed under strict policy', 6013) - else: - if len(self._jobs_list.get_in_queue()) == 0: - raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) - elif self.wrapper_policy[self.current_wrapper_section] == "mixed": - error = True - show_log = True - for job in p.jobs: - if max_jobs_to_submit == 0: - break - if job.fail_count > 0 and job.status == Status.READY: - job.packed = False - Log.printlog( - "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") - error = False - show_log = False - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 - if error: - if len(active_jobs) > 0: - if show_log: - Log.printlog(f'Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} < defined_min:{wrapper_limits["min"]}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.', 6013) + Log.printlog( + "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") + error = False + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) else: - if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution - raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) - else: - Log.info( - "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") - for job in p.jobs: - if max_jobs_to_submit == 0: - break - job.packed = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 + if error: + if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced), 7014) + else: + Log.info( + "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") + for job in p.jobs: + if max_jobs_to_submit == 0: + break + job.packed = False + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 return packages_to_submit, max_jobs_to_submit - def error_message_policy(self,min_h,min_v,wrapper_limits,hard_deadlock,wallclock_sum,balanced): - message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{wrapper_limits['min']}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " - if hard_deadlock: - message += "\nCheck your configuration: The next wrappable job can't be wrapped until some of inner jobs of current packages finishes which is impossible" + def error_message_policy(self,min_h,min_v,wrapper_limits,wallclock_sum,balanced): + message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{min_h*min_v}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " if min_v > 1: message += f"\nCheck your configuration: Check if current {wallclock_sum} vertical wallclock has reached the max defined on platforms.conf." else: @@ -510,6 +438,7 @@ class JobPackager(object): job.packed = False jobs_to_wrap = self._divide_list_by_section(jobs_to_submit) non_wrapped_jobs = jobs_to_wrap.pop("SIMPLE",[]) + any_simple_packages = len(non_wrapped_jobs) > 0 # Prepare packages for wrapped jobs for wrapper_name, jobs in jobs_to_wrap.items(): if max_jobs_to_submit == 0: @@ -536,15 +465,13 @@ class JobPackager(object): if self.wrapper_type[self.current_wrapper_section] == 'vertical': built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - if len(jobs) >= wrapper_limits["min_h"]: - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - if len(jobs) >= wrapper_limits["min_h"]: - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) else: built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) - packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits) + packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) # Now, prepare the packages for non-wrapper jobs for job in non_wrapped_jobs: if max_jobs_to_submit == 0: diff --git a/test/unit/test_job.py b/test/unit/test_job.py index f4887886c1df42b8aa57c802fa646067eec5ca8c..e01d84b87beb2ebc24252c88d02fba33f4ec2b42 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -608,6 +608,7 @@ CONFIG: run_only_members=[], #config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() @@ -827,6 +828,7 @@ CONFIG: new=True, run_only_members=config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() @@ -971,6 +973,7 @@ CONFIG: new=True, run_only_members=config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() self.assertEqual(1, len(job_list)) diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index d5ce5b0308152b60c1945a34df0cd670bf756cb7..d023225037a9b3b5a0280badedbcae4e5e96f77a 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -248,6 +248,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) @@ -317,6 +318,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list._job_list[0].member = "fake-member1" job_list._job_list[1].member = "fake-member2" @@ -363,6 +365,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list._job_list[0].section = "fake-section" job_list._job_list[0].date = "fake-date1" @@ -446,6 +449,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list.save() job_list2 = self.new_job_list(factory,temp_dir) @@ -461,6 +465,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=False, + create=True, ) #return False job_list2.update_from_file = Mock() @@ -526,6 +531,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=False, + create=True, ) # assert update_genealogy called with right values # When using an 4.0 experiment, the pkl has to be recreated and act as a new one.