diff --git a/CHANGELOG b/CHANGELOG index a46faf38632ad4ba89b3fcfc1078666b67f34249..48b7568c0fc344acb26f3435541464812f1674a1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,10 @@ +4.1.5 - PJS - Fugaku - Support +===================== +- Added Fugaku support. +- Enchanced the support for PJS. +- Added wrapper support for PJS scheduler +- Fixed issues with log traceback. + 4.1.4 - Docs and Log Rework ===================== - Log retrieval has been fully reworked, improving it is performance, FD, and memory usage. diff --git a/VERSION b/VERSION index 9d086c6dff671494d94c58ed5ddbb74280033537..b673f6ac16781c1fa404c9710c8b42c43bcc01a2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.4 \ No newline at end of file +4.1.5 \ No newline at end of file diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index b1a3968b9dd934a56ab35a3f51033c232a9fab64..3766fe48f473c3c3537473605e0ab1ab090f9351 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1562,9 +1562,7 @@ class Autosubmit: except AutosubmitError as e: raise except BaseException as e: - raise AutosubmitCritical( - "There are issues that occurred during the templates generation, please check that job parameters are well set and the template path exists.", - 7014, str(e)) + raise return True @staticmethod @@ -2256,8 +2254,8 @@ class Autosubmit: except (portalocker.AlreadyLocked, portalocker.LockException) as e: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) - except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught - raise AutosubmitCritical("There is a bug in the code, please contact via Gitlab", 7070, str(e)) + except BaseException as e: + raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") @@ -2308,7 +2306,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) + raise finally: if profile: profiler.stop() @@ -2451,6 +2449,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: + raise raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bb6e3244b7e341d165df6c8ea002a778f4faf10e..f9a78fe13d9371116ece28706945986a9d13680f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -240,6 +240,7 @@ class Job(object): self.max_checkpoint_step = 0 self.reservation = "" self.delete_when_edgeless = False + self.shape = "" # hetjobs self.het = None self.updated_log = False @@ -717,6 +718,26 @@ class Job(object): """ self._partition = value + @property + def shape(self): + """ + Returns the shape of the job. Chooses between serial and parallel platforms + + :return HPCPlatform object for the job to use + :rtype: HPCPlatform + """ + return self._shape + + @shape.setter + def shape(self, value): + """ + Sets the shape to be used by the job. + + :param value: shape to set + :type value: HPCPlatform + """ + self._shape = value + @property def children(self): """ @@ -1539,6 +1560,7 @@ class Job(object): self.total_jobs = job_data.get("TOTALJOBS",job_data.get("TOTAL_JOBS", job_platform.total_jobs)) self.max_waiting_jobs = job_data.get("MAXWAITINGJOBS",job_data.get("MAX_WAITING_JOBS", job_platform.max_waiting_jobs)) self.processors = job_data.get("PROCESSORS",platform_data.get("PROCESSORS","1")) + self.shape = job_data.get("SHAPE",platform_data.get("SHAPE","")) self.processors_per_node = job_data.get("PROCESSORS_PER_NODE",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS_PER_NODE","1")) self.nodes = job_data.get("NODES",platform_data.get("NODES","")) self.exclusive = job_data.get("EXCLUSIVE",platform_data.get("EXCLUSIVE",False)) @@ -1806,6 +1828,7 @@ class Job(object): self.delete_when_edgeless = as_conf.jobs_data[self.section].get("DELETE_WHEN_EDGELESS", True) self.check = as_conf.jobs_data[self.section].get("CHECK", False) self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) + self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1813,6 +1836,7 @@ class Job(object): parameters['SDATE'] = self.sdate parameters['MEMBER'] = self.member parameters['SPLIT'] = self.split + parameters['SHAPE'] = self.shape parameters['SPLITS'] = self.splits parameters['DELAY'] = self.delay parameters['FREQUENCY'] = self.frequency @@ -1849,6 +1873,7 @@ class Job(object): self.total_jobs = parameters["TOTALJOBS"] self.max_waiting_jobs = parameters["MAXWAITINGJOBS"] self.processors = parameters["PROCESSORS"] + self.shape = parameters["SHAPE"] self.processors_per_node = parameters["PROCESSORS_PER_NODE"] self.nodes = parameters["NODES"] self.exclusive = parameters["EXCLUSIVE"] @@ -2672,8 +2697,12 @@ class WrapperJob(Job): def cancel_failed_wrapper_job(self): Log.printlog("Cancelling job with id {0}".format(self.id), 6009) - self._platform.send_command( - self._platform.cancel_cmd + " " + str(self.id)) + try: + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + except: + Log.info(f'Job with {self.id} was finished before canceling it') + for job in self.job_list: #if job.status == Status.RUNNING: #job.inc_fail_count() diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d3eda6a82d9b0ff170e46ef16ff20f78a9a5169e..1271ec2bb9ec1c062f44a52d2c84dd622c6996a1 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -557,12 +557,12 @@ class JobPackager(object): return jobs_by_section - def _build_horizontal_packages(self, section_list, wrapper_limits, section,wrapper_info={}): + def _build_horizontal_packages(self, section_list, wrapper_limits, section,wrapper_info=[]): packages = [] horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, wrapper_limits, wrapper_limits["max"], self._platform.processors_per_node, self.wrapper_method[self.current_wrapper_section]) - package_jobs = horizontal_packager.build_horizontal_package() + package_jobs = horizontal_packager.build_horizontal_package(wrapper_info=wrapper_info) jobs_resources = dict() @@ -860,7 +860,7 @@ class JobPackagerHorizontal(object): self._sectionList = list() self._package_sections = dict() self.wrapper_info = [] - def build_horizontal_package(self, horizontal_vertical=False,wrapper_info={}): + def build_horizontal_package(self, horizontal_vertical=False,wrapper_info=[]): self.wrapper_info = wrapper_info current_package = [] current_package_by_section = {} diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 581738da422fd0387fabe1a916cd04827c88da35..0b677c9d7498332ceeee23e4ec9fe28155e1146b 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -183,7 +183,7 @@ class JobPackageBase(object): except AutosubmitCritical: raise except BaseException as e: - raise AutosubmitCritical("Error while building the scripts: {0}".format(e), 7013) + raise try: if not only_generate: Log.debug("Sending Files") diff --git a/autosubmit/platforms/headers/pjm_header.py b/autosubmit/platforms/headers/pjm_header.py index e77dfdb0309e21c30b74ef665ba34fbad1ec8a62..db0ebed1ff624118edf1351bb2e5faad23b0d2bf 100644 --- a/autosubmit/platforms/headers/pjm_header.py +++ b/autosubmit/platforms/headers/pjm_header.py @@ -111,11 +111,32 @@ class PJMHeader(object): return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) return "" + def get_tasks_directive(self,job, het=-1): + """ + Returns tasks per node directive for the specified job + + :param job: job to create tasks per node directive for + :type job: Job + :return: tasks per node directive + :rtype: str + """ + if int(job.parameters['TASKS']) > 1: + return "max-proc-per-node={0}".format(job.parameters['TASKS']) + return "" + def get_shape_directive(self, job): + """ + Returns shape directive for the specified job + :param job: + :return: + """ + if job.parameters['SHAPE'] != '': + return "PJM --mpi 'shape={0}'".format(job.parameters['SHAPE']) + return "" def get_tasks_per_node(self, job): """ - Returns memory per task directive for the specified job + Returns tasks per node directive for the specified job :param job: job to create tasks per node directive for :type job: Job @@ -125,6 +146,23 @@ class PJMHeader(object): if int(job.parameters['TASKS']) > 1: return "max-proc-per-node={0}".format(job.parameters['TASKS']) return "" + def get_threads_per_task(self, job, het=-1): + """ + Returns threads per task directive for the specified job + + :param job: job to create threads per task directive for + :type job: Job + :return: threads per task directive + :rtype: str + """ + # There is no threads per task, so directive is empty + if het > -1 and len(job.het['NUMTHREADS']) > 0: + if job.het['NUMTHREADS'][het] != '': + return f"export OMP_NUM_THREADS={job.het['NUMTHREADS'][het]}" + else: + if job.parameters['NUMTHREADS'] != '': + return "export OMP_NUM_THREADS={0}".format(job.parameters['NUMTHREADS']) + return "" SERIAL = textwrap.dedent("""\ ############################################################################### @@ -137,6 +175,10 @@ class PJMHeader(object): #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% %CUSTOM_DIRECTIVES% +%THREADS_PER_TASK_DIRECTIVE% +#%SHAPE_DIRECTIVE% +#%NODES_DIRECTIVE% + #PJM -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% #PJM -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% #%X11% @@ -153,11 +195,13 @@ class PJMHeader(object): #%NODES_DIRECTIVE% #PJM --mpi "proc=%NUMPROC%" #PJM --mpi "%TASKS_PER_NODE_DIRECTIVE%" +%THREADS_PER_TASK_DIRECTIVE% #PJM -L elapse=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% #%MEMORY_PER_TASK_DIRECTIVE% +#%SHAPE_DIRECTIVE% #PJM -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% #PJM -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%ERR_LOG_DIRECTIVE% %CUSTOM_DIRECTIVES% diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4d9e7169ff18abb3efe0a8925ab7d708ccea2f61..131bb7534eb1aee51b493b7b928bb94e62ecf755 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1064,6 +1064,7 @@ class ParamikoPlatform(Platform): self._ssh_output_err += errorLineCase.decode(lang) errorLine = errorLineCase.lower().decode(lang) + # to be simplified in the future in a function and using in. The errors should be inside the class of the platform not here if "not active" in errorLine: raise AutosubmitError( 'SSH Session not active, will restart the platforms', 6005) @@ -1071,8 +1072,8 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical("scheduler is not installed.",7052,self._ssh_output_err) elif errorLine.find("syntax error") != -1: raise AutosubmitCritical("Syntax error",7052,self._ssh_output_err) - elif errorLine.find("refused") != -1 or errorLine.find("slurm_persist_conn_open_without_init") != -1 or errorLine.find("slurmdbd") != -1 or errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1 or errorLine.find("sbatch: error: ") != -1 or errorLine.find("not submitted") != -1 or errorLine.find("invalid") != -1: - if (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": + elif errorLine.find("refused") != -1 or errorLine.find("slurm_persist_conn_open_without_init") != -1 or errorLine.find("slurmdbd") != -1 or errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1 or errorLine.find("sbatch: error: ") != -1 or errorLine.find("not submitted") != -1 or errorLine.find("invalid") != -1 or "[ERR.] PJM".lower() in errorLine: + if "[ERR.] PJM".lower() in errorLine or (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": raise AutosubmitError(errorLine, 7014, "Bad Parameters.") raise AutosubmitError('Command {0} in {1} warning: {2}'.format(command, self.host,self._ssh_output_err, 6005)) @@ -1288,6 +1289,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_account_directive'): header = header.replace( '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + if hasattr(self.header, 'get_shape_directive'): + header = header.replace( + '%SHAPE_DIRECTIVE%', self.header.get_shape_directive(job)) if hasattr(self.header, 'get_nodes_directive'): header = header.replace( '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index c25777b94f496369e511837e9666079d5fe6e7e1..4ee256be2dfd8a3fd61ed048aa4ede83c60c68d0 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -192,6 +192,7 @@ class ParamikoSubmitter(Submitter): remote_platform.exclusivity = platform_data[section].get('EXCLUSIVITY', "") remote_platform.user = platform_data[section].get('USER', "") remote_platform.scratch = platform_data[section].get('SCRATCH_DIR', "") + remote_platform.shape = platform_data[section].get('SHAPE', "") remote_platform.project_dir = platform_data[section].get('SCRATCH_PROJECT_DIR', remote_platform.project) remote_platform.temp_dir = platform_data[section].get('TEMP_DIR', "") remote_platform._default_queue = platform_data[section].get('QUEUE', "") diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 9e182c5c0fb229baa92731a8b4e68b31dde81c3f..f0a23255c23eba42d6e193a0147f300a5024abb8 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -28,6 +28,7 @@ from autosubmit.platforms.headers.pjm_header import PJMHeader from autosubmit.platforms.wrappers.wrapper_factory import PJMWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log +import textwrap class PJMPlatform(ParamikoPlatform): """ Class to manage jobs to host using PJM scheduler @@ -283,7 +284,7 @@ class PJMPlatform(ParamikoPlatform): return self.remote_log_dir def parse_job_output(self, output): - return output.strip().split()[0].strip() + return output.strip().split()[1].strip().strip("\n") def parse_job_finish_data(self, output, packed): return 0, 0, 0, 0, 0, 0, dict(), False @@ -385,15 +386,24 @@ class PJMPlatform(ParamikoPlatform): def get_checkAlljobs_cmd(self, jobs_id): # jobs_id = "jobid1+jobid2+jobid3" # -H == sacct + if jobs_id[-1] == ",": + jobs_id = jobs_id[:-1] # deletes comma return "pjstat -H -v --choose jid,st,ermsg --filter \"jid={0}\" > as_checkalljobs.txt ; pjstat -v --choose jid,st,ermsg --filter \"jid={0}\" >> as_checkalljobs.txt ; cat as_checkalljobs.txt ; rm as_checkalljobs.txt".format(jobs_id) + def get_checkjob_cmd(self, job_id): + return f"pjstat -H -v --choose st --filter \"jid={job_id}\" > as_checkjob.txt ; pjstat -v --choose st --filter \"jid={job_id}\" >> as_checkjob.txt ; cat as_checkjob.txt ; rm as_checkjob.txt" + + #return 'pjstat -v --choose jid,st,ermsg --filter \"jid={0}\"'.format(job_id) def get_queue_status_cmd(self, job_id): return self.get_checkAlljobs_cmd(job_id) def get_jobid_by_jobname_cmd(self, job_name): + if job_name[-1] == ",": + job_name = job_name[:-1] return 'pjstat -v --choose jid,st,ermsg --filter \"jnam={0}\"'.format(job_name) + def cancel_job(self, job_id): return '{0} {1}'.format(self.cancel_cmd,job_id) @@ -410,54 +420,52 @@ class PJMPlatform(ParamikoPlatform): return reason[0] return reason - def wrapper_header(self,**kwargs): - if method == 'srun': - language = "#!/bin/bash" - return \ - language + """ -############################################################################### -# {0} -############################################################################### -# -#PJM -N {0} -{1} -{8} -#PJM -g {2} -#PJM -o {0}.out -#PJM -e {0}.err -#PJM -elapse {3}:00 -#PJM --mpi "proc=%NUMPROC%" -#PJM --mpi "max-proc-per-node={7}" -{5} -{6} - -# -############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) + def wrapper_header(self, **kwargs): + wr_header = textwrap.dedent(f""" + ############################################################################### + # {kwargs["name"].split("_")[0] + "_Wrapper"} + ############################################################################### + """) + if kwargs["wrapper_data"].het.get("HETSIZE", 1) <= 1: + wr_header += textwrap.dedent(f""" + ############################################################################### + # %TASKTYPE% %DEFAULT.EXPID% EXPERIMENT + ############################################################################### + # + #PJM -N {kwargs["name"]} + #PJM -L elapse={kwargs["wallclock"]}:00 + {kwargs["queue"]} + {kwargs["partition"]} + {kwargs["dependency"]} + {kwargs["threads"]} + {kwargs["nodes"]} + {kwargs["num_processors"]} + {kwargs["tasks"]} + {kwargs["exclusive"]} + {kwargs["custom_directives"]} + + #PJM -g {kwargs["project"]} + #PJM -o {kwargs["name"]}.out + #PJM -e {kwargs["name"]}.err + # + ############################################################################### + + + # + """).ljust(13) else: - language = "#!/usr/bin/env python3" - return \ - language + """ -############################################################################### -# {0} -############################################################################### -# -#PJM -N {0} -{1} -{8} -#PJM -g {2} -#PJM -o {0}.out -#PJM -e {0}.err -#PJM -elapse {3}:00 -#PJM --mpi "proc=%NUMPROC%" -#PJM --mpi "max-proc-per-node={7}" -{5} -{6} -# -############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) + wr_header = self.calculate_wrapper_het_header(kwargs["wrapper_data"]) + if kwargs["method"] == 'srun': + language = kwargs["executable"] + if language is None or len(language) == 0: + language = "#!/bin/bash" + return language + wr_header + else: + language = kwargs["executable"] + if language is None or len(language) == 0 or "bash" in language: + language = "#!/usr/bin/env python3" + return language + wr_header + @staticmethod def allocated_nodes(): diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index ac3d09eabc093bae281a14012840b715ac6bfcb6..1228299f106c76c3a0b41b9446ad9a60781752da 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -327,8 +327,8 @@ class Platform(object): raise except Exception as e: self.connected = False - message = f'Error in platform {self.name} for section {package.jobs[0].section}: {str(e)}' - raise AutosubmitError(message, 6015) + raise + except AutosubmitCritical as e: raise AutosubmitCritical(e.message, e.code, e.trace) except AutosubmitError as e: diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 071ec16e8a146f460454a5b0814feffff80579bd..4b728dd6bf679ff89bbd8efb6d4d8bb777e51586 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -61,13 +61,14 @@ class WrapperFactory(object): wrapper_cmd, flags=re.IGNORECASE) for placeholder in placeholders_inside_wrapper: placeholder = placeholder[1:-1] + value = str(wrapper_data.jobs[0].parameters.get(placeholder.upper(), "")) - if not value: - wrapper_cmd = re.sub('%(?