diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 247eb302b25f0e7b1fef113dd4adc1ccb3fe4855..f3de7d3fb451ba8d74bee5d143ac1915d3937dcf 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2319,19 +2319,34 @@ class JobList(object): if job.name == name: return job - def get_jobs_by_section(self, section_list): + def get_jobs_by_section(self, section_list: list, banned_jobs: list = None, + get_only_non_completed: bool = False) -> list: """ - Returns the job that its name matches parameter section - :parameter section_list: list of sections to look for - :type section_list: list - :return: found job - :rtype: job + Get jobs by section. + + This method filters jobs based on the provided section list and banned jobs list. + It can also filter out completed jobs if specified. + + Parameters: + section_list (list): List of sections to filter jobs by. + banned_jobs (list, optional): List of jobs names to exclude from the result. Defaults to an empty list. + get_only_non_completed (bool, optional): If True, only non-completed jobs are included. Defaults to False. + + Returns: + list: List of jobs that match the criteria. """ - jobs_by_section = list() + if banned_jobs is None: + banned_jobs = [] + + jobs = [] for job in self._job_list: - if job.section in section_list: - jobs_by_section.append(job) - return jobs_by_section + if job.section.upper() in section_list and job.name not in banned_jobs: + if get_only_non_completed: + if job.status != Status.COMPLETED: + jobs.append(job) + else: + jobs.append(job) + return jobs def get_in_queue_grouped_id(self, platform): # type: (object) -> Dict[int, List[Job]] diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 16e78bb87fde4afab430663df3784186b3555bf8..50ffc81c1f56f80f2645f48d23c1fa4c3f41d948 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -136,20 +136,20 @@ class JobPackager(object): wrapper_limits = {'min': 1, 'max': 9999999, 'min_v': 1, 'max_v': 9999999, 'min_h': 1, 'max_h': 9999999, 'max_by_section': dict()} # Calculate the min and max based in the wrapper_section wrappers: min_wrapped:2, max_wrapped: 2 { wrapper_section: {min_wrapped: 6, max_wrapped: 6} } - wrapper_data = self._as_config.experiment_data.get("WRAPPERS",{}) - current_wrapper_data = wrapper_data.get(self.current_wrapper_section,{}) + wrapper_data = self._as_config.experiment_data.get("WRAPPERS", {}) + current_wrapper_data = wrapper_data.get(self.current_wrapper_section, {}) if len(self._jobs_list.jobs_to_run_first) == 0: - wrapper_limits['min'] = int(current_wrapper_data.get("MIN_WRAPPED", wrapper_data.get("MIN_WRAPPED", 2))) - wrapper_limits['max'] = int(current_wrapper_data.get("MAX_WRAPPED", wrapper_data.get("MAX_WRAPPED", 9999999))) - wrapper_limits['min_v'] = int(current_wrapper_data.get("MIN_WRAPPED_V", wrapper_data.get("MIN_WRAPPED_V", 1))) - wrapper_limits['max_v'] = int(current_wrapper_data.get("MAX_WRAPPED_V", wrapper_data.get("MAX_WRAPPED_V", 1))) - wrapper_limits['min_h'] = int(current_wrapper_data.get("MIN_WRAPPED_H", wrapper_data.get("MIN_WRAPPED_H", 1))) - wrapper_limits['max_h'] = int(current_wrapper_data.get("MAX_WRAPPED_H", wrapper_data.get("MAX_WRAPPED_H", 1))) + wrapper_limits['min'] = int(current_wrapper_data.get("MIN_WRAPPED", wrapper_data.get("MIN_WRAPPED", 1))) + wrapper_limits['max'] = int(current_wrapper_data.get("MAX_WRAPPED", wrapper_data.get("MAX_WRAPPED", 9999999))) + wrapper_limits['min_v'] = int(current_wrapper_data.get("MIN_WRAPPED_V", wrapper_data.get("MIN_WRAPPED_V", 1))) + wrapper_limits['max_v'] = int(current_wrapper_data.get("MAX_WRAPPED_V", wrapper_data.get("MAX_WRAPPED_V", 1))) + wrapper_limits['min_h'] = int(current_wrapper_data.get("MIN_WRAPPED_H", wrapper_data.get("MIN_WRAPPED_H", 1))) + wrapper_limits['max_h'] = int(current_wrapper_data.get("MAX_WRAPPED_H", wrapper_data.get("MAX_WRAPPED_H", 1))) # Max and min calculations if wrapper_limits['max'] < wrapper_limits['max_v'] * wrapper_limits['max_h']: wrapper_limits['max'] = wrapper_limits['max_v'] * wrapper_limits['max_h'] if wrapper_limits['min'] < wrapper_limits['min_v'] * wrapper_limits['min_h']: - wrapper_limits['min'] = max(wrapper_limits['min_v'],wrapper_limits['min_h']) + wrapper_limits['min'] = max(wrapper_limits['min_v'], wrapper_limits['min_h']) # if one dimensional wrapper or value is the default if wrapper_limits['max_v'] == 1 or current_wrapper_data.get("TYPE", "") == "vertical": wrapper_limits['max_v'] = wrapper_limits['max'] @@ -163,14 +163,13 @@ class JobPackager(object): if wrapper_limits['min_h'] == 1 and current_wrapper_data.get("TYPE", "") == "horizontal": wrapper_limits['min_h'] = wrapper_limits['min'] - # Calculate the max by section by looking at jobs_data[section].max_wrapped for section in section_list: - wrapper_limits['max_by_section'][section] = self._as_config.jobs_data.get(section,{}).get("MAX_WRAPPED",wrapper_limits['max']) + wrapper_limits['max_by_section'][section] = self._as_config.jobs_data.get(section,{}).get("MAX_WRAPPED", wrapper_limits['max']) + wrapper_limits['real_min'] = max(2, wrapper_limits['min']) return wrapper_limits - def check_jobs_to_run_first(self, package): """ Check if the jobs to run first are in the package @@ -193,7 +192,7 @@ class JobPackager(object): package.jobs_lists[seq].remove(job) return package, run_first - def check_real_package_wrapper_limits(self,package): + def check_real_package_wrapper_limits(self, package): balanced = True if self.wrapper_type[self.current_wrapper_section] == 'vertical-horizontal': i = 0 @@ -226,7 +225,7 @@ class JobPackager(object): min_h = len(package.jobs) return min_v, min_h, balanced - def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits, any_simple_packages = False): + def check_packages_respect_wrapper_policy(self, built_packages_tmp, packages_to_submit, max_jobs_to_submit, wrapper_limits, any_simple_packages = False): """ Check if the packages respect the wrapper policy and act in base of it ( submit wrapper, submit sequential, wait for more jobs to form a wrapper) :param built_packages_tmp: List of packages to be submitted @@ -266,69 +265,176 @@ class JobPackager(object): break min_v, min_h, balanced = self.check_real_package_wrapper_limits(p) # if the quantity is enough, make the wrapper - if len(p.jobs) >= wrapper_limits["min"] and min_v >= wrapper_limits["min_v"] and min_h >= wrapper_limits["min_h"] and not failed_innerjobs: + if len(p.jobs) >= wrapper_limits["real_min"] and min_v >= wrapper_limits["min_v"] and min_h >= wrapper_limits["min_h"] and not failed_innerjobs: for job in p.jobs: job.packed = True packages_to_submit.append(p) max_jobs_to_submit = max_jobs_to_submit - 1 else: + for job in p.jobs: + job.packed = False not_wrappeable_package_info.append([p, min_v, min_h, balanced]) + # It is a deadlock when: # 1. There are no more non-wrapped jobs in ready status # 2. And there are no more jobs in the queue ( submitted, queuing, running, held ) - # 3. And all current packages are not wrappable. - if not any_simple_packages and len(self._jobs_list.get_in_queue()) == 0 and len(not_wrappeable_package_info) == len(built_packages_tmp): - for p, min_v, min_h, balanced in not_wrappeable_package_info: + # 3. And all current packages are not wrappable but not if there are no more jobs to wrap. + if self.is_deadlock(any_simple_packages, not_wrappeable_package_info, built_packages_tmp): + max_jobs_to_submit = self.process_not_wrappeable_packages(not_wrappeable_package_info, packages_to_submit, + max_jobs_to_submit, wrapper_limits) + return packages_to_submit, max_jobs_to_submit + + def is_deadlock(self, any_simple_packages: bool, not_wrappeable_package_info: list, built_packages_tmp: list) -> bool: + """ + Check if the current state is a deadlock. + + :param any_simple_packages: Flag indicating if there are any simple packages. + :param not_wrappeable_package_info: List of not wrappable package information. + :param built_packages_tmp: List of built packages. + :return: True if it is a deadlock, False otherwise. + """ + return ( + not any_simple_packages + and len(self._jobs_list.get_in_queue()) == 0 + and len(not_wrappeable_package_info) == len(built_packages_tmp) + ) + + def submit_remaining_jobs(self, p: JobPackageBase, packages_to_submit: list, max_jobs_to_submit: int) -> int: + """ + Submit the remaining jobs because there are not enough jobs of this section remaining to form a wrapper. + + :param p: The package to be submitted. + :param packages_to_submit: List of packages to be submitted. + :param max_jobs_to_submit: Maximum number of jobs to submit. + :return: Updated maximum number of jobs to submit. + """ + Log.warning("There are no more jobs of this section to form a wrapper, submitting the remaining jobs") + if len(p.jobs) == 1: + p.jobs[0].packed = False + packages_to_submit.append(JobPackageSimple([p.jobs[0]])) + else: + packages_to_submit.append(p) + return max_jobs_to_submit - 1 + + def handle_strict_policy(self, p: JobPackageBase, err_message: str) -> None: + """ + Handle the strict policy case by filling self.wrappers_with_error with the error message. + :param p: The package to be processed. + :param err_message: The error message to be raised. + """ + job_names = ','.join([job.name for job in p.jobs]) + self.wrappers_with_error[job_names] = err_message + + def handle_mixed_policy(self, p: JobPackageBase, packages_to_submit: list, max_jobs_to_submit: int, err_message) -> int: + """ + Handle the mixed policy case by submitting failed jobs sequentially or/and filling self.wrappers_with_error with the error message. + + :param p: The package to be processed. + :param packages_to_submit: List of packages to be submitted. + :param max_jobs_to_submit: Maximum number of jobs to submit. + :return: Updated maximum number of jobs to submit. + """ + error = True + for job in p.jobs: + if max_jobs_to_submit == 0: + break + if job.fail_count > 0 and job.status == Status.READY: + job.packed = False + Log.printlog("Wrapper policy is set to mixed, there is a failed job that will be sent sequential") + error = False + package = JobPackageSimpleWrapped( + [job]) if job.type == Type.PYTHON and not self._platform.allow_python_jobs else JobPackageSimple( + [job]) + packages_to_submit.append(package) + max_jobs_to_submit -= 1 + if error: + job_names = ','.join([job.name for job in p.jobs]) + self.wrappers_with_error[job_names] = err_message + return max_jobs_to_submit + + def handle_flexible_policy(self, p: JobPackageBase, packages_to_submit: list, max_jobs_to_submit: int, err_message: str) -> int: + """ + Handle the flexible policy case by submitting jobs sequentially. + + :param p: The package to be processed. + :param packages_to_submit: List of packages to be submitted. + :param max_jobs_to_submit: Maximum number of jobs to submit. + :param err_message: The error message to be raised. + :return: Updated maximum number of jobs to submit. + """ + Log.warning(err_message) + Log.warning( + "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") + for job in p.jobs: + if max_jobs_to_submit == 0: + break + job.packed = False + if job.status == Status.READY: + package = JobPackageSimpleWrapped( + [job]) if job.type == Type.PYTHON and not self._platform.allow_python_jobs else JobPackageSimple( + [job]) + packages_to_submit.append(package) + max_jobs_to_submit -= 1 + return max_jobs_to_submit + + def process_not_wrappeable_packages(self, not_wrappeable_package_info: list, packages_to_submit: list, + max_jobs_to_submit: int, wrapper_limits: dict): + """ + Process the not wrappable packages based on the policy. + + :param not_wrappeable_package_info: List of not wrappable package information. + :param packages_to_submit: List of packages to be submitted. + :param max_jobs_to_submit: Maximum number of jobs to submit. + :param wrapper_limits: Dictionary with wrapper limits. + :return: Updated maximum number of jobs to submit. + """ + for p, min_v, min_h, balanced in not_wrappeable_package_info: + err_message = self.error_message_policy(min_h, min_v, wrapper_limits, balanced, p.jobs) + if not self._jobs_list.get_jobs_by_section(self.jobs_in_wrapper[self.current_wrapper_section], [job.name for job in p.jobs], + True): + max_jobs_to_submit = self.submit_remaining_jobs(p, packages_to_submit, max_jobs_to_submit) + else: if self.wrapper_policy[self.current_wrapper_section] == "strict": - for job in p.jobs: - job.packed = False - raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced), 7014) + self.handle_strict_policy(p, err_message) elif self.wrapper_policy[self.current_wrapper_section] == "mixed": - error = True - for job in p.jobs: - if max_jobs_to_submit == 0: - break - if job.fail_count > 0 and job.status == Status.READY: - job.packed = False - Log.printlog( - "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") - error = False - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 - if error: - job_names = [job.name for job in p.jobs] - job_names = ','.join(job_names) - self.wrappers_with_error[job_names] = self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced) + max_jobs_to_submit = self.handle_mixed_policy(p, packages_to_submit, max_jobs_to_submit, err_message) else: - Log.info( - "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") - for job in p.jobs: - if max_jobs_to_submit == 0: - break - job.packed = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 - return packages_to_submit, max_jobs_to_submit + max_jobs_to_submit = self.handle_flexible_policy(p, packages_to_submit, max_jobs_to_submit, err_message) + if self.wrappers_with_error: + for job_names, err_message in self.wrappers_with_error.items(): + Log.error(f"Wrapped jobs with deadlock issues: [{job_names}].") + Log.error(err_message) + raise AutosubmitCritical("Critical error in wrapper policy", 7014) + return max_jobs_to_submit + + def error_message_policy(self, min_h: int, min_v: int, wrapper_limits: dict, balanced: bool, + jobs: list) -> str: + """ + Generate an error message for wrapper policy violations. + + Parameters: + min_h (int): Minimum horizontal jobs. + min_v (int): Minimum vertical jobs. + wrapper_limits (dict): Dictionary containing wrapper limits. + balanced (bool): Indicates if the packages are balanced. + jobs (list): List of jobs with issues. + + Returns: + str: Formatted error message. + """ + message = ( + f"\nWrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due to minimum limit not being reached:" + f"\n[package_min_total: {min_h * min_v} < defined: {wrapper_limits['real_min']}]" + f"\n[package_min_h: {min_h} < defined: {wrapper_limits['min_h']}]" + f"\n[package_min_v: {min_v} < defined: {wrapper_limits['min_v']}]" + f"\n[section_wallclock: {max([job.wallclock for job in jobs])}] < [platform_max_wallclock: {self._platform.max_wallclock}]" + ) - def error_message_policy(self,min_h,min_v,wrapper_limits,wallclock_sum,balanced): - message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{min_h*min_v}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " - if min_v > 1: - message += f"\nCheck your configuration: Check if current {wallclock_sum} vertical wallclock has reached the max defined on PLATFORMS definition in YAML." - else: - message += "\nCheck your configuration: Only jobs_in_wrappers are active, check your jobs_in_wrapper dependencies." if not balanced: message += "\nPackages are not well balanced! (This is not the main cause of the Critical error)" + jobs_str = ', '.join([job.name for job in jobs]) + message += f"\nJobs with issues:[{jobs_str}].\nRevise these jobs dependencies and try again." + message += "\nThis message is activated when only jobs_in_wrappers are in active(Ready+) status.\n" return message def check_if_packages_are_ready_to_build(self): @@ -463,7 +569,7 @@ class JobPackager(object): for job in [failed_job for failed_job in jobs_to_submit if failed_job.fail_count > 0]: job.packed = False jobs_to_wrap = self._divide_list_by_section(jobs_to_submit) - non_wrapped_jobs = jobs_to_wrap.pop("SIMPLE",[]) + non_wrapped_jobs = jobs_to_wrap.pop("SIMPLE", []) any_simple_packages = len(non_wrapped_jobs) > 0 # Prepare packages for wrapped jobs for wrapper_name, jobs in jobs_to_wrap.items(): @@ -471,8 +577,8 @@ class JobPackager(object): if max_jobs_to_submit == 0: break self.current_wrapper_section = wrapper_name - section = self._as_config.experiment_data.get("WRAPPERS",{}).get(self.current_wrapper_section,{}).get("JOBS_IN_WRAPPER", "") - if not self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical']: + section = self._as_config.experiment_data.get("WRAPPERS", {}).get(self.current_wrapper_section, {}).get("JOBS_IN_WRAPPER", "") + if not self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical', 'vertical-horizontal', 'horizontal-vertical']: Log.warning( "Platform {0} does not allow wrappers, submitting jobs individually".format(self._platform.name)) for job in jobs: @@ -492,15 +598,16 @@ class JobPackager(object): current_info.append(self._as_config) if self.wrapper_type[self.current_wrapper_section] == 'vertical': - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits, wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section, wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section, wrapper_info=current_info)) else: built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) - Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") - packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) + if len(built_packages_tmp) > 0: + Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") + packages_to_submit, max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp, packages_to_submit, max_jobs_to_submit, wrapper_limits, any_simple_packages) # Now, prepare the packages for non-wrapper jobs for job in non_wrapped_jobs: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 18d0dfaf5c5e7985c4953abe2d4fe94ccb23f52a..67e36792730621a6ea78480ce3a254d1389ac7ba 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -107,6 +107,7 @@ class Platform(object): self.recovery_queue = Queue() self.log_retrieval_process_active = False self.main_process_id = None + self.max_waiting_jobs = 20 @property @autosubmit_parameter(name='current_arch') diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 7b9d8e87eb2183c91e36dda61c5ed25d19a8719d..b83345b0339a5409ef397fae42b1f47fe8fd789b 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -1,4 +1,5 @@ # Fixtures available to multiple test files must be created in this file. +from contextlib import suppress import pytest from dataclasses import dataclass @@ -6,7 +7,7 @@ from pathlib import Path from ruamel.yaml import YAML from shutil import rmtree from tempfile import TemporaryDirectory -from typing import Any, Dict, Callable, List +from typing import Any, Dict, Callable, List, Protocol, Optional from autosubmit.autosubmit import Autosubmit from autosubmit.platforms.slurmplatform import SlurmPlatform, ParamikoPlatform @@ -91,7 +92,7 @@ def autosubmit() -> Autosubmit: @pytest.fixture(scope='function') -def create_as_conf() -> Callable: +def create_as_conf() -> Callable: # May need to be changed to use the autosubmit_config one def _create_as_conf(autosubmit_exp: AutosubmitExperiment, yaml_files: List[Path], experiment_data: Dict[str, Any]): conf_dir = autosubmit_exp.exp_path / 'conf' conf_dir.mkdir(parents=False, exist_ok=False) @@ -115,3 +116,65 @@ def create_as_conf() -> Callable: return as_conf return _create_as_conf + + +class AutosubmitConfigFactory(Protocol): # Copied from the autosubmit config parser, that I believe is a revised one from the create_as_conf + + def __call__(self, expid: str, experiment_data: Optional[Dict], *args: Any, **kwargs: Any) -> AutosubmitConfig: ... + + +@pytest.fixture(scope="function") +def autosubmit_config( + request: pytest.FixtureRequest, + mocker: "pytest_mock.MockerFixture") -> AutosubmitConfigFactory: + """Return a factory for ``AutosubmitConfig`` objects. + + Abstracts the necessary mocking in ``AutosubmitConfig`` and related objects, + so that if we need to modify these, they can all be done in a single place. + + It is able to create any configuration, based on the ``request`` parameters. + + When the function (see ``scope``) finishes, the object and paths created are + cleaned (see ``finalizer`` below). + """ + + original_root_dir = BasicConfig.LOCAL_ROOT_DIR + tmp_dir = TemporaryDirectory() + tmp_path = Path(tmp_dir.name) + + # Mock this as otherwise BasicConfig.read resets our other mocked values above. + mocker.patch.object(BasicConfig, "read", autospec=True) + + def _create_autosubmit_config(expid: str, experiment_data: Dict = None, *_, **kwargs) -> AutosubmitConfig: + """Create an instance of ``AutosubmitConfig``.""" + root_dir = tmp_path + BasicConfig.LOCAL_ROOT_DIR = str(root_dir) + exp_path = root_dir / expid + exp_tmp_dir = exp_path / BasicConfig.LOCAL_TMP_DIR + aslogs_dir = exp_tmp_dir / BasicConfig.LOCAL_ASLOG_DIR + conf_dir = exp_path / "conf" + aslogs_dir.mkdir(parents=True) + conf_dir.mkdir() + + if not expid: + raise ValueError("No value provided for expid") + config = AutosubmitConfig( + expid=expid, + basic_config=BasicConfig + ) + if experiment_data is not None: + config.experiment_data = experiment_data + + for arg, value in kwargs.items(): + setattr(config, arg, value) + + return config + + def finalizer() -> None: + BasicConfig.LOCAL_ROOT_DIR = original_root_dir + with suppress(FileNotFoundError): + rmtree(tmp_path) + + request.addfinalizer(finalizer) + + return _create_autosubmit_config \ No newline at end of file diff --git a/test/unit/resources/exp_recipes/wrappers_test.yml b/test/unit/resources/exp_recipes/wrappers_test.yml new file mode 100644 index 0000000000000000000000000000000000000000..32a96b5f5ea5a52292ca5718857fb116d7785ac1 --- /dev/null +++ b/test/unit/resources/exp_recipes/wrappers_test.yml @@ -0,0 +1,117 @@ +JOBS: + SIM: + DEPENDENCIES: + SIM-1: {} + DQC_BASIC-3: {} + SCRIPT: | + echo "Running" + WALLCLOCK: 04:00 + RUNNING: chunk + CHECK: on_submission + DQC_BASIC: + SCRIPT: | + echo "Running" + WALLCLOCK: 00:15 + NODES: 1 + PROCESSORS: 1 + TASKS: 1 + THREADS: 16 + RUNNING: chunk + CHECK: on_submission + DEPENDENCIES: + SIM: {} + +wrappers: + wrapper_sim: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM" + MAX_WRAPPED: 2 + POLICY: "strict" + +EXPERIMENT: + # Number of chunks of the experiment. + NUMCHUNKS: '3' + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard +PROJECT: + # Type of the project. + PROJECT_TYPE: none + # Folder to hold the project sources. + PROJECT_DESTINATION: '' +GIT: + PROJECT_ORIGIN: '' + PROJECT_BRANCH: '' + PROJECT_COMMIT: '' + PROJECT_SUBMODULES: '' + FETCH_SINGLE_BRANCH: true +SVN: + PROJECT_URL: '' + PROJECT_REVISION: '' +LOCAL: + PROJECT_PATH: '' +PROJECT_FILES: + FILE_PROJECT_CONF: '' + FILE_JOBS_CONF: '' + JOB_SCRIPTS_TYPE: '' +RERUN: + RERUN: false + RERUN_JOBLIST: '' + + +CONFIG: + # Current version of Autosubmit. + AUTOSUBMIT_VERSION: "4.1.10" + # Maximum number of jobs permitted in the waiting status. + MAXWAITINGJOBS: 20 + # Total number of jobs in the workflow. + TOTALJOBS: 20 + SAFETYSLEEPTIME: 10 + RETRIALS: 0 + + +PLATFORMS: + MARENOSTRUM5: + TYPE: slurm + HOST: mn5-gpp + PROJECT: ehpc01 + HPC_PROJECT_DIR: /gpfs/projects/ehpc01 + USER: blabla + QUEUE: gp_debug + SCRATCH_DIR: /gpfs/scratch + PROJECT_SCRATCH: /gpfs/scratch/ehpc01 + CONTAINER_DIR: /gpfs/projects/ehpc01/containers + FDB_DIR: /gpfs/scratch/ehpc01/experiments + FDB_PROD: /gpfs/projects/ehpc01/dte/fdb + ADD_PROJECT_TO_HOST: false + TEMP_DIR: '' + PROCESSORS_PER_NODE: 112 + APP_PARTITION: gp_bsces + EXCLUSIVE: 'True' + CUSTOM_DIRECTIVES: "['#SBATCH -q gp_ehpc']" + HPCARCH_SHORT: MN5 + MAX_WALLCLOCK: 00:01 + PARTITION: ehpc01 + NODES: 284 + TASKS: 8 + THREADS: 14 + +MAIL: + NOTIFICATIONS: False + TO: +STORAGE: + TYPE: pkl + COPY_REMOTE_LOGS: true +DEFAULT: + # Job experiment ID. + EXPID: "a00h" + # Default HPC platform name. + HPCARCH: "marenostrum5" \ No newline at end of file diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py index ed0e683778d8c5a59e7262d9cfb072b2647df257..25a7032dc9bb985a66404ec958f508e709bcaf31 100644 --- a/test/unit/test_checkpoints.py +++ b/test/unit/test_checkpoints.py @@ -31,12 +31,11 @@ def prepare_basic_config(tmpdir): @pytest.fixture(scope='function') -def setup_job_list(create_as_conf, tmpdir, mocker, prepare_basic_config): +def setup_job_list(autosubmit_config, tmpdir, mocker, prepare_basic_config): experiment_id = 'random-id' - as_conf = create_as_conf + as_conf = autosubmit_config(experiment_id, {}) as_conf.experiment_data = dict() as_conf.experiment_data["JOBS"] = dict() - as_conf.jobs_data = as_conf.experiment_data["JOBS"] as_conf.experiment_data["PLATFORMS"] = dict() job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), JobListPersistenceDb(tmpdir, 'db'), as_conf) diff --git a/test/unit/test_job_list-pytest.py b/test/unit/test_job_list-pytest.py new file mode 100644 index 0000000000000000000000000000000000000000..92b06b5c83cdd66ebb186e331cd5d64727f9f9cc --- /dev/null +++ b/test/unit/test_job_list-pytest.py @@ -0,0 +1,81 @@ +import pytest + +from autosubmit.job.job_common import Status +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmit.job.job_list import JobList +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from autosubmit.job.job import Job + + +@pytest.fixture +def prepare_basic_config(tmpdir): + basic_conf = BasicConfig() + BasicConfig.DB_DIR = (tmpdir / "exp_root") + BasicConfig.DB_FILE = "debug.db" + BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") + BasicConfig.LOCAL_TMP_DIR = "tmp" + BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" + BasicConfig.LOCAL_PROJ_DIR = "proj" + BasicConfig.DEFAULT_PLATFORMS_CONF = "" + BasicConfig.CUSTOM_PLATFORMS_PATH = "" + BasicConfig.DEFAULT_JOBS_CONF = "" + BasicConfig.SMTP_SERVER = "" + BasicConfig.MAIL_FROM = "" + BasicConfig.ALLOWED_HOSTS = "" + BasicConfig.DENIED_HOSTS = "" + BasicConfig.CONFIG_FILE_FOUND = False + return basic_conf + + +@pytest.fixture(scope='function') +def setup_job_list(autosubmit_config, tmpdir, mocker, prepare_basic_config): + experiment_id = 'random-id' + as_conf = autosubmit_config + as_conf.experiment_data = dict() + as_conf.experiment_data["JOBS"] = dict() + as_conf.jobs_data = as_conf.experiment_data["JOBS"] + as_conf.experiment_data["PLATFORMS"] = dict() + job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), + JobListPersistencePkl(), as_conf) + dummy_serial_platform = mocker.MagicMock() + dummy_serial_platform.name = 'serial' + dummy_platform = mocker.MagicMock() + dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.name = 'dummy_platform' + + job_list._platforms = [dummy_platform] + # add some jobs to the job list + job = Job("job1", "1", Status.COMPLETED, 0) + job.section = "SECTION1" + job_list._job_list.append(job) + job = Job("job2", "2", Status.WAITING, 0) + job.section = "SECTION1" + job_list._job_list.append(job) + job = Job("job3", "3", Status.COMPLETED, 0) + job.section = "SECTION2" + job_list._job_list.append(job) + return job_list + + +@pytest.mark.parametrize( + "section_list, banned_jobs, get_only_non_completed, expected_length, expected_section", + [ + (["SECTION1"], [], False, 2, "SECTION1"), + (["SECTION2"], [], False, 1, "SECTION2"), + (["SECTION1"], [], True, 1, "SECTION1"), + (["SECTION2"], [], True, 0, "SECTION2"), + (["SECTION1"], ["job1"], True, 1, "SECTION1"), + ], + ids=[ + "all_jobs_in_section1", + "all_jobs_in_section2", + "non_completed_jobs_in_section1", + "non_completed_jobs_in_section2", + "ban_job1" + ] +) +def test_get_jobs_by_section(setup_job_list, section_list, banned_jobs, get_only_non_completed, expected_length, expected_section): + result = setup_job_list.get_jobs_by_section(section_list, banned_jobs, get_only_non_completed) + assert len(result) == expected_length + assert all(job.section == expected_section for job in result) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 998b65e9638b8d5945f968071c0968a726ca8a13..d389c317a03643a28890d105f77732ef34311f52 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -1645,9 +1645,10 @@ class TestWrappers(TestCase): 'max_by_section': {'S2': 9999999, 'S3': 9999999}, 'max_h': 9999999, 'max_v': 9999999, - 'min': 2, + 'min': 1, 'min_h': 1, - 'min_v': 2 + 'min_v': 1, + 'real_min': 2 } returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) @@ -1664,7 +1665,8 @@ class TestWrappers(TestCase): 'max_v': 5*4, 'min': 3, 'min_h': 2, - 'min_v': 3 + 'min_v': 3, + 'real_min': 3 } returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) @@ -1676,7 +1678,8 @@ class TestWrappers(TestCase): 'max_v': 5, 'min': 3, 'min_h': 2, - 'min_v': 3 + 'min_v': 3, + 'real_min': 3, } returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) @@ -1688,7 +1691,8 @@ class TestWrappers(TestCase): 'max_v': 5, 'min': 3, 'min_h': 2, - 'min_v': 3 + 'min_v': 3, + 'real_min': 3 } returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) @@ -1700,18 +1704,22 @@ class TestWrappers(TestCase): 'max_v': 5, 'min': 3, 'min_h': 2, - 'min_v': 3 - } + 'min_v': 3, + 'real_min': 3 + } returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED"] = 3 self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED"] = 5 self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED_H"] = 2 self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED_V"] = 3 self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED_H"] = 4 self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED_V"] = 5 + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED"] del self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED"] del self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED_H"] @@ -1721,6 +1729,18 @@ class TestWrappers(TestCase): returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + wrapper_limits = {'max': 5*4, + 'max_by_section': {'S2': 5*4, 'S3': 5*4}, + 'max_h': 4, + 'max_v': 5, + 'min': 3, + 'min_h': 2, + 'min_v': 3, + 'real_min': 3 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + def test_check_packages_respect_wrapper_policy(self): # want to test self.job_packager.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits) date_list = ["d1"] @@ -1786,7 +1806,8 @@ class TestWrappers(TestCase): 'max_v': 9999999, 'min': 2, 'min_h': 1, - 'min_v': 2 + 'min_v': 2, + 'real_min': 2 } package = [d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2] packages_h = [JobPackageHorizontal( @@ -1805,7 +1826,8 @@ class TestWrappers(TestCase): 'max_v': 2, 'min': 2, 'min_h': 2, - 'min_v': 2 + 'min_v': 2, + 'real_min': 2 } self.job_packager.jobs_in_wrapper = {self.job_packager.current_wrapper_section: {'S2': 2, 'S3': 2}} packages_to_submit = [] @@ -1818,10 +1840,8 @@ class TestWrappers(TestCase): self.job_packager.wrapper_policy["WRAPPER_V"] = "mixed" packages_to_submit = [] - self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, max_jobs_to_submit, wrapper_limits) - error = self.job_packager.wrappers_with_error['expid_d1_m1_1_s2,expid_d1_m1_1_s2,expid_d1_m1_1_s2,expid_d1_m1_1_s2,expid_d1_m1_1_s2'] - # check error not empty - assert error != "" + with self.assertRaises(log.log.AutosubmitCritical): + self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, max_jobs_to_submit, wrapper_limits) self.job_packager.wrapper_policy["WRAPPER_V"] = "strict" packages_to_submit = [] with self.assertRaises(log.log.AutosubmitCritical): diff --git a/test/unit/test_wrappers_pytest.py b/test/unit/test_wrappers_pytest.py new file mode 100644 index 0000000000000000000000000000000000000000..fd9f79c7343f2b030fa203f2945fbde3e503d8ec --- /dev/null +++ b/test/unit/test_wrappers_pytest.py @@ -0,0 +1,188 @@ +import pytest + +from autosubmit.job.job_common import Status +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmit.job.job_list import JobList +from autosubmit.platforms.slurmplatform import SlurmPlatform +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from autosubmit.job.job import Job +from autosubmit.job.job_packager import JobPackager +from autosubmit.job.job_packages import JobPackageVertical +from pathlib import Path +import copy + +from log.log import AutosubmitCritical + + +@pytest.fixture +def prepare_basic_config(tmpdir): + basic_conf = BasicConfig() + BasicConfig.DB_DIR = (tmpdir / "exp_root") + BasicConfig.DB_FILE = "debug.db" + BasicConfig.LOCAL_ROOT_DIR = (tmpdir / "exp_root") + BasicConfig.LOCAL_TMP_DIR = "tmp" + BasicConfig.LOCAL_ASLOG_DIR = "ASLOGS" + BasicConfig.LOCAL_PROJ_DIR = "proj" + BasicConfig.DEFAULT_PLATFORMS_CONF = "" + BasicConfig.CUSTOM_PLATFORMS_PATH = "" + BasicConfig.DEFAULT_JOBS_CONF = "" + BasicConfig.SMTP_SERVER = "" + BasicConfig.MAIL_FROM = "" + BasicConfig.ALLOWED_HOSTS = "" + BasicConfig.DENIED_HOSTS = "" + BasicConfig.CONFIG_FILE_FOUND = False + return basic_conf + + +@pytest.fixture(scope='function') +def setup(autosubmit_config, tmpdir, prepare_basic_config): + experiment_id = 'random-id' + as_conf = autosubmit_config(experiment_id, {}) + as_conf.experiment_data = dict() + as_conf.experiment_data["JOBS"] = dict() + as_conf.experiment_data["PLATFORMS"] = dict() + as_conf.experiment_data["LOCAL_ROOT_DIR"] = tmpdir + as_conf.experiment_data["LOCAL_TMP_DIR"] = "" + as_conf.experiment_data["LOCAL_ASLOG_DIR"] = "" + as_conf.experiment_data["LOCAL_PROJ_DIR"] = "" + as_conf.experiment_data["WRAPPERS"] = dict() + as_conf.experiment_data["WRAPPERS"]["WRAPPERS"] = dict() + as_conf.experiment_data["WRAPPERS"]["WRAPPERS"]["JOBS_IN_WRAPPER"] = "SECTION1" + as_conf.experiment_data["WRAPPERS"]["WRAPPERS"]["TYPE"] = "vertical" + Path(tmpdir / experiment_id / "tmp").mkdir(parents=True, exist_ok=True) + job_list = JobList(experiment_id, prepare_basic_config, YAMLParserFactory(), + JobListPersistencePkl(), as_conf) + + platform = SlurmPlatform(experiment_id, 'dummy-platform', as_conf.experiment_data) + + job_list._platforms = [platform] + # add some jobs to the job list + job = Job("job1", "1", Status.COMPLETED, 0) + job._init_runtime_parameters() + job.wallclock = "00:20" + job.section = "SECTION1" + job.platform = platform + job_list._job_list.append(job) + job = Job("job2", "2", Status.SUBMITTED, 0) + job._init_runtime_parameters() + job.wallclock = "00:20" + job.section = "SECTION1" + job.platform = platform + job_list._job_list.append(job) + wrapper_jobs = copy.deepcopy(job_list.get_job_list()) + for job in wrapper_jobs: + job.platform = platform + job_packager = JobPackager(as_conf, platform, job_list) + vertical_package = JobPackageVertical(wrapper_jobs, configuration=as_conf) + yield job_packager, vertical_package + + +@pytest.mark.parametrize("any_simple_packages, not_wrappeable_package_info, built_packages_tmp, expected", [ + (False, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2", "dummy-3"], False), + (True, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2", "dummy-3"], False), + (False, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2"], False), +], ids=["no_simple_packages", "simple_packages_exist", "mismatch_in_package_info"]) +def test_is_deadlock_jobs_in_queue(setup, any_simple_packages, not_wrappeable_package_info, built_packages_tmp, expected): + job_packager, _ = setup + deadlock = job_packager.is_deadlock(any_simple_packages, not_wrappeable_package_info, built_packages_tmp) + assert deadlock == expected + + +@pytest.mark.parametrize("any_simple_packages, not_wrappeable_package_info, built_packages_tmp, expected", [ + (False, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2", "dummy-3"], True), + (True, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2", "dummy-3"], False), + (False, ["dummy-1", "dummy-2", "dummy-3"], ["dummy-1", "dummy-2"], False), +], ids=["no_simple_packages", "simple_packages_exist", "mismatch_in_package_info"]) +def test_is_deadlock_no_jobs_in_queue(setup, any_simple_packages, not_wrappeable_package_info, built_packages_tmp, expected): + job_packager, _ = setup + for job in job_packager._jobs_list._job_list: + job.status = Status.COMPLETED + deadlock = job_packager.is_deadlock(any_simple_packages, not_wrappeable_package_info, built_packages_tmp) + assert deadlock == expected + + +wrapper_limits = { + "min": 1, + "min_h": 1, + "min_v": 1, + "max": 99, + "max_h": 99, + "max_v": 99, + "real_min": 2 +} + + +@pytest.mark.parametrize("not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, expected, unparsed_policy", [ + ([["_", 1, 1, True]], [], 100, 99, "strict"), + ([["_", 1, 1, False]], [], 100, 99, "mixed"), + ([["_", 1, 1, True]], [], 100, 99, "flexible"), + ([["_", 1, 1, True]], [], 100, 99, "strict_one_job"), + ([["_", 1, 1, True]], [], 100, 99, "mixed_one_job"), + ([["_", 1, 1, True]], [], 100, 99, "flexible_one_job"), +], ids=["strict_policy", "mixed_policy", "flexible_policy", "strict_one_job", "mixed_one_job", "flexible_one_job"]) +def test_process_not_wrappeable_packages_no_more_remaining_jobs(setup, not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, expected, unparsed_policy): + job_packager, vertical_package = setup + if unparsed_policy == "mixed_failed": + policy = "mixed" + elif unparsed_policy.endswith("_one_job"): + policy = unparsed_policy.split("_")[0] + job_packager._jobs_list._job_list = [job for job in job_packager._jobs_list._job_list if job.name == "job1"] + vertical_package = JobPackageVertical([vertical_package.jobs[0]], configuration=job_packager._as_config) + else: + policy = unparsed_policy + job_packager._as_config.experiment_data["WRAPPERS"]["WRAPPERS"]["POLICY"] = policy + job_packager.wrapper_policy = {'WRAPPERS': policy} + vertical_package.wrapper_policy = policy + not_wrappeable_package_info[0][0] = vertical_package + for job in vertical_package.jobs: + job.status = Status.READY + result = job_packager.process_not_wrappeable_packages(not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, wrapper_limits) + assert result == expected + + +@pytest.mark.parametrize("not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, expected, unparsed_policy ", [ + ([["_", 1, 1, True]], [], 100, 100, "strict"), + ([["_", 1, 1, False]], [], 100, 100, "mixed"), + ([["_", 1, 1, True]], [], 100, 98, "flexible"), + ([["_", 1, 1, True]], [], 100, 99, "mixed_failed"), + ([["_", 1, 1, True]], [], 100, 98, "default"), + ([["_", 1, 1, True]], [], 100, 100, "strict_one_job"), + ([["_", 1, 1, True]], [], 100, 100, "mixed_one_job"), + ([["_", 1, 1, True]], [], 100, 99, "flexible_one_job"), + +], ids=["strict_policy", "mixed_policy", "flexible_policy", "mixed_policy_failed_job", "default_policy", "strict_one_job", "mixed_one_job", "flexible_one_job"]) +def test_process_not_wrappeable_packages_more_jobs_of_that_section(setup, not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, expected, unparsed_policy, mocker): + job_packager, vertical_package = setup + if unparsed_policy == "mixed_failed": + policy = "mixed" + elif unparsed_policy.endswith("_one_job"): + policy = unparsed_policy.split("_")[0] + vertical_package = JobPackageVertical([vertical_package.jobs[0]], configuration=job_packager._as_config) + else: + policy = unparsed_policy + if "default" not in unparsed_policy: + job_packager._as_config.experiment_data["WRAPPERS"]["WRAPPERS"]["POLICY"] = policy + job_packager.wrapper_policy = {'WRAPPERS': policy} + vertical_package.wrapper_policy = policy + not_wrappeable_package_info[0][0] = vertical_package + + for job in vertical_package.jobs: + job.status = Status.READY + if unparsed_policy == "mixed_failed": + vertical_package.jobs[0].fail_count = 1 + job = Job("job3", "3", Status.WAITING, 0) + job._init_runtime_parameters() + job.wallclock = "00:20" + job.section = "SECTION1" + job.platform = job_packager._platform + job_packager._jobs_list._job_list.append(job) + if unparsed_policy in ["flexible", "mixed_failed", "flexible_one_job"]: + result = job_packager.process_not_wrappeable_packages(not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, wrapper_limits) + elif unparsed_policy in ["strict", "mixed", "strict_one_job", "mixed_one_job"]: + with pytest.raises(AutosubmitCritical): + job_packager.process_not_wrappeable_packages(not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, wrapper_limits) + result = 100 + else: + result = job_packager.process_not_wrappeable_packages(not_wrappeable_package_info, packages_to_submit, max_jobs_to_submit, wrapper_limits) + assert result == expected