diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d02b551c28ae994740217bfe744ce189cfae2a2c..1f2d60744bac266d2c49d0ca24f4b29a8437065d 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -389,12 +389,12 @@ class JobPackager(object): running_by_id = dict() for running_job in running_jobs: running_by_id[running_job.id] = running_job - running_jobs_len = len(running_by_id.keys()) + self.running_jobs_len = len(running_by_id.keys()) queued_by_id = dict() for queued_job in queuing_jobs: queued_by_id[queued_job.id] = queued_job - queuing_jobs_len = len(list(queued_by_id.keys())) + self.queuing_jobs_len = len(list(queued_by_id.keys())) submitted_jobs = jobs_list.get_submitted(platform) submitted_by_id = dict() @@ -402,20 +402,20 @@ class JobPackager(object): submitted_by_id[submitted_job.id] = submitted_job submitted_jobs_len = len(list(submitted_by_id.keys())) - waiting_jobs = submitted_jobs_len + queuing_jobs_len + self.waiting_jobs = submitted_jobs_len + self.queuing_jobs_len # Calculate available space in Platform Queue if job is not None and job.max_waiting_jobs and platform.max_waiting_jobs and int(job.max_waiting_jobs) != int(platform.max_waiting_jobs): - self._max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(waiting_jobs) + self._max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(self.waiting_jobs) else: - self._max_wait_jobs_to_submit = int(platform.max_waiting_jobs) - int(waiting_jobs) + self._max_wait_jobs_to_submit = int(platform.max_waiting_jobs) - int(self.waiting_jobs) # .total_jobs is defined in each section of platforms_.yml, if not from there, it comes form autosubmit_.yml # .total_jobs Maximum number of jobs at the same time if job is not None and job.total_jobs != platform.total_jobs: - self._max_jobs_to_submit = job.total_jobs - queuing_jobs_len + self._max_jobs_to_submit = job.total_jobs - self.queuing_jobs_len else: - self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len + self._max_jobs_to_submit = platform.total_jobs - self.queuing_jobs_len # Subtracting running jobs - self._max_jobs_to_submit = self._max_jobs_to_submit - running_jobs_len + self._max_jobs_to_submit = self._max_jobs_to_submit - self.running_jobs_len self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 self.max_jobs = min(self._max_wait_jobs_to_submit,self._max_jobs_to_submit) @@ -432,6 +432,26 @@ class JobPackager(object): if not ready: return [] max_jobs_to_submit = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) + section_jobs_to_submit = dict() + for job in jobs_ready: + job.update_parameters(self._as_config, {}) # Ensure to have the correct processors for the wrapper building code + if job.section not in section_jobs_to_submit: # This is to fix TOTAL_JOBS when is set at job_level # Only for non-wrapped jobs + if int(job.max_waiting_jobs) != int(job.platform.max_waiting_jobs): + section_max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(self.waiting_jobs) + else: + section_max_wait_jobs_to_submit = None + if int(job.total_jobs) != int(job.platform.total_jobs): + section_max_jobs_to_submit = int(job.total_jobs) - self.queuing_jobs_len - self.running_jobs_len + else: + section_max_jobs_to_submit = None + + if section_max_jobs_to_submit is not None or section_max_wait_jobs_to_submit is not None: + if section_max_jobs_to_submit is None: + section_max_jobs_to_submit = self._max_jobs_to_submit + if section_max_wait_jobs_to_submit is None: + section_max_wait_jobs_to_submit = self._max_wait_jobs_to_submit + + section_jobs_to_submit ={job.section:min(section_max_wait_jobs_to_submit,section_max_jobs_to_submit)} jobs_to_submit = sorted( jobs_ready, key=lambda k: k.priority, reverse=True) for job in [failed_job for failed_job in jobs_to_submit if failed_job.fail_count > 0]: @@ -474,7 +494,10 @@ class JobPackager(object): packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) # Now, prepare the packages for non-wrapper jobs for job in non_wrapped_jobs: - if max_jobs_to_submit == 0: + if job.section in section_jobs_to_submit: + if section_jobs_to_submit[job.section] == 0: + continue + elif max_jobs_to_submit == 0: break if len(self._jobs_list.jobs_to_run_first) > 0: # if user wants to run first some jobs, submit them first if job not in self._jobs_list.jobs_to_run_first: @@ -486,6 +509,8 @@ class JobPackager(object): package = JobPackageSimple([job]) packages_to_submit.append(package) max_jobs_to_submit = max_jobs_to_submit - 1 + if job.section in section_jobs_to_submit: + section_jobs_to_submit[job.section] = section_jobs_to_submit[job.section] - 1 for package in packages_to_submit: self.max_jobs = self.max_jobs - 1