From a89779804a0ab8b2c756b034605c5b6a48541968 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 18 Apr 2024 11:55:11 +0200 Subject: [PATCH 1/9] Fix "," --- autosubmit/platforms/pjmplatform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 9e182c5c0..122029030 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -385,6 +385,7 @@ class PJMPlatform(ParamikoPlatform): def get_checkAlljobs_cmd(self, jobs_id): # jobs_id = "jobid1+jobid2+jobid3" # -H == sacct + jobs_id = jobs_id[:-1] # deletes comma return "pjstat -H -v --choose jid,st,ermsg --filter \"jid={0}\" > as_checkalljobs.txt ; pjstat -v --choose jid,st,ermsg --filter \"jid={0}\" >> as_checkalljobs.txt ; cat as_checkalljobs.txt ; rm as_checkalljobs.txt".format(jobs_id) def get_queue_status_cmd(self, job_id): -- GitLab From 929181b517d10c175def856a9edc4f98f3621876 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 18 Apr 2024 14:42:52 +0200 Subject: [PATCH 2/9] Platform working --- autosubmit/platforms/pjmplatform.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 122029030..61f244ff1 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -385,7 +385,8 @@ class PJMPlatform(ParamikoPlatform): def get_checkAlljobs_cmd(self, jobs_id): # jobs_id = "jobid1+jobid2+jobid3" # -H == sacct - jobs_id = jobs_id[:-1] # deletes comma + if jobs_id[-1] == ",": + jobs_id = jobs_id[:-1] # deletes comma return "pjstat -H -v --choose jid,st,ermsg --filter \"jid={0}\" > as_checkalljobs.txt ; pjstat -v --choose jid,st,ermsg --filter \"jid={0}\" >> as_checkalljobs.txt ; cat as_checkalljobs.txt ; rm as_checkalljobs.txt".format(jobs_id) def get_queue_status_cmd(self, job_id): -- GitLab From 7d1f76ddffff5c29ff3bf9c01e485fe44163957a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 18 Apr 2024 16:00:34 +0200 Subject: [PATCH 3/9] work in progress --- autosubmit/job/job_packager.py | 6 +-- autosubmit/platforms/headers/pjm_header.py | 13 +++++- autosubmit/platforms/pjmplatform.py | 2 + .../platforms/wrappers/wrapper_factory.py | 43 +++++++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d3eda6a82..1271ec2bb 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -557,12 +557,12 @@ class JobPackager(object): return jobs_by_section - def _build_horizontal_packages(self, section_list, wrapper_limits, section,wrapper_info={}): + def _build_horizontal_packages(self, section_list, wrapper_limits, section,wrapper_info=[]): packages = [] horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, wrapper_limits, wrapper_limits["max"], self._platform.processors_per_node, self.wrapper_method[self.current_wrapper_section]) - package_jobs = horizontal_packager.build_horizontal_package() + package_jobs = horizontal_packager.build_horizontal_package(wrapper_info=wrapper_info) jobs_resources = dict() @@ -860,7 +860,7 @@ class JobPackagerHorizontal(object): self._sectionList = list() self._package_sections = dict() self.wrapper_info = [] - def build_horizontal_package(self, horizontal_vertical=False,wrapper_info={}): + def build_horizontal_package(self, horizontal_vertical=False,wrapper_info=[]): self.wrapper_info = wrapper_info current_package = [] current_package_by_section = {} diff --git a/autosubmit/platforms/headers/pjm_header.py b/autosubmit/platforms/headers/pjm_header.py index e77dfdb03..e374f21cc 100644 --- a/autosubmit/platforms/headers/pjm_header.py +++ b/autosubmit/platforms/headers/pjm_header.py @@ -111,11 +111,22 @@ class PJMHeader(object): return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) return "" + def get_tasks_directive(self,job, het=-1): + """ + Returns tasks per node directive for the specified job + :param job: job to create tasks per node directive for + :type job: Job + :return: tasks per node directive + :rtype: str + """ + if int(job.parameters['TASKS']) > 1: + return "max-proc-per-node={0}".format(job.parameters['TASKS']) + return "" def get_tasks_per_node(self, job): """ - Returns memory per task directive for the specified job + Returns tasks per node directive for the specified job :param job: job to create tasks per node directive for :type job: Job diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 61f244ff1..b7b2b4c60 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -393,6 +393,8 @@ class PJMPlatform(ParamikoPlatform): return self.get_checkAlljobs_cmd(job_id) def get_jobid_by_jobname_cmd(self, job_name): + if job_name[-1] == ",": + job_name = job_name[:-1] return 'pjstat -v --choose jid,st,ermsg --filter \"jnam={0}\"'.format(job_name) diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 071ec16e8..01e9e80fc 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -179,6 +179,49 @@ class SlurmWrapperFactory(WrapperFactory): return '#SBATCH --cpus-per-task={0}'.format(threads) +class PJMWrapperFactory(WrapperFactory): + + def vertical_wrapper(self, **kwargs): + return PythonVerticalWrapperBuilder(**kwargs) + + def horizontal_wrapper(self, **kwargs): + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) + + def hybrid_wrapper_horizontal_vertical(self, **kwargs): + return PythonHorizontalVerticalWrapperBuilder(**kwargs) + + def hybrid_wrapper_vertical_horizontal(self, **kwargs): + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) + + def header_directives(self, **kwargs): + return self.platform.wrapper_header(**kwargs) + + def allocated_nodes(self): + return self.platform.allocated_nodes() + + + def queue_directive(self, queue): + return '#PJM --qos={0}'.format(queue) + def partition_directive(self, partition): + return '#PJM --partition={0}'.format(partition) + def exclusive_directive(self, exclusive): + return '#PJM --exclusive' + def tasks_directive(self, tasks): + return '#PJM --ntasks-per-node={0}'.format(tasks) + def nodes_directive(self, nodes): + return '#PJM -N {0}'.format(nodes) + def processors_directive(self, processors): + return '#PJM -n {0}'.format(processors) + def threads_directive(self, threads): + return '#PJM + class LSFWrapperFactory(WrapperFactory): -- GitLab From ecf3c644403858c81de479ee9fdde6f5501c6a8e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 19 Apr 2024 15:08:11 +0200 Subject: [PATCH 4/9] work in progress --- autosubmit/platforms/headers/pjm_header.py | 20 ++++ autosubmit/platforms/pjmplatform.py | 93 +++++++++---------- .../platforms/wrappers/wrapper_factory.py | 42 ++------- 3 files changed, 72 insertions(+), 83 deletions(-) diff --git a/autosubmit/platforms/headers/pjm_header.py b/autosubmit/platforms/headers/pjm_header.py index e374f21cc..0f3f58a82 100644 --- a/autosubmit/platforms/headers/pjm_header.py +++ b/autosubmit/platforms/headers/pjm_header.py @@ -136,6 +136,23 @@ class PJMHeader(object): if int(job.parameters['TASKS']) > 1: return "max-proc-per-node={0}".format(job.parameters['TASKS']) return "" + def get_threads_per_task(self, job, het=-1): + """ + Returns threads per task directive for the specified job + + :param job: job to create threads per task directive for + :type job: Job + :return: threads per task directive + :rtype: str + """ + # There is no threads per task, so directive is empty + if het > -1 and len(job.het['NUMTHREADS']) > 0: + if job.het['NUMTHREADS'][het] != '': + return f"export OMP_NUM_THREADS={job.het['NUMTHREADS'][het]}" + else: + if job.parameters['NUMTHREADS'] != '': + return "export OMP_NUM_THREADS={0}".format(job.parameters['NUMTHREADS']) + return "" SERIAL = textwrap.dedent("""\ ############################################################################### @@ -148,6 +165,8 @@ class PJMHeader(object): #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% %CUSTOM_DIRECTIVES% +%THREADS_PER_TASK_DIRECTIVE% + #PJM -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% #PJM -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% #%X11% @@ -164,6 +183,7 @@ class PJMHeader(object): #%NODES_DIRECTIVE% #PJM --mpi "proc=%NUMPROC%" #PJM --mpi "%TASKS_PER_NODE_DIRECTIVE%" +%THREADS_PER_TASK_DIRECTIVE% #PJM -L elapse=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% #%ACCOUNT_DIRECTIVE% diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index b7b2b4c60..99f248f47 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -414,54 +414,53 @@ class PJMPlatform(ParamikoPlatform): return reason[0] return reason - def wrapper_header(self,**kwargs): - if method == 'srun': - language = "#!/bin/bash" - return \ - language + """ -############################################################################### -# {0} -############################################################################### -# -#PJM -N {0} -{1} -{8} -#PJM -g {2} -#PJM -o {0}.out -#PJM -e {0}.err -#PJM -elapse {3}:00 -#PJM --mpi "proc=%NUMPROC%" -#PJM --mpi "max-proc-per-node={7}" -{5} -{6} - -# -############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) + def wrapper_header(self, **kwargs): + wr_header = f""" + ############################################################################### + # {kwargs["name"].split("_")[0] + "_Wrapper"} + ############################################################################### + """ + if kwargs["wrapper_data"].het.get("HETSIZE", 1) <= 1: + wr_header += f""" + ############################################################################### + # %TASKTYPE% %DEFAULT.EXPID% EXPERIMENT + ############################################################################### + # + # PJM -N {kwargs["name"]} + # PJM -L elapse={kwargs["wallclock"]}:00 + {kwargs["queue"]} + {kwargs["partition"]} + {kwargs["dependency"]} + {kwargs["threads"]} + {kwargs["nodes"]} + {kwargs["num_processors"]} + {kwargs["tasks"]} + {kwargs["exclusive"]} + {kwargs["custom_directives"]} + + #PJM -g {kwargs["project"]} + #PJM -o {kwargs["name"]}.out + #PJM -e {kwargs["name"]}.err + %CUSTOM_DIRECTIVES % + # + ############################################################################### + + + # + """ else: - language = "#!/usr/bin/env python3" - return \ - language + """ -############################################################################### -# {0} -############################################################################### -# -#PJM -N {0} -{1} -{8} -#PJM -g {2} -#PJM -o {0}.out -#PJM -e {0}.err -#PJM -elapse {3}:00 -#PJM --mpi "proc=%NUMPROC%" -#PJM --mpi "max-proc-per-node={7}" -{5} -{6} -# -############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) + wr_header = self.calculate_wrapper_het_header(kwargs["wrapper_data"]) + if kwargs["method"] == 'srun': + language = kwargs["executable"] + if language is None or len(language) == 0: + language = "#!/bin/bash" + return language + wr_header + else: + language = kwargs["executable"] + if language is None or len(language) == 0 or "bash" in language: + language = "#!/usr/bin/env python3" + return language + wr_header + @staticmethod def allocated_nodes(): diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 01e9e80fc..7a27a719a 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -220,8 +220,13 @@ class PJMWrapperFactory(WrapperFactory): def processors_directive(self, processors): return '#PJM -n {0}'.format(processors) def threads_directive(self, threads): - return '#PJM + return f"export OMP_NUM_THREADS={threads}" + def queue_directive(self, queue): + return '#PJM -L rscgrp={0}'.format(queue) + + def partition_directive(self, partition): + return '#PJM -g {0}'.format(partition) class LSFWrapperFactory(WrapperFactory): @@ -258,39 +263,4 @@ class EcWrapperFactory(WrapperFactory): def dependency_directive(self, dependency): return '#PBS -v depend=afterok:{0}'.format(dependency) -class PJMWrapperFactory(WrapperFactory): - - def vertical_wrapper(self, **kwargs): - return PythonVerticalWrapperBuilder(**kwargs) - - def horizontal_wrapper(self, **kwargs): - - if kwargs["method"] == 'srun': - return SrunHorizontalWrapperBuilder(**kwargs) - else: - return PythonHorizontalWrapperBuilder(**kwargs) - - def hybrid_wrapper_horizontal_vertical(self, **kwargs): - return PythonHorizontalVerticalWrapperBuilder(**kwargs) - def hybrid_wrapper_vertical_horizontal(self, **kwargs): - if kwargs["method"] == 'srun': - return SrunVerticalHorizontalWrapperBuilder(**kwargs) - else: - return PythonVerticalHorizontalWrapperBuilder(**kwargs) - - def header_directives(self, **kwargs): - return self.platform.wrapper_header(**kwargs) - - def allocated_nodes(self): - return self.platform.allocated_nodes() - - #def dependency_directive(self, dependency): - # # There is no option for afterok in the PJM scheduler, but I think it is not needed. - # return '#PJM --dependency=afterok:{0}'.format(dependency) - - def queue_directive(self, queue): - return '#PJM -L rscgrp={0}'.format(queue) - - def partition_directive(self, partition): - return '#PJM -g {0}'.format(partition) -- GitLab From 386550088d051ece727428b94680a1a97961ea86 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 22 Apr 2024 11:02:30 +0200 Subject: [PATCH 5/9] Fix Log --- autosubmit/autosubmit.py | 5 ++--- autosubmit/job/job_packages.py | 2 +- autosubmit/platforms/platform.py | 4 ++-- bin/autosubmit | 7 ++++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index b1a3968b9..06de873c6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1562,9 +1562,7 @@ class Autosubmit: except AutosubmitError as e: raise except BaseException as e: - raise AutosubmitCritical( - "There are issues that occurred during the templates generation, please check that job parameters are well set and the template path exists.", - 7014, str(e)) + raise return True @staticmethod @@ -2451,6 +2449,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: + raise raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 581738da4..0b677c9d7 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -183,7 +183,7 @@ class JobPackageBase(object): except AutosubmitCritical: raise except BaseException as e: - raise AutosubmitCritical("Error while building the scripts: {0}".format(e), 7013) + raise try: if not only_generate: Log.debug("Sending Files") diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index ac3d09eab..1228299f1 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -327,8 +327,8 @@ class Platform(object): raise except Exception as e: self.connected = False - message = f'Error in platform {self.name} for section {package.jobs[0].section}: {str(e)}' - raise AutosubmitError(message, 6015) + raise + except AutosubmitCritical as e: raise AutosubmitCritical(e.message, e.code, e.trace) except AutosubmitError as e: diff --git a/bin/autosubmit b/bin/autosubmit index 53a7f58e6..42d2acd61 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -33,10 +33,11 @@ from typing import Union def exit_from_error(e: BaseException): + trace = traceback.format_exc() try: - Log.debug(traceback.format_exc()) + Log.debug(trace) except: - print(traceback.format_exc()) + print(trace) with suppress(FileNotFoundError, PermissionError): os.remove(os.path.join(Log.file_path, "autosubmit.lock")) if isinstance(e, (AutosubmitCritical, AutosubmitError)): @@ -45,7 +46,7 @@ def exit_from_error(e: BaseException): Log.debug("Trace: {0}", str(e.trace)) Log.critical("{1} [eCode={0}]", e.code, e.message) else: - msg = "An Unknown error occurred: {0}.\n Please report it to Autosubmit Developers through Git" + msg = "A not admitted configuration or error in the code has happened: {0}.\n Please report it to Autosubmit Developers through Git" args = [str(e)] Log.critical(msg.format(*args)) Log.info("More info at https://autosubmit.readthedocs.io/en/master/troubleshooting/error-codes.html") -- GitLab From b38fae53349e9f436c32d6f2e7db2cb5a66f22e1 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 22 Apr 2024 15:11:59 +0200 Subject: [PATCH 6/9] Added wrapper support --- autosubmit/autosubmit.py | 6 ++--- autosubmit/job/job.py | 8 ++++-- autosubmit/platforms/paramiko_platform.py | 5 ++-- autosubmit/platforms/pjmplatform.py | 26 ++++++++++++------- .../platforms/wrappers/wrapper_factory.py | 9 ++++--- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 06de873c6..3766fe48f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2254,8 +2254,8 @@ class Autosubmit: except (portalocker.AlreadyLocked, portalocker.LockException) as e: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) - except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught - raise AutosubmitCritical("There is a bug in the code, please contact via Gitlab", 7070, str(e)) + except BaseException as e: + raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") @@ -2306,7 +2306,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) + raise finally: if profile: profiler.stop() diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bb6e3244b..b6b83c834 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -2672,8 +2672,12 @@ class WrapperJob(Job): def cancel_failed_wrapper_job(self): Log.printlog("Cancelling job with id {0}".format(self.id), 6009) - self._platform.send_command( - self._platform.cancel_cmd + " " + str(self.id)) + try: + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + except: + Log.info(f'Job with {self.id} was finished before canceling it') + for job in self.job_list: #if job.status == Status.RUNNING: #job.inc_fail_count() diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4d9e7169f..079c0a1ba 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1064,6 +1064,7 @@ class ParamikoPlatform(Platform): self._ssh_output_err += errorLineCase.decode(lang) errorLine = errorLineCase.lower().decode(lang) + # to be simplified in the future in a function and using in. The errors should be inside the class of the platform not here if "not active" in errorLine: raise AutosubmitError( 'SSH Session not active, will restart the platforms', 6005) @@ -1071,8 +1072,8 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical("scheduler is not installed.",7052,self._ssh_output_err) elif errorLine.find("syntax error") != -1: raise AutosubmitCritical("Syntax error",7052,self._ssh_output_err) - elif errorLine.find("refused") != -1 or errorLine.find("slurm_persist_conn_open_without_init") != -1 or errorLine.find("slurmdbd") != -1 or errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1 or errorLine.find("sbatch: error: ") != -1 or errorLine.find("not submitted") != -1 or errorLine.find("invalid") != -1: - if (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": + elif errorLine.find("refused") != -1 or errorLine.find("slurm_persist_conn_open_without_init") != -1 or errorLine.find("slurmdbd") != -1 or errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1 or errorLine.find("sbatch: error: ") != -1 or errorLine.find("not submitted") != -1 or errorLine.find("invalid") != -1 or "[ERR.] PJM".lower() in errorLine: + if "[ERR.] PJM".lower() in errorLine or (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": raise AutosubmitError(errorLine, 7014, "Bad Parameters.") raise AutosubmitError('Command {0} in {1} warning: {2}'.format(command, self.host,self._ssh_output_err, 6005)) diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 99f248f47..3b530ee7b 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -28,6 +28,7 @@ from autosubmit.platforms.headers.pjm_header import PJMHeader from autosubmit.platforms.wrappers.wrapper_factory import PJMWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log +import textwrap class PJMPlatform(ParamikoPlatform): """ Class to manage jobs to host using PJM scheduler @@ -283,7 +284,7 @@ class PJMPlatform(ParamikoPlatform): return self.remote_log_dir def parse_job_output(self, output): - return output.strip().split()[0].strip() + return output.strip().split()[1].strip().strip("\n") def parse_job_finish_data(self, output, packed): return 0, 0, 0, 0, 0, 0, dict(), False @@ -389,6 +390,10 @@ class PJMPlatform(ParamikoPlatform): jobs_id = jobs_id[:-1] # deletes comma return "pjstat -H -v --choose jid,st,ermsg --filter \"jid={0}\" > as_checkalljobs.txt ; pjstat -v --choose jid,st,ermsg --filter \"jid={0}\" >> as_checkalljobs.txt ; cat as_checkalljobs.txt ; rm as_checkalljobs.txt".format(jobs_id) + def get_checkjob_cmd(self, job_id): + return f"pjstat -H -v --choose st --filter \"jid={job_id}\" > as_checkjob.txt ; pjstat -v --choose st --filter \"jid={job_id}\" >> as_checkjob.txt ; cat as_checkjob.txt ; rm as_checkjob.txt" + + #return 'pjstat -v --choose jid,st,ermsg --filter \"jid={0}\"'.format(job_id) def get_queue_status_cmd(self, job_id): return self.get_checkAlljobs_cmd(job_id) @@ -398,6 +403,7 @@ class PJMPlatform(ParamikoPlatform): return 'pjstat -v --choose jid,st,ermsg --filter \"jnam={0}\"'.format(job_name) + def cancel_job(self, job_id): return '{0} {1}'.format(self.cancel_cmd,job_id) @@ -415,19 +421,19 @@ class PJMPlatform(ParamikoPlatform): return reason def wrapper_header(self, **kwargs): - wr_header = f""" + wr_header = textwrap.dedent(f""" ############################################################################### # {kwargs["name"].split("_")[0] + "_Wrapper"} ############################################################################### - """ + """) if kwargs["wrapper_data"].het.get("HETSIZE", 1) <= 1: - wr_header += f""" + wr_header += textwrap.dedent(f""" ############################################################################### # %TASKTYPE% %DEFAULT.EXPID% EXPERIMENT ############################################################################### # - # PJM -N {kwargs["name"]} - # PJM -L elapse={kwargs["wallclock"]}:00 + #PJM -N {kwargs["name"]} + #PJM -L elapse={kwargs["wallclock"]}:00 {kwargs["queue"]} {kwargs["partition"]} {kwargs["dependency"]} @@ -441,13 +447,13 @@ class PJMPlatform(ParamikoPlatform): #PJM -g {kwargs["project"]} #PJM -o {kwargs["name"]}.out #PJM -e {kwargs["name"]}.err - %CUSTOM_DIRECTIVES % + %CUSTOM_DIRECTIVES% # ############################################################################### - - + + # - """ + """).ljust(13) else: wr_header = self.calculate_wrapper_het_header(kwargs["wrapper_data"]) if kwargs["method"] == 'srun': diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 7a27a719a..4b728dd6b 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -61,13 +61,14 @@ class WrapperFactory(object): wrapper_cmd, flags=re.IGNORECASE) for placeholder in placeholders_inside_wrapper: placeholder = placeholder[1:-1] + value = str(wrapper_data.jobs[0].parameters.get(placeholder.upper(), "")) - if not value: - wrapper_cmd = re.sub('%(? Date: Mon, 22 Apr 2024 15:13:34 +0200 Subject: [PATCH 7/9] Version changelog --- CHANGELOG | 6 ++++++ VERSION | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index a46faf386..95059d571 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +4.1.5 - PJS - Fugaku - Support +===================== +- Added Fugaku support. +- Enchanced the support for PJS. +- Added wrapper support for PJS scheduler + 4.1.4 - Docs and Log Rework ===================== - Log retrieval has been fully reworked, improving it is performance, FD, and memory usage. diff --git a/VERSION b/VERSION index 9d086c6df..b673f6ac1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.4 \ No newline at end of file +4.1.5 \ No newline at end of file -- GitLab From 16a549fa72ba57e9e361891701a3966a41178798 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 22 Apr 2024 15:14:01 +0200 Subject: [PATCH 8/9] Version changelog --- CHANGELOG | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG b/CHANGELOG index 95059d571..48b7568c0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -3,6 +3,7 @@ - Added Fugaku support. - Enchanced the support for PJS. - Added wrapper support for PJS scheduler +- Fixed issues with log traceback. 4.1.4 - Docs and Log Rework ===================== -- GitLab From c0cbe2eb5aedcb35d8c4010d61442e7fc1f284ae Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 24 Apr 2024 15:27:49 +0200 Subject: [PATCH 9/9] Added Shape, fixed custom_directives --- autosubmit/job/job.py | 25 ++++++++++++++++++++++ autosubmit/platforms/headers/pjm_header.py | 13 +++++++++++ autosubmit/platforms/paramiko_platform.py | 3 +++ autosubmit/platforms/paramiko_submitter.py | 1 + autosubmit/platforms/pjmplatform.py | 1 - 5 files changed, 42 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b6b83c834..f9a78fe13 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -240,6 +240,7 @@ class Job(object): self.max_checkpoint_step = 0 self.reservation = "" self.delete_when_edgeless = False + self.shape = "" # hetjobs self.het = None self.updated_log = False @@ -717,6 +718,26 @@ class Job(object): """ self._partition = value + @property + def shape(self): + """ + Returns the shape of the job. Chooses between serial and parallel platforms + + :return HPCPlatform object for the job to use + :rtype: HPCPlatform + """ + return self._shape + + @shape.setter + def shape(self, value): + """ + Sets the shape to be used by the job. + + :param value: shape to set + :type value: HPCPlatform + """ + self._shape = value + @property def children(self): """ @@ -1539,6 +1560,7 @@ class Job(object): self.total_jobs = job_data.get("TOTALJOBS",job_data.get("TOTAL_JOBS", job_platform.total_jobs)) self.max_waiting_jobs = job_data.get("MAXWAITINGJOBS",job_data.get("MAX_WAITING_JOBS", job_platform.max_waiting_jobs)) self.processors = job_data.get("PROCESSORS",platform_data.get("PROCESSORS","1")) + self.shape = job_data.get("SHAPE",platform_data.get("SHAPE","")) self.processors_per_node = job_data.get("PROCESSORS_PER_NODE",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS_PER_NODE","1")) self.nodes = job_data.get("NODES",platform_data.get("NODES","")) self.exclusive = job_data.get("EXCLUSIVE",platform_data.get("EXCLUSIVE",False)) @@ -1806,6 +1828,7 @@ class Job(object): self.delete_when_edgeless = as_conf.jobs_data[self.section].get("DELETE_WHEN_EDGELESS", True) self.check = as_conf.jobs_data[self.section].get("CHECK", False) self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) + self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1813,6 +1836,7 @@ class Job(object): parameters['SDATE'] = self.sdate parameters['MEMBER'] = self.member parameters['SPLIT'] = self.split + parameters['SHAPE'] = self.shape parameters['SPLITS'] = self.splits parameters['DELAY'] = self.delay parameters['FREQUENCY'] = self.frequency @@ -1849,6 +1873,7 @@ class Job(object): self.total_jobs = parameters["TOTALJOBS"] self.max_waiting_jobs = parameters["MAXWAITINGJOBS"] self.processors = parameters["PROCESSORS"] + self.shape = parameters["SHAPE"] self.processors_per_node = parameters["PROCESSORS_PER_NODE"] self.nodes = parameters["NODES"] self.exclusive = parameters["EXCLUSIVE"] diff --git a/autosubmit/platforms/headers/pjm_header.py b/autosubmit/platforms/headers/pjm_header.py index 0f3f58a82..db0ebed1f 100644 --- a/autosubmit/platforms/headers/pjm_header.py +++ b/autosubmit/platforms/headers/pjm_header.py @@ -123,6 +123,16 @@ class PJMHeader(object): if int(job.parameters['TASKS']) > 1: return "max-proc-per-node={0}".format(job.parameters['TASKS']) return "" + def get_shape_directive(self, job): + """ + Returns shape directive for the specified job + :param job: + :return: + """ + if job.parameters['SHAPE'] != '': + return "PJM --mpi 'shape={0}'".format(job.parameters['SHAPE']) + return "" + def get_tasks_per_node(self, job): """ @@ -166,6 +176,8 @@ class PJMHeader(object): #%MEMORY_DIRECTIVE% %CUSTOM_DIRECTIVES% %THREADS_PER_TASK_DIRECTIVE% +#%SHAPE_DIRECTIVE% +#%NODES_DIRECTIVE% #PJM -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% #PJM -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% @@ -189,6 +201,7 @@ class PJMHeader(object): #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% #%MEMORY_PER_TASK_DIRECTIVE% +#%SHAPE_DIRECTIVE% #PJM -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% #PJM -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% %CUSTOM_DIRECTIVES% diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 079c0a1ba..131bb7534 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1289,6 +1289,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_account_directive'): header = header.replace( '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + if hasattr(self.header, 'get_shape_directive'): + header = header.replace( + '%SHAPE_DIRECTIVE%', self.header.get_shape_directive(job)) if hasattr(self.header, 'get_nodes_directive'): header = header.replace( '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index c25777b94..4ee256be2 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -192,6 +192,7 @@ class ParamikoSubmitter(Submitter): remote_platform.exclusivity = platform_data[section].get('EXCLUSIVITY', "") remote_platform.user = platform_data[section].get('USER', "") remote_platform.scratch = platform_data[section].get('SCRATCH_DIR', "") + remote_platform.shape = platform_data[section].get('SHAPE', "") remote_platform.project_dir = platform_data[section].get('SCRATCH_PROJECT_DIR', remote_platform.project) remote_platform.temp_dir = platform_data[section].get('TEMP_DIR', "") remote_platform._default_queue = platform_data[section].get('QUEUE', "") diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 3b530ee7b..f0a23255c 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -447,7 +447,6 @@ class PJMPlatform(ParamikoPlatform): #PJM -g {kwargs["project"]} #PJM -o {kwargs["name"]}.out #PJM -e {kwargs["name"]}.err - %CUSTOM_DIRECTIVES% # ############################################################################### -- GitLab