diff --git a/CHANGELOG b/CHANGELOG index 3f01bb2057baf954312f046daade44089623c187..f6e5334aa6e8249d92bb85e96eb74746a2592a5a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,13 @@ +3.9.0 + Custom directives for the HPC resource manager headers + can be added on platforms and jobs configuration files + ~ only paramiko (LSF, SLURM and PBS) + First version with migrate experiments (to another user) + On CCA, TASKS and THREADS can be expressed in lots (e.g. 127:1) + Some bug fixes: + - QUEUE on slurm specified on directive qos instead of partition + - Variable expansion on CCA (ECMWF) headers + 3.8.1 First version with job packages ~ only paramiko (LSF, SLURM and PBS) - Vertical diff --git a/VERSION b/VERSION index f2807196747ffcee4ae8a36604b9cff7ebeed9ca..a5c4c763394f2d3796e7145b038054d03d66861a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.8.1 +3.9.0 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5530ba3f50c92383756f7f485c5ddb959e81f4e1..bf434b4efce80b194216351654b2fd8586822ccf 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -50,6 +50,7 @@ import random import signal import datetime import portalocker +import pwd from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool @@ -214,6 +215,13 @@ class Autosubmit: subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') + # Migrate + subparser = subparsers.add_parser('migrate', description="Migrate experiments from current user to another") + subparser.add_argument('expid', help='experiment identifier') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('-o', '--offer', action="store_true", default=False, help='Offer experiment') + group.add_argument('-p', '--pickup', action="store_true", default=False, help='Pick-up released experiment') + # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") subparser.add_argument('expid', help='experiment identifier') @@ -340,6 +348,8 @@ class Autosubmit: return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide) elif args.command == 'check': return Autosubmit.check(args.expid) + elif args.command == 'migrate': + return Autosubmit.migrate(args.expid, args.offer, args.pickup) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output) elif args.command == 'configure': @@ -499,7 +509,8 @@ class Autosubmit: Log.debug("Creating temporal directory...") exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) - os.mkdir(os.path.join(exp_id_path, "tmp"), 0o775) + os.mkdir(os.path.join(exp_id_path, "tmp")) + os.chmod(os.path.join(exp_id_path, "tmp"), 0o775) Log.debug("Creating pkl directory...") os.mkdir(os.path.join(exp_id_path, "pkl")) @@ -1052,6 +1063,105 @@ class Autosubmit: return True + @staticmethod + def migrate(experiment_id, offer, pickup): + """ + Migrates experiment files from current to other user. + It takes mapping information for new user from config files. + + :param experiment_id: experiment identifier: + :param pickup: + :param offer: + """ + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) + Log.set_file(log_file) + + if offer: + Log.info('Migrating experiment {0}'.format(experiment_id)) + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not proceed with invalid configuration') + return False + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + + Log.info("Checking remote platforms") + platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) + for platform in platforms: + Log.info("Updating {0} platform configuration with target user", platform) + if not as_conf.get_migrate_user_to(platform): + Log.critical("Missing target user in platforms configuration file") + return False + + as_conf.set_new_user(platform, as_conf.get_migrate_user_to(platform)) + Log.info("User in platform configuration file successfully updated to {0}", + as_conf.get_migrate_user_to(platform)) + + if as_conf.get_migrate_project_to(platform): + Log.info("Updating {0} platform configuration with target project", platform) + as_conf.set_new_project(platform, as_conf.get_migrate_project_to(platform)) + Log.info("Project in platform configuration file successfully updated to {0}", + as_conf.get_migrate_user_to(platform)) + else: + Log.warning("Project in platforms configuration file remains unchanged") + + Log.info("Moving remote files/dirs on {0}", platform) + p = submitter.platforms[platform] + Log.info("Moving from {0} to {1}", os.path.join(p.root_dir), + os.path.join(p.temp_dir, experiment_id)) + if not p.move_file(os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)): + Log.critical("The files/dirs on {0} cannot be moved to {1}.", p.root_dir, + os.path.join(p.temp_dir, experiment_id)) + return False + + Log.result("Files/dirs on {0} have been successfully offered", platform) + + Log.info("Moving local files/dirs") + if not Autosubmit.archive(experiment_id, False): + Log.critical("The experiment cannot be offered") + return False + + Log.result("The experiment has been successfully offered.") + + elif pickup: + Log.info('Migrating experiment {0}'.format(experiment_id)) + Log.info("Moving local files/dirs") + if not Autosubmit.unarchive(experiment_id): + Log.critical("The experiment cannot be picked up") + return False + Log.info("Local files/dirs have been sucessfully picked up") + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not proceed with invalid configuration') + return False + + Log.info("Checking remote platforms") + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + + platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) + for platform in platforms: + Log.info("Copying remote files/dirs on {0}", platform) + p = submitter.platforms[platform] + Log.info("Copying from {0} to {1}", os.path.join(p.temp_dir, experiment_id), + os.path.join(p.root_dir)) + if not p.send_command("cp -r " + os.path.join(p.temp_dir, experiment_id) + " " + + os.path.join(p.root_dir)): + Log.critical("The files/dirs on {0} cannot be copied to {1}.", + os.path.join(p.temp_dir, experiment_id), p.root_dir) + return False + + Log.result("Files/dirs on {0} have been successfully picked up", platform) + + Log.result("The experiment has been successfully picked up.") + + return True + @staticmethod def check(experiment_id): """ @@ -1444,11 +1554,13 @@ class Autosubmit: return True @staticmethod - def archive(expid): + def archive(expid, clean=True): """ Archives an experiment: call clean (if experiment is of version 3 or later), compress folder to tar.gz and moves to year's folder + :param clean: + :return: :param expid: experiment identifier :type expid: str """ @@ -1459,14 +1571,15 @@ class Autosubmit: Log.warning("Does an experiment with the given id exist?") return 1 - Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'archive{0}.log'.format(expid))) + Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'archive_{0}.log'.format(expid))) exp_folder = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - # Cleaning to reduce file size. - version = get_autosubmit_version(expid) - if version is not None and version.startswith('3') and not Autosubmit.clean(expid, True, True, True, False): - Log.critical("Can not archive project. Clean not successful") - return False + if clean: + # Cleaning to reduce file size. + version = get_autosubmit_version(expid) + if version is not None and version.startswith('3') and not Autosubmit.clean(expid, True, True, True, False): + Log.critical("Can not archive project. Clean not successful") + return False # Getting year of last completed. If not, year of expid folder year = None @@ -1491,6 +1604,7 @@ class Autosubmit: with tarfile.open(os.path.join(year_path, '{0}.tar.gz'.format(expid)), "w:gz") as tar: tar.add(exp_folder, arcname='') tar.close() + os.chmod(os.path.join(year_path, '{0}.tar.gz'.format(expid)), 0o775) except Exception as e: Log.critical("Can not write tar file: {0}".format(e)) return False @@ -1516,7 +1630,7 @@ class Autosubmit: :type experiment_id: str """ BasicConfig.read() - Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'unarchive{0}.log'.format(experiment_id))) + Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'unarchive_{0}.log'.format(experiment_id))) exp_folder = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) if os.path.exists(exp_folder): diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200bfc2d840b14978c58ac8349a1e14d2b..83769e5e1586df4e290fe83ea75285442977b72f 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -162,7 +162,7 @@ class AutosubmitConfig(object): :return: threads needed :rtype: str """ - return int(self._jobs_parser.get_option(section, 'THREADS', 1)) + return str(self._jobs_parser.get_option(section, 'THREADS', 1)) def get_tasks(self, section): """ @@ -170,9 +170,9 @@ class AutosubmitConfig(object): :param section: job type :type section: str :return: tasks (processes) per host - :rtype: int + :rtype: str """ - return int(self._jobs_parser.get_option(section, 'TASKS', 0)) + return str(self._jobs_parser.get_option(section, 'TASKS', 0)) def get_scratch_free_space(self, section): """ @@ -204,6 +204,58 @@ class AutosubmitConfig(object): """ return str(self._jobs_parser.get_option(section, 'MEMORY_PER_TASK', '')) + def get_migrate_user_to(self, section): + """ + Returns the user to change to from platform config file. + + :return: migrate user to + :rtype: str + """ + return self._platforms_parser.get_option(section, 'USER_TO', '').lower() + + def set_new_user(self, section, new_user): + """ + Sets new user for given platform + :param new_user: + :param section: platform name + :type: str + """ + content = open(self._platforms_parser_file).read() + if re.search(section, content): + content = content.replace(re.search('USER =.*', content).group(0), "USER = " + new_user) + open(self._platforms_parser_file, 'w').write(content) + + def get_migrate_project_to(self, section): + """ + Returns the project to change to from platform config file. + + :return: migrate project to + :rtype: str + """ + return self._platforms_parser.get_option(section, 'PROJECT_TO', '').lower() + + def set_new_project(self, section, new_project): + """ + Sets new project for given platform + :param new_project: + :param section: platform name + :type: str + """ + content = open(self._platforms_parser_file).read() + if re.search(section, content): + content = content.replace(re.search('PROJECT =.*', content).group(0), "PROJECT = " + new_project) + open(self._platforms_parser_file, 'w').write(content) + + def get_custom_directives(self, section): + """ + Gets custom directives needed for the given job type + :param section: job type + :type section: str + :return: custom directives needed + :rtype: str + """ + return str(self._jobs_parser.get_option(section, 'CUSTOM_DIRECTIVES', '')) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 78dc9f4239d3fcd5a733550bcc0cca17f23cc0e2..90770b8d0a9611157495ea14d9dfa8518faae208 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -36,3 +36,7 @@ API = paramiko TYPE = pkl # Defines if the remote logs will be copied to the local platform. Default = True. COPY_REMOTE_LOGS = True + +[migrate] +# Changes experiment files owner. +TO_USER = diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 8caccb5cbfc5044c64e351377ba51f4dc5517869..eaf192ce6b081598bb98537b9079061013f74d8f 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -49,7 +49,9 @@ # TYPE = bash ## Synchronize a chunk job with its dependency chunks at a 'date' or 'member' level # SYNCHRONIZE = date | member - +## Optional. Custom directives for the resource manager of the platform used for that job. +## Put as many as you wish in json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh @@ -91,4 +93,4 @@ WALLCLOCK = 00:05 FILE = TRANSFER.sh PLATFORM = LOCAL DEPENDENCIES = CLEAN -RUNNING = member \ No newline at end of file +RUNNING = member diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 0fa5eee7f3bd6719eade3733eb53b9c0d756786e..a06f59dd0c0a234b8027d5fe180590487bdb8227 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -16,8 +16,12 @@ # ADD_PROJECT_TO_HOST = False ## User for the machine scheduler. Required # USER = +## Optional. If given, Autosubmit will change owner of files in given platform when using migrate_exp. +# USER_TO = ## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch +## Path to the machine's temporary directory for migrate purposes. +# TEMP_DIR = /tmp ## If true, Autosubmit test command can use this queue as a main queue. Defaults to False # TEST_SUITE = False ## If given, Autosubmit will add jobs to the given queue. Required for some platforms. @@ -45,4 +49,7 @@ # MAX_WALLCLOCK = 72:00 ## Max processors number per job submitted to the HPC. If not specified, defaults to empty. ## Optional. Required for wrappers. -# MAX_PROCESSORS = 1 \ No newline at end of file +# MAX_PROCESSORS = 1 +## Optional. Custom directives for the resource manager of the platform used. +## Put as many as you wish in a json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0d8a18420882ef94ca6f8c17dd74130be7d2682..dd60875a0954dbb5bf2228d1f22909d191ca9d0d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -23,6 +23,7 @@ Main module for autosubmit. Only contains an interface class to all functionalit import os import re import time +import json from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython @@ -59,8 +60,8 @@ class Job(object): self.platform_name = None self.section = None self.wallclock = None - self.tasks = None - self.threads = None + self.tasks = '1' + self.threads = '1' self.processors = '1' self.memory = '' self.memory_per_task = '' @@ -73,6 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None + self.custom_directives = [] self.id = job_id self.file = None @@ -610,6 +612,15 @@ class Job(object): self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space + self.custom_directives = as_conf.get_custom_directives(self.section) + if self.custom_directives != '': + self.custom_directives = json.loads(as_conf.get_custom_directives(self.section)) + if job_platform.custom_directives: + self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) + elif job_platform.custom_directives: + self.custom_directives = json.loads(job_platform.custom_directives) + elif self.custom_directives == '': + self.custom_directives = [] parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory @@ -619,6 +630,7 @@ class Job(object): parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section parameters['SCRATCH_FREE_SPACE'] = self.scratch_free_space + parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['CURRENT_ARCH'] = job_platform.name parameters['CURRENT_HOST'] = job_platform.host diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index aa01e9d9300a809490af2c2468d129810f2b819b..1775768194ce83e3b3423d1e076f8663f55ba35c 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -292,8 +292,8 @@ class DicJobs: job.queue = self.get_option(section, "QUEUE", None) job.check = self.get_option(section, "CHECK", 'True').lower() job.processors = str(self.get_option(section, "PROCESSORS", 1)) - job.threads = self.get_option(section, "THREADS", '') - job.tasks = self.get_option(section, "TASKS", '') + job.threads = str(self.get_option(section, "THREADS", 1)) + job.tasks = str(self.get_option(section, "TASKS", 1)) job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 11d206d15b076797916a7be9c92f005113016d5f..bb19285f56c01a671b9f32da19641d46cd4e310d 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -45,6 +45,7 @@ class JobPackageBase(object): try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0].platform + self._custom_directives = set() for job in jobs: if job.platform != self._platform or job.platform is None: raise Exception('Only one valid platform per package') @@ -80,6 +81,8 @@ class JobPackageBase(object): if not job.check_script(configuration, parameters): raise WrongTemplateException(job.name) job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) self._create_scripts(configuration) self._send_files() self._do_submission() @@ -186,7 +189,8 @@ class JobPackageArray(JobPackageBase): def _create_common_script(self, filename): script_content = self.platform.header.array_header(filename, self._array_size_id, self._wallclock, - self._num_processors) + self._num_processors, + directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) os.chmod(os.path.join(self._tmp_path, filename), 0o775) @@ -381,7 +385,9 @@ class JobPackageVertical(JobPackageThread): def _common_script_content(self): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, - self._jobs_scripts, self._job_dependency, expid=self._expid) + self._jobs_scripts, self._job_dependency, expid=self._expid, + rootdir=self.platform.root_dir, + directives=self._custom_directives) class JobPackageHorizontal(JobPackageThread): @@ -402,4 +408,6 @@ class JobPackageHorizontal(JobPackageThread): def _common_script_content(self): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, - self._job_dependency, expid=self._expid) + self._job_dependency, expid=self._expid, + rootdir=self.platform.root_dir, + directives=self._custom_directives) diff --git a/autosubmit/platforms/headers/ec_cca_header.py b/autosubmit/platforms/headers/ec_cca_header.py index 9037c9c59d2391e9161343aa546d6413795b6245..4b51556e92096829db0a5a0a3b9cdff26a8764a2 100644 --- a/autosubmit/platforms/headers/ec_cca_header.py +++ b/autosubmit/platforms/headers/ec_cca_header.py @@ -38,14 +38,14 @@ class EcCcaHeader(object): # noinspection PyMethodMayBeStatic def get_tasks_per_node(self, job): - if not isinstance(job.tasks, int): + if not isinstance(job.tasks, str): return "" else: return '#PBS -l EC_tasks_per_node={0}'.format(job.tasks) # noinspection PyMethodMayBeStatic def get_threads_per_task(self, job): - if not isinstance(job.threads, int): + if not isinstance(job.threads, str): return "" else: return '#PBS -l EC_threads_per_task={0}'.format(job.threads) @@ -80,6 +80,21 @@ class EcCcaHeader(object): return "#PBS -l EC_hyperthreads=2" return "#PBS -l EC_hyperthreads=1" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -91,6 +106,7 @@ class EcCcaHeader(object): #PBS -q ns #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### @@ -112,6 +128,7 @@ class EcCcaHeader(object): %HYPERTHREADING_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ec_header.py b/autosubmit/platforms/headers/ec_header.py index ff1eaadc060e0866f44e8ccf9960d38284e70c65..3482899fed8266d73fff70737ca336fcf6408464 100644 --- a/autosubmit/platforms/headers/ec_header.py +++ b/autosubmit/platforms/headers/ec_header.py @@ -36,6 +36,21 @@ class EcHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + # noinspection PyPep8 SERIAL = textwrap.dedent("""\ ############################################################################### @@ -52,6 +67,7 @@ class EcHeader(object): #@ resources = ConsumableCpus(1) ConsumableMemory(1200mb) #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -74,6 +90,7 @@ class EcHeader(object): #@ total_tasks = %NUMPROC% #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 331ffe154c4ba5f21ef48547833787da1155f5ff..6634b2a0d04e7ca7c329dc3f9134e851571e1281 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -59,8 +59,23 @@ class LsfHeader(object): else: return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + @classmethod - def array_header(cls, filename, array_id, wallclock, num_processors): + def array_header(cls, filename, array_id, wallclock, num_processors, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -72,16 +87,17 @@ class LsfHeader(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_processors)) + """.format(filename, array_id, wallclock, num_processors, '\n'.join(str(s) for s in kwargs['directives']))) @classmethod - def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive): + def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive, **kwargs): return textwrap.dedent("""\ #!/usr/bin/env python ############################################################################### @@ -94,6 +110,7 @@ class LsfHeader(object): #BSUB -W {1} #BSUB -n {2} {4} + {5} # ############################################################################### @@ -127,7 +144,8 @@ class LsfHeader(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive)) + """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive, + '\n'.join(str(s) for s in kwargs['directives']))) SERIAL = textwrap.dedent("""\ ############################################################################### @@ -141,6 +159,7 @@ class LsfHeader(object): #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %EXCLUSIVITY_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -158,6 +177,7 @@ class LsfHeader(object): #BSUB -n %NUMPROC% %TASKS_PER_NODE_DIRECTIVE% %SCRATCH_FREE_SPACE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs10_header.py b/autosubmit/platforms/headers/pbs10_header.py index 3197603e356b36f9b28a7f57e355ce73683c640f..33aa06d9f3f0d651cd88e08aca0153a6a4274ff5 100644 --- a/autosubmit/platforms/headers/pbs10_header.py +++ b/autosubmit/platforms/headers/pbs10_header.py @@ -36,6 +36,21 @@ class Pbs10Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs10Header(object): #PBS -q serial #PBS -l cput=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -59,6 +75,7 @@ class Pbs10Header(object): #PBS -l mppnppn=32 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs11_header.py b/autosubmit/platforms/headers/pbs11_header.py index 9f9919799139297098ebe7dfd7f7a2a539f537b4..423fe148caacadb0d8894db2d963a5c700ba2945 100644 --- a/autosubmit/platforms/headers/pbs11_header.py +++ b/autosubmit/platforms/headers/pbs11_header.py @@ -36,6 +36,21 @@ class Pbs11Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -48,6 +63,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -64,6 +80,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs12_header.py b/autosubmit/platforms/headers/pbs12_header.py index 014ebb63a9c5da028d06ada447ba188f507a3a7d..4407c8a326b6bb6138562d3187f6756165c48f21 100644 --- a/autosubmit/platforms/headers/pbs12_header.py +++ b/autosubmit/platforms/headers/pbs12_header.py @@ -36,6 +36,21 @@ class Pbs12Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs12Header(object): #PBS -l select=serial=true:ncpus=1 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -58,6 +74,7 @@ class Pbs12Header(object): #PBS -l select=%NUMPROC% #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ps_header.py b/autosubmit/platforms/headers/ps_header.py index 436bb08939582888368a4fe903b4daad0d067c0d..3286a1407d9e916d5d7294e52c470ffd9581d7b5 100644 --- a/autosubmit/platforms/headers/ps_header.py +++ b/autosubmit/platforms/headers/ps_header.py @@ -36,6 +36,19 @@ class PsHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT diff --git a/autosubmit/platforms/headers/sge_header.py b/autosubmit/platforms/headers/sge_header.py index 540c5f642f4c9e042d67d17270b93c26de82bddf..bcfb5d89226149b4740293d79e1d8aff4a92fe67 100644 --- a/autosubmit/platforms/headers/sge_header.py +++ b/autosubmit/platforms/headers/sge_header.py @@ -39,6 +39,21 @@ class SgeHeader(object): else: return "$ -q {0}".format(job.parameters['CURRENT_QUEUE']) + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -52,6 +67,7 @@ class SgeHeader(object): #$ -l h_rt=%WALLCLOCK%:00 #$ -l s_rt=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -70,6 +86,7 @@ class SgeHeader(object): #$ -l s_rt=%WALLCLOCK%:00 #$ -pe orte %NUMPROC% #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 677a26d569fe005d34ee11981d8fd782004a0d44..d47d1dcbc25eff68862e8352ba052be6c7b51c36 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -37,7 +37,7 @@ class SlurmHeader(object): if job.parameters['CURRENT_QUEUE'] == '': return "" else: - return "SBATCH -p {0}".format(job.parameters['CURRENT_QUEUE']) + return "SBATCH --qos={0}".format(job.parameters['CURRENT_QUEUE']) # noinspection PyMethodMayBeStatic,PyUnusedLocal def get_account_directive(self, job): @@ -84,6 +84,21 @@ class SlurmHeader(object): return "SBATCH --mem-per-cpu {0}".format(job.parameters['MEMORY_PER_TASK']) return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -98,6 +113,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -116,6 +132,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0bb98c347dd27da63c8d5b72f70dbf956d7a7ddf..71add2a03eb369373913040f1100c0bd116013d1 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -414,6 +414,8 @@ class ParamikoPlatform(Platform): header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_custom_directives'): + header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index cc65991182cc025460b742e3f597493599ab27bd..404ab9eedbe5217a985bbf97a324bfee0530d35a 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -22,6 +22,8 @@ import time import os +from bscearth.utils.log import Log + from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig from submitter import Submitter @@ -69,6 +71,7 @@ class ParamikoSubmitter(Submitter): local_platform.max_waiting_jobs = asconf.get_max_waiting_jobs() local_platform.total_jobs = asconf.get_total_jobs() local_platform.scratch = os.path.join(BasicConfig.LOCAL_ROOT_DIR, asconf.expid, BasicConfig.LOCAL_TMP_DIR) + local_platform.temp_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, 'ASlogs') local_platform.root_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, local_platform.expid) local_platform.host = 'localhost' platforms['local'] = local_platform @@ -127,10 +130,14 @@ class ParamikoSubmitter(Submitter): remote_platform.exclusivity = parser.get_option(section, 'EXCLUSIVITY', '').lower() remote_platform.user = parser.get_option(section, 'USER', None) remote_platform.scratch = parser.get_option(section, 'SCRATCH_DIR', None) + remote_platform.temp_dir = parser.get_option(section, 'TEMP_DIR', None) remote_platform._default_queue = parser.get_option(section, 'QUEUE', None) remote_platform._serial_queue = parser.get_option(section, 'SERIAL_QUEUE', None) remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) + remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', + None) + Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50334edaa4bc1bb68fc71e9d5397debb8d4b4b82..cef7957cdfa91f16c0665eea44745a5cbc3a64ac 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -27,6 +27,7 @@ class Platform(object): self._default_queue = None self.processors_per_node = None self.scratch_free_space = None + self.custom_directives = None self.host = '' self.user = '' self.project = '' @@ -35,6 +36,7 @@ class Platform(object): self.exclusivity = '' self.type = '' self.scratch = '' + self.temp_dir = '' self.root_dir = '' self.service = None self.scheduler = None @@ -128,6 +130,7 @@ class Platform(object): parameters['{0}EXCLUSIVITY'.format(prefix)] = self.exclusivity parameters['{0}TYPE'.format(prefix)] = self.type parameters['{0}SCRATCH_DIR'.format(prefix)] = self.scratch + parameters['{0}TEMP_DIR'.format(prefix)] = self.temp_dir parameters['{0}ROOTDIR'.format(prefix)] = self.root_dir parameters['{0}LOGDIR'.format(prefix)] = self.get_files_path() diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 71ffdf57c3a93cbac834b6af06dcc5bf2afe385a..67caebf80aad6c6b32cb2ec88ac6d53e7ec3fb15 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -21,6 +21,7 @@ import textwrap # TODO: Refactor with kwargs +# TODO: Project is not EC_billing_account, use budget class EcWrapper(object): """Class to handle wrappers on ECMWF platform""" @@ -35,12 +36,13 @@ class EcWrapper(object): #PBS -N {0} #PBS -q {1} #PBS -l EC_billing_account={2} - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.out - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.err + #PBS -o {8}/LOG_{6}/{0}.out + #PBS -e {8}/LOG_{6}/{0}.err #PBS -l walltime={3}:00 #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -66,12 +68,14 @@ class EcWrapper(object): echo "The job $script has been COMPLETED" else echo "The job $script has FAILED" + exit 1 fi i=$((i+1)) done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -84,12 +88,13 @@ class EcWrapper(object): #PBS -N {0} #PBS -q {1} #PBS -l EC_billing_account={2} - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.out - #PBS -e $SCRATCH/{6}/LOG_{6}/{0}.err + #PBS -o {8}/LOG_{6}/{0}.out + #PBS -e {8}/LOG_{6}/{0}.err #PBS -l walltime={3}:00 #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -125,8 +130,9 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): - return '#' if dependency is None else '#PBS -W depend=afterok:{0}'.format(dependency) + return '#' if dependency is None else '#PBS -v depend=afterok:{0}'.format(dependency) diff --git a/autosubmit/platforms/wrappers/lsf_wrapper.py b/autosubmit/platforms/wrappers/lsf_wrapper.py index a2d2205956b6868c3da89026d03a92c0726a0f10..ffc1dc7ad7b3f5384ec26de2b752f669a48dc1b7 100644 --- a/autosubmit/platforms/wrappers/lsf_wrapper.py +++ b/autosubmit/platforms/wrappers/lsf_wrapper.py @@ -25,7 +25,7 @@ class LsfWrapper(object): """Class to handle wrappers on LSF platforms""" @classmethod - def array(cls, filename, array_id, wallclock, num_procs): + def array(cls, filename, array_id, wallclock, num_procs, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -37,13 +37,15 @@ class LsfWrapper(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_procs)) + """.format(filename, array_id, wallclock, num_procs, + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def vertical(cls, filename, queue, project, wallclock, num_procs, job_scripts, dependency, **kwargs): @@ -61,6 +63,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {6} + {7} # ############################################################################### @@ -95,7 +98,8 @@ class LsfWrapper(object): print "The job ", current.template," has FAILED" os._exit(1) """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, num_jobs, job_scripts, dependency, **kwargs): @@ -113,6 +117,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {7} + {10} # ############################################################################### @@ -158,7 +163,8 @@ class LsfWrapper(object): else: print "The job ", pid.template," has FAILED" """.format(filename, queue, project, wallclock, num_procs, (int(num_procs) / num_jobs), - str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", "${LSB_JOBID}")) + str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", + "${LSB_JOBID}", '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index 6f11cfec4fe4b4a9814e489a0ac8ea30a2b25d87..c37f000fb41b0abdb34b7df84ed7aa5efff3fde1 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -33,13 +33,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -73,8 +74,9 @@ class SlurmWrapper(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -85,13 +87,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -133,9 +136,14 @@ class SlurmWrapper(object): print "The job ", pid.template," has been COMPLETED" else: print "The job ", pid.template," has FAILED" - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): return '#' if dependency is None else '#SBATCH --dependency=afterok:{0}'.format(dependency) + + @classmethod + def queue_directive(cls, queue): + return '#' if queue == '' else '#SBATCH --qos={0}'.format(queue) \ No newline at end of file diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a90c07aeb8d7a65b030d74d5689c3a1d57a873b1..069307499fe720bc82e7ec3bf7e79e378e4969e4 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -23,6 +23,7 @@ Command list -install Install database for Autosubmit on the configured folder -archive Clean, compress and remove from the experiments' folder a finalized experiment -unarchive Restores an archived experiment +-migrate_exp Migrates an experiment from one user to another How to create an experiment @@ -644,7 +645,7 @@ For jobs running in HPC platforms, usually you have to provide information about * QUEUE: queue to add the job to. If not specificied, uses PLATFORM default. -There are also another, less used features that you can use: +There are also other, less used features that you can use: * FREQUENCY: specifies that a job has only to be run after X dates, members or chunk. A job will always be created for the last one. If not specified, defaults to 1 @@ -658,6 +659,8 @@ There are also another, less used features that you can use: * RERUN_DEPENDENCIES: defines the jobs to be rerun if this job is going to be rerunned. Syntax is identical to the used in DEPENDENCIES +* CUSTOM_DIRECTIVES: Custom directives for the HPC resource manager headers of the platform used for that job. + Example: .. code-block:: ini @@ -719,7 +722,7 @@ configuration, you can specify what platform or queue to use to run serial jobs * SERIAL_QUEUE: if specified, Autosubmit will run jobs with only one processor in the specified queue. Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided -There are some other parameters that you must need to specify: +There are some other parameters that you may need to specify: * BUDGET: budget account for the machine scheduler. If omitted, takes the value defined in PROJECT @@ -733,6 +736,8 @@ There are some other parameters that you must need to specify: * TOTAL_JOBS: maximum number of jobs to be running at the same time in this platform. +* CUSTOM_DIRECTIVES: Custom directives for the resource manager of this platform. + Example: .. code-block:: ini @@ -745,6 +750,7 @@ Example: USER = my_user SCRATCH_DIR = /scratch TEST_SUITE = True + CUSTOM_DIRECTIVES = [ "my_directive" ] How to change the communications library ===================================== @@ -828,6 +834,34 @@ Example: autosubmit unarchive cxxx +How to migrate an experiment +============================ +To migrate an experiment from one user to another, you need to add two parameters in the platforms configuration file: + + * USER_TO = + * TEMP_DIR = + +Then, just run the command: +:: + + autosubmit migrate_exp --ofer expid + + +Local files will be archived and remote files put in the HPC temporary directory. + +.. warning:: The temporary directory must be readable by both users (old owner and new owner). + +Then the new owner will have to run the command: +:: + + autosubmit migrate_exp --pickup expid + + + +Local files will be unarchived and remote files copied from the temporal loaction. + +.. warning:: The old owner might need to remove temporal files and archive. + How to configure email notifications ==================================== diff --git a/docs/source/variables.rst b/docs/source/variables.rst index 62bbc69f35bdf9183c677b43ff35ea360f6424b0..70b9b5e514becb07af909f8eb24d58755c22e0ce 100644 --- a/docs/source/variables.rst +++ b/docs/source/variables.rst @@ -57,6 +57,7 @@ suite of variables is defined for the current platform where {PLATFORM_NAME} is - **{PLATFORM_NAME}_VERSION**: Platform scheduler version - **{PLATFORM_NAME}_SCRATCH_DIR**: Platform's scratch folder path - **{PLATFORM_NAME}_ROOTDIR**: Platform's experiment folder path +- **{PLATFORM_NAME}_CUSTOM_DIRECTIVES**: Platform's custom directives for the resource manager. .. hint:: The variables ``_USER``, ``_PROJ`` and ``_BUDG`` has no value on the LOCAL platform. diff --git a/test/regression/default_conf/platforms.conf b/test/regression/default_conf/platforms.conf index 8a3a3c058ede27ea0f3c1a679dbb9c6793df0482..ae146bd42ec8e83e51bf9382627c67b73be25ff2 100644 --- a/test/regression/default_conf/platforms.conf +++ b/test/regression/default_conf/platforms.conf @@ -31,15 +31,16 @@ TEST_SUITE = False QUEUE = serial [marenostrum3] -TYPE = LSF +TYPE = slurm VERSION = mn HOST = mn-bsc32 PROJECT = bsc32 +QUEUE = debug ADD_PROJECT_TO_HOST = false -USER = bsc32649 +USER = bsc32704 SCRATCH_DIR = /gpfs/scratch TEST_SUITE = True -PROCESSORS_PER_NODE = 16 +PROCESSORS_PER_NODE = 48 [mistral] TYPE = slurm diff --git a/test/unit/test_autosubmit_ config.py b/test/unit/test_autosubmit_config.py similarity index 99% rename from test/unit/test_autosubmit_ config.py rename to test/unit/test_autosubmit_config.py index eea91d336856c96fb4f68cd10ffc575cd89db987..560b8b4801629620adfd41fd566f40be4b440a5e 100644 --- a/test/unit/test_autosubmit_ config.py +++ b/test/unit/test_autosubmit_config.py @@ -108,23 +108,23 @@ class TestAutosubmitConfig(TestCase): def test_get_threads(self): # arrange - expected_value = 99999 + expected_value = '99999' default_value = 1 config, parser_mock = self._arrange_config(expected_value) # act returned_value = config.get_threads(self.section) # assert - self._assert_get_option(parser_mock, 'THREADS', expected_value, returned_value, default_value, int) + self._assert_get_option(parser_mock, 'THREADS', expected_value, returned_value, default_value, str) def test_get_tasks(self): # arrange - expected_value = 99999 + expected_value = '99999' default_value = 0 config, parser_mock = self._arrange_config(expected_value) # act returned_value = config.get_tasks(self.section) # assert - self._assert_get_option(parser_mock, 'TASKS', expected_value, returned_value, default_value, int) + self._assert_get_option(parser_mock, 'TASKS', expected_value, returned_value, default_value, str) def test_get_memory(self): # arrange diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 49b6c5c220313f9215e4e1fd573dfb36a8f2de4c..3234d79e2d373d2924274f493fc3ac3f3f9713c0 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -287,8 +287,8 @@ class TestDicJobs(TestCase): filename = 'fake-fike' queue = 'fake-queue' processors = '111' - threads = 222 - tasks = 333 + threads = '222' + tasks = '333' memory = memory_per_task = 444 wallclock = 555 notify_on = 'COMPLETED FAILED' diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 0bcf4404405c58dd9d31365b85623c8c5fb492df..705d32eeef88260d1a5e52e1dd32d44846e30e30 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -258,11 +258,13 @@ class TestJob(TestCase): as_conf.get_memory = Mock(return_value=80) as_conf.get_wallclock = Mock(return_value='00:30') as_conf.get_member_list = Mock(return_value=[]) + as_conf.get_custom_directives = Mock(return_value='["whatever"]') dummy_serial_platform = Mock() dummy_serial_platform.name = 'serial' dummy_platform = Mock() dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.custom_directives = '["whatever"]' self.job._platform = dummy_platform # Act parameters = self.job.update_parameters(as_conf, dict())