diff --git a/VERSION b/VERSION index 26cc861f6ce95d7e1b191c869f84ef07b85b2233..bcdac4398c655c41f79b00bac11305e7e3615cde 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.0.90 +4.0.91 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b8d6ab058786f520c05fb8095d608ff032a5f4ac..525649abd0ffa55acb691bfa812e37ea3e721cca 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -227,6 +227,10 @@ class Job(object): self.current_checkpoint_step = 0 self.max_checkpoint_step = 0 self.reservation= "" + # hetjobs + self.het = dict() + self.het['HETSIZE'] = 0 + @property @autosubmit_parameter(name='tasktype') @@ -1289,54 +1293,265 @@ class Job(object): parameters['CURRENT_LOGDIR'] = job_platform.get_files_path() return parameters - def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): - self.ec_queue = str(as_conf.jobs_data[self.section].get("EC_QUEUE", as_conf.platforms_data.get(job_platform.name,{}).get("EC_QUEUE",""))) - self.executable = str(as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE",""))) - self.total_jobs = int(as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs)) - self.max_waiting_jobs = int(as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs)) - self.processors = str(as_conf.jobs_data[self.section].get("PROCESSORS",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS","1"))) - self.nodes = str(as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES",""))) - self.exclusive = str(as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False))) - self.threads = str(as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1"))) - self.tasks = str(as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1"))) - self.reservation = str(as_conf.jobs_data[self.section].get("RESERVATION",as_conf.platforms_data.get(job_platform.name, {}).get("RESERVATION", ""))) - self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING","none"))) - if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int(self.processors) > int(job_platform.processors_per_node): - self.tasks = job_platform.processors_per_node - self.memory = str(as_conf.jobs_data[self.section].get("MEMORY",as_conf.platforms_data.get(job_platform.name,{}).get("MEMORY",""))) - self.memory_per_task = str(as_conf.jobs_data[self.section].get("MEMORY_PER_TASK",as_conf.platforms_data.get(job_platform.name,{}).get("MEMORY_PER_TASK",""))) - # These are to activate serial platform if neccesary - self.queue = self.queue - self.partition = self.partition - self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK",as_conf.platforms_data.get(self.platform_name,{}).get("MAX_WALLCLOCK",None)) - if self.wallclock is None and job_platform.type not in ['ps',"local","PS","LOCAL"]: + def process_scheduler_parameters(self,as_conf,parameters,job_platform,chunk): + """ + Parsers yaml data stored in the dictionary + and calculates the components of the heterogeneous job if any + :return: + """ + hetsize = 0 + if type(self.processors) is list: + hetsize = (len(self.processors)) + else: + hetsize = 1 + if type(self.nodes) is list: + hetsize = max(hetsize,len(self.nodes)) + self.het['HETSIZE'] = hetsize + self.het['PROCESSORS'] = list() + self.het['NODES'] = list() + self.het['NUMTHREADS'] = self.het['THREADS'] = list() + self.het['TASKS'] = list() + self.het['MEMORY'] = list() + self.het['MEMORY_PER_TASK'] = list() + self.het['RESERVATION'] = list() + self.het['EXCLUSIVE'] = list() + self.het['HYPERTHREADING'] = list() + self.het['EXECUTABLE'] = list() + self.het['CURRENT_QUEUE'] = list() + self.het['PARTITION'] = list() + self.het['CURRENT_PROJ'] = list() + self.het['CUSTOM_DIRECTIVES'] = list() + if type(self.processors) is list: + self.het['PROCESSORS'] = list() + for x in self.processors: + self.het['PROCESSORS'].append(str(x)) + # Sum processors, each element can be a str or int + self.processors = str(sum([int(x) for x in self.processors])) + else: + self.processors = str(self.processors) + if type(self.nodes) is list: + # add it to heap dict as it were originally + self.het['NODES'] = list() + for x in self.nodes: + self.het['NODES'].append(str(x)) + # Sum nodes, each element can be a str or int + self.nodes = str(sum([int(x) for x in self.nodes])) + else: + self.nodes = str(self.nodes) + if type(self.threads) is list: + # Get the max threads, each element can be a str or int + self.het['NUMTHREADS'] = list() + if len(self.threads) == 1: + for x in range(self.het['HETSIZE']): + self.het['NUMTHREADS'].append(self.threads) + else: + for x in self.threads: + self.het['NUMTHREADS'].append(str(x)) + + self.threads = str(max([int(x) for x in self.threads])) + + else: + self.threads = str(self.threads) + if type(self.tasks) is list: + # Get the max tasks, each element can be a str or int + self.het['TASKS'] = list() + if len(self.tasks) == 1: + if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int( + self.processors) > int(job_platform.processors_per_node): + self.tasks = job_platform.processors_per_node + for task in range(self.het['HETSIZE']): + if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( + self.processors) > int(job_platform.processors_per_node): + self.het['TASKS'].append(str(job_platform.processors_per_node)) + else: + self.het['TASKS'].append(str(self.tasks)) + self.tasks = str(max([int(x) for x in self.tasks])) + else: + for task in self.tasks: + if int(task) <= 1 < int(job_platform.processors_per_node) and int( + self.processors) > int(job_platform.processors_per_node): + task = job_platform.processors_per_node + self.het['TASKS'].append(str(task)) + else: + if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( + self.processors) > int(job_platform.processors_per_node): + self.tasks = job_platform.processors_per_node + self.tasks = str(self.tasks) + + if type(self.memory) is list: + # Get the max memory, each element can be a str or int + self.het['MEMORY'] = list() + if len(self.memory) == 1: + for x in range(self.het['HETSIZE']): + self.het['MEMORY'].append(self.memory) + else: + for x in self.memory: + self.het['MEMORY'].append(str(x)) + self.memory = str(max([int(x) for x in self.memory])) + else: + self.memory = str(self.memory) + if type(self.memory_per_task) is list: + # Get the max memory per task, each element can be a str or int + self.het['MEMORY_PER_TASK'] = list() + if len(self.memory_per_task) == 1: + for x in range(self.het['HETSIZE']): + self.het['MEMORY_PER_TASK'].append(self.memory_per_task) + + else: + for x in self.memory_per_task: + self.het['MEMORY_PER_TASK'].append(str(x)) + self.memory_per_task = str(max([int(x) for x in self.memory_per_task])) + + else: + self.memory_per_task = str(self.memory_per_task) + if type(self.reservation) is list: + # Get the reservation name, each element can be a str + self.het['RESERVATION'] = list() + if len(self.reservation) == 1: + for x in range(self.het['HETSIZE']): + self.het['RESERVATION'].append(self.reservation) + else: + for x in self.reservation: + self.het['RESERVATION'].append(str(x)) + self.reservation = str(self.het['RESERVATION'][0]) + else: + self.reservation = str(self.reservation) + if type(self.exclusive) is list: + # Get the exclusive, each element can be only be bool + self.het['EXCLUSIVE'] = list() + if len(self.exclusive) == 1: + for x in range(self.het['HETSIZE']): + self.het['EXCLUSIVE'].append(self.exclusive) + else: + for x in self.exclusive: + self.het['EXCLUSIVE'].append(x) + self.exclusive = self.het['EXCLUSIVE'][0] + else: + self.exclusive = self.exclusive + if type(self.hyperthreading) is list: + # Get the hyperthreading, each element can be only be bool + self.het['HYPERTHREADING'] = list() + if len(self.hyperthreading) == 1: + for x in range(self.het['HETSIZE']): + self.het['HYPERTHREADING'].append(self.hyperthreading) + else: + for x in self.hyperthreading: + self.het['HYPERTHREADING'].append(x) + self.exclusive = self.het['HYPERTHREADING'][0] + else: + self.hyperthreading = self.hyperthreading + if type(self.executable) is list: + # Get the executable, each element can be only be bool + self.het['EXECUTABLE'] = list() + if len(self.executable) == 1: + for x in range(self.het['HETSIZE']): + self.het['EXECUTABLE'].append(self.executable) + else: + for x in self.executable: + self.het['EXECUTABLE'].append(x) + self.executable = str(self.het['EXECUTABLE'][0]) + else: + self.executable = self.executable + if type(self.queue) is list: + # Get the queue, each element can be only be bool + self.het['CURRENT_QUEUE'] = list() + if len(self.queue) == 1: + for x in range(self.het['HETSIZE']): + self.het['CURRENT_QUEUE'].append(self.queue) + else: + for x in self.queue: + self.het['CURRENT_QUEUE'].append(x) + self.queue = self.het['CURRENT_QUEUE'][0] + else: + self.queue = self.queue + if type(self.partition) is list: + # Get the partition, each element can be only be bool + self.het['PARTITION'] = list() + if len(self.partition) == 1: + for x in range(self.het['HETSIZE']): + self.het['PARTITION'].append(self.partition) + else: + for x in self.partition: + self.het['PARTITION'].append(x) + self.partition = self.het['PARTITION'][0] + else: + self.partition = self.partition + + self.het['CUSTOM_DIRECTIVES'] = list() + if type(self.custom_directives) is list: + self.custom_directives = json.dumps(self.custom_directives) + self.custom_directives = self.custom_directives.replace("\'", "\"").strip("[]").strip(", ") + if self.custom_directives == '': + if job_platform.custom_directives is None: + job_platform.custom_directives = '' + self.custom_directives = job_platform.custom_directives.replace("\'", "\"").strip("[]").strip(", ") + if self.custom_directives != '': + if self.custom_directives[0] != "\"": + self.custom_directives = "\"" + self.custom_directives + if self.custom_directives[-1] != "\"": + self.custom_directives = self.custom_directives + "\"" + self.custom_directives = "[" + self.custom_directives + "]" + custom_directives = self.custom_directives.split("],") + if len(custom_directives) > 1: + for custom_directive in custom_directives: + if custom_directive[-1] != "]": + custom_directive = custom_directive + "]" + self.het['CUSTOM_DIRECTIVES'].append(json.loads(custom_directive)) + self.custom_directives = self.het['CUSTOM_DIRECTIVES'][0] + else: + self.custom_directives = json.loads(self.custom_directives) + if len(self.het['CUSTOM_DIRECTIVES']) < self.het['HETSIZE']: + for x in range(self.het['HETSIZE'] - len(self.het['CUSTOM_DIRECTIVES'])): + self.het['CUSTOM_DIRECTIVES'].append(self.custom_directives ) + else: + self.custom_directives = [] + + for x in range(self.het['HETSIZE']): + self.het['CUSTOM_DIRECTIVES'].append(self.custom_directives) + # Ignore the heterogeneous parameters if the cores or nodes are no specefied as a list + if self.het['HETSIZE'] == 1: + self.het = dict() + if self.wallclock is None and job_platform.type not in ['ps', "local", "PS", "LOCAL"]: self.wallclock = "01:59" - elif self.wallclock is None and job_platform.type in ['ps','local',"PS","LOCAL"]: + elif self.wallclock is None and job_platform.type in ['ps', 'local', "PS", "LOCAL"]: self.wallclock = "00:00" # Increasing according to chunk self.wallclock = increase_wallclock_by_chunk( self.wallclock, self.wchunkinc, chunk) + + def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): + self.ec_queue = str(as_conf.jobs_data[self.section].get("EC_QUEUE", as_conf.platforms_data.get(job_platform.name,{}).get("EC_QUEUE",""))) + + self.executable = as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE","")) + self.total_jobs = as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs) + self.max_waiting_jobs = as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs) + self.processors = as_conf.jobs_data[self.section].get("PROCESSORS",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS","1")) + self.nodes = as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES","")) + self.exclusive = as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False)) + self.threads = as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1")) + self.tasks = as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1")) + self.reservation = as_conf.jobs_data[self.section].get("RESERVATION",as_conf.platforms_data.get(job_platform.name, {}).get("RESERVATION", "")) + self.hyperthreading = as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING","none")) + self.queue = self.queue + self.partition = self.partition self.scratch_free_space = int(as_conf.jobs_data[self.section].get("SCRATCH_FREE_SPACE",as_conf.platforms_data.get(job_platform.name,{}).get("SCRATCH_FREE_SPACE",0))) - try: - self.custom_directives = as_conf.jobs_data[self.section].get("CUSTOM_DIRECTIVES","") - if type(self.custom_directives) is list: - self.custom_directives = json.dumps(self.custom_directives) - self.custom_directives = self.custom_directives.replace("\'", "\"").strip("[]").strip(", ") - if self.custom_directives == '': - if job_platform.custom_directives is None: - job_platform.custom_directives = '' - self.custom_directives = job_platform.custom_directives.replace("\'", "\"").strip("[]").strip(", ") - if self.custom_directives != '': - if self.custom_directives[0] != "\"": - self.custom_directives = "\""+self.custom_directives - if self.custom_directives[-1] != "\"": - self.custom_directives = self.custom_directives+"\"" - self.custom_directives = "[" + self.custom_directives + "]" - self.custom_directives = json.loads(self.custom_directives) - else: - self.custom_directives = [] - except BaseException as e: - raise AutosubmitCritical(f"Error in CUSTOM_DIRECTIVES({self.custom_directives}) for job {self.section}",7014,str(e)) + + self.memory = as_conf.jobs_data[self.section].get("MEMORY",as_conf.platforms_data.get(job_platform.name,{}).get("MEMORY","")) + self.memory_per_task = as_conf.jobs_data[self.section].get("MEMORY_PER_TASK",as_conf.platforms_data.get(job_platform.name,{}).get("MEMORY_PER_TASK","")) + self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK", + as_conf.platforms_data.get(self.platform_name, {}).get( + "MAX_WALLCLOCK", None)) + self.custom_directives = as_conf.jobs_data[self.section].get("CUSTOM_DIRECTIVES", "") + + self.process_scheduler_parameters(as_conf,parameters,job_platform,chunk) + if self.het.get('HETSIZE',1) > 1: + for name, components_value in self.het.items(): + if name != "HETSIZE": + for indx,component in enumerate(components_value): + if indx == 0: + parameters[name.upper()] = component + parameters[f'{name.upper()}_{indx}'] = component + parameters['NUMPROC'] = self.processors parameters['PROCESSORS'] = self.processors parameters['MEMORY'] = self.memory @@ -1356,6 +1571,7 @@ class Job(object): parameters['CURRENT_QUEUE'] = self.queue parameters['RESERVATION'] = self.reservation parameters['CURRENT_EC_QUEUE'] = self.ec_queue + return parameters def update_wrapper_parameters(self,as_conf, parameters): @@ -1542,7 +1758,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 10' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: @@ -1718,7 +1934,6 @@ class Job(object): variables_tmp = [variable for variable in variables_tmp if variable not in self.default_parameters] variables.extend(variables_tmp) out = set(parameters).issuperset(set(variables)) - # Check if the variables in the templates are defined in the configurations if not out: self.undefined_variables = set(variables) - set(parameters) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index ee6ea4aacf0a2af4e2d441b3fcb8d69c0f03e4a8..ebdbf3d7c4e5d005d3159f619f0592ce76cfd789 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -58,6 +58,7 @@ class JobPackageBase(object): self.hold = False # type: bool self.export = jobs[0].export self.x11 = jobs[0].x11 + self.het = dict() try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0]._platform @@ -368,6 +369,9 @@ class JobPackageThread(JobPackageBase): # and from the JobPackageThread.create_scripts function # It is in charge of merging ( switch ) the wrapper info by checking if the value is defined by the user in the wrapper section, current wrapper section, job or platform in that order. # Some variables are calculated in futher functions, like num_processors and wallclock. + # These variables can only be present in the wrapper itself + self.parameters = dict() + self.wallclock = '00:00' if len(wrapper_info) > 0 : self.wrapper_type = wrapper_info[0] self.wrapper_policy = wrapper_info[1] @@ -392,6 +396,7 @@ class JobPackageThread(JobPackageBase): self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section self.inner_retrials = 0 + # temporal hetjob code , to be upgraded in the future if configuration is not None: self.inner_retrials = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("RETRIALS", @@ -403,93 +408,128 @@ class JobPackageThread(JobPackageBase): self.export = configuration.get_wrapper_export(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if self.export.lower() != "none" and len(self.export) > 0: for job in self.jobs: - if job.export.lower() != "none" and len(job.export) > 0: + if job.export.lower() not in "none" and len(job.export) > 0: self.export = job.export break - wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) - if wr_queue is not None and len(str(wr_queue)) > 0: - self.queue = wr_queue - else: - self.queue = jobs[0].queue - wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) - if wr_partition and len(str(wr_partition)) > 0: - self.partition = wr_partition - else: - self.partition = jobs[0].partition - wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) - if wr_exclusive is not None: - self.exclusive = wr_exclusive - else: - self.exclusive = jobs[0].exclusive - wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) - # parse custom_directives - if type(wr_custom_directives) is list: - wr_custom_directives = json.dumps(wr_custom_directives) - wr_custom_directives = wr_custom_directives.replace("\'", "\"").strip("[]").strip(", ") - if wr_custom_directives == '': - if jobs[0].custom_directives is None: - jobs[0].custom_directives = '' - wr_custom_directives = jobs[0].custom_directives - if type(wr_custom_directives) is list: - wr_custom_directives = json.dumps(wr_custom_directives) - wr_custom_directives = wr_custom_directives.replace("\'", "\"").strip("[]").strip(", ") - if wr_custom_directives != '': - if wr_custom_directives[0] != "\"": - wr_custom_directives = "\""+wr_custom_directives - if wr_custom_directives[-1] != "\"": - wr_custom_directives = wr_custom_directives+"\"" - wr_custom_directives = "[" + wr_custom_directives + "]" - wr_custom_directives = json.loads(wr_custom_directives) - else: - wr_custom_directives = [] - if len(str(wr_custom_directives)) > 0: - self.custom_directives = wr_custom_directives - else: - self.custom_directives = jobs[0].custom_directives - wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXECUTABLE",None) + wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get( + "EXECUTABLE", None) if wr_executable: self.executable = wr_executable else: self.executable = jobs[0].executable - wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) - if wr_tasks: - self.tasks = wr_tasks - else: - self.tasks = jobs[0].tasks - wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) - if wr_nodes: - self.nodes = wr_nodes + if jobs[0].het.get("HETSIZE", 1) <= 1: + wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) + if wr_queue is not None and len(str(wr_queue)) > 0: + self.queue = wr_queue + self.parameters["CURRENT_QUEUE"] = wr_queue + else: + self.queue = jobs[0].queue + self.parameters["CURRENT_QUEUE"] = jobs[0].queue + + wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) + if wr_partition and len(str(wr_partition)) > 0: + self.partition = wr_partition + else: + self.partition = jobs[0].partition + wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) + if wr_exclusive is not None: + self.exclusive = wr_exclusive + else: + self.exclusive = jobs[0].exclusive + wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) + # parse custom_directives + if type(wr_custom_directives) is list: + wr_custom_directives = json.dumps(wr_custom_directives) + wr_custom_directives = wr_custom_directives.replace("\'", "\"").strip("[]").strip(", ") + if wr_custom_directives == '': + if jobs[0].custom_directives is None: + jobs[0].custom_directives = '' + wr_custom_directives = jobs[0].custom_directives + if type(wr_custom_directives) is list: + wr_custom_directives = json.dumps(wr_custom_directives) + wr_custom_directives = wr_custom_directives.replace("\'", "\"").strip("[]").strip(", ") + if wr_custom_directives != '': + if wr_custom_directives[0] != "\"": + wr_custom_directives = "\""+wr_custom_directives + if wr_custom_directives[-1] != "\"": + wr_custom_directives = wr_custom_directives+"\"" + wr_custom_directives = "[" + wr_custom_directives + "]" + wr_custom_directives = json.loads(wr_custom_directives) + else: + wr_custom_directives = [] + if len(str(wr_custom_directives)) > 0: + self.custom_directives = wr_custom_directives + else: + self.custom_directives = jobs[0].custom_directives + + wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) + if wr_tasks: + self.tasks = wr_tasks + else: + self.tasks = jobs[0].tasks + wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) + if wr_nodes: + self.nodes = wr_nodes + else: + self.nodes = jobs[0].nodes + wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) + if wr_threads: + self.threads = wr_threads + else: + self.threads = jobs[0].threads else: + self.queue = jobs[0].queue + self.parameters["CURRENT_QUEUE"] = jobs[0].queue + self.partition = jobs[0].partition self.nodes = jobs[0].nodes - wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) - if wr_threads: - self.threads = wr_threads - else: + self.tasks = jobs[0].tasks self.threads = jobs[0].threads + self.exclusive = jobs[0].exclusive + self.custom_directives = jobs[0].custom_directives else: self.queue = jobs[0].queue + self.parameters["CURRENT_QUEUE"] = jobs[0].queue self.partition = jobs[0].partition self.nodes = jobs[0].nodes self.tasks = jobs[0].tasks self.threads = jobs[0].threads + self.exclusive = jobs[0].exclusive + self.custom_directives = jobs[0].custom_directives + self.parameters["CURRENT_PROJ"] = self._project + self.parameters["NUMTHREADS"] = self.threads + self.het = jobs[0].het + + # Memory needs more work outside this branch + self.parameters["MEMORY"] = jobs[0].memory + self.memory = jobs[0].memory + self.parameters["MEMORY_PER_TASK"] = jobs[0].memory_per_task + self.memory_per_task = jobs[0].memory_per_task + self.parameters["NODES"] = self.nodes + self.processors = self._num_processors + self.parameters["RESERVATION"] = jobs[0].reservation # have to look + self.parameters['TASKS'] = self.tasks + self.parameters["EXECUTABLE"] = self.executable # have to look self.method = method self._wrapper_data = configuration.experiment_data["WRAPPERS"][self.current_wrapper_section] - self._wrapper_data["TYPE"] = self.wrapper_type - self._wrapper_data["WRAPPER_POLICY"] = self.wrapper_policy - self._wrapper_data["INNER_RETRIALS"] = self.inner_retrials - self._wrapper_data["RETRIALS"] = self.inner_retrials - self._wrapper_data["EXTEND_WALLCLOCK"] = self.extensible_wallclock - self._wrapper_data["METHOD"] = self.wrapper_method - self._wrapper_data["EXPORT"] = self.export - self._wrapper_data["QUEUE"] = self.queue - self._wrapper_data["NODES"] = self.nodes - self._wrapper_data["TASKS"] = self.tasks - self._wrapper_data["THREADS"] = self.threads - self._wrapper_data["PROCESSORS"] = self._num_processors - self._wrapper_data["PARTITION"] = self.partition - self._wrapper_data["EXCLUSIVE"] = self.exclusive - self._wrapper_data["EXECUTABLE"] = self.executable - self._wrapper_data["CUSTOM_DIRECTIVES"] = self.custom_directives + self._wrapper_data["WRAPPER"] = self + + # self._wrapper_data["TYPE"] = self.wrapper_type + # self._wrapper_data["WRAPPER_POLICY"] = self.wrapper_policy + # self._wrapper_data["INNER_RETRIALS"] = self.inner_retrials + # self._wrapper_data["RETRIALS"] = self.inner_retrials + # self._wrapper_data["EXTEND_WALLCLOCK"] = self.extensible_wallclock + # self._wrapper_data["METHOD"] = self.wrapper_method + # self._wrapper_data["EXPORT"] = self.export + # self._wrapper_data["QUEUE"] = self.queue + # self._wrapper_data["NODES"] = self.nodes + # self._wrapper_data["TASKS"] = self.tasks + # self._wrapper_data["THREADS"] = self.threads + # self._wrapper_data["PROCESSORS"] = self._num_processors + # self._wrapper_data["PARTITION"] = self.partition + # self._wrapper_data["EXCLUSIVE"] = self.exclusive + # self._wrapper_data["EXECUTABLE"] = self.executable + # self._wrapper_data["CUSTOM_DIRECTIVES"] = self.custom_directives + # self._wrapper_data["HET"] = self.het @property def name(self): return self._name @@ -752,7 +792,7 @@ class JobPackageVertical(JobPackageThread): num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, - directives=self._custom_directives,threads=self._threads,method=self.method.lower(),retrials=self.inner_retrials, wallclock_by_level=wallclock_by_level,partition=self.partition,wrapper_data=self._wrapper_data,num_processors_value=self._num_processors) + directives=self._custom_directives,threads=self._threads,method=self.method.lower(),retrials=self.inner_retrials, wallclock_by_level=wallclock_by_level,partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) class JobPackageHorizontal(JobPackageThread): @@ -782,7 +822,7 @@ class JobPackageHorizontal(JobPackageThread): num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, - directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self._wrapper_data,num_processors_value=self._num_processors) + directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) class JobPackageHybrid(JobPackageThread): """ @@ -827,7 +867,7 @@ class JobPackageVerticalHorizontal(JobPackageHybrid): wallclock=self._wallclock, num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, - rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self._wrapper_data,num_processors_value=self._num_processors) + rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) class JobPackageHorizontalVertical(JobPackageHybrid): @@ -838,5 +878,5 @@ class JobPackageHorizontalVertical(JobPackageHybrid): wallclock=self._wallclock, num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, - rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self._wrapper_data,num_processors_value=self._num_processors) + rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index accb2362c349e2156d090ac769fd09df5ca7dab5..fe3721c21af5d62026145ff794c36b5108a22f16 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -24,7 +24,7 @@ class SlurmHeader(object): """Class to handle the SLURM headers of a job""" # noinspection PyMethodMayBeStatic,PyUnusedLocal - def get_queue_directive(self, job): + def get_queue_directive(self, job, het=-1): """ Returns queue directive for the specified job @@ -33,12 +33,16 @@ class SlurmHeader(object): :return: queue directive :rtype: str """ + + # There is no queue, so directive is empty - if job.parameters['CURRENT_QUEUE'] == '': - return "" + if het > -1 and len(job.het['CURRENT_QUEUE']) > 0: + if job.het['CURRENT_QUEUE'][het] != '': + return "SBATCH --qos={0}".format(job.het['CURRENT_QUEUE'][het]) else: - return "SBATCH --qos={0}".format(job.parameters['CURRENT_QUEUE']) - def get_proccesors_directive(self, job): + if job.parameters['CURRENT_QUEUE'] != '': + return "SBATCH --qos={0}".format(job.parameters['CURRENT_QUEUE']) + def get_proccesors_directive(self, job, het=-1): """ Returns processors directive for the specified job @@ -47,7 +51,15 @@ class SlurmHeader(object): :return: processors directive :rtype: str """ - # There is no processors, so directive is empty + if het > -1 and len(job.het['NODES']) > 0: + if job.het['NODES'][het] == '': + job_nodes = 0 + else: + job_nodes = job.het['NODES'][het] + if len(job.het['PROCESSORS']) == 0 or job.het['PROCESSORS'][het] == '' or job.het['PROCESSORS'][het] == '1' and int(job_nodes) > 1: + return "" + else: + return "SBATCH -n {0}".format(job.het['PROCESSORS'][het]) if job.nodes == "": job_nodes = 0 else: @@ -56,18 +68,23 @@ class SlurmHeader(object): return "" else: return "SBATCH -n {0}".format(job.processors) - def get_tasks_directive(self,job): + + + def get_tasks_directive(self,job, het=-1): """ Returns tasks directive for the specified job :param job: job to create tasks directive for :return: tasks directive :rtype: str """ - if job.num_tasks == '': - return "" + if het > -1 and len(job.het['TASKS']) > 0: + if job.het['TASKS'][het] != '': + return "SBATCH --ntasks-per-node={0}".format(job.het['TASKS'][het]) else: - return "SBATCH --ntasks-per-node {0}".format(job.tasks) - def get_partition_directive(self, job): + if job.parameters['TASKS'] != '': + return "SBATCH --ntasks-per-node={0}".format(job.parameters['TASKS']) + return "" + def get_partition_directive(self, job, het=-1): """ Returns partition directive for the specified job @@ -76,13 +93,15 @@ class SlurmHeader(object): :return: partition directive :rtype: str """ - # There is no partition, so directive is empty - if job.partition == '': - return "" + if het > -1 and len(job.het['PARTITION']) > 0: + if job.het['PARTITION'][het] != '': + return "SBATCH --partition={0}".format(job.het['PARTITION'][het]) else: - return "SBATCH --partition={0}".format(job.partition) + if job.partition != '': + return "SBATCH --partition={0}".format(job.partition) + return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal - def get_account_directive(self, job): + def get_account_directive(self, job, het=-1): """ Returns account directive for the specified job @@ -91,11 +110,14 @@ class SlurmHeader(object): :return: account directive :rtype: str """ - # There is no account, so directive is empty - if job.parameters['CURRENT_PROJ'] != '': - return "SBATCH -A {0}".format(job.parameters['CURRENT_PROJ']) + if het > -1 and len(job.het['CURRENT_PROJ']) > 0: + if job.het['CURRENT_PROJ'][het] != '': + return "SBATCH -A {0}".format(job.het['CURRENT_PROJ'][het]) + else: + if job.parameters['CURRENT_PROJ'] != '': + return "SBATCH -A {0}".format(job.parameters['CURRENT_PROJ']) return "" - def get_exclusive_directive(self, job): + def get_exclusive_directive(self, job, het=-1): """ Returns account directive for the specified job @@ -104,12 +126,15 @@ class SlurmHeader(object): :return: account directive :rtype: str """ - # There is no account, so directive is empty - if job.exclusive: - return "SBATCH --exclusive" + if het > -1 and len(job.het['EXCLUSIVE']) > 0: + if job.het['EXCLUSIVE'][het] != '': + return "SBATCH --exclusive" + else: + if job.parameters['EXCLUSIVE'] != '': + return "SBATCH --exclusive" return "" - def get_nodes_directive(self, job): + def get_nodes_directive(self, job, het=-1): """ Returns nodes directive for the specified job :param job: job to create nodes directive for @@ -117,13 +142,15 @@ class SlurmHeader(object): :return: nodes directive :rtype: str """ - # There is no account, so directive is empty - nodes = job.parameters.get('NODES',"") - if nodes != '': - return "SBATCH -N {0}".format(nodes) + if het > -1 and len(job.het['NODES']) > 0: + if job.het['NODES'][het] != '': + return "SBATCH --nodes={0}".format(job.het['NODES'][het]) + else: + if job.parameters['NODES'] != '': + return "SBATCH --nodes={0}".format(job.parameters['NODES']) return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal - def get_memory_directive(self, job): + def get_memory_directive(self, job, het=-1): """ Returns memory directive for the specified job @@ -132,13 +159,16 @@ class SlurmHeader(object): :return: memory directive :rtype: str """ - # There is no memory, so directive is empty - if job.parameters['MEMORY'] != '': - return "SBATCH --mem {0}".format(job.parameters['MEMORY']) + if het > -1 and len(job.het['MEMORY']) > 0: + if job.het['MEMORY'][het] != '': + return "SBATCH --mem={0}".format(job.het['MEMORY'][het]) + else: + if job.parameters['MEMORY'] != '': + return "SBATCH --mem={0}".format(job.parameters['MEMORY']) return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal - def get_memory_per_task_directive(self, job): + def get_memory_per_task_directive(self, job, het=-1): """ Returns memory per task directive for the specified job @@ -147,26 +177,50 @@ class SlurmHeader(object): :return: memory per task directive :rtype: str """ - # There is no memory per task, so directive is empty - if job.parameters['MEMORY_PER_TASK'] != '': - return "SBATCH --mem-per-cpu {0}".format(job.parameters['MEMORY_PER_TASK']) + if het > -1 and len(job.het['MEMORY_PER_TASK']) > 0: + if job.het['MEMORY_PER_TASK'][het] != '': + return "SBATCH --mem-per-cpu={0}".format(job.het['MEMORY_PER_TASK'][het]) + else: + if job.parameters['MEMORY_PER_TASK'] != '': + return "SBATCH --mem-per-cpu={0}".format(job.parameters['MEMORY_PER_TASK']) return "" + def get_threads_per_task(self, job, het=-1): + """ + Returns threads per task directive for the specified job - def get_threads_per_task(self, job): - if job.parameters['NUMTHREADS'] == '': - return "" + :param job: job to create threads per task directive for + :type job: Job + :return: threads per task directive + :rtype: str + """ + # There is no threads per task, so directive is empty + if het > -1 and len(job.het['NUMTHREADS']) > 0: + if job.het['NUMTHREADS'][het] != '': + return f"SBATCH --cpus-per-task={job.het['NUMTHREADS'][het]}" else: - return "SBATCH --cpus-per-task={0}".format(job.parameters['NUMTHREADS']) + if job.parameters['NUMTHREADS'] != '': + return "SBATCH --cpus-per-task={0}".format(job.parameters['NUMTHREADS']) + return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal - def get_reservation_directive(self, job): - if job.parameters['RESERVATION'] == '': - return "" + def get_reservation_directive(self, job, het=-1): + """ + Returns reservation directive for the specified job + :param job: + :param het: + :return: + """ + + if het > -1 and len(job.het['RESERVATION']) > 0: + if job.het['RESERVATION'][het] != '': + return "SBATCH --reservation={0}".format(job.het['RESERVATION'][het]) else: - return "SBATCH --reservation={0}".format(job.parameters['RESERVATION']) + if job.parameters['RESERVATION'] != '': + return "SBATCH --reservation={0}".format(job.parameters['RESERVATION']) + return "" - def get_custom_directives(self, job): + def get_custom_directives(self, job, het=-1): """ Returns custom directives for the specified job @@ -176,13 +230,17 @@ class SlurmHeader(object): :rtype: str """ # There is no custom directives, so directive is empty - if job.parameters['CUSTOM_DIRECTIVES'] != '': - return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + if het > -1 and len(job.het['CUSTOM_DIRECTIVES']) > 0: + if job.het['CUSTOM_DIRECTIVES'][het] != '': + return '\n'.join(str(s) for s in job.het['CUSTOM_DIRECTIVES'][het]) + else: + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) return "" - def get_tasks_per_node(self, job): + def get_tasks_per_node(self, job, het=-1): """ Returns memory per task directive for the specified job @@ -191,10 +249,172 @@ class SlurmHeader(object): :return: tasks per node directive :rtype: str """ - if int(job.parameters['TASKS']) > 1: - return "SBATCH --tasks-per-node={0}".format(job.parameters['TASKS']) + if het > -1 and len(job.het['TASKS']) > 0: + if int(job.het['TASKS'][het]): + return "SBATCH --ntasks-per-node={0}".format(job.het['TASKS'][het]) + else: + if int(job.parameters['TASKS']) > 1: + return "SBATCH --ntasks-per-node={0}".format(job.parameters['TASKS']) return "" + def wrapper_header(self, **kwargs): + + wr_header = f""" +############################################################################### +# {kwargs["name"].split("_")[0] + "_Wrapper"} +############################################################################### +""" + if kwargs["wrapper_data"].het.get("HETSIZE",1) <= 1: + wr_header += f""" +#SBATCH -J {kwargs["name"]} +{kwargs["queue"]} +{kwargs["partition"]} +{kwargs["dependency"]} +#SBATCH -A {kwargs["project"]} +#SBATCH --output={kwargs["name"]}.out +#SBATCH --error={kwargs["name"]}.err +#SBATCH -t {kwargs["wallclock"]}:00 +{kwargs["threads"]} +{kwargs["nodes"]} +{kwargs["num_processors"]} +{kwargs["tasks"]} +{kwargs["exclusive"]} +{kwargs["custom_directives"]} + +# + """ + else: + wr_header = self.calculate_wrapper_het_header(kwargs["wrapper_data"]) + if kwargs["method"] == 'srun': + language = kwargs["executable"] + if language is None or len(language) == 0: + language = "#!/bin/bash" + return language + wr_header + else: + language = kwargs["executable"] + if language is None or len(language) == 0 or "bash" in language: + language = "#!/usr/bin/env python3" + return language + wr_header + def hetjob_common_header(self,hetsize,wrapper=None): + if not wrapper: + header = textwrap.dedent("""\ + + ############################################################################### + # %TASKTYPE% %DEFAULT.EXPID% EXPERIMENT + ############################################################################### + # Common directives + ############################################################################### + # + #SBATCH -t %WALLCLOCK%:00 + #SBATCH -J %JOBNAME% + #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% + #SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% + #%X11% + # + """) + else: + header = f""" +############################################################################### +# {wrapper.name.split("_")[0] + "_Wrapper"} +############################################################################### +#SBATCH -J {wrapper.name} +#SBATCH --output={wrapper._platform.remote_log_dir}/{wrapper.name}.out +#SBATCH --error={wrapper._platform.remote_log_dir}/{wrapper.name}.err +#SBATCH -t {wrapper.wallclock}:00 +# +########################################################################################### +""" + for components in range(hetsize): + header += textwrap.dedent(f"""\ + ############################################################################### + # HET_GROUP:{components} + ############################################################################### + #%QUEUE_DIRECTIVE_{components}% + #%PARTITION_DIRECTIVE_{components}% + #%ACCOUNT_DIRECTIVE_{components}% + #%MEMORY_DIRECTIVE_{components}% + #%MEMORY_PER_TASK_DIRECTIVE_{components}% + #%THREADS_PER_TASK_DIRECTIVE_{components}% + #%NODES_DIRECTIVE_{components}% + #%NUMPROC_DIRECTIVE_{components}% + #%RESERVATION_DIRECTIVE_{components}% + #%TASKS_PER_NODE_DIRECTIVE_{components}% + %CUSTOM_DIRECTIVES_{components}% + #SBATCH hetjob + """) + return header + + def calculate_wrapper_het_header(self, wr_job): + hetsize = wr_job.het["HETSIZE"] + header = self.hetjob_common_header(hetsize,wr_job) + for components in range(hetsize): + header = header.replace( + f'%QUEUE_DIRECTIVE_{components}%', self.get_queue_directive(wr_job, components)) + header = header.replace( + f'%PARTITION_DIRECTIVE_{components}%', self.get_partition_directive(wr_job, components)) + header = header.replace( + f'%ACCOUNT_DIRECTIVE_{components}%', self.get_account_directive(wr_job, components)) + header = header.replace( + f'%MEMORY_DIRECTIVE_{components}%', self.get_memory_directive(wr_job, components)) + header = header.replace( + f'%MEMORY_PER_TASK_DIRECTIVE_{components}%', self.get_memory_per_task_directive(wr_job, components)) + header = header.replace( + f'%THREADS_PER_TASK_DIRECTIVE_{components}%', self.get_threads_per_task(wr_job, components)) + header = header.replace( + f'%NODES_DIRECTIVE_{components}%', self.get_nodes_directive(wr_job, components)) + header = header.replace( + f'%NUMPROC_DIRECTIVE_{components}%', self.get_proccesors_directive(wr_job, components)) + header = header.replace( + f'%RESERVATION_DIRECTIVE_{components}%', self.get_reservation_directive(wr_job, components)) + header = header.replace( + f'%TASKS_PER_NODE_DIRECTIVE_{components}%', self.get_tasks_per_node(wr_job, components)) + header = header.replace( + f'%CUSTOM_DIRECTIVES_{components}%', self.get_custom_directives(wr_job, components)) + header = header[:-len("#SBATCH hetjob\n")] # last element + + return header + + def calculate_het_header(self, job): + header = self.hetjob_common_header(hetsize=job.het["HETSIZE"]) + header = header.replace("%TASKTYPE%", job.section) + header = header.replace("%DEFAULT.EXPID%", job.expid) + header = header.replace("%WALLCLOCK%", job.wallclock) + header = header.replace("%JOBNAME%", job.name) + + if job.x11 == "true": + header = header.replace( + '%X11%', "SBATCH --x11=batch") + else: + header = header.replace('%X11%', "#") + + for components in range(job.het['HETSIZE']): + header = header.replace( + f'%QUEUE_DIRECTIVE_{components}%', self.get_queue_directive(job, components)) + header = header.replace( + f'%PARTITION_DIRECTIVE_{components}%', self.get_partition_directive(job, components)) + header = header.replace( + f'%ACCOUNT_DIRECTIVE_{components}%', self.get_account_directive(job, components)) + header = header.replace( + f'%MEMORY_DIRECTIVE_{components}%', self.get_memory_directive(job, components)) + header = header.replace( + f'%MEMORY_PER_TASK_DIRECTIVE_{components}%', self.get_memory_per_task_directive(job, components)) + header = header.replace( + f'%THREADS_PER_TASK_DIRECTIVE_{components}%', self.get_threads_per_task(job, components)) + header = header.replace( + f'%NODES_DIRECTIVE_{components}%', self.get_nodes_directive(job, components)) + header = header.replace( + f'%NUMPROC_DIRECTIVE_{components}%', self.get_proccesors_directive(job, components)) + header = header.replace( + f'%RESERVATION_DIRECTIVE_{components}%', self.get_reservation_directive(job, components)) + header = header.replace( + f'%TASKS_PER_NODE_DIRECTIVE_{components}%', self.get_tasks_per_node(job, components)) + header = header.replace( + f'%CUSTOM_DIRECTIVES_{components}%', self.get_custom_directives(job, components)) + header = header[:-len("#SBATCH hetjob\n")] # last element + + return header + + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %DEFAULT.EXPID% EXPERIMENT @@ -243,3 +463,4 @@ class SlurmHeader(object): # ############################################################################### """) + diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 62009dbd3cbe30e8f4a8461666fdc096fa418aab..9bfb28cac6bdd9767fd36c342afc1bc6975c5544 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1150,6 +1150,7 @@ class ParamikoPlatform(Platform): """ return 'nohup kill -0 {0} > /dev/null 2>&1; echo $?'.format(job_id) + def get_submitted_job_id(self, output, x11 = False): """ Parses submit command output to extract job id @@ -1170,68 +1171,74 @@ class ParamikoPlatform(Platform): :return: header to use :rtype: str """ - if str(job.processors) == '1': - header = self.header.SERIAL - else: - header = self.header.PARALLEL - str_datetime = date2str(datetime.datetime.now(), 'S') if str(job.wrapper_type).lower() != "vertical": out_filename = "{0}.cmd.out.{1}".format(job.name,job.fail_count) err_filename = "{0}.cmd.err.{1}".format(job.name,job.fail_count) else: out_filename = "{0}.cmd.out".format(job.name) err_filename = "{0}.cmd.err".format(job.name) - header = header.replace('%OUT_LOG_DIRECTIVE%', out_filename) - header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename) - if hasattr(self.header, 'get_queue_directive'): - header = header.replace( - '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) - if hasattr(self.header, 'get_proccesors_directive'): - header = header.replace( - '%NUMPROC_DIRECTIVE%', self.header.get_proccesors_directive(job)) - if hasattr(self.header, 'get_partition_directive'): - header = header.replace( - '%PARTITION_DIRECTIVE%', self.header.get_partition_directive(job)) - if hasattr(self.header, 'get_tasks_per_node'): - header = header.replace( - '%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) - if hasattr(self.header, 'get_threads_per_task'): - header = header.replace( - '%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) - if job.x11 == "true": - header = header.replace( - '%X11%', "SBATCH --x11=batch") + if len(job.het) > 0: + + header = self.header.calculate_het_header(job) + elif str(job.processors) == '1': + header = self.header.SERIAL else: - header = header.replace( - '%X11%', "") - if hasattr(self.header, 'get_scratch_free_space'): - header = header.replace( - '%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) - if hasattr(self.header, 'get_custom_directives'): - header = header.replace( - '%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) - if hasattr(self.header, 'get_exclusivity'): - header = header.replace( - '%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) - if hasattr(self.header, 'get_account_directive'): - header = header.replace( - '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) - if hasattr(self.header, 'get_nodes_directive'): - header = header.replace( - '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) - if hasattr(self.header, 'get_reservation_directive'): - header = header.replace( - '%RESERVATION_DIRECTIVE%', self.header.get_reservation_directive(job)) - if hasattr(self.header, 'get_memory_directive'): - header = header.replace( - '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) - if hasattr(self.header, 'get_memory_per_task_directive'): - header = header.replace( - '%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) - if hasattr(self.header, 'get_hyperthreading_directive'): - header = header.replace( - '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) + header = self.header.PARALLEL + + str_datetime = date2str(datetime.datetime.now(), 'S') + + header = header.replace('%OUT_LOG_DIRECTIVE%', out_filename) + header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename) + if job.het.get("HETSIZE",0) <= 1: + if hasattr(self.header, 'get_queue_directive'): + header = header.replace( + '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) + if hasattr(self.header, 'get_proccesors_directive'): + header = header.replace( + '%NUMPROC_DIRECTIVE%', self.header.get_proccesors_directive(job)) + if hasattr(self.header, 'get_partition_directive'): + header = header.replace( + '%PARTITION_DIRECTIVE%', self.header.get_partition_directive(job)) + if hasattr(self.header, 'get_tasks_per_node'): + header = header.replace( + '%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) + if hasattr(self.header, 'get_threads_per_task'): + header = header.replace( + '%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) + if job.x11 == "true": + header = header.replace( + '%X11%', "SBATCH --x11=batch") + else: + header = header.replace( + '%X11%', "") + if hasattr(self.header, 'get_scratch_free_space'): + header = header.replace( + '%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_custom_directives'): + header = header.replace( + '%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) + if hasattr(self.header, 'get_exclusivity'): + header = header.replace( + '%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) + if hasattr(self.header, 'get_account_directive'): + header = header.replace( + '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + if hasattr(self.header, 'get_nodes_directive'): + header = header.replace( + '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) + if hasattr(self.header, 'get_reservation_directive'): + header = header.replace( + '%RESERVATION_DIRECTIVE%', self.header.get_reservation_directive(job)) + if hasattr(self.header, 'get_memory_directive'): + header = header.replace( + '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) + if hasattr(self.header, 'get_memory_per_task_directive'): + header = header.replace( + '%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) + if hasattr(self.header, 'get_hyperthreading_directive'): + header = header.replace( + '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header def parse_time(self,wallclock): # noinspection Annotator diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index a64d386e8375f9e19a4f01f1b2a5b76751503b0a..36dfa53e2bd1e60e187299f5abbf64ad7e0b0edb 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -31,7 +31,6 @@ from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log - class SlurmPlatform(ParamikoPlatform): """ Class to manage jobs to host using SLURM scheduler @@ -484,7 +483,7 @@ class SlurmPlatform(ParamikoPlatform): status = "" try: status = [x.split()[1] for x in output.splitlines() - if x.split()[0] == str(job_id)] + if x.split()[0][:len(job_id)] == str(job_id)] except BaseException as e: pass if len(status) == 0: @@ -545,7 +544,7 @@ class SlurmPlatform(ParamikoPlatform): return 'sacct -n -X --jobs {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): - return "sacct -n -X --jobs {1} -o jobid,State".format(self.host, jobs_id) + return "sacct -n -X --jobs {1} -o jobid,State".format(self.host, jobs_id) def get_estimated_queue_time_cmd(self, job_id): return f"scontrol -o show JobId {job_id} | grep -Po '(?<=EligibleTime=)[0-9-:T]*'" @@ -599,40 +598,9 @@ class SlurmPlatform(ParamikoPlatform): job.new_status = Status.QUEUING # If it was HELD and was released, it should be QUEUING next. else: job.new_status = Status.HELD + def wrapper_header(self,**kwargs): - wr_header = f""" -############################################################################### -# {kwargs["name"].split("_")[0]+"_Wrapper"} -############################################################################### -# -#SBATCH -J {kwargs["name"]} -{kwargs["queue"]} -{kwargs["partition"]} -{kwargs["dependency"]} -#SBATCH -A {kwargs["project"]} -#SBATCH --output={kwargs["name"]}.out -#SBATCH --error={kwargs["name"]}.err -#SBATCH -t {kwargs["wallclock"]}:00 -{kwargs["threads"]} -{kwargs["nodes"]} -{kwargs["num_processors"]} -{kwargs["tasks"]} -{kwargs["exclusive"]} -{kwargs["custom_directives"]} - -# -############################################################################### -""" - if kwargs["method"] == 'srun': - language = kwargs["executable"] - if language is None or len(language) == 0: - language = "#!/bin/bash" - return language + wr_header - else: - language = kwargs["executable"] - if language is None or len(language) == 0 or "bash" in language: - language = "#!/usr/bin/env python3" - return language + wr_header + return self._header.wrapper_header(**kwargs) @staticmethod def allocated_nodes(): diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index d101cb5d5c2c0213999cdbcfe30c67219b3b98ef..a70d8adc89f5c218f7e1c3e442fe01453b336a0c 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -32,27 +32,30 @@ class WrapperFactory(object): def get_wrapper(self, wrapper_builder, **kwargs): wrapper_data = kwargs['wrapper_data'] - kwargs['allocated_nodes'] = self.allocated_nodes() - kwargs['dependency'] = self.dependency(kwargs['dependency']) - kwargs['queue'] = self.queue(kwargs['queue']) - kwargs['partition'] = self.partition(wrapper_data['PARTITION']) - kwargs["exclusive"] = self.exclusive(wrapper_data['EXCLUSIVE']) - kwargs["custom_directives"] = self.custom_directives(wrapper_data["CUSTOM_DIRECTIVES"]) - kwargs["executable"] = wrapper_data["EXECUTABLE"] - kwargs['nodes'] = self.nodes(wrapper_data['NODES']) - kwargs['tasks'] = self.tasks(wrapper_data['TASKS']) - kwargs['threads'] = self.threads(kwargs['threads']) - if str(kwargs['num_processors']).isdigit(): - kwargs['num_processors_value'] = int(kwargs['num_processors']) - else: - kwargs['num_processors_value'] = 1 - if str(wrapper_data['NODES']).isdigit() and int(wrapper_data['NODES']) > 1 and kwargs['num_processors'] == '1': - kwargs['num_processors'] = "#" - else: - kwargs['num_processors'] = self.processors(kwargs['num_processors']) + wrapper_data.wallclock = kwargs['wallclock'] + #todo here hetjobs + if wrapper_data.het["HETSIZE"] <= 1: + kwargs['allocated_nodes'] = self.allocated_nodes() + kwargs['dependency'] = self.dependency(kwargs['dependency']) + kwargs['partition'] = self.partition(wrapper_data.partition) + kwargs["exclusive"] = self.exclusive(wrapper_data.exclusive) + kwargs['nodes'] = self.nodes(wrapper_data.nodes) + kwargs['tasks'] = self.tasks(wrapper_data.tasks) + kwargs["custom_directives"] = self.custom_directives(wrapper_data.custom_directives) + kwargs['queue'] = self.queue(wrapper_data.queue) + kwargs['threads'] = self.threads(wrapper_data.threads) + if str(kwargs['num_processors']).isdigit(): + kwargs['num_processors_value'] = int(wrapper_data.processors) + else: + kwargs['num_processors_value'] = 1 + if str(wrapper_data.nodes).isdigit() and int(wrapper_data.nodes) > 1 and kwargs['num_processors'] == '1': + kwargs['num_processors'] = "#" + else: + kwargs['num_processors'] = self.processors(wrapper_data.processors) + kwargs["executable"] = wrapper_data.executable + kwargs['header_directive'] = self.header_directives(**kwargs) - builder = wrapper_builder(**kwargs) - return self.wrapper_director.construct(builder) + return self.wrapper_director.construct(wrapper_builder(**kwargs)) def vertical_wrapper(self, **kwargs): raise NotImplemented(self.exception) @@ -147,7 +150,6 @@ class SlurmWrapperFactory(WrapperFactory): def dependency_directive(self, dependency): return '#SBATCH --dependency=afterok:{0}'.format(dependency) - def queue_directive(self, queue): return '#SBATCH --qos={0}'.format(queue) def partition_directive(self, partition): diff --git a/requeriments.txt b/requeriments.txt index f1f5123a6a9f54d4e5d30fb38cc59e93aeaa6d7e..5f0d88f65472921ee7aafe1b2c2c76877937a37e 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -1,6 +1,6 @@ setuptools>=60.8.2 cython -autosubmitconfigparser==1.0.47 +autosubmitconfigparser==1.0.48 paramiko>=2.9.2 bcrypt>=3.2 PyNaCl>=1.5.0 diff --git a/test/unit/test_job.py b/test/unit/test_job.py index e9f4e4971b6440d660f58d9f1462cd9a4296ec19..e8d0cefd9f8bba7e873ccb8b6349cf12ca0be466 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -244,6 +244,115 @@ class TestJob(TestCase): update_content_mock.assert_called_with(config) self.assertTrue(checked) + @patch('autosubmitconfigparser.config.basicconfig.BasicConfig') + def test_hetjob(self, mocked_global_basic_config: Mock): + """ + Test job platforms with a platform. Builds job and platform using YAML data, without mocks. + :param mocked_global_basic_config: + :return: + """ + expid = "zzyy" + with tempfile.TemporaryDirectory() as temp_dir: + BasicConfig.LOCAL_ROOT_DIR = str(temp_dir) + Path(temp_dir, expid).mkdir() + for path in [f'{expid}/tmp', f'{expid}/tmp/ASLOGS', f'{expid}/tmp/ASLOGS_{expid}', f'{expid}/proj', + f'{expid}/conf']: + Path(temp_dir, path).mkdir() + with open(Path(temp_dir, f'{expid}/conf/experiment_data.yml'), 'w+') as experiment_data: + experiment_data.write(dedent(f'''\ + CONFIG: + RETRIALS: 0 + DEFAULT: + EXPID: {expid} + HPCARCH: test + PLATFORMS: + test: + TYPE: slurm + HOST: localhost + PROJECT: abc + QUEUE: debug + USER: me + SCRATCH_DIR: /anything/ + ADD_PROJECT_TO_HOST: False + MAX_WALLCLOCK: '00:55' + TEMP_DIR: '' + + ''')) + experiment_data.flush() + # For could be added here to cover more configurations options + with open(Path(temp_dir, f'{expid}/conf/hetjob.yml'), 'w+') as hetjob: + hetjob.write(dedent(f'''\ + JOBS: + HETJOB_A: + FILE: a + PLATFORM: test + RUNNING: once + WALLCLOCK: '00:30' + MEMORY: + - 0 + - 0 + NODES: + - 3 + - 1 + TASKS: + - 32 + - 32 + THREADS: + - 4 + - 4 + CUSTOM_DIRECTIVES: + - ['#SBATCH --export=ALL', '#SBATCH --distribution=block:cyclic', '#SBATCH --exclusive'] + - ['#SBATCH --export=ALL', '#SBATCH --distribution=block:cyclic:fcyclic', '#SBATCH --exclusive'] + ''')) + + mocked_basic_config = Mock(spec=BasicConfig) + mocked_basic_config.LOCAL_ROOT_DIR = str(temp_dir) + mocked_global_basic_config.LOCAL_ROOT_DIR.return_value = str(temp_dir) + + config = AutosubmitConfig(expid, basic_config=mocked_basic_config, parser_factory=YAMLParserFactory()) + config.reload(True) + parameters = config.load_parameters() + job_list_obj = JobList(expid, mocked_basic_config, YAMLParserFactory(), + Autosubmit._get_job_list_persistence(expid, config), config) + job_list_obj.generate( + date_list=[], + member_list=[], + num_chunks=1, + chunk_ini=1, + parameters=parameters, + date_format='M', + default_retrials=config.get_retrials(), + default_job_type=config.get_default_job_type(), + wrapper_type=config.get_wrapper_type(), + wrapper_jobs={}, + notransitive=True, + update_structure=True, + run_only_members=config.get_member_list(run_only=True), + jobs_data=config.experiment_data, + as_conf=config + ) + job_list = job_list_obj.get_job_list() + self.assertEqual(1, len(job_list)) + + submitter = Autosubmit._get_submitter(config) + submitter.load_platforms(config) + + hpcarch = config.get_platform() + for job in job_list: + if job.platform_name == "" or job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name] + + job = job_list[0] + + # This is the final header + parameters = job.update_parameters(config, parameters) + template_content, additional_templates = job.update_content(config) + + # Asserts the script is valid. There shouldn't be variables in the script that aren't in the parameters. + checked = job.check_script(config, parameters) + self.assertTrue(checked) + @patch('autosubmitconfigparser.config.basicconfig.BasicConfig') def test_job_parameters(self, mocked_global_basic_config: Mock): """Test job platforms with a platform. Builds job and platform using YAML data, without mocks. @@ -285,7 +394,7 @@ class TestJob(TestCase): USER: me SCRATCH_DIR: /anything/ ADD_PROJECT_TO_HOST: False - MAX_WALLCLOCK: '000:55' + MAX_WALLCLOCK: '00:55' TEMP_DIR: '' ''')) minimal.flush() diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index c1acdf4cacf12e83e6c4bf95ba3ced4208af3dde..c446ca431b5ddf44318ef5d5e92e04ecf014abae 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -119,18 +119,18 @@ class TestJobPackage(TestCase): } self.setUpWrappers(options) - self.assertEqual(self.job_package_wrapper._wrapper_data["TYPE"], "vertical") - self.assertEqual(self.job_package_wrapper._wrapper_data["JOBS_IN_WRAPPER"], "None") - self.assertEqual(self.job_package_wrapper._wrapper_data["METHOD"], "ASThread") - self.assertEqual(self.job_package_wrapper._wrapper_data["POLICY"], "flexible") - self.assertEqual(self.job_package_wrapper._wrapper_data["EXTEND_WALLCLOCK"], 0) - - self.assertEqual(self.job_package_wrapper._wrapper_data["EXCLUSIVE"], True) - self.assertEqual(self.job_package_wrapper._wrapper_data["INNER_RETRIALS"], 0) - self.assertEqual(self.job_package_wrapper._wrapper_data["QUEUE"], "debug") - self.assertEqual(self.job_package_wrapper._wrapper_data["PARTITION"], "debug") - self.assertEqual(self.job_package_wrapper._wrapper_data["THREADS"], "1") - self.assertEqual(self.job_package_wrapper._wrapper_data["TASKS"], "1") + self.assertEqual(self.job_package_wrapper.wrapper_type, "vertical") + self.assertEqual(self.job_package_wrapper.jobs_in_wrapper, "None") + self.assertEqual(self.job_package_wrapper.wrapper_method, "ASThread") + self.assertEqual(self.job_package_wrapper.wrapper_policy, "flexible") + self.assertEqual(self.job_package_wrapper.extensible_wallclock, 0) + + self.assertEqual(self.job_package_wrapper.exclusive, True) + self.assertEqual(self.job_package_wrapper.inner_retrials, 0) + self.assertEqual(self.job_package_wrapper.queue, "debug") + self.assertEqual(self.job_package_wrapper.partition, "debug") + self.assertEqual(self.job_package_wrapper.threads, "1") + self.assertEqual(self.job_package_wrapper.tasks, "1") options_slurm = { 'EXCLUSIVE': False, @@ -142,13 +142,13 @@ class TestJobPackage(TestCase): 'CUSTOM_DIRECTIVES': "['#SBATCH --mem=1000']" } self.setUpWrappers(options_slurm) - self.assertEqual(self.job_package_wrapper._wrapper_data["EXCLUSIVE"], False) - self.assertEqual(self.job_package_wrapper._wrapper_data["INNER_RETRIALS"], 30) - self.assertEqual(self.job_package_wrapper._wrapper_data["QUEUE"], "bsc32") - self.assertEqual(self.job_package_wrapper._wrapper_data["PARTITION"], "bsc32") - self.assertEqual(self.job_package_wrapper._wrapper_data["THREADS"], "30") - self.assertEqual(self.job_package_wrapper._wrapper_data["TASKS"], "40") - self.assertEqual(self.job_package_wrapper._wrapper_data["CUSTOM_DIRECTIVES"], ['#SBATCH --mem=1000']) + self.assertEqual(self.job_package_wrapper.exclusive, False) + self.assertEqual(self.job_package_wrapper.inner_retrials, 30) + self.assertEqual(self.job_package_wrapper.queue, "bsc32") + self.assertEqual(self.job_package_wrapper.partition, "bsc32") + self.assertEqual(self.job_package_wrapper.threads, "30") + self.assertEqual(self.job_package_wrapper.tasks, "40") + self.assertEqual(self.job_package_wrapper.custom_directives, ['#SBATCH --mem=1000'])