diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 75ba6ddcbe3b24f94cc89c44a5f1a6b09bfc82bf..054c95e3547d3b47121cb6b4eff88aa2e2230ad4 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -217,6 +217,8 @@ class Job(object): self._dependencies = [] self.running = "once" self.start_time = None + self.ext_header_path = '' + self.ext_tailer_path = '' self.edge_info = dict() self.total_jobs = None self.max_waiting_jobs = None @@ -500,6 +502,84 @@ class Job(object): del odict['_platform'] # remove filehandle entry return odict + def read_header_tailer_script(self, script_path: str, as_conf: AutosubmitConfig, is_header: bool): + """ + Opens and reads a script. If it is not a BASH script it will fail :( + + Will strip away the line with the hash bang (#!) + + :param script_path: relative to the experiment directory path to the script + :param as_conf: Autosubmit configuration file + :param is_header: boolean indicating if it is header extended script + """ + + found_hashbang = False + script_name = script_path.rsplit("/")[-1] # pick the name of the script for a more verbose error + script = '' + # the value might be None string if the key has been set, but with no value + if script_path == '' or script_path == "None": + return script + + # adjusts the error message to the type of the script + if is_header: + error_message_type = "header" + else: + error_message_type = "tailer" + + try: + # find the absolute path + script_file = open(os.path.join(as_conf.get_project_dir(), script_path), 'r') + except Exception as e: # log + # We stop Autosubmit if we don't find the script + raise AutosubmitCritical("Extended {1} script: failed to fetch {0} \n".format(str(e), + error_message_type), 7014) + + for line in script_file: + if line[:2] != "#!": + script += line + else: + found_hashbang = True + # check if the type of the script matches the one in the extended + if "bash" in line: + if self.type != Type.BASH: + raise AutosubmitCritical( + "Extended {2} script: script {0} seems Bash but job {1} isn't\n".format(script_name, + self.script_name, + error_message_type), + 7011) + elif "Rscript" in line: + if self.type != Type.R: + raise AutosubmitCritical( + "Extended {2} script: script {0} seems Rscript but job {1} isn't\n".format(script_name, + self.script_name, + error_message_type), + 7011) + elif "python" in line: + if self.type not in (Type.PYTHON, Type.PYTHON2, Type.PYTHON3): + raise AutosubmitCritical( + "Extended {2} script: script {0} seems Python but job {1} isn't\n".format(script_name, + self.script_name, + error_message_type), + 7011) + else: + raise AutosubmitCritical( + "Extended {2} script: couldn't figure out script {0} type\n".format(script_name, + self.script_name, + error_message_type), 7011) + + if not found_hashbang: + raise AutosubmitCritical( + "Extended {2} script: couldn't figure out script {0} type\n".format(script_name, + self.script_name, + error_message_type), 7011) + + if is_header: + script = "\n###############\n# Header script\n###############\n" + script + else: + script = "\n###############\n# Tailer script\n###############\n" + script + + return script + @property def parents(self): """ @@ -1562,6 +1642,11 @@ class Job(object): parameters['SCRATCH_FREE_SPACE'] = self.scratch_free_space parameters['CUSTOM_DIRECTIVES'] = self.custom_directives parameters['HYPERTHREADING'] = self.hyperthreading + # we open the files and offload the whole script as a string + # memory issues if the script is too long? Add a check to avoid problems... + if as_conf.get_project_type() != "none": + parameters['EXTENDED_HEADER'] = self.read_header_tailer_script(self.ext_header_path, as_conf, True) + parameters['EXTENDED_TAILER'] = self.read_header_tailer_script(self.ext_tailer_path, as_conf, False) parameters['CURRENT_QUEUE'] = self.queue parameters['RESERVATION'] = self.reservation parameters['CURRENT_EC_QUEUE'] = self.ec_queue diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index d705b1d1dd4205aac431f5e1e657cb86b3107b95..69d54135278b0f4283d108382a38f760e3a1f6b7 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -138,6 +138,7 @@ class StatisticsSnippetBash: AS_CHECKPOINT_CALLS=$((AS_CHECKPOINT_CALLS+1)) touch ${job_name_ptrn}_CHECKPOINT_${AS_CHECKPOINT_CALLS} } + %EXTENDED_HEADER% ################### # Autosubmit job ################### @@ -147,7 +148,7 @@ class StatisticsSnippetBash: @staticmethod def as_tailer(): return textwrap.dedent("""\ - + %EXTENDED_TAILER% ################### # Autosubmit tailer ################### @@ -209,7 +210,8 @@ class StatisticsSnippetPython: global AS_CHECKPOINT_CALLS global job_name_ptrn AS_CHECKPOINT_CALLS = AS_CHECKPOINT_CALLS + 1 - open(job_name_ptrn + '_CHECKPOINT_' + str(AS_CHECKPOINT_CALLS), 'w').close() + open(job_name_ptrn + '_CHECKPOINT_' + str(AS_CHECKPOINT_CALLS), 'w').close() + %EXTENDED_HEADER% ################### # Autosubmit job ################### @@ -220,7 +222,7 @@ class StatisticsSnippetPython: # expand tailer to use python3 def as_tailer(self): return textwrap.dedent("""\ - + %EXTENDED_TAILER% ################### # Autosubmit tailer ################### @@ -283,6 +285,7 @@ class StatisticsSnippetR: fileConn<-file(paste(job_name_ptrn,"_CHECKPOINT_",AS_CHECKPOINT_CALLS, sep = ''),"w") close(fileConn) } + %EXTENDED_HEADER% ################### # Autosubmit job ################### @@ -292,7 +295,7 @@ class StatisticsSnippetR: @staticmethod def as_tailer(): return textwrap.dedent("""\ - + %EXTENDED_TAILER% ################### # Autosubmit tailer ################### diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 440ddf63045286f9b1ee1ed9683baa40637e625e..9645f493f5f38e39fa7ec068536493020e1b44ee 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -430,6 +430,9 @@ class DicJobs: job.running = str(parameters[section].get( 'RUNNING', 'once')) job.x11 = str(parameters[section].get( 'X11', False )).lower() job.skippable = str(parameters[section].get( "SKIPPABLE", False)).lower() + # store from within the relative path to the project + job.ext_header_path = str(parameters[section].get('EXTENDED_HEADER_PATH', '')) + job.ext_tailer_path = str(parameters[section].get('EXTENDED_TAILER_PATH', '')) self._jobs_list.get_job_list().append(job) return job diff --git a/docs/source/userguide/configure/index.rst b/docs/source/userguide/configure/index.rst index be8be1b17b9ed61d4ca0f9b81cb476a6c9fd18f1..5b09b6905c75344b9798e64cda376ae20e3652ff 100644 --- a/docs/source/userguide/configure/index.rst +++ b/docs/source/userguide/configure/index.rst @@ -147,6 +147,10 @@ There are also other, less used features that you can use: * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. +* EXTENDED_HEADER_PATH: specify the path relative to the project folder where the extension to the autosubmit's header is + +* EXTENDED_TAILER_PATH: specify the path relative to the project folder where the extension to the autosubmit's tailer is + How to add a new heterogeneous job (hetjob) ------------------------------------------- diff --git a/test/unit/test_job.py b/test/unit/test_job.py index e8d0cefd9f8bba7e873ccb8b6349cf12ca0be466..218da278f1608d1aa48ba5c996bc68ad72efbc3b 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -14,6 +14,7 @@ from autosubmitconfigparser.config.configcommon import BasicConfig, YAMLParserFa from mock import Mock, MagicMock from mock import patch +import log.log from autosubmit.autosubmit import Autosubmit from autosubmit.job.job import Job from autosubmit.job.job_common import Status @@ -25,6 +26,9 @@ if version_info.major == 2: else: import builtins +# import the exception. Three dots means two folders up the hierarchy +# reference: https://peps.python.org/pep-0328/ +from log.log import AutosubmitCritical class TestJob(TestCase): def setUp(self): @@ -244,6 +248,273 @@ class TestJob(TestCase): update_content_mock.assert_called_with(config) self.assertTrue(checked) + @patch('autosubmitconfigparser.config.basicconfig.BasicConfig') + def test_header_tailer(self, mocked_global_basic_config: Mock): + """Test if header and tailer are being properly substituted onto the final .cmd file without + a bunch of mocks + + Copied from Aina's and Bruno's test for the reservation key. Hence, the following code still + applies: "Actually one mock, but that's for something in the AutosubmitConfigParser that can + be modified to remove the need of that mock." + """ + + # set up + + expid = 'zzyy' + + with tempfile.TemporaryDirectory() as temp_dir: + Path(temp_dir, expid).mkdir() + # FIXME: (Copied from Bruno) Not sure why but the submitted and Slurm were using the $expid/tmp/ASLOGS folder? + for path in [f'{expid}/tmp', f'{expid}/tmp/ASLOGS', f'{expid}/tmp/ASLOGS_{expid}', f'{expid}/proj', + f'{expid}/conf', f'{expid}/proj/project_files']: + Path(temp_dir, path).mkdir() + # loop over the host script's type + for script_type in ["Bash", "Python", "Rscript"]: + # loop over the position of the extension + for extended_position in ["header", "tailer", "header tailer", "neither"]: + # loop over the extended type + for extended_type in ["Bash", "Python", "Rscript", "Bad1", "Bad2", "FileNotFound"]: + BasicConfig.LOCAL_ROOT_DIR = str(temp_dir) + + header_file_name = "" + # this is the part of the script that executes + header_content = "" + tailer_file_name = "" + tailer_content = "" + + # create the extended header and tailer scripts + if "header" in extended_position: + if extended_type == "Bash": + header_content = 'echo "header bash"' + full_header_content = dedent(f'''\ + #!/usr/bin/bash + {header_content} + ''') + header_file_name = "header.sh" + elif extended_type == "Python": + header_content = 'print("header python")' + full_header_content = dedent(f'''\ + #!/usr/bin/python + {header_content} + ''') + header_file_name = "header.py" + elif extended_type == "Rscript": + header_content = 'print("header R")' + full_header_content = dedent(f'''\ + #!/usr/bin/env Rscript + {header_content} + ''') + header_file_name = "header.R" + elif extended_type == "Bad1": + header_content = 'this is a script without #!' + full_header_content = dedent(f'''\ + {header_content} + ''') + header_file_name = "header.bad1" + elif extended_type == "Bad2": + header_content = 'this is a header with a bath executable' + full_header_content = dedent(f'''\ + #!/does/not/exist + {header_content} + ''') + header_file_name = "header.bad2" + else: # file not found case + header_file_name = "non_existent_header" + + if extended_type != "FileNotFound": + # build the header script if we need to + with open(Path(temp_dir, f'{expid}/proj/project_files/{header_file_name}'), 'w+') as header: + header.write(full_header_content) + header.flush() + else: + # make sure that the file does not exist + for file in os.listdir(Path(temp_dir, f'{expid}/proj/project_files/')): + os.remove(Path(temp_dir, f'{expid}/proj/project_files/{file}')) + + if "tailer" in extended_position: + if extended_type == "Bash": + tailer_content = 'echo "tailer bash"' + full_tailer_content = dedent(f'''\ + #!/usr/bin/bash + {tailer_content} + ''') + tailer_file_name = "tailer.sh" + elif extended_type == "Python": + tailer_content = 'print("tailer python")' + full_tailer_content = dedent(f'''\ + #!/usr/bin/python + {tailer_content} + ''') + tailer_file_name = "tailer.py" + elif extended_type == "Rscript": + tailer_content = 'print("header R")' + full_tailer_content = dedent(f'''\ + #!/usr/bin/env Rscript + {tailer_content} + ''') + tailer_file_name = "tailer.R" + elif extended_type == "Bad1": + tailer_content = 'this is a script without #!' + full_tailer_content = dedent(f'''\ + {tailer_content} + ''') + tailer_file_name = "tailer.bad1" + elif extended_type == "Bad2": + tailer_content = 'this is a tailer with a bath executable' + full_tailer_content = dedent(f'''\ + #!/does/not/exist + {tailer_content} + ''') + tailer_file_name = "tailer.bad2" + else: # file not found case + tailer_file_name = "non_existent_tailer" + + if extended_type != "FileNotFound": + # build the tailer script if we need to + with open(Path(temp_dir, f'{expid}/proj/project_files/{tailer_file_name}'), 'w+') as tailer: + tailer.write(full_tailer_content) + tailer.flush() + else: + # clear the content of the project file + for file in os.listdir(Path(temp_dir, f'{expid}/proj/project_files/')): + os.remove(Path(temp_dir, f'{expid}/proj/project_files/{file}')) + + # configuration file + + with open(Path(temp_dir, f'{expid}/conf/configuration.yml'), 'w+') as configuration: + configuration.write(dedent(f'''\ +DEFAULT: + EXPID: {expid} + HPCARCH: local +JOBS: + A: + FILE: a + TYPE: {script_type if script_type != "Rscript" else "R"} + PLATFORM: local + RUNNING: once + EXTENDED_HEADER_PATH: {header_file_name} + EXTENDED_TAILER_PATH: {tailer_file_name} +PLATFORMS: + test: + TYPE: slurm + HOST: localhost + PROJECT: abc + QUEUE: debug + USER: me + SCRATCH_DIR: /anything/ + ADD_PROJECT_TO_HOST: False + MAX_WALLCLOCK: '00:55' + TEMP_DIR: '' +CONFIG: + RETRIALS: 0 + ''')) + + configuration.flush() + + mocked_basic_config = Mock(spec=BasicConfig) + mocked_basic_config.LOCAL_ROOT_DIR = str(temp_dir) + mocked_global_basic_config.LOCAL_ROOT_DIR.return_value = str(temp_dir) + + config = AutosubmitConfig(expid, basic_config=mocked_basic_config, parser_factory=YAMLParserFactory()) + config.reload(True) + + # act + + parameters = config.load_parameters() + + job_list_obj = JobList(expid, mocked_basic_config, YAMLParserFactory(), + Autosubmit._get_job_list_persistence(expid, config), config) + job_list_obj.generate( + date_list=[], + member_list=[], + num_chunks=1, + chunk_ini=1, + parameters=parameters, + date_format='M', + default_retrials=config.get_retrials(), + default_job_type=config.get_default_job_type(), + wrapper_type=config.get_wrapper_type(), + wrapper_jobs={}, + notransitive=True, + update_structure=True, + run_only_members=config.get_member_list(run_only=True), + jobs_data=config.experiment_data, + as_conf=config + ) + + job_list = job_list_obj.get_job_list() + + submitter = Autosubmit._get_submitter(config) + submitter.load_platforms(config) + + hpcarch = config.get_platform() + for job in job_list: + if job.platform_name == "" or job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name] + + # pick ur single job + job = job_list[0] + + if extended_position == "header" or extended_position == "tailer" or extended_position == "header tailer": + if extended_type == script_type: + # load the parameters + job.check_script(config, parameters) + # create the script + job.create_script(config) + with open(Path(temp_dir, f'{expid}/tmp/zzyy_A.cmd'), 'r') as file: + full_script = file.read() + if "header" in extended_position: + self.assertTrue(header_content in full_script) + if "tailer" in extended_position: + self.assertTrue(tailer_content in full_script) + else: # extended_type != script_type + if extended_type == "FileNotFound": + with self.assertRaises(AutosubmitCritical) as context: + job.check_script(config, parameters) + self.assertEqual(context.exception.code, 7014) + if extended_position == "header tailer" or extended_position == "header": + self.assertEqual(context.exception.message, + f"Extended header script: failed to fetch [Errno 2] No such file or directory: '{temp_dir}/{expid}/proj/project_files/{header_file_name}' \n") + else: # extended_position == "tailer": + self.assertEqual(context.exception.message, + f"Extended tailer script: failed to fetch [Errno 2] No such file or directory: '{temp_dir}/{expid}/proj/project_files/{tailer_file_name}' \n") + elif extended_type == "Bad1" or extended_type == "Bad2": + # we check if a script without hash bang fails or with a bad executable + with self.assertRaises(AutosubmitCritical) as context: + job.check_script(config, parameters) + self.assertEqual(context.exception.code, 7011) + if extended_position == "header tailer" or extended_position == "header": + self.assertEqual(context.exception.message, + f"Extended header script: couldn't figure out script {header_file_name} type\n") + else: + self.assertEqual(context.exception.message, + f"Extended tailer script: couldn't figure out script {tailer_file_name} type\n") + else: # if extended type is any but the script_type and the malformed scripts + with self.assertRaises(AutosubmitCritical) as context: + job.check_script(config, parameters) + self.assertEqual(context.exception.code, 7011) + # if we have both header and tailer, it will fail at the header first + if extended_position == "header tailer" or extended_position == "header": + self.assertEqual(context.exception.message, + f"Extended header script: script {header_file_name} seems " + f"{extended_type} but job zzyy_A.cmd isn't\n") + else: # extended_position == "tailer" + self.assertEqual(context.exception.message, + f"Extended tailer script: script {tailer_file_name} seems " + f"{extended_type} but job zzyy_A.cmd isn't\n") + else: # extended_position == "neither" + # assert it doesn't exist + # load the parameters + job.check_script(config, parameters) + # create the script + job.create_script(config) + # finally, if we don't have scripts, check if the placeholders have been removed + with open(Path(temp_dir, f'{expid}/tmp/zzyy_A.cmd'), 'r') as file: + final_script = file.read() + self.assertFalse("%EXTENDED_HEADER%" in final_script) + self.assertFalse("%EXTENDED_TAILER%" in final_script) + @patch('autosubmitconfigparser.config.basicconfig.BasicConfig') def test_hetjob(self, mocked_global_basic_config: Mock): """