diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ab754734e6f6ddc0eeeb0931b8d1a21e1595f9fb..e3a6260a4ccaeeb052747e30a645a41a7f677256 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -619,8 +619,11 @@ class Job(object): """ if ':' in str(self.processors): return reduce(lambda x, y: int(x) + int(y), self.processors.split(':')) - elif self.processors == "": - return 1 + elif self.processors == "" or self.processors == "1": + if int(self.nodes) <= 1: + return 1 + else: + return "" return int(self.processors) @property @@ -1234,6 +1237,8 @@ class Job(object): parameters['CURRENT_PROJ_DIR'] = job_platform.project_dir parameters['CURRENT_ROOTDIR'] = job_platform.root_dir parameters['CURRENT_LOGDIR'] = job_platform.get_files_path() + for key,value in as_conf.platforms_data.get(job_platform.name,{}).items(): + parameters["CURRENT_"+key.upper()] = value return parameters @@ -1241,12 +1246,11 @@ class Job(object): self.executable = str(as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE",""))) self.total_jobs = int(as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs)) self.max_waiting_jobs = int(as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs)) - self.processors = str(as_conf.jobs_data[self.section].get("PROCESSORS",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS","1"))) + self.nodes = str(as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES",""))) self.exclusive = str(as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False))) self.threads = str(as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1"))) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1"))) - self.nodes = str(as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES",""))) self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING","none"))) if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int(self.processors) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index d51dd59ddb0bc9785c9784d1cfebd9eccf288fdb..e02c3db7d69d1a4a36e7fb42e875e717e532c9ef 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -98,7 +98,6 @@ class JobPackager(object): self.special_variables = dict() - #todo add default values #Wrapper building starts here for wrapper_section,wrapper_data in self._as_config.experiment_data.get("WRAPPERS",{}).items(): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 19bdcb02e703e331e713c322aa7f616f1239c77c..a5ed96b8548644b0e30f6f4033a946e8862e82e9 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -352,7 +352,21 @@ class JobPackageThread(JobPackageBase): def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None,wrapper_section="WRAPPERS", wrapper_info= {}): super(JobPackageThread, self).__init__(jobs) - # to be pass as "configuration" + """ + :param dependency: Dependency + :type dependency: String + :param jobs_resources: Resources to be used by the jobs, if any + :type jobs_resources: Dictionary + :param method: Method to be used to submit the jobs, ASThread by default + :type method: String + :param configuration: Autosubmit configuration + :type configuration: Autosubmitconfigparser instance + + """ + # This function is called from the JobPackageThread constructor + # and from the JobPackageThread.create_scripts function + # It is in charge of merging ( switch ) the wrapper info by checking if the value is defined by the user in the wrapper section, current wrapper section, job or platform in that order. + # Some variables are calculated in futher functions, like num_processors and wallclock. if len(wrapper_info) > 0 : self.wrapper_type = wrapper_info[0] self.wrapper_policy = wrapper_info[1] @@ -370,18 +384,21 @@ class JobPackageThread(JobPackageBase): self._job_dependency = dependency self._common_script = None self._wallclock = '00:00' - self._num_processors = '0' + # depends on the type of wrapper + if not hasattr(self,"_num_processors"): + self._num_processors = '0' self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section self.inner_retrials = 0 if configuration is not None: - self.inner_retrials = configuration.get_retrials() + self.inner_retrials = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("INNER_RETRIALS",configuration.get_retrials()) + self.export = configuration.get_wrapper_export(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if self.export.lower() != "none" and len(self.export) > 0: for job in self.jobs: if job.export.lower() != "none" and len(job.export) > 0: - self.export == job.export + self.export = job.export break wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_queue is not None and len(str(wr_queue)) > 0: @@ -389,12 +406,12 @@ class JobPackageThread(JobPackageBase): else: self.queue = jobs[0].queue wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) - if wr_partition is not None and len(str(wr_partition)) > 0: + if wr_partition and len(str(wr_partition)) > 0: self.partition = wr_partition else: self.partition = jobs[0].partition - wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",False) - if wr_exclusive: + wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) + if wr_exclusive is not None: self.exclusive = wr_exclusive else: self.exclusive = jobs[0].exclusive @@ -404,13 +421,31 @@ class JobPackageThread(JobPackageBase): else: self.custom_directives = jobs[0].custom_directives wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXECUTABLE",None) - if wr_executable is None: + if wr_executable: self.executable = wr_executable else: self.executable = jobs[0].executable + wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) + if wr_tasks: + self.tasks = wr_tasks + else: + self.tasks = jobs[0].tasks + wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) + if wr_nodes: + self.nodes = wr_nodes + else: + self.nodes = jobs[0].nodes + wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) + if wr_threads: + self.threads = wr_threads + else: + self.threads = jobs[0].threads else: self.queue = jobs[0].queue self.partition = jobs[0].partition + self.nodes = jobs[0].nodes + self.tasks = jobs[0].tasks + self.threads = jobs[0].threads self.method = method self._wrapper_factory.as_conf = configuration self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"] = configuration.experiment_data["WRAPPERS"][self.current_wrapper_section] @@ -421,19 +456,15 @@ class JobPackageThread(JobPackageBase): self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["METHOD"] = self.wrapper_method self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXPORT"] = self.export self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["QUEUE"] = self.queue + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["NODES"] = self.nodes + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["TASKS"] = self.tasks + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["THREADS"] = self.threads + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["PROCESSORS"] = self._num_processors self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["PARTITION"] = self.partition self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXCLUSIVE"] = self.exclusive self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXECUTABLE"] = self.executable self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["CUSTOM_DIRECTIVES"] = self.custom_directives - pass - - - - - - -#pipeline @property def name(self): return self._name @@ -618,12 +649,13 @@ class JobPackageVertical(JobPackageThread): :param: dependency: """ def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = {}): - super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) + self._num_processors = 0 for job in jobs: if int(job.processors) >= int(self._num_processors): self._num_processors = job.processors - self._threads = job.threads - + self._threads = job.threads + super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) + for job in jobs: self._wallclock = sum_str_hours(self._wallclock, job.wallclock) self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + str(random.randint(1, 10000)), diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index dc53de97f97fa9493af2e0de3949c93bcea17c93..af94f096369d69f0ff659efc0517be58eaef409f 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -38,7 +38,31 @@ class SlurmHeader(object): return "" else: return "SBATCH --qos={0}".format(job.parameters['CURRENT_QUEUE']) + def get_proccesors_directive(self, job): + """ + Returns processors directive for the specified job + :param job: job to create processors directive for + :type job: Job + :return: processors directive + :rtype: str + """ + # There is no processors, so directive is empty + if job.processors == '' or job.processors == '1' and int(job.nodes) > 1: + return "" + else: + return "SBATCH -n {0}".format(job.processors) + def get_tasks_directive(self,job): + """ + Returns tasks directive for the specified job + :param job: job to create tasks directive for + :return: tasks directive + :rtype: str + """ + if job.num_tasks == '': + return "" + else: + return "SBATCH --ntasks-per-node {0}".format(job.tasks) def get_partition_directive(self, job): """ Returns partition directive for the specified job @@ -172,7 +196,7 @@ class SlurmHeader(object): #%THREADS_PER_TASK_DIRECTIVE% #%TASKS_PER_NODE_DIRECTIVE% #%NODES_DIRECTIVE% -#SBATCH -n %NUMPROC% +#%NUMPROC_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% @@ -195,7 +219,7 @@ class SlurmHeader(object): #%MEMORY_PER_TASK_DIRECTIVE% #%THREADS_PER_TASK_DIRECTIVE% #%NODES_DIRECTIVE% -#SBATCH -n %NUMPROC% +#%NUMPROC_DIRECTIVE% #%TASKS_PER_NODE_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 854dfbd6a380aa359d59e345dd47c0aee426c8cb..cfca6e3db1825c7a82cda9b6624f20c4e23c848a 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1187,6 +1187,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_queue_directive'): header = header.replace( '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) + if hasattr(self.header, 'get_proccesors_directive'): + header = header.replace( + '%NUMPROC_DIRECTIVE%', self.header.get_proccesors_directive(job)) if hasattr(self.header, 'get_partition_directive'): header = header.replace( '%PARTITION_DIRECTIVE%', self.header.get_partition_directive(job)) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index b0d866a25ae36dc311381199e3d19ed4aaf3d402..764fb384cb374340e516a8026c97dd26aae58afa 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -614,8 +614,10 @@ class SlurmPlatform(ParamikoPlatform): #SBATCH --output={kwargs["name"]}.out #SBATCH --error={kwargs["name"]}.err #SBATCH -t {kwargs["wallclock"]}:00 -#SBATCH -n {kwargs["num_processors"]} -#SBATCH --cpus-per-task={kwargs["threads"]} +{kwargs["threads"]} +{kwargs["nodes"]} +{kwargs["num_processors"]} +{kwargs["tasks"]} {kwargs["exclusive"]} {kwargs["custom_directives"]} diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index ca2c8dca5c87fbc1c73be3f555ea73e846d9d8e7..f47ebe77e8072f5800b54d4bae09d6ac5375f3c3 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -40,6 +40,12 @@ class WrapperFactory(object): kwargs["exclusive"] = self.exclusive(wrapper_data['EXCLUSIVE']) kwargs["custom_directives"] = self.custom_directives(wrapper_data["CUSTOM_DIRECTIVES"]) kwargs["executable"] = wrapper_data["EXECUTABLE"] + kwargs['nodes'] = self.nodes(wrapper_data['NODES']) + kwargs['tasks'] = self.tasks(wrapper_data['TASKS']) + kwargs['threads'] = self.threads(kwargs['threads']) + if int(wrapper_data['NODES']) > 1 and kwargs['num_processors'] == '1': + kwargs['num_processors'] = None + kwargs['num_processors'] = self.processors(kwargs['num_processors']) kwargs['header_directive'] = self.header_directives(**kwargs) builder = wrapper_builder(**kwargs) return self.wrapper_director.construct(builder) @@ -66,8 +72,16 @@ class WrapperFactory(object): return '#' if dependency is None else self.dependency_directive(dependency) def queue(self, queue): return '#' if not queue else self.queue_directive(queue) + def processors(self, processors): + return '#' if not processors or processors == "0" else self.processors_directive(processors) + def nodes(self, nodes): + return '#' if not nodes else self.nodes_directive(nodes) + def tasks(self, tasks): + return '#' if not tasks else self.tasks_directive(tasks) def partition(self, partition): return '#' if not partition else self.partition_directive(partition) + def threads(self, threads): + return '#' if not threads or threads in ["0","1"] else self.threads_directive(threads) def exclusive(self, exclusive): return '#' if not exclusive or str(exclusive).lower() == "false" else self.exclusive_directive(exclusive) def custom_directives(self, custom_directives): @@ -84,16 +98,21 @@ class WrapperFactory(object): return "" def dependency_directive(self, dependency): - pass + raise NotImplemented(self.exception) def queue_directive(self, queue): - pass + raise NotImplemented(self.exception) + def processors_directive(self, processors): + raise NotImplemented(self.exception) + def nodes_directive(self, nodes): + raise NotImplemented(self.exception) + def tasks_directive(self, tasks): + raise NotImplemented(self.exception) def partition_directive(self, partition): - pass + raise NotImplemented(self.exception) def exclusive_directive(self, exclusive): - pass - - - + raise NotImplemented(self.exception) + def threads_directive(self, threads): + raise NotImplemented(self.exception) class SlurmWrapperFactory(WrapperFactory): @@ -127,11 +146,19 @@ class SlurmWrapperFactory(WrapperFactory): def queue_directive(self, queue): return '#SBATCH --qos={0}'.format(queue) - def partition_directive(self, partition): return '#SBATCH --partition={0}'.format(partition) def exclusive_directive(self, exclusive): return '#SBATCH --exclusive' + def tasks_directive(self, tasks): + return '#SBATCH --ntasks-per-node={0}'.format(tasks) + def nodes_directive(self, nodes): + return '#SBATCH -N {0}'.format(nodes) + def processors_directive(self, processors): + return '#SBATCH -n {0}'.format(processors) + def threads_directive(self, threads): + return '#SBATCH --cpus-per-task={0}'.format(threads) + class LSFWrapperFactory(WrapperFactory): diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 4ee2903d7979f8f0dd6fdbe3cf637c1918d18f12..caaf9c60a2da9478839ee8cc400f03422ad731f5 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -259,6 +259,33 @@ class TestJob(TestCase): exists_mock.assert_called_once_with(os.path.join(self.job._tmp_path, self.job.name + '_COMPLETED')) self.assertEqual(Status.FAILED, self.job.status) + def test_total_processors(self): + for test in [ + { + 'processors': '', + 'nodes': 0, + 'expected': 1 + }, + { + 'processors': '', + 'nodes': 10, + 'expected': '' + }, + { + 'processors': '42', + 'nodes': 2, + 'expected': 42 + }, + { + 'processors': '1:9', + 'nodes': 0, + 'expected': 10 + } + ]: + self.job.processors = test['processors'] + self.job.nodes = test['nodes'] + self.assertEqual(self.job.total_processors, test['expected']) + def test_job_script_checking_contains_the_right_default_variables(self): # This test (and feature) was implemented in order to avoid # false positives on the checking process with auto-ecearth3 @@ -289,21 +316,30 @@ class TestJob(TestCase): dummy_serial_platform.name = 'serial' dummy_platform = MagicMock() dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.name = 'dummy_platform' + self.as_conf.substitute_dynamic_variables = MagicMock() default = {'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'} self.as_conf.substitute_dynamic_variables.return_value = default dummy_platform.custom_directives = '["whatever"]' - self.as_conf.dynamic_variables = MagicMock() + self.as_conf.dynamic_variables = {} self.as_conf.parameters = MagicMock() self.as_conf.return_value = {} self.as_conf.normalize_parameters_keys = MagicMock() self.as_conf.normalize_parameters_keys.return_value = default self.job._platform = dummy_platform + self.as_conf.platforms_data = { "dummy_platform":{ "whatever":"dummy_value", "whatever2":"dummy_value2"} } + parameters = {} # Act parameters = self.job.update_parameters(self.as_conf, parameters) # Assert + self.assertTrue('CURRENT_WHATEVER' in parameters) + self.assertTrue('CURRENT_WHATEVER2' in parameters) + + self.assertEqual('dummy_value', parameters['CURRENT_WHATEVER']) + self.assertEqual('dummy_value2', parameters['CURRENT_WHATEVER2']) self.assertTrue('d' in parameters) self.assertTrue('d_' in parameters) self.assertTrue('Y' in parameters) diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 8d347be20eca2482457a6cd279fc2594fb558d4f..29de85af201a5f96eae2e20f477667de5733dae6 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -1,22 +1,161 @@ from unittest import TestCase import os +import inspect +from copy import deepcopy from mock import Mock,MagicMock, mock_open , call from mock import patch -from autosubmit.job.job_packages import JobPackageSimple +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical from autosubmit.job.job import Job from autosubmit.job.job_common import Status +import shutil +import tempfile + +from unittest import TestCase +from mock import MagicMock +from autosubmit.job.job_packager import JobPackager +from autosubmit.job.job_list import JobList +from autosubmit.job.job_dict import DicJobs +from autosubmit.job.job_utils import Dependency +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from autosubmit.job.job_list_persistence import JobListPersistenceDb +from random import randrange +from collections import OrderedDict +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' class TestJobPackage(TestCase): + def setUpWrappers(self,options): + # reset + self.as_conf = None + self.job_package_wrapper = None + self.experiment_id = 'random-id' + self._wrapper_factory = MagicMock() + + self.config = FakeBasicConfig + self.config.read = MagicMock() + + + + self.as_conf = AutosubmitConfig(self.experiment_id, self.config, YAMLParserFactory()) + self.as_conf.experiment_data = dict() + self.as_conf.experiment_data["JOBS"] = dict() + self.as_conf.experiment_data["PLATFORMS"] = dict() + self.as_conf.experiment_data["WRAPPERS"] = dict() + self.temp_directory = tempfile.mkdtemp() + self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(), + JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + self.parser_mock = MagicMock(spec='SafeConfigParser') + + self.platform.max_waiting_jobs = 100 + self.platform.total_jobs = 100 + self.as_conf.experiment_data["WRAPPERS"]["WRAPPERS"] = options + self._wrapper_factory.as_conf = self.as_conf + self.jobs[0].wallclock = "00:00" + self.jobs[0].threads = "1" + self.jobs[0].tasks = "1" + self.jobs[0].exclusive = True + self.jobs[0].queue = "debug" + self.jobs[0].partition = "debug" + self.jobs[0].custom_directives = "dummy_directives" + self.jobs[0].processors = "9" + self.jobs[0]._processors = "9" + self.jobs[0].retrials = 0 + self.jobs[1].wallclock = "00:00" + self.jobs[1].threads = "" + self.jobs[1].tasks = "" + self.jobs[1].exclusive = True + self.jobs[1].queue = "debug2" + self.jobs[1].partition = "debug2" + self.jobs[1].custom_directives = "dummy_directives2" + self.jobs[1].processors = "9" + self.jobs[1]._processors = "9" + + + self.wrapper_type = options.get('TYPE', 'vertical') + self.wrapper_policy = options.get('POLICY', 'flexible') + self.wrapper_method = options.get('METHOD', 'ASThread') + self.jobs_in_wrapper = options.get('JOBS_IN_WRAPPER', 'None') + self.extensible_wallclock = options.get('EXTEND_WALLCLOCK', 0) + self.job_package_wrapper = JobPackageVertical(self.jobs,configuration=self.as_conf,wrapper_info=[self.wrapper_type,self.wrapper_policy,self.wrapper_method,self.jobs_in_wrapper,self.extensible_wallclock]) + self.job_list._ordered_jobs_by_date_member["WRAPPERS"] = dict() + + + + def setUp(self): self.platform = MagicMock() + self.platform.queue = "debug" + self.platform.partition = "debug" + self.platform.serial_platform = self.platform + self.platform.serial_queue = "debug-serial" + self.platform.serial_partition = "debug-serial" self.jobs = [Job('dummy1', 0, Status.READY, 0), Job('dummy2', 0, Status.READY, 0)] self.jobs[0]._platform = self.jobs[1]._platform = self.platform self.job_package = JobPackageSimple(self.jobs) + def test_default_parameters(self): + options = { + 'TYPE': "vertical", + 'JOBS_IN_WRAPPER': "None", + 'METHOD': "ASThread", + 'POLICY': "flexible", + 'EXTEND_WALLCLOCK': 0, + } + + self.setUpWrappers(options) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["TYPE"], "vertical") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["JOBS_IN_WRAPPER"], "None") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["METHOD"], "ASThread") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["POLICY"], "flexible") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["EXTEND_WALLCLOCK"], 0) + + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["EXCLUSIVE"], True) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["INNER_RETRIALS"], 0) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["QUEUE"], "debug") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["PARTITION"], "debug") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["THREADS"], "1") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["TASKS"], "1") + + options_slurm = { + 'EXCLUSIVE': False, + 'QUEUE': "bsc32", + 'PARTITION': "bsc32", + 'THREADS': "30", + 'TASKS': "40", + 'INNER_RETRIALS': 30, + 'CUSTOM_DIRECTIVES': "['#SBATCH --mem=1000']" + } + self.setUpWrappers(options_slurm) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["EXCLUSIVE"], False) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["INNER_RETRIALS"], 30) + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["QUEUE"], "bsc32") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["PARTITION"], "bsc32") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["THREADS"], "30") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["TASKS"], "40") + self.assertEqual(self.as_conf.experiment_data["CURRENT_WRAPPER"]["CUSTOM_DIRECTIVES"], "['#SBATCH --mem=1000']") + + def test_job_package_default_init(self): with self.assertRaises(Exception): @@ -65,3 +204,6 @@ class TestJobPackage(TestCase): self.job_package._create_scripts.is_called_once_with() self.job_package._send_files.is_called_once_with() self.job_package._do_submission.is_called_once_with() + + def test_wrapper_parameters(self): + pass \ No newline at end of file