diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 6dc85a87e5731fbce4ebe5e4e430a05429503182..e5b921de281206d4d80cc4a06cfff80ae1e7f9d2 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -224,6 +224,7 @@ class Job(object): # internal self.current_checkpoint_step = 0 self.max_checkpoint_step = 0 + self.reservation= "" @property @autosubmit_parameter(name='tasktype') @@ -1286,6 +1287,7 @@ class Job(object): self.exclusive = str(as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False))) self.threads = str(as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1"))) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1"))) + self.reservation = str(as_conf.jobs_data[self.section].get("RESERVATION",as_conf.platforms_data.get(job_platform.name, {}).get("RESERVATION", ""))) self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING","none"))) if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int(self.processors) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node @@ -1340,6 +1342,7 @@ class Job(object): parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['HYPERTHREADING'] = self.hyperthreading parameters['CURRENT_QUEUE'] = self.queue + parameters['RESERVATION'] = self.reservation return parameters def update_wrapper_parameters(self,as_conf, parameters): diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 129566f880a5f7cf6abaa93736c0fec1d8f9fc5f..accb2362c349e2156d090ac769fd09df5ca7dab5 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -159,6 +159,13 @@ class SlurmHeader(object): return "SBATCH --cpus-per-task={0}".format(job.parameters['NUMTHREADS']) # noinspection PyMethodMayBeStatic,PyUnusedLocal + + def get_reservation_directive(self, job): + if job.parameters['RESERVATION'] == '': + return "" + else: + return "SBATCH --reservation={0}".format(job.parameters['RESERVATION']) + def get_custom_directives(self, job): """ Returns custom directives for the specified job @@ -201,6 +208,7 @@ class SlurmHeader(object): #%TASKS_PER_NODE_DIRECTIVE% #%NODES_DIRECTIVE% #%NUMPROC_DIRECTIVE% +#%RESERVATION_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ_DIR%/%CURRENT_USER%/%DEFAULT.EXPID%/LOG_%DEFAULT.EXPID%/%OUT_LOG_DIRECTIVE% @@ -224,6 +232,7 @@ class SlurmHeader(object): #%THREADS_PER_TASK_DIRECTIVE% #%NODES_DIRECTIVE% #%NUMPROC_DIRECTIVE% +#%RESERVATION_DIRECTIVE% #%TASKS_PER_NODE_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index cfca6e3db1825c7a82cda9b6624f20c4e23c848a..9788a482a095989d8fc5ebab40dd463dbcb753c1 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1220,6 +1220,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_nodes_directive'): header = header.replace( '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) + if hasattr(self.header, 'get_reservation_directive'): + header = header.replace( + '%RESERVATION_DIRECTIVE%', self.header.get_reservation_directive(job)) if hasattr(self.header, 'get_memory_directive'): header = header.replace( '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index f1bfbcbac688a596d7a835e5444d2b6566a3e618..91d7fbe24c5b98327f7ac6cae4cff2058c26dc05 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -1,18 +1,24 @@ -from unittest import TestCase - import datetime import inspect import os import sys -from mock import Mock, MagicMock -from mock import patch +import tempfile +from pathlib import Path # compatibility with both versions (2 & 3) from sys import version_info +from textwrap import dedent +from unittest import TestCase +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmitconfigparser.config.configcommon import BasicConfig, YAMLParserFactory +from mock import Mock, MagicMock +from mock import patch + +from autosubmit.autosubmit import Autosubmit from autosubmit.job.job import Job from autosubmit.job.job_common import Status +from autosubmit.job.job_list import JobList from autosubmit.platforms.platform import Platform -from autosubmitconfigparser.config.configcommon import AutosubmitConfig if version_info.major == 2: import builtins as builtins @@ -218,8 +224,9 @@ class TestJob(TestCase): self.job.parameters['NUMPROC'] = 999 self.job.parameters['NUMTHREADS'] = 777 self.job.parameters['NUMTASK'] = 666 + self.job.parameters['RESERVATION'] = "random-string" + update_content_mock = Mock(return_value=('some-content: %NUMPROC%, %NUMTHREADS%, %NUMTASK%', 'some-content: %NUMPROC%, %NUMTHREADS%, %NUMTASK%')) - update_content_mock = Mock(return_value=('some-content: %NUMPROC%, %NUMTHREADS%, %NUMTASK%','some-content: %NUMPROC%, %NUMTHREADS%, %NUMTASK%')) #todo self.job.update_content = update_content_mock @@ -237,6 +244,111 @@ class TestJob(TestCase): update_content_mock.assert_called_with(config) self.assertTrue(checked) + @patch('autosubmitconfigparser.config.basicconfig.BasicConfig') + def test_job_parameters(self, mocked_global_basic_config: Mock): + """Test job platforms with a platform. Builds job and platform using YAML data, without mocks. + + Actually one mock, but that's for something in the AutosubmitConfigParser that can + be modified to remove the need of that mock. + """ + + expid = 'zzyy' + + for reservation in [None, '', ' ', 'some-string', 'a', '123', 'True']: + reservation_string = '' if not reservation else f'RESERVATION: "{reservation}"' + with tempfile.TemporaryDirectory() as temp_dir: + BasicConfig.LOCAL_ROOT_DIR = str(temp_dir) + Path(temp_dir, expid).mkdir() + # FIXME: Not sure why but the submitted and Slurm were using the $expid/tmp/ASLOGS folder? + for path in [f'{expid}/tmp', f'{expid}/tmp/ASLOGS', f'{expid}/tmp/ASLOGS_{expid}', f'{expid}/proj', + f'{expid}/conf']: + Path(temp_dir, path).mkdir() + with open(Path(temp_dir, f'{expid}/conf/minimal.yml'), 'w+') as minimal: + minimal.write(dedent(f'''\ + DEFAULT: + EXPID: {expid} + HPCARCH: test + JOBS: + A: + FILE: a + PLATFORM: test + RUNNING: once + {reservation_string} + PLATFORMS: + test: + TYPE: slurm + HOST: localhost + PROJECT: abc + QUEUE: debug + USER: me + SCRATCH_DIR: /anything/ + ADD_PROJECT_TO_HOST: False + MAX_WALLCLOCK: '000:55' + TEMP_DIR: '' + ''')) + minimal.flush() + + mocked_basic_config = Mock(spec=BasicConfig) + mocked_basic_config.LOCAL_ROOT_DIR = str(temp_dir) + mocked_global_basic_config.LOCAL_ROOT_DIR.return_value = str(temp_dir) + + config = AutosubmitConfig(expid, basic_config=mocked_basic_config, parser_factory=YAMLParserFactory()) + config.reload(True) + parameters = config.load_parameters() + + job_list_obj = JobList(expid, mocked_basic_config, YAMLParserFactory(), + Autosubmit._get_job_list_persistence(expid, config), config) + job_list_obj.generate( + date_list=[], + member_list=[], + num_chunks=1, + chunk_ini=1, + parameters=parameters, + date_format='M', + default_retrials=config.get_retrials(), + default_job_type=config.get_default_job_type(), + wrapper_type=config.get_wrapper_type(), + wrapper_jobs={}, + notransitive=True, + update_structure=True, + run_only_members=config.get_member_list(run_only=True), + jobs_data=config.experiment_data, + as_conf=config + ) + job_list = job_list_obj.get_job_list() + self.assertEqual(1, len(job_list)) + + submitter = Autosubmit._get_submitter(config) + submitter.load_platforms(config) + + hpcarch = config.get_platform() + for job in job_list: + if job.platform_name == "" or job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name] + + job = job_list[0] + + # Asserts the script is valid. + checked = job.check_script(config, parameters) + self.assertTrue(checked) + + # Asserts the configuration value is propagated as-is to the job parameters. + # Finally, asserts the header created is correct. + if not reservation: + self.assertTrue('JOBS.A.RESERVATION' not in job.parameters) + + template_content, additional_templates = job.update_content(config) + self.assertFalse(additional_templates) + + self.assertFalse(f'#SBATCH --reservation' in template_content) + else: + self.assertEqual(reservation, job.parameters['JOBS.A.RESERVATION']) + + template_content, additional_templates = job.update_content(config) + self.assertFalse(additional_templates) + self.assertTrue(f'#SBATCH --reservation={reservation}' in template_content) + def test_exists_completed_file_then_sets_status_to_completed(self): # arrange exists_mock = Mock(return_value=True) @@ -310,7 +422,7 @@ class TestJob(TestCase): 'MEMORY': memory, 'WALLCLOCK': wallclock, 'CUSTOM_DIRECTIVES': custom_directives, - 'SCRATCH_FREE_SPACE': 0 + 'SCRATCH_FREE_SPACE': 0, } self.as_conf.jobs_data[section] = options