From c0944a1f5bc12e36c6cea2eb7d3d018c9ad4f857 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 2 Dec 2024 11:54:53 +0100 Subject: [PATCH 1/8] Update VERSION --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 152e4522c..b05079e54 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.11 +4.1.12 -- GitLab From 036c51c636c550cfc20c6921214e3035e18a13cc Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 2 Dec 2024 16:09:48 +0100 Subject: [PATCH 2/8] added process_jobs_to_submit fixes runtime issue --- autosubmit/job/job_packages.py | 9 ++++- autosubmit/platforms/pjmplatform.py | 8 +---- autosubmit/platforms/slurmplatform.py | 10 ++---- test/unit/test_packages.py | 51 +++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 15 deletions(-) create mode 100644 test/unit/test_packages.py diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0d1d6e597..12b52bdf5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -219,7 +219,13 @@ class JobPackageBase(object): def _do_submission(self,job_scripts=None, hold=False): """ Submit package to the platform. """ - + def process_jobs_to_submit(self, jobs_id: List[str], hold: bool = False) -> None: + for i, job in enumerate(self.jobs): + job.hold = hold + job.id = str(jobs_id[i]) + job.status = Status.SUBMITTED + if hasattr(self, "name") and self.name != "simple_package": # hasattr for retrocompatibility + job.wrapper_name = self.name class JobPackageSimple(JobPackageBase): """ @@ -230,6 +236,7 @@ class JobPackageSimple(JobPackageBase): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} self.export = jobs[0].export + self.name = "simple_package" def _create_scripts(self, configuration): for job in self.jobs: diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 7291377b7..e167a69b8 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -147,13 +147,7 @@ class PJMPlatform(ParamikoPlatform): sleep(10) for package in valid_packages_to_submit: - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - - i += 1 + package.process_jobs_to_submit(jobs_id, hold) save = True except AutosubmitError as e: raise diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index b2a8e2ffa..ad3a3fce2 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -185,12 +185,7 @@ class SlurmPlatform(ParamikoPlatform): except Exception as e: failed_packages.append(jobs_id) continue - for job in package.jobs: - job.hold = hold - job.id = str(jobs_id[i]) - job.status = Status.SUBMITTED - job.wrapper_name = package.name - + package.process_jobs_to_submit(jobs_id, hold) # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -204,7 +199,6 @@ class SlurmPlatform(ParamikoPlatform): self.send_command(self.cancel_job(id_)) # This can be faster if we cancel all jobs at once but there is no cancel_all_jobs call right now so todo in future Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled') Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted') - i += 1 if len(failed_packages) > 0: for job_id in failed_packages: platform.send_command(platform.cancel_cmd + " {0}".format(job_id)) @@ -214,6 +208,8 @@ class SlurmPlatform(ParamikoPlatform): raise except AutosubmitCritical as e: raise + except AttributeError: + raise except Exception as e: raise AutosubmitError("{0} submission failed".format(self.name), 6015, str(e)) return save,valid_packages_to_submit diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py new file mode 100644 index 000000000..1206a64dc --- /dev/null +++ b/test/unit/test_packages.py @@ -0,0 +1,51 @@ +import mock +import pytest +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.job.job import Job + + +@pytest.fixture +def create_packages(mocker, autosubmit_config): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + } + } + as_conf = autosubmit_config("a000", exp_data) + jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in jobs: + job._platform = mocker.MagicMock() + job._platform.name = "dummy" + job.platform_name = "dummy" + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(jobs), + JobPackageVertical(jobs, configuration=as_conf), + JobPackageHorizontal(jobs, configuration=as_conf), + ] + for package in packages: + if hasattr(package, 'name'): # Should be always True for fresh jobs/packages. + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_jobs_to_submit(create_packages): + packages = create_packages + jobs_id = [1, 2, 3] + for package in packages: + package.process_jobs_to_submit(jobs_id, False) + for i, job in enumerate(package.jobs): + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None -- GitLab From 866119c6ba070ab9b006fa82a45b320c4cca9e19 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 2 Dec 2024 16:36:40 +0100 Subject: [PATCH 3/8] Added few todos that requires to change critical stuff --- autosubmit/job/job_packages.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 12b52bdf5..2d1198d85 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -224,7 +224,7 @@ class JobPackageBase(object): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - if hasattr(self, "name") and self.name != "simple_package": # hasattr for retrocompatibility + if hasattr(self, "name"): # TODO change this check for a property that checks if it is a wrapper or not, the same change has to be done in other parts of the code job.wrapper_name = self.name class JobPackageSimple(JobPackageBase): @@ -236,7 +236,7 @@ class JobPackageSimple(JobPackageBase): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} self.export = jobs[0].export - self.name = "simple_package" + # self.name = "simple_package" TODO this should be possible, but it crashes accross the code. Add a property that defines what is a package with wrappers def _create_scripts(self, configuration): for job in self.jobs: -- GitLab From 16b02b7f4fc6580513f8cb0733c6f573e76bae16 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 3 Dec 2024 11:20:32 +0100 Subject: [PATCH 4/8] Fix runtime issue, recovery now shows a better output --- autosubmit/autosubmit.py | 5 ++--- autosubmit/job/job_packages.py | 4 ++-- autosubmit/platforms/pjmplatform.py | 3 ++- autosubmit/platforms/slurmplatform.py | 21 +++++++++++---------- test/unit/test_packages.py | 8 ++++---- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c520c6ea7..6f84065b7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2981,9 +2981,8 @@ class Autosubmit: "Experiment can't be recovered due being {0} active jobs in your experiment, If you want to recover the experiment, please use the flag -f and all active jobs will be cancelled".format( len(current_active_jobs)), 7000) Log.debug("Job list restored from {0} files", pkl_dir) - except BaseException as e: - raise AutosubmitCritical( - "Couldn't restore the job_list or packages, check if the filesystem is having issues", 7040, str(e)) + except Exception: + raise Log.info('Recovering experiment {0}'.format(expid)) try: for job in job_list.get_job_list(): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 2d1198d85..49dd3a5cc 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -219,10 +219,10 @@ class JobPackageBase(object): def _do_submission(self,job_scripts=None, hold=False): """ Submit package to the platform. """ - def process_jobs_to_submit(self, jobs_id: List[str], hold: bool = False) -> None: + def process_jobs_to_submit(self, job_id: str, hold: bool = False) -> None: for i, job in enumerate(self.jobs): job.hold = hold - job.id = str(jobs_id[i]) + job.id = str(job_id) job.status = Status.SUBMITTED if hasattr(self, "name"): # TODO change this check for a property that checks if it is a wrapper or not, the same change has to be done in other parts of the code job.wrapper_name = self.name diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index e167a69b8..b769cf792 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -147,7 +147,8 @@ class PJMPlatform(ParamikoPlatform): sleep(10) for package in valid_packages_to_submit: - package.process_jobs_to_submit(jobs_id, hold) + package.process_jobs_to_submit(jobs_id[i], hold) + i += 1 save = True except AutosubmitError as e: raise diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index ad3a3fce2..24dc96fd2 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -152,21 +152,21 @@ class SlurmPlatform(ParamikoPlatform): if jobs_id is None or len(jobs_id) <= 0: raise AutosubmitError( "Submission failed, this can be due a failure on the platform", 6015,"Jobs_id {0}".format(jobs_id)) - i = 0 if hold: sleep(10) - + jobid_index = 0 for package in valid_packages_to_submit: + current_package_id = str(jobs_id[jobid_index]) if hold: retries = 5 - package.jobs[0].id = str(jobs_id[i]) + package.jobs[0].id = current_package_id try: can_continue = True while can_continue and retries > 0: - cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i]) + cmd = package.jobs[0].platform.get_queue_status_cmd(current_package_id) package.jobs[0].platform.send_command(cmd) queue_status = package.jobs[0].platform._ssh_output - reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i]) + reason = package.jobs[0].platform.parse_queue_reason(queue_status, current_package_id) if reason == '(JobHeldAdmin)': can_continue = False elif reason == '(JobHeldUser)': @@ -176,16 +176,16 @@ class SlurmPlatform(ParamikoPlatform): sleep(5) retries = retries - 1 if not can_continue: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) - i = i + 1 + package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(current_package_id)) + jobid_index += 1 continue if not self.hold_job(package.jobs[0]): - i = i + 1 + jobid_index += 1 continue except Exception as e: - failed_packages.append(jobs_id) + failed_packages.append(current_package_id) continue - package.process_jobs_to_submit(jobs_id, hold) + package.process_jobs_to_submit(current_package_id, hold) # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -199,6 +199,7 @@ class SlurmPlatform(ParamikoPlatform): self.send_command(self.cancel_job(id_)) # This can be faster if we cancel all jobs at once but there is no cancel_all_jobs call right now so todo in future Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled') Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted') + jobid_index += 1 if len(failed_packages) > 0: for job_id in failed_packages: platform.send_command(platform.cancel_cmd + " {0}".format(job_id)) diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py index 1206a64dc..879f47390 100644 --- a/test/unit/test_packages.py +++ b/test/unit/test_packages.py @@ -25,7 +25,7 @@ def create_packages(mocker, autosubmit_config): job._init_runtime_parameters() job.wallclock = "00:01" packages = [ - JobPackageSimple(jobs), + JobPackageSimple([jobs[0]]), JobPackageVertical(jobs, configuration=as_conf), JobPackageHorizontal(jobs, configuration=as_conf), ] @@ -39,9 +39,9 @@ def create_packages(mocker, autosubmit_config): def test_process_jobs_to_submit(create_packages): packages = create_packages jobs_id = [1, 2, 3] - for package in packages: - package.process_jobs_to_submit(jobs_id, False) - for i, job in enumerate(package.jobs): + for i, package in enumerate(packages): # Equivalent to valid_packages_to_submit but without the ghost jobs check etc. + package.process_jobs_to_submit(jobs_id[i], False) + for job in package.jobs: # All jobs inside a package must have the same id. assert job.hold is False assert job.id == str(jobs_id[i]) assert job.status == Status.SUBMITTED -- GitLab From fae08fbb053794f5bb43db52a94b38fa98380ef8 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 3 Dec 2024 17:04:23 +0100 Subject: [PATCH 5/8] Improved package test --- autosubmit/platforms/slurmplatform.py | 1 - test/unit/test_packages.py | 5 +- test/unit/test_pjm_platform_pytest.py | 88 ++++++++++++++++++++++++ test/unit/test_slurm_platform_pytest.py | 89 +++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 test/unit/test_pjm_platform_pytest.py create mode 100644 test/unit/test_slurm_platform_pytest.py diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 24dc96fd2..3a01ff186 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -98,7 +98,6 @@ class SlurmPlatform(ParamikoPlatform): :return: """ try: - valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 != True] if len(valid_packages_to_submit) > 0: duplicated_jobs_already_checked = False diff --git a/test/unit/test_packages.py b/test/unit/test_packages.py index 879f47390..8b3852511 100644 --- a/test/unit/test_packages.py +++ b/test/unit/test_packages.py @@ -30,9 +30,8 @@ def create_packages(mocker, autosubmit_config): JobPackageHorizontal(jobs, configuration=as_conf), ] for package in packages: - if hasattr(package, 'name'): # Should be always True for fresh jobs/packages. - if not isinstance(package, JobPackageSimple): - package._name = "wrapped" + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" return packages diff --git a/test/unit/test_pjm_platform_pytest.py b/test/unit/test_pjm_platform_pytest.py new file mode 100644 index 000000000..4ba25be88 --- /dev/null +++ b/test/unit/test_pjm_platform_pytest.py @@ -0,0 +1,88 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.pjmplatform import PJMPlatform + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def pjm_platform(as_conf): + platform = PJMPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, pjm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = pjm_platform + job._platform.name = pjm_platform.name + job.platform_name = pjm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, pjm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + pjm_platform.get_jobid_by_jobname = mocker.MagicMock() + pjm_platform.send_command = mocker.MagicMock() + pjm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + pjm_platform.submit_Script.return_value = jobs_id + pjm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == [] diff --git a/test/unit/test_slurm_platform_pytest.py b/test/unit/test_slurm_platform_pytest.py new file mode 100644 index 000000000..e2827d065 --- /dev/null +++ b/test/unit/test_slurm_platform_pytest.py @@ -0,0 +1,89 @@ +import pytest + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal +from autosubmit.platforms.slurmplatform import SlurmPlatform +from log.log import AutosubmitError, AutosubmitCritical + + +@pytest.fixture +def as_conf(autosubmit_config, tmpdir): + exp_data = { + "WRAPPERS": { + "WRAPPERS": { + "JOBS_IN_WRAPPER": "dummysection" + } + }, + "PLATFORMS": { + "pytest-slurm": { + "type": "slurm", + "host": "localhost", + "user": "user", + "project": "project", + "scratch_dir": "/scratch", + "QUEUE": "queue", + "ADD_PROJECT_TO_HOST": False, + "MAX_WALLCLOCK": "00:01", + "TEMP_DIR": "", + "MAX_PROCESSORS": 99999, + }, + }, + "LOCAL_ROOT_DIR": str(tmpdir), + "LOCAL_TMP_DIR": str(tmpdir), + "LOCAL_PROJ_DIR": str(tmpdir), + "LOCAL_ASLOG_DIR": str(tmpdir), + } + as_conf = autosubmit_config("dummy-expid", exp_data) + return as_conf + + +@pytest.fixture +def slurm_platform(as_conf): + platform = SlurmPlatform(expid="dummy-expid", name='pytest-slurm', config=as_conf.experiment_data) + return platform + + +@pytest.fixture +def create_packages(as_conf, slurm_platform): + simple_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0)] + vertical_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + horizontal_jobs = [Job("dummy-1", 1, Status.SUBMITTED, 0), Job("dummy-2", 2, Status.SUBMITTED, 0), Job("dummy-3", 3, Status.SUBMITTED, 0)] + for job in simple_jobs + vertical_jobs + horizontal_jobs: + job._platform = slurm_platform + job._platform.name = slurm_platform.name + job.platform_name = slurm_platform.name + job.processors = 2 + job.section = "dummysection" + job._init_runtime_parameters() + job.wallclock = "00:01" + packages = [ + JobPackageSimple(simple_jobs), + JobPackageVertical(vertical_jobs, configuration=as_conf), + JobPackageHorizontal(horizontal_jobs, configuration=as_conf), + ] + for package in packages: + if not isinstance(package, JobPackageSimple): + package._name = "wrapped" + return packages + + +def test_process_batch_ready_jobs_valid_packages_to_submit(mocker, slurm_platform, as_conf, create_packages): + valid_packages_to_submit = create_packages + failed_packages = [] + slurm_platform.get_jobid_by_jobname = mocker.MagicMock() + slurm_platform.send_command = mocker.MagicMock() + slurm_platform.submit_Script = mocker.MagicMock() + jobs_id = [1, 2, 3] + slurm_platform.submit_Script.return_value = jobs_id + slurm_platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages) + for i, package in enumerate(valid_packages_to_submit): + for job in package.jobs: + assert job.hold is False + assert job.id == str(jobs_id[i]) + assert job.status == Status.SUBMITTED + if not isinstance(package, JobPackageSimple): + assert job.wrapper_name == "wrapped" + else: + assert job.wrapper_name is None + assert failed_packages == [] -- GitLab From a1080c7c1a60ede5e8ea965114be765643de0a74 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 3 Dec 2024 17:04:55 +0100 Subject: [PATCH 6/8] Improved package test --- test/unit/test_slurm_platform_pytest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unit/test_slurm_platform_pytest.py b/test/unit/test_slurm_platform_pytest.py index e2827d065..251316a9c 100644 --- a/test/unit/test_slurm_platform_pytest.py +++ b/test/unit/test_slurm_platform_pytest.py @@ -4,7 +4,6 @@ from autosubmit.job.job import Job from autosubmit.job.job_common import Status from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal from autosubmit.platforms.slurmplatform import SlurmPlatform -from log.log import AutosubmitError, AutosubmitCritical @pytest.fixture -- GitLab From 9c6cc0c287ac98c1fc7a78cbf1157e9dd975c9e5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 10 Dec 2024 11:38:14 +0100 Subject: [PATCH 7/8] Closing file descriptors on reconnect. --- autosubmit/autosubmit.py | 5 +++-- autosubmit/platforms/paramiko_platform.py | 1 + autosubmit/platforms/platform.py | 4 ++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6f84065b7..dcd94d11b 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2384,6 +2384,7 @@ class Autosubmit: if p.log_recovery_process: p.cleanup_event.set() # Send cleanup event p.log_recovery_process.join() + Log.info(f"Log recovery process for {p.name} has finished") for job in job_list.get_completed_failed_without_logs(): # update the info gathered from the childs job_list.update_log_status(job, as_conf) job_list.save() @@ -2401,8 +2402,8 @@ class Autosubmit: Autosubmit.database_fix(expid) except Exception as e: pass - for platform in platforms_to_test: - platform.closeConnection() + for p in platforms_to_test: + p.closeConnection() if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") else: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 205a869eb..eba3f9f95 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -95,6 +95,7 @@ class ParamikoPlatform(Platform): return self._wrapper def reset(self): + self.closeConnection() self.connected = False self._ssh = None self._ssh_config = None diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 326189c7c..bbc609e7a 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,7 @@ import atexit import multiprocessing import queue # only for the exception +from os import _exit import setproctitle import locale import os @@ -882,6 +883,7 @@ class Platform(object): Log.result(f"Process {self.log_recovery_process.name} started with pid {self.log_recovery_process.pid}") # Cleanup will be automatically prompt on control + c or a normal exit atexit.register(self.send_cleanup_signal) + atexit.register(self.closeConnection) def send_cleanup_signal(self) -> None: """ @@ -992,4 +994,6 @@ class Platform(object): if self.cleanup_event.is_set(): # Check if the main process is waiting for this child to end. self.recover_job_log(identifier, jobs_pending_to_process) break + self.closeConnection() Log.info(f"{identifier} Exiting.") + _exit(0) # Exit userspace after manually closing ssh sockets, recommended for child processes, the queue() and shared signals should be in charge of the main process. -- GitLab From 32f19bbd5cae379295ac56dac9342695062d94a7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 11 Dec 2024 12:13:16 +0100 Subject: [PATCH 8/8] Fixes an issue with logs with this sequence of commands: `create + run + create + recovery + run` --- autosubmit/autosubmit.py | 10 ++---- autosubmit/job/job.py | 46 +++++++++++++++++++++++++ bin/autosubmit | 1 + test/unit/test_job_pytest.py | 66 ++++++++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dcd94d11b..fb6d8b5f9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2574,7 +2574,6 @@ class Autosubmit: raise except BaseException as e: raise - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, @@ -3017,13 +3016,8 @@ class Autosubmit: job.status = Status.COMPLETED Log.info( "CHANGED job '{0}' status to COMPLETED".format(job.name)) - # Log.status("CHANGED job '{0}' status to COMPLETED".format(job.name)) - - if not no_recover_logs: - try: - job.platform.get_logs_files(expid, job.remote_logs) - except Exception as e: - pass + job.recover_last_ready_date() + job.recover_last_log_name() elif job.status != Status.SUSPENDED: job.status = Status.WAITING job._fail_count = 0 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9b5897077..4674635ed 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -222,6 +222,7 @@ class Job(object): self.parameters = None self._tmp_path = os.path.join( BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR) + self._log_path = Path(f"{self._tmp_path}/LOG_{self.expid}") self.write_start = False self._platform = None self.check = 'true' @@ -2501,6 +2502,51 @@ class Job(object): self.local_logs = local_logs self.remote_logs = copy.deepcopy(local_logs) + def _recover_last_log_name_from_filesystem(self) -> bool: + """ + Recovers the log name for the job from the filesystem. + :return: True if the log name was already recovered, False otherwise + :rtype: bool + """ + log_name = sorted(list(self._log_path.glob(f"{self.name}*")), key=lambda x: x.stat().st_mtime) + log_name = log_name[-1] if log_name else None + if log_name: + file_timestamp = int(datetime.datetime.fromtimestamp(log_name.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if self.ready_date and file_timestamp >= int(self.ready_date): + self.local_logs = (log_name.with_suffix(".out").name, log_name.with_suffix(".err").name) + self.remote_logs = copy.deepcopy(self.local_logs) + return True + self.local_logs = (f"{self.name}.out.{self._fail_count}", f"{self.name}.err.{self._fail_count}") + self.remote_logs = copy.deepcopy(self.local_logs) + return False + + def recover_last_log_name(self): + """ + Recovers the last log name for the job + """ + if not self.updated_log: + self.updated_log = self._recover_last_log_name_from_filesystem() + # TODO: After PostgreSQL migration, implement _recover_last_log_from_db() to retrieve the last log from the database. + + def recover_last_ready_date(self) -> None: + """ + Recovers the last ready date for this job + """ + if not self.ready_date: + stat_file = Path(f"{self._tmp_path}/{self.name}_TOTAL_STATS") + if stat_file.exists(): + output_by_lines = stat_file.read_text().splitlines() + if output_by_lines: + line_info = output_by_lines[-1].split(" ") + if line_info and line_info[0].isdigit(): + self.ready_date = line_info[0] + else: + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + else: # Default to last mod time + self.ready_date = datetime.datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + Log.debug(f"Failed to recover ready date for the job {self.name}") + class WrapperJob(Job): """ diff --git a/bin/autosubmit b/bin/autosubmit index c13ce081e..6fcb569d6 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -40,6 +40,7 @@ def main(): return_value = exit_from_error(e) return return_value + if __name__ == "__main__": exit_code = main() sys.exit(exit_code) # Sys.exit ensures a proper cleanup of the program, while os._exit() does not. diff --git a/test/unit/test_job_pytest.py b/test/unit/test_job_pytest.py index 421429fbc..71e2db21e 100644 --- a/test/unit/test_job_pytest.py +++ b/test/unit/test_job_pytest.py @@ -1,7 +1,9 @@ +from datetime import datetime, timedelta import pytest from autosubmit.job.job import Job from autosubmit.platforms.psplatform import PsPlatform +from pathlib import Path @pytest.mark.parametrize('experiment_data, expected_data', [( @@ -50,3 +52,67 @@ def test_update_parameters_current_variables(autosubmit_config, experiment_data, job.update_parameters(as_conf, {}) for key, value in expected_data.items(): assert job.parameters[key] == value + + +@pytest.mark.parametrize('test_with_file, file_is_empty, last_line_empty', [ + (False, False, False), + (True, True, False), + (True, False, False), + (True, False, True) +], ids=["no file", "file is empty", "file is correct", "file last line is empty"]) +def test_recover_last_ready_date(tmpdir, test_with_file, file_is_empty, last_line_empty): + job = Job('dummy', '1', 0, 1) + job._tmp_path = Path(tmpdir) + stat_file = job._tmp_path.joinpath(f'{job.name}_TOTAL_STATS') + ready_time = datetime.now() + timedelta(minutes=5) + ready_date = int(ready_time.strftime("%Y%m%d%H%M%S")) + expected_date = None + if test_with_file: + if file_is_empty: + stat_file.touch() + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + if last_line_empty: + with stat_file.open('w') as f: + f.write(" ") + expected_date = datetime.fromtimestamp(stat_file.stat().st_mtime).strftime('%Y%m%d%H%M%S') + else: + with stat_file.open('w') as f: + f.write(f"{ready_date} {ready_date} {ready_date} COMPLETED") + expected_date = str(ready_date) + job.ready_date = None + job.recover_last_ready_date() + assert job.ready_date == expected_date + + +@pytest.mark.parametrize('test_with_logfiles, file_timestamp_greater_than_ready_date', [ + (False, False), + (True, True), + (True, False), +], ids=["no file", "log timestamp >= ready_date", "log timestamp < ready_date"]) +def test_recover_last_log_name(tmpdir, test_with_logfiles, file_timestamp_greater_than_ready_date): + job = Job('dummy', '1', 0, 1) + job._log_path = Path(tmpdir) + expected_local_logs = (f"{job.name}.out.0", f"{job.name}.err.0") + if test_with_logfiles: + if file_timestamp_greater_than_ready_date: + ready_time = datetime.now() - timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + expected_update_log = True + expected_local_logs = (log_name.with_suffix('.out').name, log_name.with_suffix('.err').name) + else: + expected_update_log = False + ready_time = datetime.now() + timedelta(minutes=5) + job.ready_date = str(ready_time.strftime("%Y%m%d%H%M%S")) + log_name = job._log_path.joinpath(f'{job.name}_{job.ready_date}') + log_name.with_suffix('.out').touch() + log_name.with_suffix('.err').touch() + else: + expected_update_log = False + + job.updated_log = False + job.recover_last_log_name() + assert job.updated_log == expected_update_log + assert job.local_logs[0] == str(expected_local_logs[0]) + assert job.local_logs[1] == str(expected_local_logs[1]) -- GitLab