From 0e29473313d63982b2db0f44505240ab97c17005 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Tue, 1 Aug 2017 18:45:47 +0200 Subject: [PATCH 1/4] First version of custom headers working for paramiko. See #270 --- autosubmit/config/config_common.py | 10 ++++++++++ autosubmit/config/files/jobs.conf | 6 ++++-- autosubmit/config/files/platforms.conf | 5 ++++- autosubmit/job/job.py | 10 ++++++++++ autosubmit/job/job_packages.py | 4 ++-- autosubmit/platforms/headers/ec_cca_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/ec_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/lsf_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs10_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs11_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs12_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/ps_header.py | 13 +++++++++++++ autosubmit/platforms/headers/sge_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/slurm_header.py | 17 +++++++++++++++++ autosubmit/platforms/paramiko_platform.py | 2 ++ autosubmit/platforms/paramiko_submitter.py | 5 +++++ autosubmit/platforms/platform.py | 1 + 17 files changed, 187 insertions(+), 5 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200..10877bc43 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -204,6 +204,16 @@ class AutosubmitConfig(object): """ return str(self._jobs_parser.get_option(section, 'MEMORY_PER_TASK', '')) + def get_custom_directives(self, section): + """ + Gets custom directives needed for the given job type + :param section: job type + :type section: str + :return: custom directives needed + :rtype: str + """ + return str(self._jobs_parser.get_option(section, 'CUSTOM_DIRECTIVES', '')) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 8caccb5cb..79948db00 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -49,7 +49,9 @@ # TYPE = bash ## Synchronize a chunk job with its dependency chunks at a 'date' or 'member' level # SYNCHRONIZE = date | member - +## Optional. Custom directives for the resource manager of the platform used for that job. +## Put as many as you wish in json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh @@ -91,4 +93,4 @@ WALLCLOCK = 00:05 FILE = TRANSFER.sh PLATFORM = LOCAL DEPENDENCIES = CLEAN -RUNNING = member \ No newline at end of file +RUNNING = member diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 0fa5eee7f..1706b5728 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -45,4 +45,7 @@ # MAX_WALLCLOCK = 72:00 ## Max processors number per job submitted to the HPC. If not specified, defaults to empty. ## Optional. Required for wrappers. -# MAX_PROCESSORS = 1 \ No newline at end of file +# MAX_PROCESSORS = 1 +## Optional. Custom directives for the resource manager of the platform used. +## Put as many as you wish in a json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0d8a1842..4125e8a58 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -23,6 +23,7 @@ Main module for autosubmit. Only contains an interface class to all functionalit import os import re import time +import json from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython @@ -73,6 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None + self.custom_directives = None self.id = job_id self.file = None @@ -610,6 +612,13 @@ class Job(object): self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space + self.custom_directives = as_conf.get_custom_directives(self.section) + if self.custom_directives != '': + self.custom_directives = json.loads(as_conf.get_custom_directives(self.section)) + if job_platform.custom_directives: + self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) + elif job_platform.custom_directives: + self.custom_directives = json.loads(job_platform.custom_directives) parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory @@ -619,6 +628,7 @@ class Job(object): parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section parameters['SCRATCH_FREE_SPACE'] = self.scratch_free_space + parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['CURRENT_ARCH'] = job_platform.name parameters['CURRENT_HOST'] = job_platform.host diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 3bcd70ec5..e1c5dcffe 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -382,7 +382,7 @@ class JobPackageVertical(JobPackageThread): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, customdir=self.platform.customdir) class JobPackageHorizontal(JobPackageThread): @@ -404,4 +404,4 @@ class JobPackageHorizontal(JobPackageThread): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, customdir=self.platform.customdir) diff --git a/autosubmit/platforms/headers/ec_cca_header.py b/autosubmit/platforms/headers/ec_cca_header.py index 9037c9c59..ea70b2ac0 100644 --- a/autosubmit/platforms/headers/ec_cca_header.py +++ b/autosubmit/platforms/headers/ec_cca_header.py @@ -80,6 +80,21 @@ class EcCcaHeader(object): return "#PBS -l EC_hyperthreads=2" return "#PBS -l EC_hyperthreads=1" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -91,6 +106,7 @@ class EcCcaHeader(object): #PBS -q ns #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### @@ -112,6 +128,7 @@ class EcCcaHeader(object): %HYPERTHREADING_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ec_header.py b/autosubmit/platforms/headers/ec_header.py index ff1eaadc0..3482899fe 100644 --- a/autosubmit/platforms/headers/ec_header.py +++ b/autosubmit/platforms/headers/ec_header.py @@ -36,6 +36,21 @@ class EcHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + # noinspection PyPep8 SERIAL = textwrap.dedent("""\ ############################################################################### @@ -52,6 +67,7 @@ class EcHeader(object): #@ resources = ConsumableCpus(1) ConsumableMemory(1200mb) #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -74,6 +90,7 @@ class EcHeader(object): #@ total_tasks = %NUMPROC% #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 331ffe154..d14b14bbe 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -59,6 +59,21 @@ class LsfHeader(object): else: return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + @classmethod def array_header(cls, filename, array_id, wallclock, num_processors): return textwrap.dedent("""\ @@ -141,6 +156,7 @@ class LsfHeader(object): #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %EXCLUSIVITY_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -158,6 +174,7 @@ class LsfHeader(object): #BSUB -n %NUMPROC% %TASKS_PER_NODE_DIRECTIVE% %SCRATCH_FREE_SPACE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs10_header.py b/autosubmit/platforms/headers/pbs10_header.py index 3197603e3..33aa06d9f 100644 --- a/autosubmit/platforms/headers/pbs10_header.py +++ b/autosubmit/platforms/headers/pbs10_header.py @@ -36,6 +36,21 @@ class Pbs10Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs10Header(object): #PBS -q serial #PBS -l cput=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -59,6 +75,7 @@ class Pbs10Header(object): #PBS -l mppnppn=32 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs11_header.py b/autosubmit/platforms/headers/pbs11_header.py index 9f9919799..423fe148c 100644 --- a/autosubmit/platforms/headers/pbs11_header.py +++ b/autosubmit/platforms/headers/pbs11_header.py @@ -36,6 +36,21 @@ class Pbs11Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -48,6 +63,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -64,6 +80,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs12_header.py b/autosubmit/platforms/headers/pbs12_header.py index 014ebb63a..4407c8a32 100644 --- a/autosubmit/platforms/headers/pbs12_header.py +++ b/autosubmit/platforms/headers/pbs12_header.py @@ -36,6 +36,21 @@ class Pbs12Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs12Header(object): #PBS -l select=serial=true:ncpus=1 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -58,6 +74,7 @@ class Pbs12Header(object): #PBS -l select=%NUMPROC% #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ps_header.py b/autosubmit/platforms/headers/ps_header.py index 436bb0893..3286a1407 100644 --- a/autosubmit/platforms/headers/ps_header.py +++ b/autosubmit/platforms/headers/ps_header.py @@ -36,6 +36,19 @@ class PsHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT diff --git a/autosubmit/platforms/headers/sge_header.py b/autosubmit/platforms/headers/sge_header.py index 540c5f642..bcfb5d892 100644 --- a/autosubmit/platforms/headers/sge_header.py +++ b/autosubmit/platforms/headers/sge_header.py @@ -39,6 +39,21 @@ class SgeHeader(object): else: return "$ -q {0}".format(job.parameters['CURRENT_QUEUE']) + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -52,6 +67,7 @@ class SgeHeader(object): #$ -l h_rt=%WALLCLOCK%:00 #$ -l s_rt=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -70,6 +86,7 @@ class SgeHeader(object): #$ -l s_rt=%WALLCLOCK%:00 #$ -pe orte %NUMPROC% #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 677a26d56..0d0693b06 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -84,6 +84,21 @@ class SlurmHeader(object): return "SBATCH --mem-per-cpu {0}".format(job.parameters['MEMORY_PER_TASK']) return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -98,6 +113,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -116,6 +132,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0bb98c347..71add2a03 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -414,6 +414,8 @@ class ParamikoPlatform(Platform): header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_custom_directives'): + header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index cc6599118..9150fe505 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -22,6 +22,8 @@ import time import os +from bscearth.utils.log import Log + from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig from submitter import Submitter @@ -131,6 +133,9 @@ class ParamikoSubmitter(Submitter): remote_platform._serial_queue = parser.get_option(section, 'SERIAL_QUEUE', None) remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) + remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', + None) + Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50334edaa..9b1aca5ca 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -27,6 +27,7 @@ class Platform(object): self._default_queue = None self.processors_per_node = None self.scratch_free_space = None + self.custom_directives = None self.host = '' self.user = '' self.project = '' -- GitLab From 34bd552118580418fc8509a91010d0699faeb435 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 2 Aug 2017 16:46:32 +0200 Subject: [PATCH 2/4] Fix Unit test Job. Json object needs to be created for custom directives parameter. --- test/unit/test_job.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 0bcf44044..705d32eee 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -258,11 +258,13 @@ class TestJob(TestCase): as_conf.get_memory = Mock(return_value=80) as_conf.get_wallclock = Mock(return_value='00:30') as_conf.get_member_list = Mock(return_value=[]) + as_conf.get_custom_directives = Mock(return_value='["whatever"]') dummy_serial_platform = Mock() dummy_serial_platform.name = 'serial' dummy_platform = Mock() dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.custom_directives = '["whatever"]' self.job._platform = dummy_platform # Act parameters = self.job.update_parameters(as_conf, dict()) -- GitLab From 98a600ae0f4206c5db6477828356e89c8f65c104 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 2 Aug 2017 19:19:53 +0200 Subject: [PATCH 3/4] Documentation custom directives. Deactivation of W option into ec_wrapper (turned into v temporarly), since it is not working. See #267 --- autosubmit/config/files/jobs.conf | 2 +- autosubmit/config/files/platforms.conf | 2 +- autosubmit/platforms/wrappers/ec_wrapper.py | 2 +- docs/source/usage.rst | 9 +++++++-- docs/source/variables.rst | 1 + 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 79948db00..eaf192ce6 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -51,7 +51,7 @@ # SYNCHRONIZE = date | member ## Optional. Custom directives for the resource manager of the platform used for that job. ## Put as many as you wish in json formatted array. -# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 1706b5728..e5e6ff4c3 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -48,4 +48,4 @@ # MAX_PROCESSORS = 1 ## Optional. Custom directives for the resource manager of the platform used. ## Put as many as you wish in a json formatted array. -# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 418c8af3a..7e848dd1e 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -131,4 +131,4 @@ class EcWrapper(object): @classmethod def dependency_directive(cls, dependency): - return '#' if dependency is None else '#PBS -W depend=afterok:{0}'.format(dependency) + return '#' if dependency is None else '#PBS -v depend=afterok:{0}'.format(dependency) diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a90c07aeb..89c6ee805 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -644,7 +644,7 @@ For jobs running in HPC platforms, usually you have to provide information about * QUEUE: queue to add the job to. If not specificied, uses PLATFORM default. -There are also another, less used features that you can use: +There are also other, less used features that you can use: * FREQUENCY: specifies that a job has only to be run after X dates, members or chunk. A job will always be created for the last one. If not specified, defaults to 1 @@ -658,6 +658,8 @@ There are also another, less used features that you can use: * RERUN_DEPENDENCIES: defines the jobs to be rerun if this job is going to be rerunned. Syntax is identical to the used in DEPENDENCIES +* CUSTOM_DIRECTIVES: Custom directives for the HPC resource manager headers of the platform used for that job. + Example: .. code-block:: ini @@ -719,7 +721,7 @@ configuration, you can specify what platform or queue to use to run serial jobs * SERIAL_QUEUE: if specified, Autosubmit will run jobs with only one processor in the specified queue. Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided -There are some other parameters that you must need to specify: +There are some other parameters that you may need to specify: * BUDGET: budget account for the machine scheduler. If omitted, takes the value defined in PROJECT @@ -733,6 +735,8 @@ There are some other parameters that you must need to specify: * TOTAL_JOBS: maximum number of jobs to be running at the same time in this platform. +* CUSTOM_DIRECTIVES: Custom directives for the resource manager of this platform. + Example: .. code-block:: ini @@ -745,6 +749,7 @@ Example: USER = my_user SCRATCH_DIR = /scratch TEST_SUITE = True + CUSTOM_DIRECTIVES = [ "my_directive" ] How to change the communications library ===================================== diff --git a/docs/source/variables.rst b/docs/source/variables.rst index 62bbc69f3..70b9b5e51 100644 --- a/docs/source/variables.rst +++ b/docs/source/variables.rst @@ -57,6 +57,7 @@ suite of variables is defined for the current platform where {PLATFORM_NAME} is - **{PLATFORM_NAME}_VERSION**: Platform scheduler version - **{PLATFORM_NAME}_SCRATCH_DIR**: Platform's scratch folder path - **{PLATFORM_NAME}_ROOTDIR**: Platform's experiment folder path +- **{PLATFORM_NAME}_CUSTOM_DIRECTIVES**: Platform's custom directives for the resource manager. .. hint:: The variables ``_USER``, ``_PROJ`` and ``_BUDG`` has no value on the LOCAL platform. -- GitLab From e8577e40aa451b9a7ee3d9c0e55d57a9cbd07568 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 4 Aug 2017 17:28:05 +0200 Subject: [PATCH 4/4] Custom directives implemented on packages. Tested on ECMWF and MN4. Fixes #270 --- autosubmit/job/job.py | 4 +++- autosubmit/job/job_packages.py | 12 ++++++++--- autosubmit/platforms/headers/lsf_header.py | 11 ++++++---- autosubmit/platforms/paramiko_submitter.py | 2 +- autosubmit/platforms/wrappers/ec_wrapper.py | 8 ++++++-- autosubmit/platforms/wrappers/lsf_wrapper.py | 14 +++++++++---- .../platforms/wrappers/slurm_wrapper.py | 20 +++++++++++++------ 7 files changed, 50 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4125e8a58..df36b4c14 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -74,7 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None - self.custom_directives = None + self.custom_directives = [] self.id = job_id self.file = None @@ -619,6 +619,8 @@ class Job(object): self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) elif job_platform.custom_directives: self.custom_directives = json.loads(job_platform.custom_directives) + elif self.custom_directives == '': + self.custom_directives = [] parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index e1c5dcffe..bb19285f5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -45,6 +45,7 @@ class JobPackageBase(object): try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0].platform + self._custom_directives = set() for job in jobs: if job.platform != self._platform or job.platform is None: raise Exception('Only one valid platform per package') @@ -80,6 +81,8 @@ class JobPackageBase(object): if not job.check_script(configuration, parameters): raise WrongTemplateException(job.name) job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) self._create_scripts(configuration) self._send_files() self._do_submission() @@ -186,7 +189,8 @@ class JobPackageArray(JobPackageBase): def _create_common_script(self, filename): script_content = self.platform.header.array_header(filename, self._array_size_id, self._wallclock, - self._num_processors) + self._num_processors, + directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) os.chmod(os.path.join(self._tmp_path, filename), 0o775) @@ -382,7 +386,8 @@ class JobPackageVertical(JobPackageThread): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir, customdir=self.platform.customdir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) class JobPackageHorizontal(JobPackageThread): @@ -404,4 +409,5 @@ class JobPackageHorizontal(JobPackageThread): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir, customdir=self.platform.customdir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index d14b14bbe..6634b2a0d 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -75,7 +75,7 @@ class LsfHeader(object): return "" @classmethod - def array_header(cls, filename, array_id, wallclock, num_processors): + def array_header(cls, filename, array_id, wallclock, num_processors, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -87,16 +87,17 @@ class LsfHeader(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_processors)) + """.format(filename, array_id, wallclock, num_processors, '\n'.join(str(s) for s in kwargs['directives']))) @classmethod - def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive): + def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive, **kwargs): return textwrap.dedent("""\ #!/usr/bin/env python ############################################################################### @@ -109,6 +110,7 @@ class LsfHeader(object): #BSUB -W {1} #BSUB -n {2} {4} + {5} # ############################################################################### @@ -142,7 +144,8 @@ class LsfHeader(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive)) + """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive, + '\n'.join(str(s) for s in kwargs['directives']))) SERIAL = textwrap.dedent("""\ ############################################################################### diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 9150fe505..d84915c5a 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -134,7 +134,7 @@ class ParamikoSubmitter(Submitter): remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', - None) + None) Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 7e848dd1e..67caebf80 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -42,6 +42,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -73,7 +74,8 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -92,6 +94,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -127,7 +130,8 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/lsf_wrapper.py b/autosubmit/platforms/wrappers/lsf_wrapper.py index a2d220595..ffc1dc7ad 100644 --- a/autosubmit/platforms/wrappers/lsf_wrapper.py +++ b/autosubmit/platforms/wrappers/lsf_wrapper.py @@ -25,7 +25,7 @@ class LsfWrapper(object): """Class to handle wrappers on LSF platforms""" @classmethod - def array(cls, filename, array_id, wallclock, num_procs): + def array(cls, filename, array_id, wallclock, num_procs, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -37,13 +37,15 @@ class LsfWrapper(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_procs)) + """.format(filename, array_id, wallclock, num_procs, + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def vertical(cls, filename, queue, project, wallclock, num_procs, job_scripts, dependency, **kwargs): @@ -61,6 +63,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {6} + {7} # ############################################################################### @@ -95,7 +98,8 @@ class LsfWrapper(object): print "The job ", current.template," has FAILED" os._exit(1) """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, num_jobs, job_scripts, dependency, **kwargs): @@ -113,6 +117,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {7} + {10} # ############################################################################### @@ -158,7 +163,8 @@ class LsfWrapper(object): else: print "The job ", pid.template," has FAILED" """.format(filename, queue, project, wallclock, num_procs, (int(num_procs) / num_jobs), - str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", "${LSB_JOBID}")) + str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", + "${LSB_JOBID}", '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index 6f11cfec4..384aafb5a 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -33,13 +33,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -73,8 +74,9 @@ class SlurmWrapper(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -85,13 +87,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -133,9 +136,14 @@ class SlurmWrapper(object): print "The job ", pid.template," has been COMPLETED" else: print "The job ", pid.template," has FAILED" - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): return '#' if dependency is None else '#SBATCH --dependency=afterok:{0}'.format(dependency) + + @classmethod + def queue_directive(cls, queue): + return '#' if queue == '' else '#SBATCH -p {0}'.format(queue) \ No newline at end of file -- GitLab