diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 7040baebb1aeeeadd4705d334e4b3a50cfb6e175..247eb302b25f0e7b1fef113dd4adc1ccb3fe4855 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -24,7 +24,7 @@ import traceback from contextlib import suppress from shutil import move from threading import Thread -from typing import List, Dict +from typing import List, Dict, Tuple from pathlib import Path import math @@ -929,7 +929,7 @@ class JobList(object): filters_to_apply = relationships return filters_to_apply - def _add_edge_info(self, job, special_status): + def _add_edges_map_info(self, job, special_status): """ Special relations to be check in the update_list method :param job: Current job @@ -957,7 +957,7 @@ class JobList(object): if special_conditions.get("FROM_STEP", None): job.max_checkpoint_step = int(special_conditions.get("FROM_STEP", 0)) if int( special_conditions.get("FROM_STEP", 0)) > job.max_checkpoint_step else job.max_checkpoint_step - self._add_edge_info(job, special_conditions["STATUS"]) # job_list map + self._add_edges_map_info(job, special_conditions["STATUS"]) # job_list map job.add_edge_info(parent, special_conditions) # this job def _apply_jobs_edge_info(self, job, dependencies): @@ -2574,35 +2574,60 @@ class JobList(object): """ Check if a checkpoint step exists for this edge""" return job.get_checkpoint_files(parent.name) - def check_special_status(self): + def check_special_status(self) -> List[Job]: + """ + Check if all parents of a job have the correct status for checkpointing. + + :returns: jobs_to_check - Jobs that fulfill the special conditions. """ - Check if all parents of a job have the correct status for checkpointing - :return: jobs that fullfill the special conditions """ jobs_to_check = [] - for status, sorted_job_list in self.jobs_edges.items(): - if status == "ALL": + jobs_to_skip = [] + for target_status, sorted_job_list in self.jobs_edges.items(): + if target_status == "ALL": continue for job in sorted_job_list: if job.status != Status.WAITING: continue - if status in ["RUNNING", "FAILED"]: - # check checkpoint if any - if job.platform and job.platform.connected: # This will be true only when used under setstatus/run - job.get_checkpoint_files() - non_completed_parents_current = 0 - completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) - for parent in job.edge_info[status].values(): - if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: - continue - else: - status_str = Status.VALUE_TO_KEY[parent[0].status] - if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status): - non_completed_parents_current += 1 - if (non_completed_parents_current + completed_parents) == len(job.parents): - jobs_to_check.append(job) - + if target_status in ["RUNNING", "FAILED"]: + self._check_checkpoint(job) + non_completed_parents_current, completed_parents = self._count_parents_status(job, target_status) + if (len(non_completed_parents_current) + len(completed_parents)) == len(job.parents): + if job not in jobs_to_skip: + jobs_to_check.append(job) return jobs_to_check + @staticmethod + def _check_checkpoint(job: Job) -> None: + """ + Check if a job has a checkpoint. + + :param job: The job to check. + """ + if job.platform and job.platform.connected: # This will be true only when used under setstatus/run + job.get_checkpoint_files() + + @staticmethod + def _count_parents_status(job: Job, target_status: str) -> Tuple[List[Job], List[Job]]: + """ + Count the number of completed and non-completed parents. + + :param job: The job to check. + :param target_status: The target status to compare against. + :return: A tuple containing two lists: + - non_completed_parents_current: Non-completed parents. + - completed_parents: Completed parents. + """ + non_completed_parents_current = [] + completed_parents = [parent for parent in job.parents if parent.status == Status.COMPLETED] + for parent in job.edge_info[target_status].values(): + if target_status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + continue + current_status = Status.VALUE_TO_KEY[parent[0].status] + if Status.LOGICAL_ORDER.index(current_status) >= Status.LOGICAL_ORDER.index(target_status): + if parent[0] not in completed_parents: + non_completed_parents_current.append(parent[0]) + return non_completed_parents_current, completed_parents + def update_log_status(self, job, as_conf): """ Updates the log err and log out. diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py index 35dca3350841c6a7a0d38ea7c72811893cbd569c..ed0e683778d8c5a59e7262d9cfb072b2647df257 100644 --- a/test/unit/test_checkpoints.py +++ b/test/unit/test_checkpoints.py @@ -1,9 +1,5 @@ -from unittest import TestCase - -import inspect +import pytest import shutil -import tempfile -from mock import Mock, MagicMock from random import randrange from autosubmit.job.job import Job @@ -11,160 +7,118 @@ from autosubmit.job.job_common import Status from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistenceDb from autosubmitconfigparser.config.yamlparser import YAMLParserFactory - - -class TestJobList(TestCase): - def setUp(self): - self.experiment_id = 'random-id' - self.as_conf = Mock() - self.as_conf.experiment_data = dict() - self.as_conf.experiment_data["JOBS"] = dict() - self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] - self.as_conf.experiment_data["PLATFORMS"] = dict() - self.temp_directory = tempfile.mkdtemp() - self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) - dummy_serial_platform = MagicMock() - dummy_serial_platform.name = 'serial' - dummy_platform = MagicMock() - dummy_platform.serial_platform = dummy_serial_platform - dummy_platform.name = 'dummy_platform' - # creating jobs for self list - self.completed_job = self._createDummyJobWithStatus(Status.COMPLETED) - self.completed_job.platform = dummy_platform - self.completed_job2 = self._createDummyJobWithStatus(Status.COMPLETED) - self.completed_job2.platform = dummy_platform - self.completed_job3 = self._createDummyJobWithStatus(Status.COMPLETED) - self.completed_job3.platform = dummy_platform - self.completed_job4 = self._createDummyJobWithStatus(Status.COMPLETED) - self.completed_job4.platform = dummy_platform - self.submitted_job = self._createDummyJobWithStatus(Status.SUBMITTED) - self.submitted_job.platform = dummy_platform - self.submitted_job2 = self._createDummyJobWithStatus(Status.SUBMITTED) - self.submitted_job2.platform = dummy_platform - self.submitted_job3 = self._createDummyJobWithStatus(Status.SUBMITTED) - self.submitted_job3.platform = dummy_platform - - self.running_job = self._createDummyJobWithStatus(Status.RUNNING) - self.running_job.platform = dummy_platform - self.running_job2 = self._createDummyJobWithStatus(Status.RUNNING) - self.running_job2.platform = dummy_platform - - self.queuing_job = self._createDummyJobWithStatus(Status.QUEUING) - self.queuing_job.platform = dummy_platform - - self.failed_job = self._createDummyJobWithStatus(Status.FAILED) - self.failed_job.platform = dummy_platform - self.failed_job2 = self._createDummyJobWithStatus(Status.FAILED) - self.failed_job2.platform = dummy_platform - self.failed_job3 = self._createDummyJobWithStatus(Status.FAILED) - self.failed_job3.platform = dummy_platform - self.failed_job4 = self._createDummyJobWithStatus(Status.FAILED) - self.failed_job4.platform = dummy_platform - self.ready_job = self._createDummyJobWithStatus(Status.READY) - self.ready_job.platform = dummy_platform - self.ready_job2 = self._createDummyJobWithStatus(Status.READY) - self.ready_job2.platform = dummy_platform - self.ready_job3 = self._createDummyJobWithStatus(Status.READY) - self.ready_job3.platform = dummy_platform - - self.waiting_job = self._createDummyJobWithStatus(Status.WAITING) - self.waiting_job.platform = dummy_platform - self.waiting_job2 = self._createDummyJobWithStatus(Status.WAITING) - self.waiting_job2.platform = dummy_platform - - self.unknown_job = self._createDummyJobWithStatus(Status.UNKNOWN) - self.unknown_job.platform = dummy_platform - - - self.job_list._job_list = [self.completed_job, self.completed_job2, self.completed_job3, self.completed_job4, - self.submitted_job, self.submitted_job2, self.submitted_job3, self.running_job, - self.running_job2, self.queuing_job, self.failed_job, self.failed_job2, - self.failed_job3, self.failed_job4, self.ready_job, self.ready_job2, - self.ready_job3, self.waiting_job, self.waiting_job2, self.unknown_job] - self.waiting_job.parents.add(self.ready_job) - self.waiting_job.parents.add(self.completed_job) - self.waiting_job.parents.add(self.failed_job) - self.waiting_job.parents.add(self.submitted_job) - self.waiting_job.parents.add(self.running_job) - self.waiting_job.parents.add(self.queuing_job) - - def tearDown(self) -> None: - shutil.rmtree(self.temp_directory) - - def test_add_edge_job(self): - special_variables = dict() - special_variables["STATUS"] = Status.VALUE_TO_KEY[Status.COMPLETED] - special_variables["FROM_STEP"] = 0 - for p in self.waiting_job.parents: - self.waiting_job.add_edge_info(p, special_variables) - for parent in self.waiting_job.parents: - self.assertEqual(self.waiting_job.edge_info[special_variables["STATUS"]][parent.name], - (parent, special_variables.get("FROM_STEP", 0))) - - - def test_add_edge_info_joblist(self): - special_conditions = dict() - special_conditions["STATUS"] = Status.VALUE_TO_KEY[Status.COMPLETED] - special_conditions["FROM_STEP"] = 0 - self.job_list._add_edge_info(self.waiting_job, special_conditions["STATUS"]) - self.assertEqual(len(self.job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED],[])),1) - self.job_list._add_edge_info(self.waiting_job2, special_conditions["STATUS"]) - self.assertEqual(len(self.job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED],[])),2) - - def test_check_special_status(self): - self.waiting_job.edge_info = dict() - - self.job_list.jobs_edges = dict() - # Adds edge info for waiting_job in the list - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.COMPLETED]) - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.READY]) - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.RUNNING]) - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.SUBMITTED]) - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.QUEUING]) - self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.FAILED]) - # Adds edge info for waiting_job - special_variables = dict() - for p in self.waiting_job.parents: - special_variables["STATUS"] = Status.VALUE_TO_KEY[p.status] - special_variables["FROM_STEP"] = 0 - self.waiting_job.add_edge_info(p,special_variables) - # call to special status - jobs_to_check = self.job_list.check_special_status() - for job in jobs_to_check: - tmp = [parent for parent in job.parents if - parent.status == Status.COMPLETED or parent in self.jobs_edges["ALL"]] - assert len(tmp) == len(job.parents) - self.waiting_job.add_parent(self.waiting_job2) - for job in jobs_to_check: - tmp = [parent for parent in job.parents if - parent.status == Status.COMPLETED or parent in self.jobs_edges["ALL"]] - assert len(tmp) == len(job.parents) - - - - def _createDummyJobWithStatus(self, status): - job_name = str(randrange(999999, 999999999)) - job_id = randrange(1, 999) - job = Job(job_name, job_id, status, 0) - job.type = randrange(0, 2) - return job - -class FakeBasicConfig: - def __init__(self): - pass - def props(self): - pr = {} - for name in dir(self): - value = getattr(self, name) - if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): - pr[name] = value - return pr - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' +from autosubmitconfigparser.config.basicconfig import BasicConfig + + +@pytest.fixture +def prepare_basic_config(tmpdir): + basic_conf = BasicConfig() + BasicConfig.DB_DIR = (tmpdir / "exp_root") + BasicConfig.DB_FILE = "debug.db" + BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") + BasicConfig.LOCAL_TMP_DIR = "tmp" + BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" + BasicConfig.LOCAL_PROJ_DIR = "proj" + BasicConfig.DEFAULT_PLATFORMS_CONF = "" + BasicConfig.CUSTOM_PLATFORMS_PATH = "" + BasicConfig.DEFAULT_JOBS_CONF = "" + BasicConfig.SMTP_SERVER = "" + BasicConfig.MAIL_FROM = "" + BasicConfig.ALLOWED_HOSTS = "" + BasicConfig.DENIED_HOSTS = "" + BasicConfig.CONFIG_FILE_FOUND = False + return basic_conf + + +@pytest.fixture(scope='function') +def setup_job_list(create_as_conf, tmpdir, mocker, prepare_basic_config): + experiment_id = 'random-id' + as_conf = create_as_conf + as_conf.experiment_data = dict() + as_conf.experiment_data["JOBS"] = dict() + as_conf.jobs_data = as_conf.experiment_data["JOBS"] + as_conf.experiment_data["PLATFORMS"] = dict() + job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), + JobListPersistenceDb(tmpdir, 'db'), as_conf) + dummy_serial_platform = mocker.MagicMock() + dummy_serial_platform.name = 'serial' + dummy_platform = mocker.MagicMock() + dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.name = 'dummy_platform' + + jobs = { + "completed": [create_dummy_job_with_status(Status.COMPLETED, dummy_platform) for _ in range(4)], + "submitted": [create_dummy_job_with_status(Status.SUBMITTED, dummy_platform) for _ in range(3)], + "running": [create_dummy_job_with_status(Status.RUNNING, dummy_platform) for _ in range(2)], + "queuing": [create_dummy_job_with_status(Status.QUEUING, dummy_platform)], + "failed": [create_dummy_job_with_status(Status.FAILED, dummy_platform) for _ in range(4)], + "ready": [create_dummy_job_with_status(Status.READY, dummy_platform) for _ in range(3)], + "waiting": [create_dummy_job_with_status(Status.WAITING, dummy_platform) for _ in range(2)], + "unknown": [create_dummy_job_with_status(Status.UNKNOWN, dummy_platform)] + } + + job_list._job_list = [job for job_list in jobs.values() for job in job_list] + waiting_job = jobs["waiting"][0] + waiting_job.parents.update(jobs["ready"] + jobs["completed"] + jobs["failed"] + jobs["submitted"] + jobs["running"] + jobs["queuing"]) + + yield job_list, waiting_job, jobs + shutil.rmtree(tmpdir) + + +def create_dummy_job_with_status(status, platform): + job_name = str(randrange(999999, 999999999)) + job_id = randrange(1, 999) + job = Job(job_name, job_id, status, 0) + job.type = randrange(0, 2) + job.platform = platform + return job + + +def test_add_edge_job(setup_job_list): + _, waiting_job, _ = setup_job_list + special_variables = {"STATUS": Status.VALUE_TO_KEY[Status.COMPLETED], "FROM_STEP": 0} + for p in waiting_job.parents: + waiting_job.add_edge_info(p, special_variables) + for parent in waiting_job.parents: + assert waiting_job.edge_info[special_variables["STATUS"]][parent.name] == (parent, special_variables.get("FROM_STEP", 0)) + + +def test_add_edge_info_joblist(setup_job_list): + job_list, waiting_job, jobs = setup_job_list + special_conditions = {"STATUS": Status.VALUE_TO_KEY[Status.COMPLETED], "FROM_STEP": 0} + job_list._add_edges_map_info(waiting_job, special_conditions["STATUS"]) + assert len(job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED], [])) == 1 + job_list._add_edges_map_info(jobs["waiting"][1], special_conditions["STATUS"]) + assert len(job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED], [])) == 2 + + +def test_check_special_status(setup_job_list): + job_list, _, jobs = setup_job_list + job_list.jobs_edges = dict() + job_a = jobs["completed"][0] + job_b = jobs["running"][0] + job_c = jobs["waiting"][0] + job_c.parents = set() + job_c.parents.add(job_a) + job_c.parents.add(job_b) + # C can start when A is completed and B is running + job_c.edge_info = {Status.VALUE_TO_KEY[Status.COMPLETED]: {job_a.name: (job_a, 0)}, Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0)}} + special_conditions = {"STATUS": Status.VALUE_TO_KEY[Status.RUNNING], "FROM_STEP": 0} + # Test: { A: COMPLETED, B: RUNNING } + job_list._add_edges_map_info(job_c, special_conditions["STATUS"]) + assert job_c in job_list.check_special_status() # This function should return the jobs that can start ( they will be put in Status.ready in the update_list funtion ) + # Test: { A: RUNNING, B: RUNNING }, A condition is default ( completed ) and B is running + job_a.status = Status.RUNNING + assert job_c not in job_list.check_special_status() + # Test: { A: RUNNING, B: RUNNING }, setting B and A condition to running + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0), job_a.name: (job_a, 0)}} + assert job_c in job_list.check_special_status() + # Test: { A: COMPLETED, B: COMPLETED } # This should always work. + job_a.status = Status.COMPLETED + job_b.status = Status.COMPLETED + assert job_c in job_list.check_special_status() + # Test: { A: FAILED, B: COMPLETED } + job_a.status = Status.FAILED + job_b.status = Status.COMPLETED + # This may change in #1316 + assert job_c in job_list.check_special_status()