From 5689af0646ef44acfd6ad5cc0a5d279517a3e791 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 10 Jun 2024 16:57:26 +0200 Subject: [PATCH] Fixed few things related to multi-wrapper --- autosubmit/job/job_packager.py | 28 ++++++++++++++-------------- autosubmit/job/job_packages.py | 8 ++++++-- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 704333d50..a79fd60fb 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -605,7 +605,7 @@ class JobPackager(object): job.packed = True dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock,wrapper_info=wrapper_info) - jobs_list = job_vertical_packager.build_vertical_package(job) + jobs_list = job_vertical_packager.build_vertical_package(job, wrapper_info) packages.append(JobPackageVertical(jobs_list, configuration=self._as_config,wrapper_section=self.current_wrapper_section,wrapper_info=wrapper_info)) else: @@ -613,7 +613,7 @@ class JobPackager(object): return packages def _build_hybrid_package(self, jobs_list, wrapper_limits, section,wrapper_info={}): - self.wrapper_info = wrapper_info + #self.wrapper_info = wrapper_info jobs_resources = dict() jobs_resources['MACHINEFILES'] = self._as_config.get_wrapper_machinefiles() @@ -623,13 +623,13 @@ class JobPackager(object): wrapper_limits["max"], self._platform.processors_per_node,self.wrapper_method[self.current_wrapper_section]) if self.wrapper_type[self.current_wrapper_section] == 'vertical-horizontal': - return self._build_vertical_horizontal_package(horizontal_packager, jobs_resources) + return self._build_vertical_horizontal_package(horizontal_packager, jobs_resources, wrapper_info) else: - return self._build_horizontal_vertical_package(horizontal_packager, section, jobs_resources) + return self._build_horizontal_vertical_package(horizontal_packager, section, jobs_resources, wrapper_info) - def _build_horizontal_vertical_package(self, horizontal_packager, section, jobs_resources): + def _build_horizontal_vertical_package(self, horizontal_packager, section, jobs_resources, wrapper_info): total_wallclock = '00:00' - horizontal_package = horizontal_packager.build_horizontal_package(wrapper_info=self.wrapper_info) + horizontal_package = horizontal_packager.build_horizontal_package(wrapper_info=wrapper_info) horizontal_packager.create_sections_order(section) horizontal_packager.add_sectioncombo_processors( horizontal_packager.total_processors) @@ -656,9 +656,9 @@ class JobPackager(object): return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self._as_config, wrapper_section=self.current_wrapper_section) - def _build_vertical_horizontal_package(self, horizontal_packager, jobs_resources): + def _build_vertical_horizontal_package(self, horizontal_packager, jobs_resources, wrapper_info): total_wallclock = '00:00' - horizontal_package = horizontal_packager.build_horizontal_package() + horizontal_package = horizontal_packager.build_horizontal_package(wrapper_info=wrapper_info) total_processors = horizontal_packager.total_processors current_package = [] ## Create the vertical ## @@ -672,7 +672,7 @@ class JobPackager(object): dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) job_list = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, horizontal_packager.wrapper_limits["max"], horizontal_packager.wrapper_limits, - self._platform.max_wallclock,wrapper_info=self.wrapper_info).build_vertical_package(job) + self._platform.max_wallclock,wrapper_info=self.wrapper_info).build_vertical_package(job, wrapper_info) current_package.append(list(set(job_list))) for job in current_package[-1]: @@ -710,7 +710,7 @@ class JobPackagerVertical(object): self.max_wallclock = max_wallclock self.wrapper_info = wrapper_info - def build_vertical_package(self, job): + def build_vertical_package(self, job, wrapper_info): """ Goes through the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable and groups them together into a wrapper. (iterative-version) @@ -723,7 +723,7 @@ class JobPackagerVertical(object): stack = [(job, 1)] while stack: job, level = stack.pop() - if level % 100 == 0: + if level % 10 == 0 and level > 0: Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= \ self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits[ @@ -731,7 +731,7 @@ class JobPackagerVertical(object): continue child = self.get_wrappable_child(job) if child is not None and len(str(child)) > 0: - child.update_parameters(self.wrapper_info[-1], {}) + child.update_parameters(wrapper_info[-1], {}) self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) if self.total_wallclock <= self.max_wallclock: child.packed = True @@ -909,7 +909,7 @@ class JobPackagerHorizontal(object): jobs_by_section = dict() Log.info(f"Updating inner job parameters") for job in self.job_list: - job.update_parameters(self.wrapper_info[-1],{}) + job.update_parameters(wrapper_info[-1],{}) if job.section not in jobs_by_section: jobs_by_section[job.section] = list() jobs_by_section[job.section].append(job) @@ -918,7 +918,7 @@ class JobPackagerHorizontal(object): for section in jobs_by_section: current_package_by_section[section] = 0 for job in jobs_by_section[section]: - if jobs_processed % 100 == 0: + if jobs_processed % 10 == 0 and jobs_processed > 0: Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 1 and int(job.processors) <= 1: job.processors = 0 diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 85fc41ff5..f865d0de0 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -464,7 +464,11 @@ class JobPackageThread(JobPackageBase): self.tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",self.tasks) self.nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",self.nodes) self.reservation = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("RESERVATION",self.reservation) - + wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) + if wr_threads: + self.threads = wr_threads + else: + self.threads = jobs[0].threads self.parameters["CURRENT_PROJ"] = self._project self.het = jobs[0].het @@ -500,7 +504,7 @@ class JobPackageThread(JobPackageBase): return jobs_scripts @property def queue(self): - if (not str(self.nodes).isdigit() or (self.nodes.isdigit() and int(self.nodes) < 1)) and (not self._num_processors.isdigit() or (self._num_processors.isdigit() and int(self._num_processors) <= 1)): + if (not str(self.nodes).isdigit() or (str(self.nodes).isdigit() and int(self.nodes) < 1)) and (not str(self._num_processors).isdigit() or (str(self._num_processors).isdigit() and int(self._num_processors) <= 1)): return self.platform.serial_platform.serial_queue else: return self._queue -- GitLab