From 04eba8fb617ebb0650368b69519eae1b5e215960 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 12 Apr 2017 18:49:44 +0200 Subject: [PATCH 01/16] Ask TASKS and THREADS in lots. Replicate of 3dadfd3efa665dc897e5bb43a45c2af6723e48bb for solving issue #226 , fix also name of test --- autosubmit/config/config_common.py | 6 +++--- autosubmit/job/job.py | 4 ++-- autosubmit/job/job_dict.py | 4 ++-- ...st_autosubmit_ config.py => test_autosubmit_config.py} | 8 ++++---- test/unit/test_dic_jobs.py | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) rename test/unit/{test_autosubmit_ config.py => test_autosubmit_config.py} (99%) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200..2ead2c3f1 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -162,7 +162,7 @@ class AutosubmitConfig(object): :return: threads needed :rtype: str """ - return int(self._jobs_parser.get_option(section, 'THREADS', 1)) + return str(self._jobs_parser.get_option(section, 'THREADS', 1)) def get_tasks(self, section): """ @@ -170,9 +170,9 @@ class AutosubmitConfig(object): :param section: job type :type section: str :return: tasks (processes) per host - :rtype: int + :rtype: str """ - return int(self._jobs_parser.get_option(section, 'TASKS', 0)) + return str(self._jobs_parser.get_option(section, 'TASKS', 0)) def get_scratch_free_space(self, section): """ diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0d8a1842..dfe72425c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -59,8 +59,8 @@ class Job(object): self.platform_name = None self.section = None self.wallclock = None - self.tasks = None - self.threads = None + self.tasks = '1' + self.threads = '1' self.processors = '1' self.memory = '' self.memory_per_task = '' diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index aa01e9d93..177576819 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -292,8 +292,8 @@ class DicJobs: job.queue = self.get_option(section, "QUEUE", None) job.check = self.get_option(section, "CHECK", 'True').lower() job.processors = str(self.get_option(section, "PROCESSORS", 1)) - job.threads = self.get_option(section, "THREADS", '') - job.tasks = self.get_option(section, "TASKS", '') + job.threads = str(self.get_option(section, "THREADS", 1)) + job.tasks = str(self.get_option(section, "TASKS", 1)) job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') diff --git a/test/unit/test_autosubmit_ config.py b/test/unit/test_autosubmit_config.py similarity index 99% rename from test/unit/test_autosubmit_ config.py rename to test/unit/test_autosubmit_config.py index eea91d336..560b8b480 100644 --- a/test/unit/test_autosubmit_ config.py +++ b/test/unit/test_autosubmit_config.py @@ -108,23 +108,23 @@ class TestAutosubmitConfig(TestCase): def test_get_threads(self): # arrange - expected_value = 99999 + expected_value = '99999' default_value = 1 config, parser_mock = self._arrange_config(expected_value) # act returned_value = config.get_threads(self.section) # assert - self._assert_get_option(parser_mock, 'THREADS', expected_value, returned_value, default_value, int) + self._assert_get_option(parser_mock, 'THREADS', expected_value, returned_value, default_value, str) def test_get_tasks(self): # arrange - expected_value = 99999 + expected_value = '99999' default_value = 0 config, parser_mock = self._arrange_config(expected_value) # act returned_value = config.get_tasks(self.section) # assert - self._assert_get_option(parser_mock, 'TASKS', expected_value, returned_value, default_value, int) + self._assert_get_option(parser_mock, 'TASKS', expected_value, returned_value, default_value, str) def test_get_memory(self): # arrange diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 49b6c5c22..3234d79e2 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -287,8 +287,8 @@ class TestDicJobs(TestCase): filename = 'fake-fike' queue = 'fake-queue' processors = '111' - threads = 222 - tasks = 333 + threads = '222' + tasks = '333' memory = memory_per_task = 444 wallclock = 555 notify_on = 'COMPLETED FAILED' -- GitLab From 1d968ed58f51e6374ce045624d25aeb500a77b92 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Thu, 13 Apr 2017 15:01:05 +0200 Subject: [PATCH 02/16] Minor. Fix previous commit. --- autosubmit/platforms/headers/ec_cca_header.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/headers/ec_cca_header.py b/autosubmit/platforms/headers/ec_cca_header.py index 9037c9c59..c621bbfc8 100644 --- a/autosubmit/platforms/headers/ec_cca_header.py +++ b/autosubmit/platforms/headers/ec_cca_header.py @@ -38,14 +38,14 @@ class EcCcaHeader(object): # noinspection PyMethodMayBeStatic def get_tasks_per_node(self, job): - if not isinstance(job.tasks, int): + if not isinstance(job.tasks, str): return "" else: return '#PBS -l EC_tasks_per_node={0}'.format(job.tasks) # noinspection PyMethodMayBeStatic def get_threads_per_task(self, job): - if not isinstance(job.threads, int): + if not isinstance(job.threads, str): return "" else: return '#PBS -l EC_threads_per_task={0}'.format(job.threads) -- GitLab From 49fd1691a54cfe49b4733482d5407b259ad45427 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Tue, 18 Apr 2017 16:13:19 +0200 Subject: [PATCH 03/16] Start branch issue70 --- autosubmit/autosubmit.py | 32 ++++++++++++++++++++++++++++++++ test/unit/test_handle_exp.py | 23 +++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 test/unit/test_handle_exp.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5530ba3f5..2097ff0d2 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -214,6 +214,11 @@ class Autosubmit: subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') + # Handle + subparser = subparsers.add_parser('handle', description="Handle experiments to another user") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-mf', '--mapfile', help='map users file') + # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") subparser.add_argument('expid', help='experiment identifier') @@ -340,6 +345,8 @@ class Autosubmit: return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide) elif args.command == 'check': return Autosubmit.check(args.expid) + elif args.command == 'handle': + return Autosubmit.handle(args.expid, args.mapfile) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output) elif args.command == 'configure': @@ -1052,6 +1059,31 @@ class Autosubmit: return True + @staticmethod + def handle(experiment_id, map_file): + """ + Handles experiment files to another users. It takes mapping information of old + user and new user from a map file. + + :param experiment_id: experiment identifier: + :param map_file: map file: + """" + BasicConfig.read() + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) + if not os.path.exists(exp_path): + Log.critical("The directory {0} is needed and does not exist.", exp_path) + Log.warning("Does an experiment with the given id exist?") + return False + + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, 'handle_exp.log') + Log.set_file(log_file) + + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + return False + + + @staticmethod def check(experiment_id): """ diff --git a/test/unit/test_handle_exp.py b/test/unit/test_handle_exp.py new file mode 100644 index 000000000..df468d9a9 --- /dev/null +++ b/test/unit/test_handle_exp.py @@ -0,0 +1,23 @@ +from unittest import TestCase +from mock import Mock, patch + + +class TestHandleExp(TestCase): + def setUp(self): + self.user_from = "old-user" + self.user_to = "new-user" + + def testFoo(self): + self.failUnless(False) + +# @patch('autosubmit.autosubmit.handle') +# def test_handle_experiment(self, db_common_mock): +# current_user_id = "old-user" +# self._build_db_mock(current_experiment_id, db_common_mock) +# user_id = handle_experiment(self.user_from, self.user_to) +# self.assertEquals("new_user", user_id) +# +# @staticmethod +# def _build_db_mock(current_experiment_id, mock_db_common): +# mock_db_common.last_name_used = Mock(return_value=current_experiment_id) +# mock_db_common.check_experiment_exists = Mock(return_value=False) -- GitLab From 2ea04c929bcce759e91bf9209142c2c2afd63a30 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Thu, 18 May 2017 18:37:05 +0200 Subject: [PATCH 04/16] Progress on the tests and methods to migrate experiment owner --- autosubmit/autosubmit.py | 29 ++++++++------ autosubmit/config/config_common.py | 9 +++++ autosubmit/config/files/autosubmit.conf | 4 ++ autosubmit/config/files/platforms.conf | 4 +- autosubmit/experiment/experiment_common.py | 34 +++++++++++++++++ ...t_ config.py => test_autosubmit_config.py} | 32 +++++++++++++--- test/unit/test_handle_exp.py | 23 ----------- test/unit/test_migrate_exp.py | 38 +++++++++++++++++++ 8 files changed, 132 insertions(+), 41 deletions(-) rename test/unit/{test_autosubmit_ config.py => test_autosubmit_config.py} (94%) delete mode 100644 test/unit/test_handle_exp.py create mode 100644 test/unit/test_migrate_exp.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 2097ff0d2..a51f18236 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -72,6 +72,7 @@ from bscearth.utils.log import Log from database.db_common import create_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment +from experiment.experiment_common import migrate_experiment from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -214,10 +215,9 @@ class Autosubmit: subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') - # Handle - subparser = subparsers.add_parser('handle', description="Handle experiments to another user") + # Migrate + subparser = subparsers.add_parser('migrate', description="Migrate experiments from current user to another") subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-mf', '--mapfile', help='map users file') # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") @@ -345,8 +345,8 @@ class Autosubmit: return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide) elif args.command == 'check': return Autosubmit.check(args.expid) - elif args.command == 'handle': - return Autosubmit.handle(args.expid, args.mapfile) + elif args.command == 'migrate': + return Autosubmit.migrate(args.expid, args.mapfile) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output) elif args.command == 'configure': @@ -1060,14 +1060,14 @@ class Autosubmit: return True @staticmethod - def handle(experiment_id, map_file): + def migrate(experiment_id, map_file): """ - Handles experiment files to another users. It takes mapping information of old + Migrates experiment files from current to other user. It takes mapping information of old user and new user from a map file. :param experiment_id: experiment identifier: :param map_file: map file: - """" + """ BasicConfig.read() exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) if not os.path.exists(exp_path): @@ -1075,12 +1075,19 @@ class Autosubmit: Log.warning("Does an experiment with the given id exist?") return False - log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, 'handle_exp.log') + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, 'migrate_exp.log') Log.set_file(log_file) - as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): + #as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + #if not as_conf.check_conf_files(): + #return False + user_to = "cprodhom" + + if not migrate_experiment(exp_path, user_to): + Log.critical("The directory owner for {0} cannot be changed to {1}.", exp_path, user_to) return False + + return True diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200..a35958de5 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -902,6 +902,15 @@ class AutosubmitConfig(object): """ return self._conf_parser.get_option('storage', 'TYPE', 'pkl').lower() + def get_migrate_to(self): + """ + Returns the user to change experiment's owner to from autosubmit's config file. + + :return: migrate to user + :rtype: str + """ + return self._conf_parser.get_option('migrate', 'TO_USER', '').lower() + @staticmethod def is_valid_mail_address(mail_address): if re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', mail_address): diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 78dc9f423..6bd2c4169 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -36,3 +36,7 @@ API = paramiko TYPE = pkl # Defines if the remote logs will be copied to the local platform. Default = True. COPY_REMOTE_LOGS = True + +[migrate] +# Changes owner of experiment files. +TO_USER = diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 0fa5eee7f..b2f39fd59 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -16,6 +16,8 @@ # ADD_PROJECT_TO_HOST = False ## User for the machine scheduler. Required # USER = +## Optional. If given, Autosubmit will change owner of files in given platform. +# USER_TO = ## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch ## If true, Autosubmit test command can use this queue as a main queue. Defaults to False @@ -45,4 +47,4 @@ # MAX_WALLCLOCK = 72:00 ## Max processors number per job submitted to the HPC. If not specified, defaults to empty. ## Optional. Required for wrappers. -# MAX_PROCESSORS = 1 \ No newline at end of file +# MAX_PROCESSORS = 1 diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py index 43e78191f..cb9291740 100644 --- a/autosubmit/experiment/experiment_common.py +++ b/autosubmit/experiment/experiment_common.py @@ -20,6 +20,9 @@ """ Module containing functions to manage autosubmit's experiments. """ +import os +import pwd +import glob import string import autosubmit.database.db_common as db_common from bscearth.utils.log import Log @@ -169,3 +172,34 @@ def base36decode(number): :rtype: int """ return int(number, 36) + + +def migrate_experiment(exp_path, user_to): + """ + Migrates experiment files from current to new user. + Group id will be kept the same. + + :param exp_path: experiment path: + :param user_to: to user: + """ + to_uid = pwd.getpwnam(user_to).pw_uid + Log.info("The UID for {0} is {1}.", user_to, to_uid) + current_gid = os.stat(exp_path).st_gid + Log.info("GID will be kept to {0}.", current_gid) + recursive_file_permissions(exp_path, to_uid, current_gid) + return user_to, current_gid + +def recursive_file_permissions(path,uid=-1,gid=-1): + """ + Recursively updates file permissions on a given path. + UID and GID default to -1 + """ + os.chown(path,uid,gid) + for item in glob.glob(path+'/*'): + if os.path.isdir(item): + recursive_file_permissions(os.path.join(path,item),uid,gid) + else: + try: + os.chown(os.path.join(path,item),uid,gid) + except: + Log.critical('File permissions on {0} not updated due to error.', os.path.join(path,item)) diff --git a/test/unit/test_autosubmit_ config.py b/test/unit/test_autosubmit_config.py similarity index 94% rename from test/unit/test_autosubmit_ config.py rename to test/unit/test_autosubmit_config.py index eea91d336..7729d4a30 100644 --- a/test/unit/test_autosubmit_ config.py +++ b/test/unit/test_autosubmit_config.py @@ -136,6 +136,16 @@ class TestAutosubmitConfig(TestCase): # assert self._assert_get_option(parser_mock, 'MEMORY', expected_value, returned_value, default_value, str) + def test_get_user_to(self): + # arrange + expected_value = 'new_user' + default_value = '' + config, parser_mock = self._arrange_config(expected_value) + # act + returned_value = config.get_chown(self.section) + # assert + self._assert_get_option(parser_mock, 'USER_TO', expected_value, returned_value, default_value, str) + def test_that_reload_must_load_parsers(self): # arrange config = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory()) @@ -204,6 +214,16 @@ class TestAutosubmitConfig(TestCase): # assert open_mock.assert_any_call(getattr(config, '_conf_parser_file'), 'w') + def test_get_to_user(self): + # arrange + expected_value = 'new_user' + default_value = '' + config, parser_mock = self._arrange_config(expected_value) + # act + returned_value = config.get_to_user(self.section) + # assert + self._assert_get_option(parser_mock, 'TO_USER', expected_value, returned_value, default_value, str) + def test_load_project_parameters(self): # arrange parser_mock = Mock(spec=ConfigParser) @@ -444,11 +464,11 @@ class TestAutosubmitConfig(TestCase): class FakeBasicConfig: - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DB_DIR = '/scratch/Earth/dmanuben/dummy/db/dir' + DB_FILE = '/scratch/Earth/dmanuben/dummy/db/file' + DB_PATH = '/scratch/Earth/dmanuben/dummy/db/path' + LOCAL_ROOT_DIR = '/scratch/Earth/dmanuben/dummy/local/root/dir' + LOCAL_TMP_DIR = '/scratch/Earth/dmanuben/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/scratch/Earth/dmanuben/dummy/local/proj/dir' DEFAULT_PLATFORMS_CONF = '' DEFAULT_JOBS_CONF = '' diff --git a/test/unit/test_handle_exp.py b/test/unit/test_handle_exp.py deleted file mode 100644 index df468d9a9..000000000 --- a/test/unit/test_handle_exp.py +++ /dev/null @@ -1,23 +0,0 @@ -from unittest import TestCase -from mock import Mock, patch - - -class TestHandleExp(TestCase): - def setUp(self): - self.user_from = "old-user" - self.user_to = "new-user" - - def testFoo(self): - self.failUnless(False) - -# @patch('autosubmit.autosubmit.handle') -# def test_handle_experiment(self, db_common_mock): -# current_user_id = "old-user" -# self._build_db_mock(current_experiment_id, db_common_mock) -# user_id = handle_experiment(self.user_from, self.user_to) -# self.assertEquals("new_user", user_id) -# -# @staticmethod -# def _build_db_mock(current_experiment_id, mock_db_common): -# mock_db_common.last_name_used = Mock(return_value=current_experiment_id) -# mock_db_common.check_experiment_exists = Mock(return_value=False) diff --git a/test/unit/test_migrate_exp.py b/test/unit/test_migrate_exp.py new file mode 100644 index 000000000..eeeb1132a --- /dev/null +++ b/test/unit/test_migrate_exp.py @@ -0,0 +1,38 @@ +from unittest import TestCase +from mock import Mock, patch +from autosubmit.experiment.experiment_common import migrate_experiment +import os +import pwd + + +class TestMigrateExp(TestCase): + def setUp(self): + #self.user_from = "old-user" + self.user_from = "dmanuben" + #self.user_to = "new-user" + self.user_to = "dmanuben" + +# def testFoo(self): +# self.failUnless(False) + + @patch('autosubmit.experiment.experiment_common.os') + def test_migrate_experiment(self, mock_os): + current_user_id = "old-user" + user_id, group_id = migrate_experiment("any path", self.user_to) + + to_uid = pwd.getpwnam(self.user_to).pw_uid + mock_os.chown.assert_called_with("any path", to_uid, group_id) + #self.assertEquals("new_user", user_id) + +# @patch('autosubmit.experiment.experiment_common.db_common') +# def test_create_new_experiment(self, db_common_mock): +# current_experiment_id = "empty" +# self._build_db_mock(current_experiment_id, db_common_mock) +# experiment_id = new_experiment(self.description, self.version) +# self.assertEquals("a000", experiment_id) + +# +# @staticmethod +# def _build_db_mock(current_experiment_id, mock_db_common): +# mock_db_common.last_name_used = Mock(return_value=current_experiment_id) +# mock_db_common.check_experiment_exists = Mock(return_value=False) -- GitLab From ac6deeb74a1b1d7c9d72968f75e7b5e5385f5e6c Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Mon, 29 May 2017 15:36:27 +0200 Subject: [PATCH 05/16] Migrate offer and pickup working by using archive/unarchive methods. Archived file permissions are 775 now --- autosubmit/autosubmit.py | 63 +++++++++++----------- autosubmit/config/config_common.py | 2 +- autosubmit/experiment/experiment_common.py | 34 ------------ test/unit/test_autosubmit_config.py | 20 ------- 4 files changed, 31 insertions(+), 88 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a51f18236..0a545d9ec 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -50,6 +50,7 @@ import random import signal import datetime import portalocker +import pwd from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool @@ -72,7 +73,8 @@ from bscearth.utils.log import Log from database.db_common import create_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment -from experiment.experiment_common import migrate_experiment +from experiment.experiment_common import migrate_experiment_offer +from experiment.experiment_common import migrate_experiment_pickup from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -218,6 +220,9 @@ class Autosubmit: # Migrate subparser = subparsers.add_parser('migrate', description="Migrate experiments from current user to another") subparser.add_argument('expid', help='experiment identifier') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('-o', '--offer', action="store_true", default=False, help='Offer experiment') + group.add_argument('-p', '--pickup', action="store_true", default=False, help='Pick-up released experiment') # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") @@ -346,7 +351,7 @@ class Autosubmit: elif args.command == 'check': return Autosubmit.check(args.expid) elif args.command == 'migrate': - return Autosubmit.migrate(args.expid, args.mapfile) + return Autosubmit.migrate(args.expid, args.offer, args.pickup) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output) elif args.command == 'configure': @@ -1060,37 +1065,27 @@ class Autosubmit: return True @staticmethod - def migrate(experiment_id, map_file): + def migrate(experiment_id, offer, pickup): """ - Migrates experiment files from current to other user. It takes mapping information of old - user and new user from a map file. + Migrates experiment files from current to other user. + It takes mapping information for new user from config files. :param experiment_id: experiment identifier: - :param map_file: map file: """ - BasicConfig.read() - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) - if not os.path.exists(exp_path): - Log.critical("The directory {0} is needed and does not exist.", exp_path) - Log.warning("Does an experiment with the given id exist?") - return False + if offer: + Autosubmit.archive(experiment_id, False) + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) + Log.set_file(log_file) + Log.result("The experiment has been successfully offered.") - log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id, BasicConfig.LOCAL_TMP_DIR, 'migrate_exp.log') - Log.set_file(log_file) + elif pickup: + Autosubmit.unarchive(experiment_id) + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) + Log.set_file(log_file) + Log.result("The experiment has been successfully picked up.") - #as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) - #if not as_conf.check_conf_files(): - #return False - user_to = "cprodhom" - - if not migrate_experiment(exp_path, user_to): - Log.critical("The directory owner for {0} cannot be changed to {1}.", exp_path, user_to) - return False - return True - - @staticmethod def check(experiment_id): """ @@ -1483,7 +1478,7 @@ class Autosubmit: return True @staticmethod - def archive(expid): + def archive(expid, clean=True): """ Archives an experiment: call clean (if experiment is of version 3 or later), compress folder to tar.gz and moves to year's folder @@ -1498,14 +1493,15 @@ class Autosubmit: Log.warning("Does an experiment with the given id exist?") return 1 - Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'archive{0}.log'.format(expid))) + Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'archive_{0}.log'.format(expid))) exp_folder = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - # Cleaning to reduce file size. - version = get_autosubmit_version(expid) - if version is not None and version.startswith('3') and not Autosubmit.clean(expid, True, True, True, False): - Log.critical("Can not archive project. Clean not successful") - return False + if clean: + # Cleaning to reduce file size. + version = get_autosubmit_version(expid) + if version is not None and version.startswith('3') and not Autosubmit.clean(expid, True, True, True, False): + Log.critical("Can not archive project. Clean not successful") + return False # Getting year of last completed. If not, year of expid folder year = None @@ -1530,6 +1526,7 @@ class Autosubmit: with tarfile.open(os.path.join(year_path, '{0}.tar.gz'.format(expid)), "w:gz") as tar: tar.add(exp_folder, arcname='') tar.close() + os.chmod(os.path.join(year_path, '{0}.tar.gz'.format(expid)), 0o775) except Exception as e: Log.critical("Can not write tar file: {0}".format(e)) return False @@ -1555,7 +1552,7 @@ class Autosubmit: :type experiment_id: str """ BasicConfig.read() - Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'unarchive{0}.log'.format(experiment_id))) + Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'unarchive_{0}.log'.format(experiment_id))) exp_folder = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) if os.path.exists(exp_folder): diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index a35958de5..1e55223a0 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -902,7 +902,7 @@ class AutosubmitConfig(object): """ return self._conf_parser.get_option('storage', 'TYPE', 'pkl').lower() - def get_migrate_to(self): + def get_exp_migrate_to_user(self): """ Returns the user to change experiment's owner to from autosubmit's config file. diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py index cb9291740..43e78191f 100644 --- a/autosubmit/experiment/experiment_common.py +++ b/autosubmit/experiment/experiment_common.py @@ -20,9 +20,6 @@ """ Module containing functions to manage autosubmit's experiments. """ -import os -import pwd -import glob import string import autosubmit.database.db_common as db_common from bscearth.utils.log import Log @@ -172,34 +169,3 @@ def base36decode(number): :rtype: int """ return int(number, 36) - - -def migrate_experiment(exp_path, user_to): - """ - Migrates experiment files from current to new user. - Group id will be kept the same. - - :param exp_path: experiment path: - :param user_to: to user: - """ - to_uid = pwd.getpwnam(user_to).pw_uid - Log.info("The UID for {0} is {1}.", user_to, to_uid) - current_gid = os.stat(exp_path).st_gid - Log.info("GID will be kept to {0}.", current_gid) - recursive_file_permissions(exp_path, to_uid, current_gid) - return user_to, current_gid - -def recursive_file_permissions(path,uid=-1,gid=-1): - """ - Recursively updates file permissions on a given path. - UID and GID default to -1 - """ - os.chown(path,uid,gid) - for item in glob.glob(path+'/*'): - if os.path.isdir(item): - recursive_file_permissions(os.path.join(path,item),uid,gid) - else: - try: - os.chown(os.path.join(path,item),uid,gid) - except: - Log.critical('File permissions on {0} not updated due to error.', os.path.join(path,item)) diff --git a/test/unit/test_autosubmit_config.py b/test/unit/test_autosubmit_config.py index 7729d4a30..c2965d839 100644 --- a/test/unit/test_autosubmit_config.py +++ b/test/unit/test_autosubmit_config.py @@ -136,16 +136,6 @@ class TestAutosubmitConfig(TestCase): # assert self._assert_get_option(parser_mock, 'MEMORY', expected_value, returned_value, default_value, str) - def test_get_user_to(self): - # arrange - expected_value = 'new_user' - default_value = '' - config, parser_mock = self._arrange_config(expected_value) - # act - returned_value = config.get_chown(self.section) - # assert - self._assert_get_option(parser_mock, 'USER_TO', expected_value, returned_value, default_value, str) - def test_that_reload_must_load_parsers(self): # arrange config = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory()) @@ -214,16 +204,6 @@ class TestAutosubmitConfig(TestCase): # assert open_mock.assert_any_call(getattr(config, '_conf_parser_file'), 'w') - def test_get_to_user(self): - # arrange - expected_value = 'new_user' - default_value = '' - config, parser_mock = self._arrange_config(expected_value) - # act - returned_value = config.get_to_user(self.section) - # assert - self._assert_get_option(parser_mock, 'TO_USER', expected_value, returned_value, default_value, str) - def test_load_project_parameters(self): # arrange parser_mock = Mock(spec=ConfigParser) -- GitLab From 5de6701683cade26639bd2c96c5f522c595d6eb4 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 30 Jun 2017 11:51:51 +0200 Subject: [PATCH 06/16] Progress on migrate exp --- autosubmit/autosubmit.py | 51 +++++++++++++++++----- autosubmit/config/config_common.py | 9 ---- autosubmit/config/files/autosubmit.conf | 2 +- autosubmit/config/files/platforms.conf | 2 + autosubmit/platforms/paramiko_submitter.py | 2 + autosubmit/platforms/platform.py | 2 + 6 files changed, 48 insertions(+), 20 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0a545d9ec..1204aa5e4 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,8 +73,6 @@ from bscearth.utils.log import Log from database.db_common import create_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment -from experiment.experiment_common import migrate_experiment_offer -from experiment.experiment_common import migrate_experiment_pickup from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -1071,18 +1069,49 @@ class Autosubmit: It takes mapping information for new user from config files. :param experiment_id: experiment identifier: + :param pickup: + :param offer: """ + log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) + Log.set_file(log_file) + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not run with invalid configuration') + return False + + Log.info('Migrating experiment {0}'.format(experiment_id)) + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + + Log.info("Checking remote platforms") + platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) + if offer: - Autosubmit.archive(experiment_id, False) - log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) - Log.set_file(log_file) - Log.result("The experiment has been successfully offered.") + Log.info("Moving remote files/dirs") + for platform in platforms: + p = submitter.platforms[platform] + Log.info("Moving from {0} to {1}", os.path.join(p.root_dir), + os.path.join(p.temp_dir, experiment_id)) + p.move_file(os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)) + Log.result("Files/dirs on {0} have been successfully offered", platform) + + Log.info("Updating configuration with target user/project") + as_conf._conf_parser.get_option('migrate', 'TO_USER', '').lower() + as_conf.check_platforms_conf() + content = open(as_conf._conf_parser_file).read() + content = content.replace(re.search('SAFETYSLEEPTIME =.*', content).group(0), + "SAFETYSLEEPTIME = %d" % sleep_time) + open(self._conf_parser_file, 'w').write(content) + + Log.info("Moving local files/dirs") + Autosubmit.archive(experiment_id, False) + Log.result("The experiment has been successfully offered.") elif pickup: - Autosubmit.unarchive(experiment_id) - log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) - Log.set_file(log_file) - Log.result("The experiment has been successfully picked up.") + Autosubmit.unarchive(experiment_id) + Log.result("The experiment has been successfully picked up.") return True @@ -1483,6 +1512,8 @@ class Autosubmit: Archives an experiment: call clean (if experiment is of version 3 or later), compress folder to tar.gz and moves to year's folder + :param clean: + :return: :param expid: experiment identifier :type expid: str """ diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 1e55223a0..70356b200 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -902,15 +902,6 @@ class AutosubmitConfig(object): """ return self._conf_parser.get_option('storage', 'TYPE', 'pkl').lower() - def get_exp_migrate_to_user(self): - """ - Returns the user to change experiment's owner to from autosubmit's config file. - - :return: migrate to user - :rtype: str - """ - return self._conf_parser.get_option('migrate', 'TO_USER', '').lower() - @staticmethod def is_valid_mail_address(mail_address): if re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', mail_address): diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 6bd2c4169..90770b8d0 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -38,5 +38,5 @@ TYPE = pkl COPY_REMOTE_LOGS = True [migrate] -# Changes owner of experiment files. +# Changes experiment files owner. TO_USER = diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index b2f39fd59..70d628948 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -20,6 +20,8 @@ # USER_TO = ## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch +## Path to the tmp directory for the machine. +# TMP_DIR = /tmp ## If true, Autosubmit test command can use this queue as a main queue. Defaults to False # TEST_SUITE = False ## If given, Autosubmit will add jobs to the given queue. Required for some platforms. diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index cc6599118..9b8ed504e 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -69,6 +69,7 @@ class ParamikoSubmitter(Submitter): local_platform.max_waiting_jobs = asconf.get_max_waiting_jobs() local_platform.total_jobs = asconf.get_total_jobs() local_platform.scratch = os.path.join(BasicConfig.LOCAL_ROOT_DIR, asconf.expid, BasicConfig.LOCAL_TMP_DIR) + local_platform.temp_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, 'ASlogs') local_platform.root_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, local_platform.expid) local_platform.host = 'localhost' platforms['local'] = local_platform @@ -127,6 +128,7 @@ class ParamikoSubmitter(Submitter): remote_platform.exclusivity = parser.get_option(section, 'EXCLUSIVITY', '').lower() remote_platform.user = parser.get_option(section, 'USER', None) remote_platform.scratch = parser.get_option(section, 'SCRATCH_DIR', None) + remote_platform.temp_dir = parser.get_option(section, 'TEMP_DIR', None) remote_platform._default_queue = parser.get_option(section, 'QUEUE', None) remote_platform._serial_queue = parser.get_option(section, 'SERIAL_QUEUE', None) remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50334edaa..3d5cafd10 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -35,6 +35,7 @@ class Platform(object): self.exclusivity = '' self.type = '' self.scratch = '' + self.temp_dir = '' self.root_dir = '' self.service = None self.scheduler = None @@ -128,6 +129,7 @@ class Platform(object): parameters['{0}EXCLUSIVITY'.format(prefix)] = self.exclusivity parameters['{0}TYPE'.format(prefix)] = self.type parameters['{0}SCRATCH_DIR'.format(prefix)] = self.scratch + parameters['{0}TEMP_DIR'.format(prefix)] = self.temp_dir parameters['{0}ROOTDIR'.format(prefix)] = self.root_dir parameters['{0}LOGDIR'.format(prefix)] = self.get_files_path() -- GitLab From 8d912d188778dab9f7b0d96478f04f32d4e34aaa Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Tue, 11 Jul 2017 19:43:05 +0200 Subject: [PATCH 07/16] Minor. Fix previous commit. --- autosubmit/autosubmit.py | 76 +++++++++++++++++++------- autosubmit/config/config_common.py | 21 +++++++ autosubmit/config/files/platforms.conf | 4 +- 3 files changed, 78 insertions(+), 23 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1204aa5e4..25d56c5f5 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,6 +73,8 @@ from bscearth.utils.log import Log from database.db_common import create_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment +from experiment.experiment_common import migrate_experiment_offer +from experiment.experiment_common import migrate_experiment_pickup from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -1074,43 +1076,75 @@ class Autosubmit: """ log_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'migrate_{0}.log'.format(experiment_id)) Log.set_file(log_file) - as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): - Log.critical('Can not run with invalid configuration') - return False - Log.info('Migrating experiment {0}'.format(experiment_id)) - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - if submitter.platforms is None: - return False + if offer: + Log.info('Migrating experiment {0}'.format(experiment_id)) + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not proceed with invalid configuration') + return False - Log.info("Checking remote platforms") - platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False - if offer: + Log.info("Checking remote platforms") + platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) Log.info("Moving remote files/dirs") for platform in platforms: + Log.info("Updating platform configuration with target user") + if not as_conf.get_migrate_user_to(platform): + Log.critical("Missing target user in platforms configuration file") + return False + + as_conf.set_new_user(platform, as_conf.get_migrate_user_to(platform)) + Log.info("User in platform configuration file successfully updated to {0}", + as_conf.get_migrate_user_to(platform)) + p = submitter.platforms[platform] Log.info("Moving from {0} to {1}", os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)) - p.move_file(os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)) + if not p.move_file(os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)): + Log.critical("The files/dirs on {0} cannot be moved to {1}.", p.root_dir, + os.path.join(p.temp_dir, experiment_id)) + return False Log.result("Files/dirs on {0} have been successfully offered", platform) - Log.info("Updating configuration with target user/project") - as_conf._conf_parser.get_option('migrate', 'TO_USER', '').lower() - as_conf.check_platforms_conf() - content = open(as_conf._conf_parser_file).read() - content = content.replace(re.search('SAFETYSLEEPTIME =.*', content).group(0), - "SAFETYSLEEPTIME = %d" % sleep_time) - open(self._conf_parser_file, 'w').write(content) - Log.info("Moving local files/dirs") Autosubmit.archive(experiment_id, False) Log.result("The experiment has been successfully offered.") elif pickup: + Log.info('Migrating experiment {0}'.format(experiment_id)) + Log.info("Moving local files/dirs") Autosubmit.unarchive(experiment_id) + Log.info("Local files/dirs have been sucessfully picked up") + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not proceed with invalid configuration') + return False + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + + Log.info("Checking remote platforms") + platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) + Log.info("Copying remote files/dirs") + for platform in platforms: + p = submitter.platforms[platform] + Log.info("Copying from {0} to {1}", os.path.join(p.temp_dir, experiment_id), + os.path.join(p.root_dir)) + if not p.send_command("cp -r " + os.path.join(p.temp_dir, experiment_id) + " " + + os.path.join(p.root_dir)): + Log.critical("The files/dirs on {0} cannot be copied to {1}.", + os.path.join(p.temp_dir, experiment_id), p.root_dir) + return False + + Log.result("Files/dirs on {0} have been successfully picked up", platform) + Log.result("The experiment has been successfully picked up.") return True diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200..3cb15cc11 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -204,6 +204,27 @@ class AutosubmitConfig(object): """ return str(self._jobs_parser.get_option(section, 'MEMORY_PER_TASK', '')) + def get_migrate_user_to(self, section): + """ + Returns the user to change to from platform config file. + + :return: migrate user to + :rtype: str + """ + return self._platforms_parser.get_option(section, 'USER_TO', '').lower() + + def set_new_user(self, section, new_user): + """ + Sets new user for given platform + :param new_user: + :param section: platform name + :type: str + """ + content = open(self._platforms_parser_file).read() + if re.search(section, content): + content = content.replace(re.search('USER =.*', content).group(0), "USER = " + new_user) + open(self._platforms_parser_file, 'w').write(content) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 70d628948..608357c3f 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -20,8 +20,8 @@ # USER_TO = ## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch -## Path to the tmp directory for the machine. -# TMP_DIR = /tmp +## Path to the machine's temporary directory for migrate purposes. +# TEMP_DIR = /tmp ## If true, Autosubmit test command can use this queue as a main queue. Defaults to False # TEST_SUITE = False ## If given, Autosubmit will add jobs to the given queue. Required for some platforms. -- GitLab From 5d4902322dc1fd975592b2e6ef37acedf9cf92d7 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 12 Jul 2017 18:46:21 +0200 Subject: [PATCH 08/16] Add remote files migration support. Fixes #70. Minor change tmp directory chmod to 775. --- autosubmit/autosubmit.py | 31 +++++++++++++++------ autosubmit/config/config_common.py | 21 ++++++++++++++ test/regression/default_conf/platforms.conf | 7 +++-- 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 25d56c5f5..bf434b4ef 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,8 +73,6 @@ from bscearth.utils.log import Log from database.db_common import create_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment -from experiment.experiment_common import migrate_experiment_offer -from experiment.experiment_common import migrate_experiment_pickup from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -511,7 +509,8 @@ class Autosubmit: Log.debug("Creating temporal directory...") exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) - os.mkdir(os.path.join(exp_id_path, "tmp"), 0o775) + os.mkdir(os.path.join(exp_id_path, "tmp")) + os.chmod(os.path.join(exp_id_path, "tmp"), 0o775) Log.debug("Creating pkl directory...") os.mkdir(os.path.join(exp_id_path, "pkl")) @@ -1091,9 +1090,8 @@ class Autosubmit: Log.info("Checking remote platforms") platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) - Log.info("Moving remote files/dirs") for platform in platforms: - Log.info("Updating platform configuration with target user") + Log.info("Updating {0} platform configuration with target user", platform) if not as_conf.get_migrate_user_to(platform): Log.critical("Missing target user in platforms configuration file") return False @@ -1102,6 +1100,15 @@ class Autosubmit: Log.info("User in platform configuration file successfully updated to {0}", as_conf.get_migrate_user_to(platform)) + if as_conf.get_migrate_project_to(platform): + Log.info("Updating {0} platform configuration with target project", platform) + as_conf.set_new_project(platform, as_conf.get_migrate_project_to(platform)) + Log.info("Project in platform configuration file successfully updated to {0}", + as_conf.get_migrate_user_to(platform)) + else: + Log.warning("Project in platforms configuration file remains unchanged") + + Log.info("Moving remote files/dirs on {0}", platform) p = submitter.platforms[platform] Log.info("Moving from {0} to {1}", os.path.join(p.root_dir), os.path.join(p.temp_dir, experiment_id)) @@ -1109,31 +1116,37 @@ class Autosubmit: Log.critical("The files/dirs on {0} cannot be moved to {1}.", p.root_dir, os.path.join(p.temp_dir, experiment_id)) return False + Log.result("Files/dirs on {0} have been successfully offered", platform) Log.info("Moving local files/dirs") - Autosubmit.archive(experiment_id, False) + if not Autosubmit.archive(experiment_id, False): + Log.critical("The experiment cannot be offered") + return False + Log.result("The experiment has been successfully offered.") elif pickup: Log.info('Migrating experiment {0}'.format(experiment_id)) Log.info("Moving local files/dirs") - Autosubmit.unarchive(experiment_id) + if not Autosubmit.unarchive(experiment_id): + Log.critical("The experiment cannot be picked up") + return False Log.info("Local files/dirs have been sucessfully picked up") as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): Log.critical('Can not proceed with invalid configuration') return False + Log.info("Checking remote platforms") submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) if submitter.platforms is None: return False - Log.info("Checking remote platforms") platforms = filter(lambda x: x not in ['local', 'LOCAL'], submitter.platforms) - Log.info("Copying remote files/dirs") for platform in platforms: + Log.info("Copying remote files/dirs on {0}", platform) p = submitter.platforms[platform] Log.info("Copying from {0} to {1}", os.path.join(p.temp_dir, experiment_id), os.path.join(p.root_dir)) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3cb15cc11..e64e43a67 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -225,6 +225,27 @@ class AutosubmitConfig(object): content = content.replace(re.search('USER =.*', content).group(0), "USER = " + new_user) open(self._platforms_parser_file, 'w').write(content) + def get_migrate_project_to(self, section): + """ + Returns the project to change to from platform config file. + + :return: migrate project to + :rtype: str + """ + return self._platforms_parser.get_option(section, 'PROJECT_TO', '').lower() + + def set_new_project(self, section, new_project): + """ + Sets new project for given platform + :param new_project: + :param section: platform name + :type: str + """ + content = open(self._platforms_parser_file).read() + if re.search(section, content): + content = content.replace(re.search('PROJECT =.*', content).group(0), "PROJECT = " + new_project) + open(self._platforms_parser_file, 'w').write(content) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/test/regression/default_conf/platforms.conf b/test/regression/default_conf/platforms.conf index 8a3a3c058..ae146bd42 100644 --- a/test/regression/default_conf/platforms.conf +++ b/test/regression/default_conf/platforms.conf @@ -31,15 +31,16 @@ TEST_SUITE = False QUEUE = serial [marenostrum3] -TYPE = LSF +TYPE = slurm VERSION = mn HOST = mn-bsc32 PROJECT = bsc32 +QUEUE = debug ADD_PROJECT_TO_HOST = false -USER = bsc32649 +USER = bsc32704 SCRATCH_DIR = /gpfs/scratch TEST_SUITE = True -PROCESSORS_PER_NODE = 16 +PROCESSORS_PER_NODE = 48 [mistral] TYPE = slurm -- GitLab From 1102c8b44e8535f9c6086588ea2b35741f0e3c31 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 19 Jul 2017 18:49:30 +0200 Subject: [PATCH 09/16] Bug fixes: error and output path variable expansion, exit if one job in vertical wrap fails. See #267 --- autosubmit/job/job_packages.py | 6 ++++-- autosubmit/platforms/wrappers/ec_wrapper.py | 14 ++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 11d206d15..3bcd70ec5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -381,7 +381,8 @@ class JobPackageVertical(JobPackageThread): def _common_script_content(self): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, - self._jobs_scripts, self._job_dependency, expid=self._expid) + self._jobs_scripts, self._job_dependency, expid=self._expid, + rootdir=self.platform.root_dir) class JobPackageHorizontal(JobPackageThread): @@ -402,4 +403,5 @@ class JobPackageHorizontal(JobPackageThread): def _common_script_content(self): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, - self._job_dependency, expid=self._expid) + self._job_dependency, expid=self._expid, + rootdir=self.platform.root_dir) diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 71ffdf57c..418c8af3a 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -21,6 +21,7 @@ import textwrap # TODO: Refactor with kwargs +# TODO: Project is not EC_billing_account, use budget class EcWrapper(object): """Class to handle wrappers on ECMWF platform""" @@ -35,8 +36,8 @@ class EcWrapper(object): #PBS -N {0} #PBS -q {1} #PBS -l EC_billing_account={2} - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.out - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.err + #PBS -o {8}/LOG_{6}/{0}.out + #PBS -e {8}/LOG_{6}/{0}.err #PBS -l walltime={3}:00 #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 @@ -66,12 +67,13 @@ class EcWrapper(object): echo "The job $script has been COMPLETED" else echo "The job $script has FAILED" + exit 1 fi i=$((i+1)) done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), kwargs['rootdir'])) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -84,8 +86,8 @@ class EcWrapper(object): #PBS -N {0} #PBS -q {1} #PBS -l EC_billing_account={2} - #PBS -o $SCRATCH/{6}/LOG_{6}/{0}.out - #PBS -e $SCRATCH/{6}/LOG_{6}/{0}.err + #PBS -o {8}/LOG_{6}/{0}.out + #PBS -e {8}/LOG_{6}/{0}.err #PBS -l walltime={3}:00 #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 @@ -125,7 +127,7 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), kwargs['rootdir'])) @classmethod def dependency_directive(cls, dependency): -- GitLab From 0e29473313d63982b2db0f44505240ab97c17005 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Tue, 1 Aug 2017 18:45:47 +0200 Subject: [PATCH 10/16] First version of custom headers working for paramiko. See #270 --- autosubmit/config/config_common.py | 10 ++++++++++ autosubmit/config/files/jobs.conf | 6 ++++-- autosubmit/config/files/platforms.conf | 5 ++++- autosubmit/job/job.py | 10 ++++++++++ autosubmit/job/job_packages.py | 4 ++-- autosubmit/platforms/headers/ec_cca_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/ec_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/lsf_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs10_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs11_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/pbs12_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/ps_header.py | 13 +++++++++++++ autosubmit/platforms/headers/sge_header.py | 17 +++++++++++++++++ autosubmit/platforms/headers/slurm_header.py | 17 +++++++++++++++++ autosubmit/platforms/paramiko_platform.py | 2 ++ autosubmit/platforms/paramiko_submitter.py | 5 +++++ autosubmit/platforms/platform.py | 1 + 17 files changed, 187 insertions(+), 5 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 70356b200..10877bc43 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -204,6 +204,16 @@ class AutosubmitConfig(object): """ return str(self._jobs_parser.get_option(section, 'MEMORY_PER_TASK', '')) + def get_custom_directives(self, section): + """ + Gets custom directives needed for the given job type + :param section: job type + :type section: str + :return: custom directives needed + :rtype: str + """ + return str(self._jobs_parser.get_option(section, 'CUSTOM_DIRECTIVES', '')) + def check_conf_files(self): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 8caccb5cb..79948db00 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -49,7 +49,9 @@ # TYPE = bash ## Synchronize a chunk job with its dependency chunks at a 'date' or 'member' level # SYNCHRONIZE = date | member - +## Optional. Custom directives for the resource manager of the platform used for that job. +## Put as many as you wish in json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh @@ -91,4 +93,4 @@ WALLCLOCK = 00:05 FILE = TRANSFER.sh PLATFORM = LOCAL DEPENDENCIES = CLEAN -RUNNING = member \ No newline at end of file +RUNNING = member diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 0fa5eee7f..1706b5728 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -45,4 +45,7 @@ # MAX_WALLCLOCK = 72:00 ## Max processors number per job submitted to the HPC. If not specified, defaults to empty. ## Optional. Required for wrappers. -# MAX_PROCESSORS = 1 \ No newline at end of file +# MAX_PROCESSORS = 1 +## Optional. Custom directives for the resource manager of the platform used. +## Put as many as you wish in a json formatted array. +# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d0d8a1842..4125e8a58 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -23,6 +23,7 @@ Main module for autosubmit. Only contains an interface class to all functionalit import os import re import time +import json from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython @@ -73,6 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None + self.custom_directives = None self.id = job_id self.file = None @@ -610,6 +612,13 @@ class Job(object): self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space + self.custom_directives = as_conf.get_custom_directives(self.section) + if self.custom_directives != '': + self.custom_directives = json.loads(as_conf.get_custom_directives(self.section)) + if job_platform.custom_directives: + self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) + elif job_platform.custom_directives: + self.custom_directives = json.loads(job_platform.custom_directives) parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory @@ -619,6 +628,7 @@ class Job(object): parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section parameters['SCRATCH_FREE_SPACE'] = self.scratch_free_space + parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['CURRENT_ARCH'] = job_platform.name parameters['CURRENT_HOST'] = job_platform.host diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 3bcd70ec5..e1c5dcffe 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -382,7 +382,7 @@ class JobPackageVertical(JobPackageThread): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, customdir=self.platform.customdir) class JobPackageHorizontal(JobPackageThread): @@ -404,4 +404,4 @@ class JobPackageHorizontal(JobPackageThread): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir) + rootdir=self.platform.root_dir, customdir=self.platform.customdir) diff --git a/autosubmit/platforms/headers/ec_cca_header.py b/autosubmit/platforms/headers/ec_cca_header.py index 9037c9c59..ea70b2ac0 100644 --- a/autosubmit/platforms/headers/ec_cca_header.py +++ b/autosubmit/platforms/headers/ec_cca_header.py @@ -80,6 +80,21 @@ class EcCcaHeader(object): return "#PBS -l EC_hyperthreads=2" return "#PBS -l EC_hyperthreads=1" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -91,6 +106,7 @@ class EcCcaHeader(object): #PBS -q ns #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### @@ -112,6 +128,7 @@ class EcCcaHeader(object): %HYPERTHREADING_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ec_header.py b/autosubmit/platforms/headers/ec_header.py index ff1eaadc0..3482899fe 100644 --- a/autosubmit/platforms/headers/ec_header.py +++ b/autosubmit/platforms/headers/ec_header.py @@ -36,6 +36,21 @@ class EcHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + # noinspection PyPep8 SERIAL = textwrap.dedent("""\ ############################################################################### @@ -52,6 +67,7 @@ class EcHeader(object): #@ resources = ConsumableCpus(1) ConsumableMemory(1200mb) #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -74,6 +90,7 @@ class EcHeader(object): #@ total_tasks = %NUMPROC% #@ wall_clock_limit = %WALLCLOCK%:00 #@ platforms + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 331ffe154..d14b14bbe 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -59,6 +59,21 @@ class LsfHeader(object): else: return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + @classmethod def array_header(cls, filename, array_id, wallclock, num_processors): return textwrap.dedent("""\ @@ -141,6 +156,7 @@ class LsfHeader(object): #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %EXCLUSIVITY_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -158,6 +174,7 @@ class LsfHeader(object): #BSUB -n %NUMPROC% %TASKS_PER_NODE_DIRECTIVE% %SCRATCH_FREE_SPACE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs10_header.py b/autosubmit/platforms/headers/pbs10_header.py index 3197603e3..33aa06d9f 100644 --- a/autosubmit/platforms/headers/pbs10_header.py +++ b/autosubmit/platforms/headers/pbs10_header.py @@ -36,6 +36,21 @@ class Pbs10Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs10Header(object): #PBS -q serial #PBS -l cput=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -59,6 +75,7 @@ class Pbs10Header(object): #PBS -l mppnppn=32 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs11_header.py b/autosubmit/platforms/headers/pbs11_header.py index 9f9919799..423fe148c 100644 --- a/autosubmit/platforms/headers/pbs11_header.py +++ b/autosubmit/platforms/headers/pbs11_header.py @@ -36,6 +36,21 @@ class Pbs11Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -48,6 +63,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -64,6 +80,7 @@ class Pbs11Header(object): #PBS -l walltime=%WALLCLOCK% #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% #PBS -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/pbs12_header.py b/autosubmit/platforms/headers/pbs12_header.py index 014ebb63a..4407c8a32 100644 --- a/autosubmit/platforms/headers/pbs12_header.py +++ b/autosubmit/platforms/headers/pbs12_header.py @@ -36,6 +36,21 @@ class Pbs12Header(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -45,6 +60,7 @@ class Pbs12Header(object): #PBS -l select=serial=true:ncpus=1 #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -58,6 +74,7 @@ class Pbs12Header(object): #PBS -l select=%NUMPROC% #PBS -l walltime=%WALLCLOCK%:00 #PBS -A %CURRENT_BUDG% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/ps_header.py b/autosubmit/platforms/headers/ps_header.py index 436bb0893..3286a1407 100644 --- a/autosubmit/platforms/headers/ps_header.py +++ b/autosubmit/platforms/headers/ps_header.py @@ -36,6 +36,19 @@ class PsHeader(object): # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT diff --git a/autosubmit/platforms/headers/sge_header.py b/autosubmit/platforms/headers/sge_header.py index 540c5f642..bcfb5d892 100644 --- a/autosubmit/platforms/headers/sge_header.py +++ b/autosubmit/platforms/headers/sge_header.py @@ -39,6 +39,21 @@ class SgeHeader(object): else: return "$ -q {0}".format(job.parameters['CURRENT_QUEUE']) + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -52,6 +67,7 @@ class SgeHeader(object): #$ -l h_rt=%WALLCLOCK%:00 #$ -l s_rt=%WALLCLOCK%:00 #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -70,6 +86,7 @@ class SgeHeader(object): #$ -l s_rt=%WALLCLOCK%:00 #$ -pe orte %NUMPROC% #%QUEUE_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 677a26d56..0d0693b06 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -84,6 +84,21 @@ class SlurmHeader(object): return "SBATCH --mem-per-cpu {0}".format(job.parameters['MEMORY_PER_TASK']) return "" + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def get_custom_directives(self, job): + """ + Returns custom directives for the specified job + + :param job: job to create custom directive for + :type job: Job + :return: custom directives + :rtype: str + """ + # There is no custom directives, so directive is empty + if job.parameters['CUSTOM_DIRECTIVES'] != '': + return '\n'.join(str(s) for s in job.parameters['CUSTOM_DIRECTIVES']) + return "" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -98,6 +113,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) @@ -116,6 +132,7 @@ class SlurmHeader(object): #SBATCH -J %JOBNAME% #SBATCH -o %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #SBATCH -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% + %CUSTOM_DIRECTIVES% # ############################################################################### """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0bb98c347..71add2a03 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -414,6 +414,8 @@ class ParamikoPlatform(Platform): header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_custom_directives'): + header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index cc6599118..9150fe505 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -22,6 +22,8 @@ import time import os +from bscearth.utils.log import Log + from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig from submitter import Submitter @@ -131,6 +133,9 @@ class ParamikoSubmitter(Submitter): remote_platform._serial_queue = parser.get_option(section, 'SERIAL_QUEUE', None) remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) + remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', + None) + Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50334edaa..9b1aca5ca 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -27,6 +27,7 @@ class Platform(object): self._default_queue = None self.processors_per_node = None self.scratch_free_space = None + self.custom_directives = None self.host = '' self.user = '' self.project = '' -- GitLab From 34bd552118580418fc8509a91010d0699faeb435 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 2 Aug 2017 16:46:32 +0200 Subject: [PATCH 11/16] Fix Unit test Job. Json object needs to be created for custom directives parameter. --- test/unit/test_job.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 0bcf44044..705d32eee 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -258,11 +258,13 @@ class TestJob(TestCase): as_conf.get_memory = Mock(return_value=80) as_conf.get_wallclock = Mock(return_value='00:30') as_conf.get_member_list = Mock(return_value=[]) + as_conf.get_custom_directives = Mock(return_value='["whatever"]') dummy_serial_platform = Mock() dummy_serial_platform.name = 'serial' dummy_platform = Mock() dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.custom_directives = '["whatever"]' self.job._platform = dummy_platform # Act parameters = self.job.update_parameters(as_conf, dict()) -- GitLab From 98a600ae0f4206c5db6477828356e89c8f65c104 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Wed, 2 Aug 2017 19:19:53 +0200 Subject: [PATCH 12/16] Documentation custom directives. Deactivation of W option into ec_wrapper (turned into v temporarly), since it is not working. See #267 --- autosubmit/config/files/jobs.conf | 2 +- autosubmit/config/files/platforms.conf | 2 +- autosubmit/platforms/wrappers/ec_wrapper.py | 2 +- docs/source/usage.rst | 9 +++++++-- docs/source/variables.rst | 1 + 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index 79948db00..eaf192ce6 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -51,7 +51,7 @@ # SYNCHRONIZE = date | member ## Optional. Custom directives for the resource manager of the platform used for that job. ## Put as many as you wish in json formatted array. -# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] [LOCAL_SETUP] FILE = LOCAL_SETUP.sh diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 1706b5728..e5e6ff4c3 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -48,4 +48,4 @@ # MAX_PROCESSORS = 1 ## Optional. Custom directives for the resource manager of the platform used. ## Put as many as you wish in a json formatted array. -# CUSTOM_DIRECTIVE = ["#PBS -W depend=afterok:0", "#PBS -W depend=afterok:1"] +# CUSTOM_DIRECTIVE = ["#PBS -v myvar=value, "#PBS -v othervar=value"] diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 418c8af3a..7e848dd1e 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -131,4 +131,4 @@ class EcWrapper(object): @classmethod def dependency_directive(cls, dependency): - return '#' if dependency is None else '#PBS -W depend=afterok:{0}'.format(dependency) + return '#' if dependency is None else '#PBS -v depend=afterok:{0}'.format(dependency) diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a90c07aeb..89c6ee805 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -644,7 +644,7 @@ For jobs running in HPC platforms, usually you have to provide information about * QUEUE: queue to add the job to. If not specificied, uses PLATFORM default. -There are also another, less used features that you can use: +There are also other, less used features that you can use: * FREQUENCY: specifies that a job has only to be run after X dates, members or chunk. A job will always be created for the last one. If not specified, defaults to 1 @@ -658,6 +658,8 @@ There are also another, less used features that you can use: * RERUN_DEPENDENCIES: defines the jobs to be rerun if this job is going to be rerunned. Syntax is identical to the used in DEPENDENCIES +* CUSTOM_DIRECTIVES: Custom directives for the HPC resource manager headers of the platform used for that job. + Example: .. code-block:: ini @@ -719,7 +721,7 @@ configuration, you can specify what platform or queue to use to run serial jobs * SERIAL_QUEUE: if specified, Autosubmit will run jobs with only one processor in the specified queue. Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided -There are some other parameters that you must need to specify: +There are some other parameters that you may need to specify: * BUDGET: budget account for the machine scheduler. If omitted, takes the value defined in PROJECT @@ -733,6 +735,8 @@ There are some other parameters that you must need to specify: * TOTAL_JOBS: maximum number of jobs to be running at the same time in this platform. +* CUSTOM_DIRECTIVES: Custom directives for the resource manager of this platform. + Example: .. code-block:: ini @@ -745,6 +749,7 @@ Example: USER = my_user SCRATCH_DIR = /scratch TEST_SUITE = True + CUSTOM_DIRECTIVES = [ "my_directive" ] How to change the communications library ===================================== diff --git a/docs/source/variables.rst b/docs/source/variables.rst index 62bbc69f3..70b9b5e51 100644 --- a/docs/source/variables.rst +++ b/docs/source/variables.rst @@ -57,6 +57,7 @@ suite of variables is defined for the current platform where {PLATFORM_NAME} is - **{PLATFORM_NAME}_VERSION**: Platform scheduler version - **{PLATFORM_NAME}_SCRATCH_DIR**: Platform's scratch folder path - **{PLATFORM_NAME}_ROOTDIR**: Platform's experiment folder path +- **{PLATFORM_NAME}_CUSTOM_DIRECTIVES**: Platform's custom directives for the resource manager. .. hint:: The variables ``_USER``, ``_PROJ`` and ``_BUDG`` has no value on the LOCAL platform. -- GitLab From e8577e40aa451b9a7ee3d9c0e55d57a9cbd07568 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 4 Aug 2017 17:28:05 +0200 Subject: [PATCH 13/16] Custom directives implemented on packages. Tested on ECMWF and MN4. Fixes #270 --- autosubmit/job/job.py | 4 +++- autosubmit/job/job_packages.py | 12 ++++++++--- autosubmit/platforms/headers/lsf_header.py | 11 ++++++---- autosubmit/platforms/paramiko_submitter.py | 2 +- autosubmit/platforms/wrappers/ec_wrapper.py | 8 ++++++-- autosubmit/platforms/wrappers/lsf_wrapper.py | 14 +++++++++---- .../platforms/wrappers/slurm_wrapper.py | 20 +++++++++++++------ 7 files changed, 50 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4125e8a58..df36b4c14 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -74,7 +74,7 @@ class Job(object): self.date_format = '' self.type = Type.BASH self.scratch_free_space = None - self.custom_directives = None + self.custom_directives = [] self.id = job_id self.file = None @@ -619,6 +619,8 @@ class Job(object): self.custom_directives = self.custom_directives + json.loads(job_platform.custom_directives) elif job_platform.custom_directives: self.custom_directives = json.loads(job_platform.custom_directives) + elif self.custom_directives == '': + self.custom_directives = [] parameters['NUMPROC'] = self.processors parameters['MEMORY'] = self.memory diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index e1c5dcffe..bb19285f5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -45,6 +45,7 @@ class JobPackageBase(object): try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0].platform + self._custom_directives = set() for job in jobs: if job.platform != self._platform or job.platform is None: raise Exception('Only one valid platform per package') @@ -80,6 +81,8 @@ class JobPackageBase(object): if not job.check_script(configuration, parameters): raise WrongTemplateException(job.name) job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) self._create_scripts(configuration) self._send_files() self._do_submission() @@ -186,7 +189,8 @@ class JobPackageArray(JobPackageBase): def _create_common_script(self, filename): script_content = self.platform.header.array_header(filename, self._array_size_id, self._wallclock, - self._num_processors) + self._num_processors, + directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) os.chmod(os.path.join(self._tmp_path, filename), 0o775) @@ -382,7 +386,8 @@ class JobPackageVertical(JobPackageThread): return self.platform.wrapper.vertical(self._name, self._queue, self._project, self._wallclock, self._num_processors, self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir, customdir=self.platform.customdir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) class JobPackageHorizontal(JobPackageThread): @@ -404,4 +409,5 @@ class JobPackageHorizontal(JobPackageThread): return self.platform.wrapper.horizontal(self._name, self._queue, self._project, self._wallclock, self._num_processors, len(self.jobs), self._jobs_scripts, self._job_dependency, expid=self._expid, - rootdir=self.platform.root_dir, customdir=self.platform.customdir) + rootdir=self.platform.root_dir, + directives=self._custom_directives) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index d14b14bbe..6634b2a0d 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -75,7 +75,7 @@ class LsfHeader(object): return "" @classmethod - def array_header(cls, filename, array_id, wallclock, num_processors): + def array_header(cls, filename, array_id, wallclock, num_processors, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -87,16 +87,17 @@ class LsfHeader(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_processors)) + """.format(filename, array_id, wallclock, num_processors, '\n'.join(str(s) for s in kwargs['directives']))) @classmethod - def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive): + def thread_header(cls, filename, wallclock, num_processors, job_scripts, dependency_directive, **kwargs): return textwrap.dedent("""\ #!/usr/bin/env python ############################################################################### @@ -109,6 +110,7 @@ class LsfHeader(object): #BSUB -W {1} #BSUB -n {2} {4} + {5} # ############################################################################### @@ -142,7 +144,8 @@ class LsfHeader(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive)) + """.format(filename, wallclock, num_processors, str(job_scripts), dependency_directive, + '\n'.join(str(s) for s in kwargs['directives']))) SERIAL = textwrap.dedent("""\ ############################################################################### diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 9150fe505..d84915c5a 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -134,7 +134,7 @@ class ParamikoSubmitter(Submitter): remote_platform.processors_per_node = parser.get_option(section, 'PROCESSORS_PER_NODE', None) remote_platform.custom_directives = parser.get_option(section, 'CUSTOM_DIRECTIVES', - None) + None) Log.debug("Custom directives from platform.conf: {0}".format(remote_platform.custom_directives)) remote_platform.scratch_free_space = parser.get_option(section, 'SCRATCH_FREE_SPACE', None) diff --git a/autosubmit/platforms/wrappers/ec_wrapper.py b/autosubmit/platforms/wrappers/ec_wrapper.py index 7e848dd1e..67caebf80 100644 --- a/autosubmit/platforms/wrappers/ec_wrapper.py +++ b/autosubmit/platforms/wrappers/ec_wrapper.py @@ -42,6 +42,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -73,7 +74,8 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -92,6 +94,7 @@ class EcWrapper(object): #PBS -l EC_total_tasks={4} #PBS -l EC_hyperthreads=1 {7} + {9} # ############################################################################### @@ -127,7 +130,8 @@ class EcWrapper(object): done """.format(filename, queue, project, wallclock, num_procs, ' '.join(str(s) for s in job_scripts), kwargs['expid'], - cls.dependency_directive(dependency), kwargs['rootdir'])) + cls.dependency_directive(dependency), kwargs['rootdir'], + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/lsf_wrapper.py b/autosubmit/platforms/wrappers/lsf_wrapper.py index a2d220595..ffc1dc7ad 100644 --- a/autosubmit/platforms/wrappers/lsf_wrapper.py +++ b/autosubmit/platforms/wrappers/lsf_wrapper.py @@ -25,7 +25,7 @@ class LsfWrapper(object): """Class to handle wrappers on LSF platforms""" @classmethod - def array(cls, filename, array_id, wallclock, num_procs): + def array(cls, filename, array_id, wallclock, num_procs, **kwargs): return textwrap.dedent("""\ ############################################################################### # {0} @@ -37,13 +37,15 @@ class LsfWrapper(object): #BSUB -eo {0}.%I.err #BSUB -W {2} #BSUB -n {3} + {4} # ############################################################################### SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') chmod +x $SCRIPT ./$SCRIPT - """.format(filename, array_id, wallclock, num_procs)) + """.format(filename, array_id, wallclock, num_procs, + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def vertical(cls, filename, queue, project, wallclock, num_procs, job_scripts, dependency, **kwargs): @@ -61,6 +63,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {6} + {7} # ############################################################################### @@ -95,7 +98,8 @@ class LsfWrapper(object): print "The job ", current.template," has FAILED" os._exit(1) """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, num_jobs, job_scripts, dependency, **kwargs): @@ -113,6 +117,7 @@ class LsfWrapper(object): #BSUB -W {3} #BSUB -n {4} {7} + {10} # ############################################################################### @@ -158,7 +163,8 @@ class LsfWrapper(object): else: print "The job ", pid.template," has FAILED" """.format(filename, queue, project, wallclock, num_procs, (int(num_procs) / num_jobs), - str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", "${LSB_JOBID}")) + str(job_scripts), cls.dependency_directive(dependency), "${LSB_DJOB_HOSTFILE}", + "${LSB_JOBID}", '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index 6f11cfec4..384aafb5a 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -33,13 +33,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -73,8 +74,9 @@ class SlurmWrapper(object): else: print "The job ", current.template," has FAILED" os._exit(1) - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def horizontal(cls, filename, queue, project, wallclock, num_procs, _, job_scripts, dependency, **kwargs): @@ -85,13 +87,14 @@ class SlurmWrapper(object): ############################################################################### # #SBATCH -J {0} - #SBATCH -p {1} + {1} #SBATCH -A {2} #SBATCH -o {0}.out #SBATCH -e {0}.err #SBATCH -t {3}:00 #SBATCH -n {4} {6} + {7} # ############################################################################### @@ -133,9 +136,14 @@ class SlurmWrapper(object): print "The job ", pid.template," has been COMPLETED" else: print "The job ", pid.template," has FAILED" - """.format(filename, queue, project, wallclock, num_procs, str(job_scripts), - cls.dependency_directive(dependency))) + """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), + cls.dependency_directive(dependency), + '\n'.ljust(13).join(str(s) for s in kwargs['directives']))) @classmethod def dependency_directive(cls, dependency): return '#' if dependency is None else '#SBATCH --dependency=afterok:{0}'.format(dependency) + + @classmethod + def queue_directive(cls, queue): + return '#' if queue == '' else '#SBATCH -p {0}'.format(queue) \ No newline at end of file -- GitLab From d788ab5d7a251dd3dd1e6db5f7760b0d2c9627ad Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 4 Aug 2017 17:56:45 +0200 Subject: [PATCH 14/16] Fix sbatch, use of qos instead of partition. Fixes #271 --- autosubmit/platforms/headers/slurm_header.py | 2 +- autosubmit/platforms/wrappers/slurm_wrapper.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 0d0693b06..d47d1dcbc 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -37,7 +37,7 @@ class SlurmHeader(object): if job.parameters['CURRENT_QUEUE'] == '': return "" else: - return "SBATCH -p {0}".format(job.parameters['CURRENT_QUEUE']) + return "SBATCH --qos={0}".format(job.parameters['CURRENT_QUEUE']) # noinspection PyMethodMayBeStatic,PyUnusedLocal def get_account_directive(self, job): diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index 384aafb5a..c37f000fb 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -146,4 +146,4 @@ class SlurmWrapper(object): @classmethod def queue_directive(cls, queue): - return '#' if queue == '' else '#SBATCH -p {0}'.format(queue) \ No newline at end of file + return '#' if queue == '' else '#SBATCH --qos={0}'.format(queue) \ No newline at end of file -- GitLab From 8d630ad3d1efb41c636f0826283e3b9c0ba87b78 Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 4 Aug 2017 18:18:17 +0200 Subject: [PATCH 15/16] Remove test_migrate_exp, not properly written. Add documentation. Closes #70 --- autosubmit/config/files/platforms.conf | 2 +- docs/source/usage.rst | 29 ++++++++++++++++++++ test/unit/test_migrate_exp.py | 38 -------------------------- 3 files changed, 30 insertions(+), 39 deletions(-) delete mode 100644 test/unit/test_migrate_exp.py diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 608357c3f..45ab24753 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -16,7 +16,7 @@ # ADD_PROJECT_TO_HOST = False ## User for the machine scheduler. Required # USER = -## Optional. If given, Autosubmit will change owner of files in given platform. +## Optional. If given, Autosubmit will change owner of files in given platform when using migrate_exp. # USER_TO = ## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch diff --git a/docs/source/usage.rst b/docs/source/usage.rst index a90c07aeb..82b93c199 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -23,6 +23,7 @@ Command list -install Install database for Autosubmit on the configured folder -archive Clean, compress and remove from the experiments' folder a finalized experiment -unarchive Restores an archived experiment +-migrate_exp Migrates an experiment from one user to another How to create an experiment @@ -828,6 +829,34 @@ Example: autosubmit unarchive cxxx +How to migrate an experiment +============================ +To migrate an experiment from one user to another, you need to add two parameters in the platforms configuration file: + + * USER_TO = + * TEMP_DIR = + +Then, just run the command: +:: + + autosubmit migrate_exp --ofer expid + + +Local files will be archived and remote files put in the HPC temporary directory. + +.. warning:: The temporary directory must be readable by both users (old owner and new owner). + +Then the new owner will have to run the command: +:: + + autosubmit migrate_exp --pickup expid + + + +Local files will be unarchived and remote files copied from the temporal loaction. + +.. warning:: The old owner might need to remove temporal files and archive. + How to configure email notifications ==================================== diff --git a/test/unit/test_migrate_exp.py b/test/unit/test_migrate_exp.py deleted file mode 100644 index eeeb1132a..000000000 --- a/test/unit/test_migrate_exp.py +++ /dev/null @@ -1,38 +0,0 @@ -from unittest import TestCase -from mock import Mock, patch -from autosubmit.experiment.experiment_common import migrate_experiment -import os -import pwd - - -class TestMigrateExp(TestCase): - def setUp(self): - #self.user_from = "old-user" - self.user_from = "dmanuben" - #self.user_to = "new-user" - self.user_to = "dmanuben" - -# def testFoo(self): -# self.failUnless(False) - - @patch('autosubmit.experiment.experiment_common.os') - def test_migrate_experiment(self, mock_os): - current_user_id = "old-user" - user_id, group_id = migrate_experiment("any path", self.user_to) - - to_uid = pwd.getpwnam(self.user_to).pw_uid - mock_os.chown.assert_called_with("any path", to_uid, group_id) - #self.assertEquals("new_user", user_id) - -# @patch('autosubmit.experiment.experiment_common.db_common') -# def test_create_new_experiment(self, db_common_mock): -# current_experiment_id = "empty" -# self._build_db_mock(current_experiment_id, db_common_mock) -# experiment_id = new_experiment(self.description, self.version) -# self.assertEquals("a000", experiment_id) - -# -# @staticmethod -# def _build_db_mock(current_experiment_id, mock_db_common): -# mock_db_common.last_name_used = Mock(return_value=current_experiment_id) -# mock_db_common.check_experiment_exists = Mock(return_value=False) -- GitLab From e9132c7db86d378a735e7c1294aa2ae0ffd2b57d Mon Sep 17 00:00:00 2001 From: Domingo Manubens-Gil Date: Fri, 4 Aug 2017 18:40:41 +0200 Subject: [PATCH 16/16] Bumping version 3.9.0 --- CHANGELOG | 10 ++++++++++ VERSION | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 3f01bb205..f6e5334aa 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,13 @@ +3.9.0 + Custom directives for the HPC resource manager headers + can be added on platforms and jobs configuration files + ~ only paramiko (LSF, SLURM and PBS) + First version with migrate experiments (to another user) + On CCA, TASKS and THREADS can be expressed in lots (e.g. 127:1) + Some bug fixes: + - QUEUE on slurm specified on directive qos instead of partition + - Variable expansion on CCA (ECMWF) headers + 3.8.1 First version with job packages ~ only paramiko (LSF, SLURM and PBS) - Vertical diff --git a/VERSION b/VERSION index f28071967..a5c4c7633 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.8.1 +3.9.0 -- GitLab