diff --git a/VERSION b/VERSION index 152e4522ca7dd6b968334e3822eaf386d418fd66..b05079e54f7adb3c717f79e14feab59587f76e57 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.11 +4.1.12 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c520c6ea7da3a9097fd620f99c95dea21b637afd..fb6d8b5f92743a1b486c691193ebd469af5f6850 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2384,6 +2384,7 @@ class Autosubmit: if p.log_recovery_process: p.cleanup_event.set() # Send cleanup event p.log_recovery_process.join() + Log.info(f"Log recovery process for {p.name} has finished") for job in job_list.get_completed_failed_without_logs(): # update the info gathered from the childs job_list.update_log_status(job, as_conf) job_list.save() @@ -2401,8 +2402,8 @@ class Autosubmit: Autosubmit.database_fix(expid) except Exception as e: pass - for platform in platforms_to_test: - platform.closeConnection() + for p in platforms_to_test: + p.closeConnection() if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") else: @@ -2573,7 +2574,6 @@ class Autosubmit: raise except BaseException as e: raise - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, @@ -2981,9 +2981,8 @@ class Autosubmit: "Experiment can't be recovered due being {0} active jobs in your experiment, If you want to recover the experiment, please use the flag -f and all active jobs will be cancelled".format( len(current_active_jobs)), 7000) Log.debug("Job list restored from {0} files", pkl_dir) - except BaseException as e: - raise AutosubmitCritical( - "Couldn't restore the job_list or packages, check if the filesystem is having issues", 7040, str(e)) + except Exception: + raise Log.info('Recovering experiment {0}'.format(expid)) try: for job in job_list.get_job_list(): @@ -3017,13 +3016,8 @@ class Autosubmit: job.status = Status.COMPLETED Log.info( "CHANGED job '{0}' status to COMPLETED".format(job.name)) - # Log.status("CHANGED job '{0}' status to COMPLETED".format(job.name)) - - if not no_recover_logs: - try: - job.platform.get_logs_files(expid, job.remote_logs) - except Exception as e: - pass + job.recover_last_ready_date() + job.recover_last_log_name() elif job.status != Status.SUSPENDED: job.status = Status.WAITING job._fail_count = 0 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9b58970777c433092d2bc14d64b18917afaede54..4674635edf83f8551eee64f0dba655d8754d3b69 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -222,6 +222,7 @@ class Job(object): self.parameters = None self._tmp_path = os.path.join( BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR) + self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}") self.write_start = False self._platform = None self.check = 'true' @@ -2501,6 +2502,51 @@ class Job(object): self.local_logs = local_logs self.remote_logs = copy.deepcopy(local_logs) + def _recover_last_log_name_from_filesystem(self) -> bool: + """ + Recovers the log name for the job from the filesystem. + :return: True if the log name was already recovered, False otherwise + :rtype: bool + """ + log_name = sorted(list(self._log_path.glob(f"{self.name}*")), key=lambda x: x.stat().st_mtime) + log_name = log_name[-1] if log_name else None + if log_name: + file_timestamp = int(datetime.datetime.fromtimestamp(log_name.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if self.ready_date and file_timestamp >= int(self.ready_date): + self.local_logs = (log_name.with_suffix(".out").name, log_name.with_suffix(".err").name) + self.remote_logs = copy.deepcopy(self.local_logs) + return True + self.local_logs = (f"{self.name}.out.{self._fail_count}", f"{self.name}.err.{self._fail_count}") + self.remote_logs = copy.deepcopy(self.local_logs) + return False + + def recover_last_log_name(self): + """ + Recovers the last log name for the job + """ + if not self.updated_log: + self.updated_log = self._recover_last_log_name_from_filesystem() + # TODO: After PostgreSQL migration, implement _recover_last_log_from_db() to retrieve the last log from the database. + + def recover_last_ready_date(self) -> None: + """ + Recovers the last ready date for this job + """ + if not self.ready_date: + stat_file = Path(f"{self._tmp_path}/{self.name}_TOTAL_STATS") + if stat_file.exists(): + output_by_lines = stat_file.read_text().splitlines() + if output_by_lines: + line_info = output_by_lines[-1].split(" ") + if line_info and line_info[0].isdigit(): + self.ready_date = line_info[0] + else: + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + else: # Default to last mod time + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + class WrapperJob(Job): """ diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0d1d6e59719b528ddaa60a400526ad7ead043df2..49dd3a5cce767d5235de205abb0a47bd1abb9ef5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -219,7 +219,13 @@ class JobPackageBase(object): def _do_submission(self,job_scripts=None, hold=False): """ Submit package to the platform. """ - + def process_jobs_to_submit(self, job_id: str, hold: bool = False) -> None: + for i, job in enumerate(self.jobs): + job.hold = hold + job.id = str(job_id) + job.status = Status.SUBMITTED + if hasattr(self, "name"): # TODO change this check for a property that checks if it is a wrapper or not, the same change has to be done in other parts of the code + job.wrapper_name = self.name class JobPackageSimple(JobPackageBase): """ @@ -230,6 +236,7 @@ class JobPackageSimple(JobPackageBase): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} self.export = jobs[0].export + # self.name = "simple_package" TODO this should be possible, but it crashes accross the code. Add a property that defines what is a package with wrappers def _create_scripts(self, configuration): for job in self.jobs: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 205a869ebec10d5dbffe3dfbf0434afaa6ced37e..eba3f9f953585bcc61a2f6d08f808174be1a7a9c 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -95,6 +95,7 @@ class ParamikoPlatform(Platform): return self._wrapper def reset(self): + self.closeConnection() self.connected = False self._ssh = None self._ssh_config = None diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 7291377b72a6337135698c096d4c15a6c621d4a4..b769cf7926fb1293071b90f3a1448769582cad53 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -147,12 +147,7 @@ class PJMPlatform(ParamikoPlatform): sleep(10) for package in valid_packages_to_submit: - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - + package.process_jobs_to_submit(jobs_id[i], hold) i += 1 save = True except AutosubmitError as e: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 326189c7ccecd8cac58c5c7d4f888d905fd2aa29..bbc609e7a10b1acd80cc94fc3169aa9507330388 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,7 @@ import atexit import multiprocessing import queue # only for the exception +from os import _exit import setproctitle import locale import os @@ -882,6 +883,7 @@ class Platform(object): Log.result(f"Process {self.log_recovery_process.name} started with pid {self.log_recovery_process.pid}") # Cleanup will be automatically prompt on control + c or a normal exit atexit.register(self.send_cleanup_signal) + atexit.register(self.closeConnection) def send_cleanup_signal(self) -> None: """ @@ -992,4 +994,6 @@ class Platform(object): if self.cleanup_event.is_set(): # Check if the main process is waiting for this child to end. self.recover_job_log(identifier, jobs_pending_to_process) break + self.closeConnection() Log.info(f"{identifier} Exiting.") + _exit(0) # Exit userspace after manually closing ssh sockets, recommended for child processes, the queue() and shared signals should be in charge of the main process. diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index b2a8e2ffaf8a84036ccef2e4297e73de104b0eb9..3a01ff18696a5713a9ab6b70bffc6cd950a830d2 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -98,7 +98,6 @@ class SlurmPlatform(ParamikoPlatform): :return: """ try: - valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 != True] if len(valid_packages_to_submit) > 0: duplicated_jobs_already_checked = False @@ -152,21 +151,21 @@ class SlurmPlatform(ParamikoPlatform): if jobs_id is None or len(jobs_id) <= 0: raise AutosubmitError( "Submission failed, this can be due a failure on the platform", 6015,"Jobs_id {0}".format(jobs_id)) - i = 0 if hold: sleep(10) - + jobid_index = 0 for package in valid_packages_to_submit: + current_package_id = str(jobs_id[jobid_index]) if hold: retries = 5 - package.jobs[0].id = str(jobs_id[i]) + package.jobs[0].id = current_package_id try: can_continue = True while can_continue and retries > 0: - cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i]) + cmd = package.jobs[0].platform.get_queue_status_cmd(current_package_id) package.jobs[0].platform.send_command(cmd) queue_status = package.jobs[0].platform._ssh_output - reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i]) + reason = package.jobs[0].platform.parse_queue_reason(queue_status, current_package_id) if reason == '(JobHeldAdmin)': can_continue = False elif reason == '(JobHeldUser)': @@ -176,21 +175,16 @@ class SlurmPlatform(ParamikoPlatform): sleep(5) retries = retries - 1 if not can_continue: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) - i = i + 1 + package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(current_package_id)) + jobid_index += 1 continue if not self.hold_job(package.jobs[0]): - i = i + 1 + jobid_index += 1 continue except Exception as e: - failed_packages.append(jobs_id) + failed_packages.append(current_package_id) continue - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - + package.process_jobs_to_submit(current_package_id, hold) # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -204,7 +198,7 @@ class SlurmPlatform(ParamikoPlatform): self.send_command(self.cancel_job(id_)) # This can be faster if we cancel all jobs at once but there is no cancel_all_jobs call right now so todo in future Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled') Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted') - i += 1 + jobid_index += 1 if len(failed_packages) > 0: for job_id in failed_packages: platform.send_command(platform.cancel_cmd + " {0}".format(job_id)) @@ -214,6 +208,8 @@ class SlurmPlatform(ParamikoPlatform): raise except AutosubmitCritical as e: raise + except AttributeError: + raise except Exception as e: raise AutosubmitError("{0} submission failed".format(self.name), 6015, str(e)) return save,valid_packages_to_submit diff --git a/bin/autosubmit b/bin/autosubmit index c13ce081e015d0a42da2fd8a040c8a5f9ce339c0..6fcb569d6e4fca3eb60cbd91964a512e238bc0a4 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -40,6 +40,7 @@ def main(): return_value = exit_from_error(e) return return_value + if __name__ == "__main__": exit_code = main() sys.exit(exit_code) # Sys.exit ensures a proper cleanup of the program, while os._exit() does not. diff --git a/test/unit/test_job_pytest.py b/test/unit/test_job_pytest.py index 421429fbcb2e1f35399844b6850c42ccdb294e4c..71e2db21e96794c8ed8fc9e3d1c0326d15b32414 100644 --- a/test/unit/test_job_pytest.py +++ b/test/unit/test_job_pytest.py @@ -1,7 +1,9 @@ +from datetime import datetime, timedelta import pytest from autosubmit.job.job import Job from autosubmit.platforms.psplatform import PsPlatform +from pathlib import Path @pytest.mark.parametrize('experiment_data, expected_data', [( @@ -50,3 +52,67 @@ def test_update_parameters_current_variables(autosubmit_config, experiment_data, job.update_parameters(as_conf, {}) for key, value in expected_data.items(): assert job.parameters[key] == value + + +@pytest.mark.parametrize('test_with_file, file_is_empty, last_line_empty', [ + (False, False, False), + (True, True, False), + (True, False, False), + (True, False, True) +], ids=["no file", "file is empty", "file is correct", "file last line is empty"]) +def test_recover_last_ready_date(tmpdir, test_with_file, file_is_empty, last_line_empty): + job = Job('dummy', '1', 0, 1) + job._tmp_path = Path(tmpdir) + stat_file = job._tmp_path.joinpath(f'{job.name}_TOTAL_STATS') + ready_time = datetime.now() + timedelta(minutes=5) + ready_date = int(ready_time.strftime("%Y%m%d%H%M%S")) + expected_date = None + if test_with_file: + if file_is_empty: + stat_file.touch() + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + if last_line_empty: + with stat_file.open('w') as f: + f.write(" ") + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + with stat_file.open('w') as f: + f.write(f"{ready_date} {ready_date} {ready_date} COMPLETED") + expected_date = str(ready_date) + job.ready_date = None + job.recover_last_ready_date() + assert job.ready_date == expected_date + + +@pytest.mark.parametrize('test_with_logfiles, file_timestamp_greater_than_ready_date', [ + (False, False), + (True, True), + (True, False), +], ids=["no file", "log timestamp >= ready_date", "log timestamp < ready_date"]) +def test_recover_last_log_name(tmpdir, test_with_logfiles, file_timestamp_greater_than_ready_date): + job = Job('dummy', '1', 0, 1) + job._log_path = Path(tmpdir) + expected_local_logs = (f"{job.name}.out.0", f"{job.name}.err.0") + if test_with_logfiles: + if file_timestamp_greater_than_ready_date: + ready_time = datetime.now() - timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + expected_update_log = True + expected_local_logs = (log_name.with_suffix('.out').name, log_name.with_suffix('.err').name) + else: + expected_update_log = False + ready_time = datetime.now() + timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + log_name.with_suffix('.out').touch() + log_name.with_suffix('.err').touch() + else: + expected_update_log = False + + job.updated_log = False + job.recover_last_log_name() + assert job.updated_log == expected_update_log + assert job.local_logs[0] == str(expected_local_logs[0]) + assert job.local_logs[1] == str(expected_local_logs[1]) diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py new file mode 100644 index 0000000000000000000000000000000000000000..8b38525118fb74aa31b2d1b1b5427d4ef5092dc2 --- /dev/null +++ b/test/unit/test_packages.py @@ -0,0 +1,50 @@ +import mock +import pytest +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.job.job import Job + + +@pytest.fixture +def create_packages(mocker, autosubmit_config): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + } + } + as_conf = autosubmit_config("a000", exp_data) + jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in jobs: + job._platform = mocker.MagicMock() + job._platform.name = "dummy" + job.platform_name = "dummy" + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple([jobs[0]]), + JobPackageVertical(jobs, configuration=as_conf), + JobPackageHorizontal(jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_jobs_to_submit(create_packages): + packages = create_packages + jobs_id = [1, 2, 3] + for i, package in enumerate(packages): # Equivalent to valid_packages_to_submit but without the ghost jobs check etc. + package.process_jobs_to_submit(jobs_id[i], False) + for job in package.jobs: # All jobs inside a package must have the same id. + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None diff --git a/test/unit/test_pjm_platform_pytest.py b/test/unit/test_pjm_platform_pytest.py new file mode 100644 index 0000000000000000000000000000000000000000..4ba25be88b12c7db80c13ce8afc450e4338505cf --- /dev/null +++ b/test/unit/test_pjm_platform_pytest.py @@ -0,0 +1,88 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.pjmplatform import PJMPlatform + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def pjm_platform(as_conf): + platform = PJMPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, pjm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = pjm_platform + job._platform.name = pjm_platform.name + job.platform_name = pjm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, pjm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + pjm_platform.get_jobid_by_jobname = mocker.MagicMock() + pjm_platform.send_command = mocker.MagicMock() + pjm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + pjm_platform.submit_Script.return_value = jobs_id + pjm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == [] diff --git a/test/unit/test_slurm_platform_pytest.py b/test/unit/test_slurm_platform_pytest.py new file mode 100644 index 0000000000000000000000000000000000000000..251316a9ce1a5679bb50e490731f617f0708368c --- /dev/null +++ b/test/unit/test_slurm_platform_pytest.py @@ -0,0 +1,88 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.slurmplatform import SlurmPlatform + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def slurm_platform(as_conf): + platform = SlurmPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, slurm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = slurm_platform + job._platform.name = slurm_platform.name + job.platform_name = slurm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, slurm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + slurm_platform.get_jobid_by_jobname = mocker.MagicMock() + slurm_platform.send_command = mocker.MagicMock() + slurm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + slurm_platform.submit_Script.return_value = jobs_id + slurm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == []