diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d1aa0dba0839b3d9065545f158d11843d49b68d8..c97c0b17654758e61c87343155c678c0b0288987 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -27,6 +27,7 @@ from math import ceil import operator from collections import defaultdict + class JobPackager(object): """ Main class that manages Job wrapping. @@ -46,6 +47,8 @@ class JobPackager(object): self.hold = hold # Submitted + Queuing Jobs for specific Platform queuing_jobs = jobs_list.get_queuing(platform) + # We now consider the running jobs count + running_jobs_count = len(jobs_list.get_running(platform)) queued_by_id = dict() for queued_job in queuing_jobs: queued_by_id[queued_job.id] = queued_job @@ -63,6 +66,9 @@ class JobPackager(object): # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf # .total_jobs Maximum number of jobs at the same time self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len + # Substracting running jobs + self._max_jobs_to_submit = self._max_jobs_to_submit - running_jobs_count + self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf @@ -81,13 +87,13 @@ class JobPackager(object): jobs_list.get_prepared(platform))) else: Log.debug("Number of jobs ready: {0}", len( - jobs_list.get_ready(platform, hold=False))) + jobs_list.get_ready(platform, hold=False))) if len(jobs_list.get_ready(platform)) > 0: Log.debug("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 - def compute_weight(self,job_list): + def compute_weight(self, job_list): job = self jobs_by_section = dict() held_jobs = self._jobs_list.get_held_jobs() @@ -103,25 +109,25 @@ class JobPackager(object): for section in jobs_by_section: if section in jobs_held_by_section.keys(): - weight=len(jobs_held_by_section[section])+1 + weight = len(jobs_held_by_section[section]) + 1 else: weight = 1 - highest_completed=[] + highest_completed = [] for job in sorted(jobs_by_section[section], key=operator.attrgetter('chunk')): - weight=weight+1 + weight = weight + 1 job.distance_weight = weight completed_jobs = 9999 if job.has_parents() > 1: - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) > completed_jobs: - completed_jobs=len(tmp) + completed_jobs = len(tmp) highest_completed = [job] else: highest_completed.append(job) for job in highest_completed: - job.distance_weight = job.distance_weight-1 - + job.distance_weight = job.distance_weight - 1 def build_packages(self): """ @@ -147,7 +153,8 @@ class JobPackager(object): if self.hold and len(jobs_ready) > 0: self.compute_weight(jobs_ready) - sorted_jobs = sorted(jobs_ready, key=operator.attrgetter('distance_weight')) + sorted_jobs = sorted( + jobs_ready, key=operator.attrgetter('distance_weight')) jobs_in_held_status = self._jobs_list.get_held_jobs( ) + self._jobs_list.get_submitted(self._platform, hold=self.hold) held_by_id = dict() @@ -186,12 +193,15 @@ class JobPackager(object): jobs_ready), self._max_jobs_to_submit) # Take the first num_jobs_to_submit from the list of available jobs_to_submit_tmp = list_of_available[0:num_jobs_to_submit] - jobs_to_submit = [fresh_job for fresh_job in jobs_to_submit_tmp if fresh_job.fail_count == 0] - jobs_to_submit_seq = [failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] + jobs_to_submit = [ + fresh_job for fresh_job in jobs_to_submit_tmp if fresh_job.fail_count == 0] + + jobs_to_submit_seq = [ + failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] jobs_to_submit_by_section = self._divide_list_by_section( jobs_to_submit) - for job in jobs_to_submit_seq: #Failed jobs at least one time + for job in jobs_to_submit_seq: # Failed jobs at least one time job.packed = False if job.type == Type.PYTHON and not self._platform.allow_python_jobs: package = JobPackageSimpleWrapped([job]) @@ -254,7 +264,8 @@ class JobPackager(object): elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: wrapped = True built_packages_tmp = list() - built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section)) + built_packages_tmp.append(self._build_hybrid_package( + jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section)) if wrapped: for p in built_packages_tmp: #if len(self._jobs_list.jobs_to_run_first) > 0: # related to TWO_STEP_START new variable , defined in expdef @@ -266,7 +277,7 @@ class JobPackager(object): # packed_job.packed = False # p.jobs = temp_jobs failed_innerjobs = False - #Check failed jobs first + # Check failed jobs first for job in p.jobs: if job.fail_count > 0: failed_innerjobs = True @@ -275,7 +286,8 @@ class JobPackager(object): for job in p.jobs: if job.fail_count == 0: continue - Log.debug("Wrapper policy is set to mixed and there are failed jobs") + Log.debug( + "Wrapper policy is set to mixed and there are failed jobs") job.packed = False if job.status == Status.READY: if job.type == Type.PYTHON and not self._platform.allow_python_jobs: @@ -284,7 +296,8 @@ class JobPackager(object): package = JobPackageSimple([job]) packages_to_submit.append(package) else: - if len(p.jobs) >= min_wrapped_jobs: # if the quantity is enough, make the wrapper + # if the quantity is enough, make the wrapper + if len(p.jobs) >= min_wrapped_jobs: for job in p.jobs: job.packed = True packages_to_submit.append(p) @@ -301,27 +314,33 @@ class JobPackager(object): if len(tmp) != len(job.parents): deadlock = False if deadlock and self.wrapper_policy == "strict": - Log.debug("Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") + Log.debug( + "Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") for job in p.jobs: job.packed = False elif deadlock and self.wrapper_policy == "mixed": - Log.debug("Wrapper policy is set to mixed, there is a deadlock") + Log.debug( + "Wrapper policy is set to mixed, there is a deadlock") for job in p.jobs: job.packed = False if job.fail_count > 0 and job.status == Status.READY: - Log.debug("Wrapper policy is set to semi-strict, there is a failed job that will be sent sequential") + Log.debug( + "Wrapper policy is set to semi-strict, there is a failed job that will be sent sequential") if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped([job]) + package = JobPackageSimpleWrapped( + [job]) else: package = JobPackageSimple([job]) packages_to_submit.append(package) elif deadlock and self.wrapper_policy != "strict" and self.wrapper_policy != "mixed": - Log.debug("Wrapper policy is set to flexible and there is a deadlock, As will submit the jobs sequentally") + Log.debug( + "Wrapper policy is set to flexible and there is a deadlock, As will submit the jobs sequentally") for job in p.jobs: job.packed = False if job.status == Status.READY: if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped([job]) + package = JobPackageSimpleWrapped( + [job]) else: package = JobPackageSimple([job]) packages_to_submit.append(package) @@ -368,7 +387,8 @@ class JobPackager(object): jobs_section = dict() for job in jobs_list: # This iterator will always return None if there is no '&' defined in the section name - section = next((s for s in sections_split if job.section in s and '&' in s), None) + section = next( + (s for s in sections_split if job.section in s and '&' in s), None) if section is None: section = job.section if section not in jobs_section: @@ -423,7 +443,8 @@ class JobPackager(object): job_vertical_packager = JobPackagerVerticalSimple([job], job.wallclock, self.max_jobs, max_wrapped_jobs, self._platform.max_wallclock, max_wrapper_job_by_section) - jobs_list = job_vertical_packager.build_vertical_package(job) + jobs_list = job_vertical_packager.build_vertical_package( + job) if job.status is Status.READY: packages.append(JobPackageVertical( @@ -472,9 +493,9 @@ class JobPackager(object): for i in xrange(len(current_package)): total_wallclock = sum_str_hours(total_wallclock, wallclock) if len(current_package) > 1: - for level in xrange(1,len(current_package)): + for level in xrange(1, len(current_package)): for job in current_package[level]: - job.level=level + job.level = level return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self._as_config) @@ -488,16 +509,16 @@ class JobPackager(object): for job in horizontal_package: job_list = JobPackagerVerticalSimple([job], job.wallclock, self.max_jobs, horizontal_packager.max_wrapped_jobs, - self._platform.max_wallclock,horizontal_packager.max_wrapper_job_by_section).build_vertical_package(job) + self._platform.max_wallclock, horizontal_packager.max_wrapper_job_by_section).build_vertical_package(job) current_package.append(job_list) for job in current_package[-1]: total_wallclock = sum_str_hours(total_wallclock, job.wallclock) if len(current_package) > 1: - for level in xrange(1,len(current_package)): + for level in xrange(1, len(current_package)): for job in current_package[level]: - job.level=level + job.level = level return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, jobs_resources=jobs_resources, method=self.wrapper_method, configuration=self._as_config) @@ -554,7 +575,7 @@ class JobPackagerVertical(object): child.level = level self.jobs_list.append(child) # Recursive call - return self.build_vertical_package(child,level=level+1) + return self.build_vertical_package(child, level=level + 1) # Wrapped jobs are accumulated and returned in this list return self.jobs_list @@ -675,7 +696,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): for index in xrange(self.index, len(sorted_jobs)): child = sorted_jobs[index] if self._is_wrappable(child): - self.index = index+1 + self.index = index + 1 return child continue return None @@ -701,7 +722,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): class JobPackagerHorizontal(object): - def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node, method="ASThread", max_wrapper_job_by_section = dict()): + def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node, method="ASThread", max_wrapper_job_by_section=dict()): self.processors_node = processors_node self.max_processors = max_processors self.max_wrapped_jobs = max_wrapped_jobs @@ -734,7 +755,8 @@ class JobPackagerHorizontal(object): if self.max_jobs > 0 and len(current_package) < self.max_wrapped_jobs and current_package_by_section[section] < self.max_wrapper_job_by_section[section]: if int(job.tasks) != 0 and int(job.tasks) != int(self.processors_node) and \ int(job.tasks) < job.total_processors: - nodes = int(ceil(job.total_processors / float(job.tasks))) + nodes = int( + ceil(job.total_processors / float(job.tasks))) total_processors = int(self.processors_node) * nodes else: total_processors = job.total_processors @@ -795,7 +817,8 @@ class JobPackagerHorizontal(object): if wrappable and child not in next_section_list: next_section_list.append(child) - next_section_list.sort(key=lambda job: self.sort_by_expression(job.name)) + next_section_list.sort( + key=lambda job: self.sort_by_expression(job.name)) self.job_list = next_section_list package_jobs = self.build_horizontal_package(horizontal_vertical)