From a61ea7b5be4389d4be57e09b30a13d54d1332427 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 23 May 2024 10:48:45 +0200 Subject: [PATCH 1/7] Added the option of having an inline script instead of a file --- autosubmit/job/job.py | 49 ++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 7974f0e4a..ed8885a47 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1838,6 +1838,7 @@ class Job(object): self.check = as_conf.jobs_data[self.section].get("CHECK", False) self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") + self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1960,34 +1961,38 @@ class Job(object): :rtype: str """ self.update_parameters(as_conf, self.parameters) - try: - if as_conf.get_project_type().lower() != "none" and len(as_conf.get_project_type()) > 0: - template_file = open(os.path.join(as_conf.get_project_dir(), self.file), 'r') - template = '' - if as_conf.get_remote_dependencies() == "true": + if self.script: + Log.warning(f"Custom script for job {self.name} is being used, file contents are ignored.") + template = self.script + else: + try: + if as_conf.get_project_type().lower() != "none" and len(as_conf.get_project_type()) > 0: + template_file = open(os.path.join(as_conf.get_project_dir(), self.file), 'r') + template = '' + if as_conf.get_remote_dependencies() == "true": + if self.type == Type.BASH: + template = 'sleep 5' + "\n" + elif self.type == Type.PYTHON2: + template = 'time.sleep(5)' + "\n" + elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: + template = 'time.sleep(5)' + "\n" + elif self.type == Type.R: + template = 'Sys.sleep(5)' + "\n" + template += template_file.read() + template_file.close() + else: if self.type == Type.BASH: - template = 'sleep 5' + "\n" + template = 'sleep 5' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: template = 'time.sleep(5)' + "\n" elif self.type == Type.R: - template = 'Sys.sleep(5)' + "\n" - template += template_file.read() - template_file.close() - else: - if self.type == Type.BASH: - template = 'sleep 5' - elif self.type == Type.PYTHON2: - template = 'time.sleep(5)' + "\n" - elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: - template = 'time.sleep(5)' + "\n" - elif self.type == Type.R: - template = 'Sys.sleep(5)' - else: - template = '' - except Exception as e: - template = '' + template = 'Sys.sleep(5)' + else: + template = '' + except Exception as e: + template = '' if self.type == Type.BASH: snippet = StatisticsSnippetBash -- GitLab From a3f793ff7fd5824b4512b59ba4938ee7c954b1e8 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 23 May 2024 15:32:41 +0200 Subject: [PATCH 2/7] Added script parameter --- autosubmit/autosubmit.py | 2 +- autosubmit/job/job.py | 12 +++++++++++- autosubmit/job/job_list.py | 4 ++++ autosubmit/job/job_utils.py | 6 +----- autosubmit/platforms/platform.py | 1 + 5 files changed, 18 insertions(+), 7 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dec9660ac..986fbcc37 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2129,7 +2129,7 @@ class Autosubmit: Log.debug("Preparing run") # This function is called only once, when the experiment is started. It is used to initialize the experiment and to check the correctness of the configuration files. # If there are issues while running, this function will be called again to reinitialize the experiment. - job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive, start_time, start_after, run_only_members) except AutosubmitCritical as e: #e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ed8885a47..e87c395df 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -252,6 +252,7 @@ class Job(object): self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial + self._script = None # Inline code to be executed def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -288,6 +289,15 @@ class Job(object): def name(self, value): self._name = value + @property + @autosubmit_parameter(name='script') + def script(self): + """Allows to launch inline code instead of using the file parameter""" + return self._script + @script.setter + def script(self, value): + self._script = value + @property @autosubmit_parameter(name='fail_count') def fail_count(self): @@ -1961,7 +1971,7 @@ class Job(object): :rtype: str """ self.update_parameters(as_conf, self.parameters) - if self.script: + if self.script and self.file: Log.warning(f"Custom script for job {self.name} is being used, file contents are ignored.") template = self.script else: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 81851902b..163689316 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2664,6 +2664,10 @@ class JobList(object): job.status = Status.FAILED job.packed = False save = True + else: + for job in [ job for job in self._job_list if job.status in [ Status.WAITING, Status.READY, Status.DELAYED, Status.PREPARED ] ]: + job.fail_count = 0 + job.packed = False # Check checkpoint jobs, the status can be Any for job in self.check_special_status(): job.status = Status.READY diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index c7ae6709e..dae0ccc5e 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -3,11 +3,7 @@ import math from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical -import os -from autosubmit.job.job_package_persistence import JobPackagePersistence -from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Dict -from bscearth.utils.date import date2str, previous_day, chunk_end_date, chunk_start_date, subs_dates +from bscearth.utils.date import date2str, chunk_end_date, chunk_start_date, subs_dates # Copyright 2017-2020 Earth Sciences Department, BSC-CNS diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 438078118..6e3dc8956 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -24,6 +24,7 @@ def processed(fn): return process return wrapper + class Platform(object): """ Class to manage the connections to the different platforms. -- GitLab From 1dbeb28dd117247c77f4089e40113089d9ccb39e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 27 May 2024 12:30:29 +0200 Subject: [PATCH 3/7] Fix some issues --- autosubmit/autosubmit.py | 20 +++++++++++-------- autosubmit/job/job_common.py | 4 ++-- .../platforms/wrappers/wrapper_builder.py | 6 ++++-- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 986fbcc37..099f43523 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -4825,8 +4825,14 @@ class Autosubmit: job_list.add_logs(prev_job_list_logs) job_list.save() as_conf.save() - JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).reset_table() + try: + as_conf.check_conf_files(True) + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages_persistence.reset_table() + packages_persistence.reset_table(wrappers) + except: + pass groups_dict = dict() # Setting up job historical database header. Must create a new run. @@ -4863,20 +4869,15 @@ class Autosubmit: expand_list=expand, expanded_status=status) groups_dict = job_grouping.group_jobs() # WRAPPERS - if len(as_conf.experiment_data.get("WRAPPERS", {})) > 0 and check_wrappers: - as_conf.check_conf_files(True) - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - packages_persistence.reset_table(True) job_list_wr = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive, monitor=True, new=False) Autosubmit.generate_scripts_andor_wrappers( as_conf, job_list_wr, job_list_wr.get_job_list(), packages_persistence, True) - packages = packages_persistence.load(True) else: packages = None + Log.info("\nPlotting the jobs list...") monitor_exp = Monitor() # if output is set, use output @@ -5602,6 +5603,9 @@ class Autosubmit: final_list = list(set(final_list)) performed_changes = {} for job in final_list: + if final_status in [Status.WAITING, Status.PREPARED, Status.DELAYED, Status.READY]: + job.packed = False + job.fail_count = 0 if job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED] and job.platform.name not in definitive_platforms: Log.printlog("JOB: [{1}] is ignored as the [{0}] platform is currently offline".format( diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 27414223f..92dc1b1bd 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -200,7 +200,7 @@ class StatisticsSnippetPython: locale.setlocale(locale.LC_ALL, 'C') job_name_ptrn = '%CURRENT_LOGDIR%/%JOBNAME%' stat_file = open(job_name_ptrn + '_STAT', 'w') - stat_file.write('int({0:.0f})\\n'.format(time.time())) + stat_file.write(f'{int(time.time())}\\n') stat_file.close() ################### # Autosubmit Checkpoint @@ -229,7 +229,7 @@ class StatisticsSnippetPython: ################### stat_file = open(job_name_ptrn + '_STAT', 'a') - stat_file.write('int({0:.0f})\\n'.format(time.time())) + stat_file.write(f'{int(time.time())}\\n') stat_file.close() open(job_name_ptrn + '_COMPLETED', 'a').close() exit(0) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index d40a985d1..7ff5552db 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -455,7 +455,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while fail_count <= job_retrials and not completed: current = {1} current.start() - os.system("echo "+str(int(time.time()))+" > "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Start/submit running + timer = int(time.time()) + os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed current.join({3}) total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -467,7 +468,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - os.system("echo "+str(int(time.time()))+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + timer = int(time.time()) + os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") -- GitLab From 3fe9f8fdfcb7d6b0292b54d6c10fdd6d16a92dfe Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 29 May 2024 10:19:54 +0200 Subject: [PATCH 4/7] Fix Horizontal wrapper --- autosubmit/autosubmit.py | 10 +-- autosubmit/job/job_packager.py | 7 +- autosubmit/job/job_packages.py | 84 +++++-------------- .../platforms/wrappers/wrapper_factory.py | 17 ++-- 4 files changed, 32 insertions(+), 86 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 099f43523..275d20f62 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -4910,15 +4910,9 @@ class Autosubmit: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) except AutosubmitError as e: - # TODO: == "" or is None? - if e.trace == "": - e.trace = traceback.format_exc() - raise AutosubmitError(e.message, e.code, e.trace) + raise except AutosubmitCritical as e: - # TODO: == "" or is None? - if e.trace == "" or not e.trace: - e.trace = traceback.format_exc() - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except BaseException as e: raise finally: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a4ae2547f..f5dad4b22 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -887,7 +887,7 @@ class JobPackagerHorizontal(object): job_total_processors = int(job.total_processors) if len(current_package) < self.wrapper_limits["max_h"] and len(current_package) < self.wrapper_limits["max"] and current_package_by_section[section] < self.wrapper_limits["max_by_section"][section]: if int(job.tasks) != 0 and int(job.tasks) != int(self.processors_node) and \ - int(job.tasks) < job_total_processors: + int(self.processors_node) < int(job_total_processors): nodes = int( ceil(job_total_processors / float(job.tasks))) total_processors = int(self.processors_node) * nodes @@ -896,10 +896,7 @@ class JobPackagerHorizontal(object): if (self._current_processors + total_processors) <= int(self.max_processors): current_package.append(job) self._current_processors += total_processors - else: - current_package = [job] - self._current_processors = total_processors - current_package_by_section[section] += 1 + current_package_by_section[section] += 1 else: break diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0b677c9d7..4924f0ff9 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -371,9 +371,7 @@ class JobPackageThread(JobPackageBase): # It is in charge of merging ( switch ) the wrapper info by checking if the value is defined by the user in the wrapper section, current wrapper section, job or platform in that order. # Some variables are calculated in futher functions, like num_processors and wallclock. # These variables can only be present in the wrapper itself - self.parameters = dict() - self.wallclock = '00:00' - if len(wrapper_info) > 0 : + if len(wrapper_info) > 0: self.wrapper_type = wrapper_info[0] self.wrapper_policy = wrapper_info[1] self.wrapper_method = wrapper_info[2] @@ -389,15 +387,27 @@ class JobPackageThread(JobPackageBase): # Seems like this one is not used at all in the class self._job_dependency = dependency self._common_script = None + self.executable = None + self._wallclock = '00:00' # depends on the type of wrapper - if not hasattr(self,"_num_processors"): - self._num_processors = '0' + self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section self.inner_retrials = 0 - # temporal hetjob code , to be upgraded in the future + if not hasattr(self,"_num_processors"): + self._num_processors = '0' + self.parameters = dict() + self.nodes = jobs[0].nodes + self.queue = jobs[0].queue + self.parameters["CURRENT_QUEUE"] = self.queue + self.partition = jobs[0].partition + self.tasks = jobs[0].tasks if int(jobs[0].tasks) > 1 else "" + self.threads = jobs[0].threads if int(jobs[0].threads) > 1 else "" + self.exclusive = jobs[0].exclusive + self.custom_directives = jobs[0].custom_directives + self.wallclock = '00:00' if configuration is not None: self.inner_retrials = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("RETRIALS",self.jobs[0].retrials) @@ -416,27 +426,17 @@ class JobPackageThread(JobPackageBase): "EXECUTABLE", None) if wr_executable: self.executable = wr_executable - else: - self.executable = jobs[0].executable if jobs[0].het.get("HETSIZE", 1) <= 1: wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_queue is not None and len(str(wr_queue)) > 0: self.queue = wr_queue self.parameters["CURRENT_QUEUE"] = wr_queue - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_partition and len(str(wr_partition)) > 0: self.partition = wr_partition - else: - self.partition = jobs[0].partition wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) if wr_exclusive is not None: self.exclusive = wr_exclusive - else: - self.exclusive = jobs[0].exclusive wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) # parse custom_directives if type(wr_custom_directives) is list: @@ -460,42 +460,16 @@ class JobPackageThread(JobPackageBase): wr_custom_directives = [] if len(str(wr_custom_directives)) > 0: self.custom_directives = wr_custom_directives - else: - self.custom_directives = jobs[0].custom_directives - wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) if wr_tasks: self.tasks = wr_tasks - else: - self.tasks = jobs[0].tasks wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) if wr_nodes: self.nodes = wr_nodes - else: - self.nodes = jobs[0].nodes wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) if wr_threads: self.threads = wr_threads - else: - self.threads = jobs[0].threads - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - self.partition = jobs[0].partition - self.nodes = jobs[0].nodes - self.tasks = jobs[0].tasks - self.threads = jobs[0].threads - self.exclusive = jobs[0].exclusive - self.custom_directives = jobs[0].custom_directives - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - self.partition = jobs[0].partition - self.nodes = jobs[0].nodes - self.tasks = jobs[0].tasks - self.threads = jobs[0].threads - self.exclusive = jobs[0].exclusive - self.custom_directives = jobs[0].custom_directives + self.parameters["CURRENT_PROJ"] = self._project self.parameters["NUMTHREADS"] = self.threads self.het = jobs[0].het @@ -511,26 +485,8 @@ class JobPackageThread(JobPackageBase): self.parameters['TASKS'] = self.tasks self.parameters["EXECUTABLE"] = self.executable # have to look self.method = method - self._wrapper_data = configuration.experiment_data["WRAPPERS"][self.current_wrapper_section] - self._wrapper_data["WRAPPER"] = self - - # self._wrapper_data["TYPE"] = self.wrapper_type - # self._wrapper_data["WRAPPER_POLICY"] = self.wrapper_policy - # self._wrapper_data["INNER_RETRIALS"] = self.inner_retrials - # self._wrapper_data["RETRIALS"] = self.inner_retrials - # self._wrapper_data["EXTEND_WALLCLOCK"] = self.extensible_wallclock - # self._wrapper_data["METHOD"] = self.wrapper_method - # self._wrapper_data["EXPORT"] = self.export - # self._wrapper_data["QUEUE"] = self.queue - # self._wrapper_data["NODES"] = self.nodes - # self._wrapper_data["TASKS"] = self.tasks - # self._wrapper_data["THREADS"] = self.threads - # self._wrapper_data["PROCESSORS"] = self._num_processors - # self._wrapper_data["PARTITION"] = self.partition - # self._wrapper_data["EXCLUSIVE"] = self.exclusive - # self._wrapper_data["EXECUTABLE"] = self.executable - # self._wrapper_data["CUSTOM_DIRECTIVES"] = self.custom_directives - # self._wrapper_data["HET"] = self.het + + @property def name(self): return self._name @@ -550,7 +506,7 @@ class JobPackageThread(JobPackageBase): return jobs_scripts @property def queue(self): - if str(self._num_processors) == '1' or str(self._num_processors) == '0': + if (not str(self.nodes).isdigit() or (self.nodes.isdigit() and int(self.nodes) < 1)) and (not self._num_processors.isdigit() or (self._num_processors.isdigit() and int(self._num_processors) <= 1)): return self.platform.serial_platform.serial_queue else: return self._queue diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 4b728dd6b..e62680610 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -31,9 +31,15 @@ class WrapperFactory(object): self.exception = "This type of wrapper is not supported for this platform" def get_wrapper(self, wrapper_builder, **kwargs): - wrapper_data = kwargs['wrapper_data'] + wrapper_data = kwargs['wrapper_data'] # this refers to the object with all parameters init wrapper_data.wallclock = kwargs['wallclock'] if wrapper_data.het.get("HETSIZE",0) <= 1: + if not str(kwargs['num_processors_value']).isdigit(): + kwargs['num_processors_value'] = 1 + if str(wrapper_data.nodes).isdigit() and int(wrapper_data.nodes) > 1 and int(kwargs['num_processors_value']) <= 1: + kwargs['num_processors'] = "#" + else: + kwargs['num_processors'] = self.processors(kwargs['num_processors_value']) kwargs['allocated_nodes'] = self.allocated_nodes() kwargs['dependency'] = self.dependency(kwargs['dependency']) kwargs['partition'] = self.partition(wrapper_data.partition) @@ -43,14 +49,7 @@ class WrapperFactory(object): kwargs["custom_directives"] = self.custom_directives(wrapper_data.custom_directives) kwargs['queue'] = self.queue(wrapper_data.queue) kwargs['threads'] = self.threads(wrapper_data.threads) - if str(kwargs['num_processors']).isdigit(): - kwargs['num_processors_value'] = int(wrapper_data.processors) - else: - kwargs['num_processors_value'] = 1 - if str(wrapper_data.nodes).isdigit() and int(wrapper_data.nodes) > 1 and kwargs['num_processors'] == '1': - kwargs['num_processors'] = "#" - else: - kwargs['num_processors'] = self.processors(wrapper_data.processors) + kwargs["executable"] = wrapper_data.executable kwargs['header_directive'] = self.header_directives(**kwargs) -- GitLab From 8bf88114ca256f38bf0c2bc1015359c6b27633f1 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 29 May 2024 11:21:22 +0200 Subject: [PATCH 5/7] added reservation key --- autosubmit/job/job_packages.py | 5 ++++- autosubmit/platforms/headers/slurm_header.py | 2 +- autosubmit/platforms/pjmplatform.py | 2 +- autosubmit/platforms/wrappers/wrapper_factory.py | 10 ++++++++++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 4924f0ff9..e785e7740 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -408,6 +408,7 @@ class JobPackageThread(JobPackageBase): self.exclusive = jobs[0].exclusive self.custom_directives = jobs[0].custom_directives self.wallclock = '00:00' + self.reservation = jobs[0].reservation if configuration is not None: self.inner_retrials = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("RETRIALS",self.jobs[0].retrials) @@ -427,6 +428,7 @@ class JobPackageThread(JobPackageBase): if wr_executable: self.executable = wr_executable if jobs[0].het.get("HETSIZE", 1) <= 1: + wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_queue is not None and len(str(wr_queue)) > 0: self.queue = wr_queue @@ -469,6 +471,7 @@ class JobPackageThread(JobPackageBase): wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) if wr_threads: self.threads = wr_threads + self.reservation = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("RESERVATION",self.reservation) self.parameters["CURRENT_PROJ"] = self._project self.parameters["NUMTHREADS"] = self.threads @@ -481,7 +484,7 @@ class JobPackageThread(JobPackageBase): self.memory_per_task = jobs[0].memory_per_task self.parameters["NODES"] = self.nodes self.processors = self._num_processors - self.parameters["RESERVATION"] = jobs[0].reservation # have to look + self.parameters["RESERVATION"] = self.reservation # have to look self.parameters['TASKS'] = self.tasks self.parameters["EXECUTABLE"] = self.executable # have to look self.method = method diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index d0b163523..ce590465a 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -279,7 +279,7 @@ class SlurmHeader(object): {kwargs["tasks"]} {kwargs["exclusive"]} {kwargs["custom_directives"]} - +{kwargs.get("reservation","#")} # """ else: diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index e474ee3a7..c8f520258 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -455,7 +455,7 @@ class PJMPlatform(ParamikoPlatform): {kwargs["tasks"]} {kwargs["exclusive"]} {kwargs["custom_directives"]} - + #PJM -g {kwargs["project"]} #PJM -o {kwargs["name"]}.out #PJM -e {kwargs["name"]}.err diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index e62680610..b71cd503d 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -49,6 +49,7 @@ class WrapperFactory(object): kwargs["custom_directives"] = self.custom_directives(wrapper_data.custom_directives) kwargs['queue'] = self.queue(wrapper_data.queue) kwargs['threads'] = self.threads(wrapper_data.threads) + kwargs['reservation'] = self.reservation(wrapper_data.reservation) kwargs["executable"] = wrapper_data.executable @@ -88,6 +89,9 @@ class WrapperFactory(object): def allocated_nodes(self): return '' + def reservation(self, reservation): + return '#' if not reservation else self.reservation_directive(reservation) + def dependency(self, dependency): return '#' if dependency is None else self.dependency_directive(dependency) def queue(self, queue): @@ -117,6 +121,8 @@ class WrapperFactory(object): return '\n'.join(str(s) for s in custom_directives) return "" + def reservation_directive(self, reservation): + return '#' def dependency_directive(self, dependency): raise NotImplemented(self.exception) def queue_directive(self, queue): @@ -161,6 +167,8 @@ class SlurmWrapperFactory(WrapperFactory): def allocated_nodes(self): return self.platform.allocated_nodes() + def reservation_directive(self, reservation): + return "#SBATCH --reservation={0}".format(reservation) def dependency_directive(self, dependency): return '#SBATCH --dependency=afterok:{0}'.format(dependency) def queue_directive(self, queue): @@ -206,6 +214,8 @@ class PJMWrapperFactory(WrapperFactory): def allocated_nodes(self): return self.platform.allocated_nodes() + def reservation_directive(self, reservation): + return "#" # Reservation directive doesn't exist in PJM, they're handled directly by admins def queue_directive(self, queue): return '#PJM --qos={0}'.format(queue) -- GitLab From d52bc36a2037d69c2b4c2af5f6f8379a26d0e85b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 31 May 2024 15:44:10 +0200 Subject: [PATCH 6/7] wrapper additional changes --- autosubmit/job/job.py | 14 ++++++-------- autosubmit/job/job_packages.py | 29 +++++++++-------------------- autosubmit/platforms/platform.py | 2 +- test/unit/test_job_package.py | 4 ++-- 4 files changed, 18 insertions(+), 31 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index e87c395df..9f6383df4 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1403,25 +1403,23 @@ class Job(object): # Get the max tasks, each element can be a str or int self.het['TASKS'] = list() if len(self.tasks) == 1: - if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node for task in range(self.het['HETSIZE']): - if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(task) > int( + job_platform.processors_per_node): self.het['TASKS'].append(str(job_platform.processors_per_node)) else: self.het['TASKS'].append(str(self.tasks)) self.tasks = str(max([int(x) for x in self.tasks])) else: for task in self.tasks: - if int(task) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(task) > int( + job_platform.processors_per_node): task = job_platform.processors_per_node self.het['TASKS'].append(str(task)) else: - if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node self.tasks = str(self.tasks) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index e785e7740..f6ad51eb4 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -52,6 +52,7 @@ class JobPackageBase(object): def __init__(self, jobs): # type: (List[Job]) -> None + self.nodes = "" self._common_script = None self._jobs = jobs # type: List[Job] self._expid = jobs[0].expid # type: str @@ -399,12 +400,11 @@ class JobPackageThread(JobPackageBase): if not hasattr(self,"_num_processors"): self._num_processors = '0' self.parameters = dict() - self.nodes = jobs[0].nodes + self.nodes = jobs[0].nodes if not self.nodes else self.nodes self.queue = jobs[0].queue self.parameters["CURRENT_QUEUE"] = self.queue self.partition = jobs[0].partition - self.tasks = jobs[0].tasks if int(jobs[0].tasks) > 1 else "" - self.threads = jobs[0].threads if int(jobs[0].threads) > 1 else "" + self.tasks = jobs[0].tasks self.exclusive = jobs[0].exclusive self.custom_directives = jobs[0].custom_directives self.wallclock = '00:00' @@ -423,12 +423,9 @@ class JobPackageThread(JobPackageBase): if job.export.lower() not in "none" and len(job.export) > 0: self.export = job.export break - wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get( - "EXECUTABLE", None) - if wr_executable: - self.executable = wr_executable + self.executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get( + "EXECUTABLE", self.executable) if jobs[0].het.get("HETSIZE", 1) <= 1: - wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_queue is not None and len(str(wr_queue)) > 0: self.queue = wr_queue @@ -436,9 +433,7 @@ class JobPackageThread(JobPackageBase): wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_partition and len(str(wr_partition)) > 0: self.partition = wr_partition - wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) - if wr_exclusive is not None: - self.exclusive = wr_exclusive + self.exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",self.exclusive) wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) # parse custom_directives if type(wr_custom_directives) is list: @@ -462,15 +457,9 @@ class JobPackageThread(JobPackageBase): wr_custom_directives = [] if len(str(wr_custom_directives)) > 0: self.custom_directives = wr_custom_directives - wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) - if wr_tasks: - self.tasks = wr_tasks - wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) - if wr_nodes: - self.nodes = wr_nodes - wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) - if wr_threads: - self.threads = wr_threads + self.tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",self.tasks) + self.nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",self.nodes) + self.threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",self.threads) self.reservation = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("RESERVATION",self.reservation) self.parameters["CURRENT_PROJ"] = self._project diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 6e3dc8956..7a55bdd17 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -48,7 +48,7 @@ class Platform(object): self._default_queue = None self._partition = None self.ec_queue = "hpc" - self.processors_per_node = "1" + self.processors_per_node = None self.scratch_free_space = None self.custom_directives = None self._host = '' diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index e12aa8eb6..bebdff0d1 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -75,8 +75,8 @@ class TestJobPackage(TestCase): self.jobs[0]._platform = self.platform self.jobs[0].retrials = 0 self.jobs[1].wallclock = "00:00" - self.jobs[1].threads = "" - self.jobs[1].tasks = "" + self.jobs[1].threads = "1" + self.jobs[1].tasks = "1" self.jobs[1].exclusive = True self.jobs[1].queue = "debug2" self.jobs[1].partition = "debug2" -- GitLab From ffc97a092cff6f0c2d25ce5b86e607b034e55466 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 3 Jun 2024 17:42:21 +0200 Subject: [PATCH 7/7] fixed tests, added info --- autosubmit/autosubmit.py | 4 +- autosubmit/job/job.py | 2 +- autosubmit/job/job_packager.py | 96 ++++++++++++++++++++++++---------- autosubmit/job/job_packages.py | 18 ++++--- test/unit/test_job_package.py | 8 +-- 5 files changed, 88 insertions(+), 40 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 275d20f62..bb7ea8fd9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -4826,13 +4826,13 @@ class Autosubmit: job_list.save() as_conf.save() try: - as_conf.check_conf_files(True) packages_persistence = JobPackagePersistence( os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) packages_persistence.reset_table() - packages_persistence.reset_table(wrappers) + packages_persistence.reset_table(True) except: pass + groups_dict = dict() # Setting up job historical database header. Must create a new run. diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9f6383df4..21417ff8b 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1419,7 +1419,7 @@ class Job(object): task = job_platform.processors_per_node self.het['TASKS'].append(str(task)) else: - if int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): + if job_platform.processors_per_node and int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node self.tasks = str(self.tasks) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index f5dad4b22..704333d50 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -337,6 +337,7 @@ class JobPackager(object): Check if the packages are ready to be built :return: List of jobs ready to be built, boolean indicating if packages can't be built for other reasons ( max_total_jobs...) """ + Log.info("Calculating size limits for {0}".format(self._platform.name)) jobs_ready = list() if len(self._jobs_list.jobs_to_run_first) > 0: jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if @@ -465,6 +466,7 @@ class JobPackager(object): any_simple_packages = len(non_wrapped_jobs) > 0 # Prepare packages for wrapped jobs for wrapper_name, jobs in jobs_to_wrap.items(): + Log.info(f"Building packages for {wrapper_name}") if max_jobs_to_submit == 0: break self.current_wrapper_section = wrapper_name @@ -546,13 +548,14 @@ class JobPackager(object): sections_split[wrapper_name] = section_name jobs_by_section[wrapper_name] = list() + if self.jobs_in_wrapper: + Log.info(f"Calculating wrapper packages") jobs_by_section["SIMPLE"] = [] for wrapper_name,section_name in sections_split.items(): for job in jobs_list[:]: if job.section.upper() in section_name.split("&"): jobs_by_section[wrapper_name].append(job) jobs_list.remove(job) - # jobs not in wrapper for job in (job for job in jobs_list): jobs_by_section["SIMPLE"].append(job) for wrappers in list(jobs_by_section.keys()): @@ -707,37 +710,69 @@ class JobPackagerVertical(object): self.max_wallclock = max_wallclock self.wrapper_info = wrapper_info - def build_vertical_package(self, job, level=1): + def build_vertical_package(self, job): """ Goes through the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable - and groups them together into a wrapper. + and groups them together into a wrapper. (iterative-version) - :param level: - :param job: Job to be wrapped. \n - :type job: Job Object \n - :return: List of jobs that are wrapped together. \n - :rtype: List() of Job Object \n + :param job: Job to be wrapped. + :type job: Job Object + :return: List of jobs that are wrapped together. + :rtype: List() of Job Object """ - # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion - if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits["max"]: - return self.jobs_list - child = self.get_wrappable_child(job) - # If not None, it is wrappable - if child is not None and len(str(child)) > 0: - child.update_parameters(self.wrapper_info[-1],{}) - # Calculate total wallclock per possible wrapper - self.total_wallclock = sum_str_hours( - self.total_wallclock, child.wallclock) - # Testing against max from platform - if self.total_wallclock <= self.max_wallclock: - # Marking, this is later tested in the main loop - child.packed = True - child.level = level - self.jobs_list.append(child) - # Recursive call - return self.build_vertical_package(child, level=level + 1) - # Wrapped jobs are accumulated and returned in this list + stack = [(job, 1)] + while stack: + job, level = stack.pop() + if level % 100 == 0: + Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") + if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= \ + self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits[ + "max"]: + continue + child = self.get_wrappable_child(job) + if child is not None and len(str(child)) > 0: + child.update_parameters(self.wrapper_info[-1], {}) + self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) + if self.total_wallclock <= self.max_wallclock: + child.packed = True + child.level = level + self.jobs_list.append(child) + stack.append((child, level + 1)) return self.jobs_list + # def build_vertical_package(self, job, level=1): + # """ + # Goes through the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable + # and groups them together into a wrapper. + # + # :param level: + # :param job: Job to be wrapped. \n + # :type job: Job Object \n + # :return: List of jobs that are wrapped together. \n + # :rtype: List() of Job Object \n + # """ + # # print log each 100 jobs + # if level % 100 == 0: + # Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") + # # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion + # if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits["max"]: + # return self.jobs_list + # child = self.get_wrappable_child(job) + # # If not None, it is wrappable + # if child is not None and len(str(child)) > 0: + # child.update_parameters(self.wrapper_info[-1],{}) + # # Calculate total wallclock per possible wrapper + # self.total_wallclock = sum_str_hours( + # self.total_wallclock, child.wallclock) + # # Testing against max from platform + # if self.total_wallclock <= self.max_wallclock: + # # Marking, this is later tested in the main loop + # child.packed = True + # child.level = level + # self.jobs_list.append(child) + # # Recursive call + # return self.build_vertical_package(child, level=level + 1) + # # Wrapped jobs are accumulated and returned in this list + # return self.jobs_list def get_wrappable_child(self, job): pass @@ -864,6 +899,7 @@ class JobPackagerHorizontal(object): self._sectionList = list() self._package_sections = dict() self.wrapper_info = [] + def build_horizontal_package(self, horizontal_vertical=False,wrapper_info=[]): self.wrapper_info = wrapper_info current_package = [] @@ -871,14 +907,19 @@ class JobPackagerHorizontal(object): if horizontal_vertical: self._current_processors = 0 jobs_by_section = dict() + Log.info(f"Updating inner job parameters") for job in self.job_list: job.update_parameters(self.wrapper_info[-1],{}) if job.section not in jobs_by_section: jobs_by_section[job.section] = list() jobs_by_section[job.section].append(job) + Log.info(f"Building horizontal package") + jobs_processed = 0 for section in jobs_by_section: current_package_by_section[section] = 0 for job in jobs_by_section[section]: + if jobs_processed % 100 == 0: + Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 1 and int(job.processors) <= 1: job.processors = 0 if job.total_processors == "": @@ -899,6 +940,7 @@ class JobPackagerHorizontal(object): current_package_by_section[section] += 1 else: break + jobs_processed += 1 self.create_components_dict() diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index f6ad51eb4..85fc41ff5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -60,6 +60,8 @@ class JobPackageBase(object): self.export = jobs[0].export self.x11 = jobs[0].x11 self.het = dict() + self._num_processors = '0' + self._threads = '0' try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0]._platform @@ -291,6 +293,8 @@ class JobPackageArray(JobPackageBase): self._array_size_id = "[1-" + str(len(jobs)) + "]" self._wallclock = '00:00' self._num_processors = '0' + self._threads = '0' + for job in jobs: if job.wallclock > self._wallclock: self._wallclock = job.wallclock @@ -459,11 +463,9 @@ class JobPackageThread(JobPackageBase): self.custom_directives = wr_custom_directives self.tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",self.tasks) self.nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",self.nodes) - self.threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",self.threads) self.reservation = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("RESERVATION",self.reservation) self.parameters["CURRENT_PROJ"] = self._project - self.parameters["NUMTHREADS"] = self.threads self.het = jobs[0].het # Memory needs more work outside this branch @@ -596,7 +598,7 @@ class JobPackageThreadWrapped(JobPackageThread): self._common_script = None self._wallclock = '00:00' self._num_processors = '0' - self.threads = '1' + self._threads = '1' self.current_wrapper_section = wrapper_section @@ -669,12 +671,14 @@ class JobPackageVertical(JobPackageThread): :param: dependency: """ def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = []): - self._num_processors = 0 + + super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) for job in jobs: if int(job.processors) >= int(self._num_processors): self._num_processors = job.processors self._threads = job.threads - super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) + self._threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("THREADS", + self._threads) for job in jobs: self._wallclock = sum_str_hours(self._wallclock, job.wallclock) self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + @@ -756,13 +760,15 @@ class JobPackageHorizontal(JobPackageThread): def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None,wrapper_section="WRAPPERS"): super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources,configuration=configuration,wrapper_section=wrapper_section) self.method = method - self._queue = self.queue for job in jobs: if job.wallclock > self._wallclock: self._wallclock = job.wallclock self._num_processors = str(int(self._num_processors) + int(job.processors)) self._threads = job.threads + self._threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("THREADS", + self._threads) + self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + str(random.randint(1, 10000)), self._num_processors, diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index bebdff0d1..e70f10872 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -64,7 +64,7 @@ class TestJobPackage(TestCase): self._wrapper_factory.as_conf = self.as_conf self.jobs[0].wallclock = "00:00" - self.jobs[0].threads = "1" + self.jobs[0]._threads = "1" self.jobs[0].tasks = "1" self.jobs[0].exclusive = True self.jobs[0].queue = "debug" @@ -75,7 +75,7 @@ class TestJobPackage(TestCase): self.jobs[0]._platform = self.platform self.jobs[0].retrials = 0 self.jobs[1].wallclock = "00:00" - self.jobs[1].threads = "1" + self.jobs[1]._threads = "1" self.jobs[1].tasks = "1" self.jobs[1].exclusive = True self.jobs[1].queue = "debug2" @@ -131,7 +131,7 @@ class TestJobPackage(TestCase): self.assertEqual(self.job_package_wrapper.inner_retrials, 0) self.assertEqual(self.job_package_wrapper.queue, "debug") self.assertEqual(self.job_package_wrapper.partition, "debug") - self.assertEqual(self.job_package_wrapper.threads, "1") + self.assertEqual(self.job_package_wrapper._threads, "1") self.assertEqual(self.job_package_wrapper.tasks, "1") options_slurm = { @@ -148,7 +148,7 @@ class TestJobPackage(TestCase): self.assertEqual(self.job_package_wrapper.inner_retrials, 30) self.assertEqual(self.job_package_wrapper.queue, "bsc32") self.assertEqual(self.job_package_wrapper.partition, "bsc32") - self.assertEqual(self.job_package_wrapper.threads, "30") + self.assertEqual(self.job_package_wrapper._threads, "30") self.assertEqual(self.job_package_wrapper.tasks, "40") self.assertEqual(self.job_package_wrapper.custom_directives, ['#SBATCH --mem=1000']) -- GitLab