diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 4a3f85cf7e292550b1766c584c0f747f7312c3dd..3447e2ba7c70a58026c78e7217fcfb614798ceb2 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2062,7 +2062,8 @@ class Autosubmit: job.platform_name = hpcarch # noinspection PyTypeChecker try: - job.platform = submitter.platforms[job.platform_name.upper()] + job.platform_name = as_conf.experiment_data["PLATFORMS"].get(job.platform_name.upper(), hpcarch) + job.platform = submitter.platforms[job.platform_name.upper()] except Exception as e: raise AutosubmitCritical( "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ba4d704079cc82ffe5560d752feab7eb759da7ef..733d2274476534bd26e72ab7af9ec9b9cab70846 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -940,7 +940,7 @@ class Job(object): if special_conditions["STATUS"] not in self.edge_info: self.edge_info[special_conditions["STATUS"]] = {} - self.edge_info[special_conditions["STATUS"]][parent.name] = (parent,special_conditions.get("FROM_STEP", 0)) + self.edge_info[special_conditions["STATUS"]][parent.name] = (parent, special_conditions.get("FROM_STEP", 0), special_conditions.get("ANY_FINAL_STATUS_IS_VALID", False)) def delete_parent(self, parent): """ diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d373fb6b2861aa081ba793c610e84de3a4114bc6..8f750a74aa8ce26dc73cd909daca0cd415d036d9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -954,7 +954,6 @@ class JobList(object): :return: """ if special_conditions.get("STATUS", None): - if special_conditions.get("FROM_STEP", None): job.max_checkpoint_step = int(special_conditions.get("FROM_STEP", 0)) if int( special_conditions.get("FROM_STEP", 0)) > job.max_checkpoint_step else job.max_checkpoint_step @@ -987,6 +986,7 @@ class JobList(object): special_conditions = dict() special_conditions["STATUS"] = filters_to_apply_by_section[key].pop("STATUS", None) special_conditions["FROM_STEP"] = filters_to_apply_by_section[key].pop("FROM_STEP", None) + special_conditions["ANY_FINAL_STATUS_IS_VALID"] = filters_to_apply_by_section[key].pop("ANY_FINAL_STATUS_IS_VALID", False) for parent in list_of_parents: self.add_special_conditions(job, special_conditions, filters_to_apply_by_section[key], parent) @@ -1213,6 +1213,7 @@ class JobList(object): def get_filters_to_apply(self, job, dependency): filters_to_apply = self._filter_current_job(job, copy.deepcopy(dependency.relationships)) filters_to_apply.pop("STATUS", None) + filters_to_apply.pop("ANY_FINAL_STATUS_IS_VALID", None) # Don't do perform special filter if only "FROM_STEP" is applied if "FROM_STEP" in filters_to_apply: if filters_to_apply.get("CHUNKS_TO","none") == "none" and filters_to_apply.get("MEMBERS_TO","none") == "none" and filters_to_apply.get("DATES_TO","none") == "none" and filters_to_apply.get("SPLITS_TO","none") == "none": @@ -2628,18 +2629,18 @@ class JobList(object): :returns: jobs_to_check - Jobs that fulfill the special conditions. """ jobs_to_check = [] - jobs_to_skip = [] for target_status, sorted_job_list in self.jobs_edges.items(): if target_status == "ALL": continue for job in sorted_job_list: - if job.status != Status.WAITING: + if job.status != Status.WAITING: # Job already running continue if target_status in ["RUNNING", "FAILED"]: self._check_checkpoint(job) - non_completed_parents_current, completed_parents = self._count_parents_status(job, target_status) - if (len(non_completed_parents_current) + len(completed_parents)) == len(job.parents): - if job not in jobs_to_skip: + # Now ALL parents status of the job + relations_unsatisfated, relations_satisfated = self._check_relationship_is_ready(job) + if not relations_unsatisfated: + if not [parent for parent in job.parents if parent not in relations_satisfated and parent.status != Status.COMPLETED]: jobs_to_check.append(job) return jobs_to_check @@ -2654,26 +2655,46 @@ class JobList(object): job.get_checkpoint_files() @staticmethod - def _count_parents_status(job: Job, target_status: str) -> Tuple[List[Job], List[Job]]: + def _check_relationship_is_ready(job: Job) -> Tuple[List[Job], List[Job]]: """ - Count the number of completed and non-completed parents. + Check the correctness of the relationship between a job and its parents. - :param job: The job to check. - :param target_status: The target status to compare against. + Default: For a relation to be satisfated, the parent must be in COMPLETED status. + Special: For a relation to be satisfated, the parent must be equal or superior to target status. + + :param job: The job to check. job.edge_info: contains a tuple with (parent, checkpoint_number, optional) :return: A tuple containing two lists: - - non_completed_parents_current: Non-completed parents. - - completed_parents: Completed parents. - """ - non_completed_parents_current = [] - completed_parents = [parent for parent in job.parents if parent.status == Status.COMPLETED] - for parent in job.edge_info[target_status].values(): - if target_status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: - continue - current_status = Status.VALUE_TO_KEY[parent[0].status] - if Status.LOGICAL_ORDER.index(current_status) >= Status.LOGICAL_ORDER.index(target_status): - if parent[0] not in completed_parents: - non_completed_parents_current.append(parent[0]) - return non_completed_parents_current, completed_parents + - relation_unsatisfated: Parent jobs that doesn't satisfate the relationship. + - relation_satisfated: Parent jobs that satisfate the relationship. + """ + relation_satisfated = [] + relation_unsatisfated = [] + for target_status_key in job.edge_info.keys(): + target_status = Status.KEY_TO_VALUE[target_status_key] + for parent_edge_info in job.edge_info[target_status_key].values(): + if target_status in [Status.FAILED, Status.RUNNING] and parent_edge_info[1] and int(parent_edge_info[1]) >= job.current_checkpoint_step: + continue + # If optional is True, the parent is not required to be completed, other final static status are allowed + if parent_edge_info[2]: # Optional == Any final status is allowed + if parent_edge_info[0].status in [Status.COMPLETED, Status.FAILED, Status.SKIPPED, Status.COMPLETED, target_status]: + relation_satisfated.append(parent_edge_info[0]) + else: + relation_unsatisfated.append(parent_edge_info[0]) + else: # not Optional + parent_status_key = Status.VALUE_TO_KEY[parent_edge_info[0].status] + if parent_edge_info[0].status in [Status.FAILED, Status.COMPLETED, Status.SKIPPED, Status.UNKNOWN]: + if (parent_edge_info[0].status == Status.SKIPPED and target_status in [Status.SKIPPED, Status.COMPLETED]) or parent_edge_info[0].status == target_status: + relation_satisfated.append(parent_edge_info[0]) + else: + relation_unsatisfated.append(parent_edge_info[0]) + elif Status.LOGICAL_ORDER.index(parent_status_key) >= Status.LOGICAL_ORDER.index(target_status_key): + relation_satisfated.append(parent_edge_info[0]) + elif Status.LOGICAL_ORDER.index(parent_status_key) < Status.LOGICAL_ORDER.index(target_status_key): + relation_unsatisfated.append(parent_edge_info[0]) + else: + relation_unsatisfated.append(parent_edge_info[0]) + + return relation_unsatisfated, relation_satisfated def update_log_status(self, job, as_conf): """ @@ -2831,12 +2852,7 @@ class JobList(object): for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] - tmp2 = [parent for parent in job.parents if - parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] - tmp3 = [parent for parent in job.parents if - parent.status == Status.SKIPPED or parent.status == Status.FAILED] - failed_ones = [parent for parent in job.parents if parent.status == Status.FAILED] - if job.parents is None or len(tmp) == len(job.parents): + if job.parents is None or (len(tmp) == len(job.parents) and not [parent for parent in job.parents if parent.name in job.edge_info.get("FAILED", {})]): # TODO pytest but this function needs a refactor before, 245 lines job.status = Status.READY job.packed = False job.hold = False @@ -2844,39 +2860,6 @@ class JobList(object): "Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies() == "true": all_parents_completed.append(job.name) - if job.status != Status.READY: - if len(tmp3) != len(job.parents): - if len(tmp2) == len(job.parents): - strong_dependencies_failure = False - weak_dependencies_failure = False - for parent in failed_ones: - if parent.name in job.edge_info and job.edge_info[parent.name].get('optional', False): - weak_dependencies_failure = True - elif parent.section in job.dependencies: - if parent.status not in [Status.COMPLETED, Status.SKIPPED]: - strong_dependencies_failure = True - break - if not strong_dependencies_failure and weak_dependencies_failure: - job.status = Status.READY - job.packed = False - job.hold = False - Log.debug( - "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format( - job.name)) - break - if as_conf.get_remote_dependencies() == "true": - all_parents_completed.append(job.name) - else: - if len(tmp3) == 1 and len(job.parents) == 1: - for parent in job.parents: - if parent.name in job.edge_info and job.edge_info[parent.name].get('optional', False): - job.status = Status.READY - job.packed = False - job.hold = False - Log.debug( - "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format( - job.name)) - break if as_conf.get_remote_dependencies() == "true": for job in self.get_prepared(): tmp = [ diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 17bffecc51f61875b8318530c28a99fdf5aca532..1872fd7d9fd3bb584c0a8998799e53e2557a23ef 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -150,6 +150,7 @@ class JobPackageBase(object): if not job.check_script(configuration, parameters, show_logs=job.check_warnings): Log.warning( f'Script {job.name} has some empty variables. An empty value has substituted these variables') + else: Log.result("Script {0} OK", job.name) # looking for directives on jobs @@ -851,4 +852,3 @@ class JobPackageHorizontalVertical(JobPackageHybrid): jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) - diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index ccdfffb020747045194c93cb37efe6abe6c7961c..c9a4c090c340b62c82ebdc4afcf9554afd42ecaa 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -42,7 +42,7 @@ from autosubmit.monitor.diagram import create_stats_report from log.log import Log, AutosubmitCritical from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from typing import Dict, List +from typing import Dict, List, Optional, Tuple GENERAL_STATS_OPTION_MAX_LENGTH = 1000 @@ -278,42 +278,30 @@ class Monitor: Log.debug('Graph definition finalized') return graph - def _check_final_status(self, job, child): - # order of self._table - # the dictionary is composed by: + def _check_final_status(self, job: Job, child: Job) -> Tuple[Optional[str], Optional[str], bool]: + """ + Check the final status of a job in relation to its child. + + :param job: The job to check. + :type job: Job + :param child: The child job to check against. + :type child: Job + :return: A tuple containing: + - color: The color associated with the status. + - label: The label associated with the status. + - dashed: Whether the relation line should be dashed or straight + """ + color = None label = None + dashed = False if len(child.edge_info) > 0: - if job.name in child.edge_info.get("FAILED",{}): - color = self._table.get(Status.FAILED,None) - label = child.edge_info["FAILED"].get(job.name,0)[1] - elif job.name in child.edge_info.get("RUNNING",{}): - color = self._table.get(Status.RUNNING,None) - label = child.edge_info["RUNNING"].get(job.name,0)[1] - elif job.name in child.edge_info.get("QUEUING",{}): - color = self._table.get(Status.QUEUING,None) - elif job.name in child.edge_info.get("HELD",{}): - color = self._table.get(Status.HELD,None) - elif job.name in child.edge_info.get("DELAYED",{}): - color = self._table.get(Status.DELAYED,None) - elif job.name in child.edge_info.get("UNKNOWN",{}): - color = self._table.get(Status.UNKNOWN,None) - elif job.name in child.edge_info.get("SUSPENDED",{}): - color = self._table.get(Status.SUSPENDED,None) - elif job.name in child.edge_info.get("SKIPPED",{}): - color = self._table.get(Status.SKIPPED,None) - elif job.name in child.edge_info.get("WAITING",{}): - color = self._table.get(Status.WAITING,None) - elif job.name in child.edge_info.get("READY",{}): - color = self._table.get(Status.READY,None) - elif job.name in child.edge_info.get("SUBMITTED",{}): - color = self._table.get(Status.SUBMITTED,None) - else: - return None, None - if label and label == 0: - label = None - return color,label - else: - return None, None + for section, data in child.edge_info.items(): + parent_info = data.get(job.name, None) + if parent_info: + color = self._table[Status.KEY_TO_VALUE[section]] + label = parent_info[1] + dashed = parent_info[2] + return color, label, dashed def _add_children(self, job, exp, node_job, groups, hide_groups): if job in self.nodes_plotted: @@ -323,17 +311,21 @@ class Monitor: for child in sorted(job.children, key=lambda k: NaturalSort(k.name)): node_child, skip = self._check_node_exists( exp, child, groups, hide_groups) - color, label = self._check_final_status(job, child) + color, label, optional = self._check_final_status(job, child) if len(node_child) == 0 and not skip: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) + if optional: + style = "dashed" + else: + style = "solid" if color: # label = None doesn't disable label, instead it sets it to nothing and complain about invalid syntax if label: - exp.add_edge(pydotplus.Edge(node_job, node_child, style="dashed", color=color, label=label)) + exp.add_edge(pydotplus.Edge(node_job, node_child, style=style, color=color, label=label)) else: - exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed",color=color)) + exp.add_edge(pydotplus.Edge(node_job, node_child, style=style, color=color)) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) else: diff --git a/setup.py b/setup.py index 230582c2093da57603b4262bbd533f62d9120c70..edd3de33d8ca95c186c246903a77b1d33151f375 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ install_requires = [ 'py3dotplus==1.1.0', 'numpy<2', 'rocrate==0.*', - 'autosubmitconfigparser==1.0.73', + 'autosubmitconfigparser==1.0.74', 'configparser', 'setproctitle', 'invoke>=2.0', diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py index 25a7032dc9bb985a66404ec958f508e709bcaf31..05c83c48bd0899ee3c260e13047e7d8fd0c3c313 100644 --- a/test/unit/test_checkpoints.py +++ b/test/unit/test_checkpoints.py @@ -75,11 +75,11 @@ def create_dummy_job_with_status(status, platform): def test_add_edge_job(setup_job_list): _, waiting_job, _ = setup_job_list - special_variables = {"STATUS": Status.VALUE_TO_KEY[Status.COMPLETED], "FROM_STEP": 0} + special_variables = {"STATUS": Status.VALUE_TO_KEY[Status.COMPLETED], "FROM_STEP": 0, "ANY_FINAL_STATUS_IS_VALID": False} for p in waiting_job.parents: waiting_job.add_edge_info(p, special_variables) for parent in waiting_job.parents: - assert waiting_job.edge_info[special_variables["STATUS"]][parent.name] == (parent, special_variables.get("FROM_STEP", 0)) + assert waiting_job.edge_info[special_variables["STATUS"]][parent.name] == (parent, special_variables.get("FROM_STEP", 0), False) def test_add_edge_info_joblist(setup_job_list): @@ -92,32 +92,78 @@ def test_add_edge_info_joblist(setup_job_list): def test_check_special_status(setup_job_list): + # Init job_list, _, jobs = setup_job_list job_list.jobs_edges = dict() job_a = jobs["completed"][0] job_b = jobs["running"][0] job_c = jobs["waiting"][0] + job_d = jobs["failed"][0] job_c.parents = set() job_c.parents.add(job_a) job_c.parents.add(job_b) + # C can start when A is completed and B is running - job_c.edge_info = {Status.VALUE_TO_KEY[Status.COMPLETED]: {job_a.name: (job_a, 0)}, Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0)}} - special_conditions = {"STATUS": Status.VALUE_TO_KEY[Status.RUNNING], "FROM_STEP": 0} + job_c.edge_info = {Status.VALUE_TO_KEY[Status.COMPLETED]: {job_a.name: (job_a, 0, False)}, Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False)}} + special_conditions = {"STATUS": Status.VALUE_TO_KEY[Status.RUNNING], "FROM_STEP": 0, "ANY_FINAL_STATUS_IS_VALID": False} + # Test: { A: COMPLETED, B: RUNNING } job_list._add_edges_map_info(job_c, special_conditions["STATUS"]) assert job_c in job_list.check_special_status() # This function should return the jobs that can start ( they will be put in Status.ready in the update_list funtion ) + # Test: { A: RUNNING, B: RUNNING }, A condition is default ( completed ) and B is running job_a.status = Status.RUNNING assert job_c not in job_list.check_special_status() + # Test: { A: RUNNING, B: RUNNING }, setting B and A condition to running - job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0), job_a.name: (job_a, 0)}} + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} assert job_c in job_list.check_special_status() + # Test: { A: COMPLETED, B: COMPLETED } # This should always work. + job_c.edge_info = {Status.VALUE_TO_KEY[Status.COMPLETED]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} job_a.status = Status.COMPLETED job_b.status = Status.COMPLETED assert job_c in job_list.check_special_status() + # Test: { A: FAILED, B: COMPLETED } + job_c.edge_info = {Status.VALUE_TO_KEY[Status.COMPLETED]: {job_b.name: (job_b, 0, False)}, Status.VALUE_TO_KEY[Status.FAILED]: {job_a.name: (job_a, 0, False)}} job_a.status = Status.FAILED job_b.status = Status.COMPLETED # This may change in #1316 assert job_c in job_list.check_special_status() + + # Test: { A: FAILED, B: COMPLETED } but job_c is already running + job_c.status = Status.RUNNING + assert job_c not in job_list.check_special_status() + + # Testing now with a job that has no special_status + job_c.parents.add(job_d) + job_c.status = Status.WAITING + # Test: { A: RUNNING, B: RUNNING }, A is RUNNING, B is RUNNING D is COMPLETED + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} + job_a.status = Status.RUNNING + job_b.status = Status.RUNNING + job_d.status = Status.COMPLETED + assert job_c in job_list.check_special_status() + + # Test: { A: RUNNING, B: RUNNING }, A is RUNNING, B is RUNNING D is FAILED + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} + job_a.status = Status.RUNNING + job_b.status = Status.RUNNING + job_d.status = Status.FAILED + assert job_c not in job_list.check_special_status() + + # Test: { A: RUNNING, B: RUNNING }, A is WAITING, B is WAITING D is COMPLETED + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} + job_a.status = Status.WAITING + job_b.status = Status.RUNNING + job_d.status = Status.COMPLETED + assert job_c not in job_list.check_special_status() + + # Test: { A: RUNNING, B: RUNNING }, A is WAITING, B is WAITING D is COMPLETED + job_c.edge_info = {Status.VALUE_TO_KEY[Status.RUNNING]: {job_b.name: (job_b, 0, False), job_a.name: (job_a, 0, False)}} + job_a.status = Status.WAITING + job_b.status = Status.WAITING + job_d.status = Status.FAILED + job_c.parents.add(job_d) + assert job_c not in job_list.check_special_status() diff --git a/test/unit/test_job_list-pytest.py b/test/unit/test_job_list-pytest.py deleted file mode 100644 index 92b06b5c83cdd66ebb186e331cd5d64727f9f9cc..0000000000000000000000000000000000000000 --- a/test/unit/test_job_list-pytest.py +++ /dev/null @@ -1,81 +0,0 @@ -import pytest - -from autosubmit.job.job_common import Status -from autosubmit.job.job_list_persistence import JobListPersistencePkl -from autosubmit.job.job_list import JobList -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from autosubmit.job.job import Job - - -@pytest.fixture -def prepare_basic_config(tmpdir): - basic_conf = BasicConfig() - BasicConfig.DB_DIR = (tmpdir / "exp_root") - BasicConfig.DB_FILE = "debug.db" - BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") - BasicConfig.LOCAL_TMP_DIR = "tmp" - BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" - BasicConfig.LOCAL_PROJ_DIR = "proj" - BasicConfig.DEFAULT_PLATFORMS_CONF = "" - BasicConfig.CUSTOM_PLATFORMS_PATH = "" - BasicConfig.DEFAULT_JOBS_CONF = "" - BasicConfig.SMTP_SERVER = "" - BasicConfig.MAIL_FROM = "" - BasicConfig.ALLOWED_HOSTS = "" - BasicConfig.DENIED_HOSTS = "" - BasicConfig.CONFIG_FILE_FOUND = False - return basic_conf - - -@pytest.fixture(scope='function') -def setup_job_list(autosubmit_config, tmpdir, mocker, prepare_basic_config): - experiment_id = 'random-id' - as_conf = autosubmit_config - as_conf.experiment_data = dict() - as_conf.experiment_data["JOBS"] = dict() - as_conf.jobs_data = as_conf.experiment_data["JOBS"] - as_conf.experiment_data["PLATFORMS"] = dict() - job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), - JobListPersistencePkl(), as_conf) - dummy_serial_platform = mocker.MagicMock() - dummy_serial_platform.name = 'serial' - dummy_platform = mocker.MagicMock() - dummy_platform.serial_platform = dummy_serial_platform - dummy_platform.name = 'dummy_platform' - - job_list._platforms = [dummy_platform] - # add some jobs to the job list - job = Job("job1", "1", Status.COMPLETED, 0) - job.section = "SECTION1" - job_list._job_list.append(job) - job = Job("job2", "2", Status.WAITING, 0) - job.section = "SECTION1" - job_list._job_list.append(job) - job = Job("job3", "3", Status.COMPLETED, 0) - job.section = "SECTION2" - job_list._job_list.append(job) - return job_list - - -@pytest.mark.parametrize( - "section_list, banned_jobs, get_only_non_completed, expected_length, expected_section", - [ - (["SECTION1"], [], False, 2, "SECTION1"), - (["SECTION2"], [], False, 1, "SECTION2"), - (["SECTION1"], [], True, 1, "SECTION1"), - (["SECTION2"], [], True, 0, "SECTION2"), - (["SECTION1"], ["job1"], True, 1, "SECTION1"), - ], - ids=[ - "all_jobs_in_section1", - "all_jobs_in_section2", - "non_completed_jobs_in_section1", - "non_completed_jobs_in_section2", - "ban_job1" - ] -) -def test_get_jobs_by_section(setup_job_list, section_list, banned_jobs, get_only_non_completed, expected_length, expected_section): - result = setup_job_list.get_jobs_by_section(section_list, banned_jobs, get_only_non_completed) - assert len(result) == expected_length - assert all(job.section == expected_section for job in result) diff --git a/test/unit/test_job_list_pytest.py b/test/unit/test_job_list_pytest.py new file mode 100644 index 0000000000000000000000000000000000000000..68e489ad062b860a1a053e1a39d2ef92cc7234d3 --- /dev/null +++ b/test/unit/test_job_list_pytest.py @@ -0,0 +1,137 @@ +import pytest + +from autosubmit.job.job_common import Status +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmit.job.job_list import JobList +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from autosubmit.job.job import Job + + +@pytest.fixture +def prepare_basic_config(tmpdir): + basic_conf = BasicConfig() + BasicConfig.DB_DIR = (tmpdir / "exp_root") + BasicConfig.DB_FILE = "debug.db" + BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") + BasicConfig.LOCAL_TMP_DIR = "tmp" + BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" + BasicConfig.LOCAL_PROJ_DIR = "proj" + BasicConfig.DEFAULT_PLATFORMS_CONF = "" + BasicConfig.CUSTOM_PLATFORMS_PATH = "" + BasicConfig.DEFAULT_JOBS_CONF = "" + BasicConfig.SMTP_SERVER = "" + BasicConfig.MAIL_FROM = "" + BasicConfig.ALLOWED_HOSTS = "" + BasicConfig.DENIED_HOSTS = "" + BasicConfig.CONFIG_FILE_FOUND = False + return basic_conf + + +@pytest.fixture(scope='function') +def setup_job_list(autosubmit_config, tmpdir, mocker, prepare_basic_config): + experiment_id = 'random-id' + as_conf = autosubmit_config + as_conf.experiment_data = dict() + as_conf.experiment_data["JOBS"] = dict() + as_conf.jobs_data = as_conf.experiment_data["JOBS"] + as_conf.experiment_data["PLATFORMS"] = dict() + job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), + JobListPersistencePkl(), as_conf) + dummy_serial_platform = mocker.MagicMock() + dummy_serial_platform.name = 'serial' + dummy_platform = mocker.MagicMock() + dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.name = 'dummy_platform' + + job_list._platforms = [dummy_platform] + # add some jobs to the job list + job = Job("job1", "1", Status.COMPLETED, 0) + job.section = "SECTION1" + job_list._job_list.append(job) + job = Job("job2", "2", Status.WAITING, 0) + job.section = "SECTION1" + job_list._job_list.append(job) + job = Job("job3", "3", Status.COMPLETED, 0) + job.section = "SECTION2" + job_list._job_list.append(job) + return job_list + + +@pytest.mark.parametrize( + "section_list, banned_jobs, get_only_non_completed, expected_length, expected_section", + [ + (["SECTION1"], [], False, 2, "SECTION1"), + (["SECTION2"], [], False, 1, "SECTION2"), + (["SECTION1"], [], True, 1, "SECTION1"), + (["SECTION2"], [], True, 0, "SECTION2"), + (["SECTION1"], ["job1"], True, 1, "SECTION1"), + ], + ids=[ + "all_jobs_in_section1", + "all_jobs_in_section2", + "non_completed_jobs_in_section1", + "non_completed_jobs_in_section2", + "ban_job1" + ] +) +def test_get_jobs_by_section(setup_job_list, section_list, banned_jobs, get_only_non_completed, expected_length, expected_section): + result = setup_job_list.get_jobs_by_section(section_list, banned_jobs, get_only_non_completed) + assert len(result) == expected_length + assert all(job.section == expected_section for job in result) + + +@pytest.mark.parametrize( + "job_status, parent_status, target_status_key, expected_unsatisfated, expected_satisfated", + [ + (Status.WAITING, Status.COMPLETED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.WAITING, Status.FAILED, "COMPLETED", ["parent_job"], ["parent_job_any_final_status_is_valid"]), + (Status.WAITING, Status.RUNNING, "RUNNING", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.WAITING, Status.RUNNING, "COMPLETED", ["parent_job", "parent_job_any_final_status_is_valid"], []), + (Status.WAITING, Status.SKIPPED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.RUNNING, Status.COMPLETED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.RUNNING, Status.FAILED, "COMPLETED", ["parent_job"], ["parent_job_any_final_status_is_valid"]), + (Status.RUNNING, Status.RUNNING, "RUNNING", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.RUNNING, Status.RUNNING, "COMPLETED", ["parent_job", "parent_job_any_final_status_is_valid"], []), + (Status.RUNNING, Status.SKIPPED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.SKIPPED, Status.COMPLETED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.SKIPPED, Status.FAILED, "COMPLETED", ["parent_job"], ["parent_job_any_final_status_is_valid"]), + (Status.SKIPPED, Status.SKIPPED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.FAILED, Status.COMPLETED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + (Status.FAILED, Status.FAILED, "COMPLETED", ["parent_job"], ["parent_job_any_final_status_is_valid"]), + (Status.FAILED, Status.SKIPPED, "COMPLETED", [], ["parent_job", "parent_job_any_final_status_is_valid"]), + ], + ids=[ + "JOB: WAITING <- Parent: COMPLETED | target: COMPLETED", + "JOB: WAITING <- Parent: FAILED | target: COMPLETED", + "JOB: WAITING <- Parent: RUNNING | target: RUNNING", + "JOB: WAITING <- Parent: RUNNING | target: COMPLETED", + "JOB: WAITING <- Parent: SKIPPED | target: COMPLETED", + "JOB: RUNNING <- Parent: COMPLETED | target: COMPLETED", + "JOB: RUNNING <- Parent: FAILED | target: COMPLETED", + "JOB: RUNNING <- Parent: RUNNING | target: RUNNING", + "JOB: RUNNING <- Parent: RUNNING | target: COMPLETED", + "JOB: RUNNING <- Parent: SKIPPED | target: COMPLETED", + "JOB: SKIPPED <- Parent: COMPLETED | target: COMPLETED", + "JOB: SKIPPED <- Parent: FAILED | target: COMPLETED", + "JOB: SKIPPED <- Parent: SKIPPED | target: COMPLETED", + "JOB: FAILED <- Parent: COMPLETED | target: COMPLETED", + "JOB: FAILED <- Parent: FAILED | target: COMPLETED", + "JOB: FAILED <- Parent: SKIPPED | target: COMPLETED", + ] +) +def test_check_relationship_is_ready(job_status, parent_status, target_status_key, expected_unsatisfated, expected_satisfated): + parent_job = Job("parent_job", "1", parent_status, 0) + parent_job_any_final_status_is_valid = Job("parent_job_any_final_status_is_valid", "2", parent_status, 0) + job = Job("job", "3", job_status, 0) + job.edge_info = { + target_status_key: { + "parent_job": (parent_job, None, False), + "parent_job_any_final_status_is_valid": (parent_job_any_final_status_is_valid, None, True) + } + } + + unsatisfated, satisfated = JobList._check_relationship_is_ready(job) + + assert [j.name for j in unsatisfated] == expected_unsatisfated + assert [j.name for j in satisfated] == expected_satisfated diff --git a/test/unit/test_monitor.py b/test/unit/test_monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..9795d561d936e10728460b6022e5f0919719e252 --- /dev/null +++ b/test/unit/test_monitor.py @@ -0,0 +1,27 @@ +from autosubmit.job.job_common import Status +from autosubmit.job.job import Job +from autosubmit.monitor.monitor import Monitor + + +def test_check_final_status(): + # Create mock jobs + parent_job = Job("parent_job", "1", Status.COMPLETED, 0) + child_job = Job("child_job", "2", Status.WAITING, 0) + + # Set up edge_info for the child job + child_job.edge_info = { + "COMPLETED": { + "parent_job": (parent_job, 0, True) + } + } + + # Create an instance of the class containing _check_final_status + monitor_instance = Monitor() # Replace with the actual class name if different + + # Call the method + color, label, dashed = monitor_instance._check_final_status(parent_job, child_job) + + # Assert the expected values + assert color == monitor_instance._table[Status.KEY_TO_VALUE["COMPLETED"]] + assert label == 0 + assert dashed is True