diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200bfc2d840b14978c58ac8349a1e14d2b..10877bc4314e111835b51ba287f3f531ba21c9c2 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -204,6 +204,16 @@ class AutosubmitConfig(object): """ return str(self._jobs_parser.get_option(section, 'MEMORY_PER_TASK', '')) + def get_custom_directives(self, section): + """ + Gets custom directives needed for the given job type + :param section: job type + :type section: str + :return: custom directives needed + :rtype: str + """ + return str(self._jobs_parser.get_option(section, 'CUSTOM_DIRECTIVES', '')) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 8caccb5cbfc5044c64e351377ba51f4dc5517869..eaf192ce6b081598bb98537b9079061013f74d8f 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -49,7 +49,9 @@ # TYPE = bash ## Synchronize a chunk job with its dependency chunks at a 'date' or 'member' level # SYNCHRONIZE = date | member - +## Optional. Custom directives for the resource manager of the platform used for that job. +## Put as many as you wish in json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh @@ -91,4 +93,4 @@ WALLCLOCK = 00:05 FILE = TRANSFER.sh PLATFORM = LOCAL DEPENDENCIES = CLEAN -RUNNING = member \ No newline at end of file +RUNNING = member diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 0fa5eee7f3bd6719eade3733eb53b9c0d756786e..e5e6ff4c3f4a8152a1672eebcf920a4d86664e45 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -45,4 +45,7 @@ # MAX_WALLCLOCK = 72:00 ## Max processors number per job submitted to the HPC. If not specified, defaults to empty. ## Optional. Required for wrappers. -# MAX_PROCESSORS = 1 \ No newline at end of file +# MAX_PROCESSORS = 1 +## Optional. Custom directives for the resource manager of the platform used. +## Put as many as you wish in a json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0d8a18420882ef94ca6f8c17dd74130be7d2682..df36b4c14a80dc9383322fdda88055b5639f13da 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -23,6 +23,7 @@ Main module for autosubmit. Only contains an interface class to all functionalit import os import re import time +import json from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython @@ -73,6 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None + self.custom_directives = [] self.id = job_id self.file = None @@ -610,6 +612,15 @@ class Job(object): self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space + self.custom_directives = as_conf.get_custom_directives(self.section) + if self.custom_directives != '': + self.custom_directives = json.loads(as_conf.get_custom_directives(self.section)) + if job_platform.custom_directives: + self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) + elif job_platform.custom_directives: + self.custom_directives = json.loads(job_platform.custom_directives) + elif self.custom_directives == '': + self.custom_directives = [] parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory @@ -619,6 +630,7 @@ class Job(object): parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section parameters['SCRATCH_FREE_SPACE'] = self.scratch_free_space + parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['CURRENT_ARCH'] = job_platform.name parameters['CURRENT_HOST'] = job_platform.host diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 3bcd70ec57bccafd09944c189ecc5e14f570b85d..bb19285f56c01a671b9f32da19641d46cd4e310d 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -45,6 +45,7 @@ class JobPackageBase(object): try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0].platform + self._custom_directives = set() for job in jobs: if job.platform != self._platform or job.platform is None: raise Exception('Only one valid platform per package') @@ -80,6 +81,8 @@ class JobPackageBase(object): if not job.check_script(configuration, parameters): raise WrongTemplateException(job.name) job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) self._create_scripts(configuration) self._send_files() self._do_submission() @@ -186,7 +189,8 @@ class JobPackageArray(JobPackageBase): def _create_common_script(self, filename): script_content = self.platform.header.array_header(filename, self._array_size_id, self._wallclock, - self._num_processors) + self._num_processors, + directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) os.chmod(os.path.join(self._tmp_path, filename), 0o775) @@ -382,7 +386,8 @@ class JobPackageVertical(JobPackageThread): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) class JobPackageHorizontal(JobPackageThread): @@ -404,4 +409,5 @@ class JobPackageHorizontal(JobPackageThread): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) diff --git a/autosubmit/platforms/headers/ec_cca_header.py b/autosubmit/platforms/headers/ec_cca_header.py index 9037c9c59d2391e9161343aa546d6413795b6245..ea70b2ac0ac21b0c4d8e3a8506bc77946ff55801 100644 --- a/autosubmit/platforms/headers/ec_cca_header.py +++ b/autosubmit/platforms/headers/ec_cca_header.py @@ -80,6 +80,21 @@ class EcCcaHeader(object): return "#PBS -l EC_hyperthreads=2" return "#PBS -l EC_hyperthreads=1" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -91,6 +106,7 @@ class EcCcaHeader(object): #PBS -q ns #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### @@ -112,6 +128,7 @@ class EcCcaHeader(object): %HYPERTHREADING_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ec_header.py b/autosubmit/platforms/headers/ec_header.py index ff1eaadc060e0866f44e8ccf9960d38284e70c65..3482899fed8266d73fff70737ca336fcf6408464 100644 --- a/autosubmit/platforms/headers/ec_header.py +++ b/autosubmit/platforms/headers/ec_header.py @@ -36,6 +36,21 @@ class EcHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + # noinspection PyPep8 SERIAL = textwrap.dedent("""\ ############################################################################### @@ -52,6 +67,7 @@ class EcHeader(object): #@ resources = ConsumableCpus(1) ConsumableMemory(1200mb) #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -74,6 +90,7 @@ class EcHeader(object): #@ total_tasks = %NUMPROC% #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 331ffe154c4ba5f21ef48547833787da1155f5ff..6634b2a0d04e7ca7c329dc3f9134e851571e1281 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -59,8 +59,23 @@ class LsfHeader(object): else: return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + @classmethod - def array_header(cls, filename, array_id, wallclock, num_processors): + def array_header(cls, filename, array_id, wallclock, num_processors, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -72,16 +87,17 @@ class LsfHeader(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_processors)) + """.format(filename, array_id, wallclock, num_processors, '\n'.join(str(s) for s in kwargs['directives']))) @classmethod - def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive): + def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive, **kwargs): return textwrap.dedent("""\ #!/usr/bin/env python ############################################################################### @@ -94,6 +110,7 @@ class LsfHeader(object): #BSUB -W {1} #BSUB -n {2} {4} + {5} # ############################################################################### @@ -127,7 +144,8 @@ class LsfHeader(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive)) + """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive, + '\n'.join(str(s) for s in kwargs['directives']))) SERIAL = textwrap.dedent("""\ ############################################################################### @@ -141,6 +159,7 @@ class LsfHeader(object): #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %EXCLUSIVITY_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -158,6 +177,7 @@ class LsfHeader(object): #BSUB -n %NUMPROC% %TASKS_PER_NODE_DIRECTIVE% %SCRATCH_FREE_SPACE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs10_header.py b/autosubmit/platforms/headers/pbs10_header.py index 3197603e356b36f9b28a7f57e355ce73683c640f..33aa06d9f3f0d651cd88e08aca0153a6a4274ff5 100644 --- a/autosubmit/platforms/headers/pbs10_header.py +++ b/autosubmit/platforms/headers/pbs10_header.py @@ -36,6 +36,21 @@ class Pbs10Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs10Header(object): #PBS -q serial #PBS -l cput=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -59,6 +75,7 @@ class Pbs10Header(object): #PBS -l mppnppn=32 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs11_header.py b/autosubmit/platforms/headers/pbs11_header.py index 9f9919799139297098ebe7dfd7f7a2a539f537b4..423fe148caacadb0d8894db2d963a5c700ba2945 100644 --- a/autosubmit/platforms/headers/pbs11_header.py +++ b/autosubmit/platforms/headers/pbs11_header.py @@ -36,6 +36,21 @@ class Pbs11Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -48,6 +63,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -64,6 +80,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs12_header.py b/autosubmit/platforms/headers/pbs12_header.py index 014ebb63a9c5da028d06ada447ba188f507a3a7d..4407c8a326b6bb6138562d3187f6756165c48f21 100644 --- a/autosubmit/platforms/headers/pbs12_header.py +++ b/autosubmit/platforms/headers/pbs12_header.py @@ -36,6 +36,21 @@ class Pbs12Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs12Header(object): #PBS -l select=serial=true:ncpus=1 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -58,6 +74,7 @@ class Pbs12Header(object): #PBS -l select=%NUMPROC% #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ps_header.py b/autosubmit/platforms/headers/ps_header.py index 436bb08939582888368a4fe903b4daad0d067c0d..3286a1407d9e916d5d7294e52c470ffd9581d7b5 100644 --- a/autosubmit/platforms/headers/ps_header.py +++ b/autosubmit/platforms/headers/ps_header.py @@ -36,6 +36,19 @@ class PsHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT diff --git a/autosubmit/platforms/headers/sge_header.py b/autosubmit/platforms/headers/sge_header.py index 540c5f642f4c9e042d67d17270b93c26de82bddf..bcfb5d89226149b4740293d79e1d8aff4a92fe67 100644 --- a/autosubmit/platforms/headers/sge_header.py +++ b/autosubmit/platforms/headers/sge_header.py @@ -39,6 +39,21 @@ class SgeHeader(object): else: return "$ -q {0}".format(job.parameters['CURRENT_QUEUE']) + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -52,6 +67,7 @@ class SgeHeader(object): #$ -l h_rt=%WALLCLOCK%:00 #$ -l s_rt=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -70,6 +86,7 @@ class SgeHeader(object): #$ -l s_rt=%WALLCLOCK%:00 #$ -pe orte %NUMPROC% #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 677a26d569fe005d34ee11981d8fd782004a0d44..0d0693b06bcc191bb083b8ee7ad8a810d9cc6a24 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -84,6 +84,21 @@ class SlurmHeader(object): return "SBATCH --mem-per-cpu {0}".format(job.parameters['MEMORY_PER_TASK']) return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -98,6 +113,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -116,6 +132,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0bb98c347dd27da63c8d5b72f70dbf956d7a7ddf..71add2a03eb369373913040f1100c0bd116013d1 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -414,6 +414,8 @@ class ParamikoPlatform(Platform): header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_custom_directives'): + header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index cc65991182cc025460b742e3f597493599ab27bd..d84915c5ab1508c05fb7f12c8968a16d1b0502b3 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -22,6 +22,8 @@ import time import os +from bscearth.utils.log import Log + from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig from submitter import Submitter @@ -131,6 +133,9 @@ class ParamikoSubmitter(Submitter): remote_platform._serial_queue = parser.get_option(section, 'SERIAL_QUEUE', None) remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) + remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', + None) + Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50334edaa4bc1bb68fc71e9d5397debb8d4b4b82..9b1aca5ca6b8438d1b2b45930fb1f43d3fcbf861 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -27,6 +27,7 @@ class Platform(object): self._default_queue = None self.processors_per_node = None self.scratch_free_space = None + self.custom_directives = None self.host = '' self.user = '' self.project = '' diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 418c8af3ab13f7257d56780573e63cb75a2bbc81..67caebf80aad6c6b32cb2ec88ac6d53e7ec3fb15 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -42,6 +42,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -73,7 +74,8 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -92,6 +94,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -127,8 +130,9 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): - return '#' if dependency is None else '#PBS -W depend=afterok:{0}'.format(dependency) + return '#' if dependency is None else '#PBS -v depend=afterok:{0}'.format(dependency) diff --git a/autosubmit/platforms/wrappers/lsf_wrapper.py b/autosubmit/platforms/wrappers/lsf_wrapper.py index a2d2205956b6868c3da89026d03a92c0726a0f10..ffc1dc7ad7b3f5384ec26de2b752f669a48dc1b7 100644 --- a/autosubmit/platforms/wrappers/lsf_wrapper.py +++ b/autosubmit/platforms/wrappers/lsf_wrapper.py @@ -25,7 +25,7 @@ class LsfWrapper(object): """Class to handle wrappers on LSF platforms""" @classmethod - def array(cls, filename, array_id, wallclock, num_procs): + def array(cls, filename, array_id, wallclock, num_procs, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -37,13 +37,15 @@ class LsfWrapper(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_procs)) + """.format(filename, array_id, wallclock, num_procs, + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def vertical(cls, filename, queue, project, wallclock, num_procs, job_scripts, dependency, **kwargs): @@ -61,6 +63,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {6} + {7} # ############################################################################### @@ -95,7 +98,8 @@ class LsfWrapper(object): print "The job ", current.template," has FAILED" os._exit(1) """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, num_jobs, job_scripts, dependency, **kwargs): @@ -113,6 +117,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {7} + {10} # ############################################################################### @@ -158,7 +163,8 @@ class LsfWrapper(object): else: print "The job ", pid.template," has FAILED" """.format(filename, queue, project, wallclock, num_procs, (int(num_procs) / num_jobs), - str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", "${LSB_JOBID}")) + str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", + "${LSB_JOBID}", '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index 6f11cfec4fe4b4a9814e489a0ac8ea30a2b25d87..384aafb5a42251f55b0780979019e00a3a8a328e 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -33,13 +33,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -73,8 +74,9 @@ class SlurmWrapper(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -85,13 +87,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -133,9 +136,14 @@ class SlurmWrapper(object): print "The job ", pid.template," has been COMPLETED" else: print "The job ", pid.template," has FAILED" - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): return '#' if dependency is None else '#SBATCH --dependency=afterok:{0}'.format(dependency) + + @classmethod + def queue_directive(cls, queue): + return '#' if queue == '' else '#SBATCH -p {0}'.format(queue) \ No newline at end of file diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a90c07aeb8d7a65b030d74d5689c3a1d57a873b1..89c6ee80561fd2d71921ee1a37a0fc0b979bf2b3 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -644,7 +644,7 @@ For jobs running in HPC platforms, usually you have to provide information about * QUEUE: queue to add the job to. If not specificied, uses PLATFORM default. -There are also another, less used features that you can use: +There are also other, less used features that you can use: * FREQUENCY: specifies that a job has only to be run after X dates, members or chunk. A job will always be created for the last one. If not specified, defaults to 1 @@ -658,6 +658,8 @@ There are also another, less used features that you can use: * RERUN_DEPENDENCIES: defines the jobs to be rerun if this job is going to be rerunned. Syntax is identical to the used in DEPENDENCIES +* CUSTOM_DIRECTIVES: Custom directives for the HPC resource manager headers of the platform used for that job. + Example: .. code-block:: ini @@ -719,7 +721,7 @@ configuration, you can specify what platform or queue to use to run serial jobs * SERIAL_QUEUE: if specified, Autosubmit will run jobs with only one processor in the specified queue. Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided -There are some other parameters that you must need to specify: +There are some other parameters that you may need to specify: * BUDGET: budget account for the machine scheduler. If omitted, takes the value defined in PROJECT @@ -733,6 +735,8 @@ There are some other parameters that you must need to specify: * TOTAL_JOBS: maximum number of jobs to be running at the same time in this platform. +* CUSTOM_DIRECTIVES: Custom directives for the resource manager of this platform. + Example: .. code-block:: ini @@ -745,6 +749,7 @@ Example: USER = my_user SCRATCH_DIR = /scratch TEST_SUITE = True + CUSTOM_DIRECTIVES = [ "my_directive" ] How to change the communications library ===================================== diff --git a/docs/source/variables.rst b/docs/source/variables.rst index 62bbc69f35bdf9183c677b43ff35ea360f6424b0..70b9b5e514becb07af909f8eb24d58755c22e0ce 100644 --- a/docs/source/variables.rst +++ b/docs/source/variables.rst @@ -57,6 +57,7 @@ suite of variables is defined for the current platform where {PLATFORM_NAME} is - **{PLATFORM_NAME}_VERSION**: Platform scheduler version - **{PLATFORM_NAME}_SCRATCH_DIR**: Platform's scratch folder path - **{PLATFORM_NAME}_ROOTDIR**: Platform's experiment folder path +- **{PLATFORM_NAME}_CUSTOM_DIRECTIVES**: Platform's custom directives for the resource manager. .. hint:: The variables ``_USER``, ``_PROJ`` and ``_BUDG`` has no value on the LOCAL platform. diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 0bcf4404405c58dd9d31365b85623c8c5fb492df..705d32eeef88260d1a5e52e1dd32d44846e30e30 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -258,11 +258,13 @@ class TestJob(TestCase): as_conf.get_memory = Mock(return_value=80) as_conf.get_wallclock = Mock(return_value='00:30') as_conf.get_member_list = Mock(return_value=[]) + as_conf.get_custom_directives = Mock(return_value='["whatever"]') dummy_serial_platform = Mock() dummy_serial_platform.name = 'serial' dummy_platform = Mock() dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.custom_directives = '["whatever"]' self.job._platform = dummy_platform # Act parameters = self.job.update_parameters(as_conf, dict())