diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 5d02f627f48494a81c639f64c77bee1064027fcb..7d77e02ad1ca8e69a36fd29ef7be1e117b2d851e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -258,7 +258,7 @@ class Job(object): # hetjobs self.het = {'HETSIZE': 0} self.parameters = dict() - self._tasks = '1' + self._tasks = '0' self._nodes = "" self.default_parameters = {'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'} @@ -1606,7 +1606,7 @@ class Job(object): self.nodes = job_data.get("NODES",platform_data.get("NODES","")) self.exclusive = job_data.get("EXCLUSIVE",platform_data.get("EXCLUSIVE",False)) self.threads = job_data.get("THREADS",platform_data.get("THREADS","1")) - self.tasks = job_data.get("TASKS",platform_data.get("TASKS","1")) + self.tasks = job_data.get("TASKS",platform_data.get("TASKS","0")) self.reservation = job_data.get("RESERVATION",as_conf.platforms_data.get(job_platform.name, {}).get("RESERVATION", "")) self.hyperthreading = job_data.get("HYPERTHREADING",platform_data.get("HYPERTHREADING","none")) self.queue = job_data.get("QUEUE",platform_data.get("QUEUE","")) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a79fd60fb26af55a058c58bc269e1b9ac8339aba..1ba86f2c3596ed3c151e583409aa5d719ea7a195 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -337,7 +337,7 @@ class JobPackager(object): Check if the packages are ready to be built :return: List of jobs ready to be built, boolean indicating if packages can't be built for other reasons ( max_total_jobs...) """ - Log.info("Calculating size limits for {0}".format(self._platform.name)) + Log.info("Calculating possible ready jobs for {0}".format(self._platform.name)) jobs_ready = list() if len(self._jobs_list.jobs_to_run_first) > 0: jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if @@ -438,9 +438,11 @@ class JobPackager(object): return [] max_jobs_to_submit = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) section_jobs_to_submit = dict() - for job in jobs_ready: - job.update_parameters(self._as_config, {}) # Ensure to have the correct processors for the wrapper building code + + for job in [job for job in jobs_ready]: if job.section not in section_jobs_to_submit: # This is to fix TOTAL_JOBS when is set at job_level # Only for non-wrapped jobs + job.update_parameters(self._as_config, + {}) # Ensure to have the correct processors for the wrapper building code if int(job.max_waiting_jobs) != int(job.platform.max_waiting_jobs): section_max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(self.waiting_jobs) else: @@ -450,13 +452,13 @@ class JobPackager(object): else: section_max_jobs_to_submit = None - if section_max_jobs_to_submit is not None or section_max_wait_jobs_to_submit is not None: - if section_max_jobs_to_submit is None: - section_max_jobs_to_submit = self._max_jobs_to_submit - if section_max_wait_jobs_to_submit is None: - section_max_wait_jobs_to_submit = self._max_wait_jobs_to_submit + if section_max_jobs_to_submit is None: + section_max_jobs_to_submit = self._max_jobs_to_submit + if section_max_wait_jobs_to_submit is None: + section_max_wait_jobs_to_submit = self._max_wait_jobs_to_submit - section_jobs_to_submit ={job.section:min(section_max_wait_jobs_to_submit,section_max_jobs_to_submit)} + section_jobs_to_submit ={job.section:min(section_max_wait_jobs_to_submit,section_max_jobs_to_submit)} + Log.result(f"Section:{job.section} can submit {section_jobs_to_submit[job.section]} jobs at this time") jobs_to_submit = sorted( jobs_ready, key=lambda k: k.priority, reverse=True) for job in [failed_job for failed_job in jobs_to_submit if failed_job.fail_count > 0]: @@ -498,7 +500,7 @@ class JobPackager(object): built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) else: built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) - + Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) # Now, prepare the packages for non-wrapper jobs @@ -720,6 +722,8 @@ class JobPackagerVertical(object): :return: List of jobs that are wrapped together. :rtype: List() of Job Object """ + self.total_wallclock = "00:00" # reset total wallclock for package + job.update_parameters(wrapper_info[-1],{}) # update_parameter has moved, so this is now needed. stack = [(job, 1)] while stack: job, level = stack.pop() @@ -907,9 +911,7 @@ class JobPackagerHorizontal(object): if horizontal_vertical: self._current_processors = 0 jobs_by_section = dict() - Log.info(f"Updating inner job parameters") for job in self.job_list: - job.update_parameters(wrapper_info[-1],{}) if job.section not in jobs_by_section: jobs_by_section[job.section] = list() jobs_by_section[job.section].append(job) @@ -920,6 +922,7 @@ class JobPackagerHorizontal(object): for job in jobs_by_section[section]: if jobs_processed % 10 == 0 and jobs_processed > 0: Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") + job.update_parameters(wrapper_info[-1], {}) if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 1 and int(job.processors) <= 1: job.processors = 0 if job.total_processors == "": @@ -939,6 +942,7 @@ class JobPackagerHorizontal(object): self._current_processors += total_processors current_package_by_section[section] += 1 else: + Log.result(f"Wrapper package creation is finished. {jobs_processed} jobs have been wrapped together.") break jobs_processed += 1 diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index f865d0de03d701dec52060bd8a7258d41b9f51a9..309881657862c47ba21ed1bfabe6a48f395d0c6c 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -45,6 +45,16 @@ def threaded(fn): thread.start() return thread return wrapper +def jobs_in_wrapper_str(as_conf, current_wrapper): + jobs_in_wrapper = as_conf.experiment_data["WRAPPERS"].get(current_wrapper, {}).get("JOBS_IN_WRAPPER", "") + if "," in jobs_in_wrapper: + jobs_in_wrapper = jobs_in_wrapper.split(",") + elif "&" in jobs_in_wrapper: + jobs_in_wrapper = jobs_in_wrapper.split("&") + else: + jobs_in_wrapper = jobs_in_wrapper.split(" ") + jobs_in_wrapper = [job.strip(" ,") for job in jobs_in_wrapper] + return "&".join(jobs_in_wrapper) class JobPackageBase(object): """ Class to manage the package of jobs to be submitted by autosubmit @@ -685,10 +695,7 @@ class JobPackageVertical(JobPackageThread): self._threads) for job in jobs: self._wallclock = sum_str_hours(self._wallclock, job.wallclock) - self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + - str(random.randint(1, 10000)), - self._num_processors, - len(self._jobs)) + self._name = f"{self._expid}_{self.FILE_PREFIX}_{jobs_in_wrapper_str(configuration,self.current_wrapper_section)}_{str(int(time.time())) + str(random.randint(1, 10000))}_{self._num_processors}_{len(self._jobs)}" def parse_time(self): format_ = "minute" @@ -756,6 +763,9 @@ class JobPackageVertical(JobPackageThread): directives=self._custom_directives,threads=self._threads,method=self.method.lower(),retrials=self.inner_retrials, wallclock_by_level=wallclock_by_level,partition=self.partition,wrapper_data=self,num_processors_value=self._num_processors) + + + class JobPackageHorizontal(JobPackageThread): """ Class to manage a horizontal thread-based package of jobs to be submitted by autosubmit @@ -772,11 +782,8 @@ class JobPackageHorizontal(JobPackageThread): self._threads = job.threads self._threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("THREADS", self._threads) + self._name = f"{self._expid}_{self.FILE_PREFIX}_{jobs_in_wrapper_str(configuration,self.current_wrapper_section)}_{str(int(time.time())) + str(random.randint(1, 10000))}_{self._num_processors}_{len(self._jobs)}" - self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + - str(random.randint(1, 10000)), - self._num_processors, - len(self._jobs)) self._jobs_resources = jobs_resources def _common_script_content(self): @@ -800,10 +807,8 @@ class JobPackageHybrid(JobPackageThread): self._num_processors = int(num_processors) self._threads = all_jobs[0].threads self._wallclock = total_wallclock - self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + - str(random.randint(1, 10000)), - self._num_processors, - len(self._jobs)) + self._name = f"{self._expid}_{self.FILE_PREFIX}_{jobs_in_wrapper_str(configuration,self.current_wrapper_section)}_{str(int(time.time())) + str(random.randint(1, 10000))}_{self._num_processors}_{len(self._jobs)}" + @property def _jobs_scripts(self): diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 2da519d172bbe5d47887fa9c661357e15c783170..43ae211e6d640bd7c4e515b442b31f458c022a13 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -264,8 +264,8 @@ def get_job_package_code(expid, job_name): packages = packages_wrapper if len(packages_wrapper) > len(packages_wrapper_plus) else packages_wrapper_plus for exp, package_name, _job_name in packages: if job_name == _job_name: - code = int(package_name.split("_")[2]) - return code + code = int(package_name.split("_")[-3]) + return code except Exception as e: pass return 0 diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index b71cd503df9f943ce810fac43d95915c28025e3c..f5e3bd2b2db5f454581969448e4bca8966d83cd2 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -101,7 +101,7 @@ class WrapperFactory(object): def nodes(self, nodes): return '#' if not nodes else self.nodes_directive(nodes) def tasks(self, tasks): - return '#' if not tasks else self.tasks_directive(tasks) + return '#' if not tasks or int(tasks) < 1 else self.tasks_directive(tasks) def partition(self, partition): return '#' if not partition else self.partition_directive(partition) def threads(self, threads): diff --git a/test/unit/files/base_horizontal_slurm.cmd b/test/unit/files/base_horizontal_slurm.cmd new file mode 100644 index 0000000000000000000000000000000000000000..53f6f5ccc20d08a29b6653824448158b09f4ba3d --- /dev/null +++ b/test/unit/files/base_horizontal_slurm.cmd @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +############################################################################### +# t000_Wrapper +############################################################################### + +#SBATCH -J t000_ASThread_WRAP_HORIZONTAL_17207042564901_2_2 +#SBATCH --qos=gp_debug +# +# +#SBATCH -A whatever +#SBATCH --output=t000_ASThread_WRAP_HORIZONTAL_17207042564901_2_2.out +#SBATCH --error=t000_ASThread_WRAP_HORIZONTAL_17207042564901_2_2.err +#SBATCH -t 00:01:00 +# +# +#SBATCH -n 2 +# +# +# +# +# diff --git a/test/unit/files/base_horizontal_vertical_slurm.cmd b/test/unit/files/base_horizontal_vertical_slurm.cmd new file mode 100644 index 0000000000000000000000000000000000000000..5b86d656e19df565ed6316141ef97b6f94d35832 --- /dev/null +++ b/test/unit/files/base_horizontal_vertical_slurm.cmd @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +############################################################################### +# t000_Wrapper +############################################################################### + +#SBATCH -J t000_ASThread_WRAP_HORIZONTAL_VERTICAL_17206892431106_2_4 +#SBATCH --qos=gp_debug +# +# +#SBATCH -A whatever +#SBATCH --output=t000_ASThread_WRAP_HORIZONTAL_VERTICAL_17206892431106_2_4.out +#SBATCH --error=t000_ASThread_WRAP_HORIZONTAL_VERTICAL_17206892431106_2_4.err +#SBATCH -t 00:02:00 +# +# +#SBATCH -n 2 +# +# +# +# +# \ No newline at end of file diff --git a/test/unit/files/base_vertical_horizontal_slurm.cmd b/test/unit/files/base_vertical_horizontal_slurm.cmd new file mode 100644 index 0000000000000000000000000000000000000000..be2b6f52254e1aa14942fdc24debb9a154e95ac9 --- /dev/null +++ b/test/unit/files/base_vertical_horizontal_slurm.cmd @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +############################################################################### +# t000_Wrapper +############################################################################### + +#SBATCH -J t000_ASThread_WRAP_VERTICAL_HORIZONTAL_17206892433179_2_4 +#SBATCH --qos=gp_debug +# +# +#SBATCH -A whatever +#SBATCH --output=t000_ASThread_WRAP_VERTICAL_HORIZONTAL_17206892433179_2_4.out +#SBATCH --error=t000_ASThread_WRAP_VERTICAL_HORIZONTAL_17206892433179_2_4.err +#SBATCH -t 00:02:00 +# +# +#SBATCH -n 2 +# +# +# +# +# diff --git a/test/unit/files/base_vertical_slurm.cmd b/test/unit/files/base_vertical_slurm.cmd new file mode 100644 index 0000000000000000000000000000000000000000..d18b5a240141c9c0fbb6959d7304924a7cd995a9 --- /dev/null +++ b/test/unit/files/base_vertical_slurm.cmd @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +############################################################################### +# t000_Wrapper +############################################################################### + +#SBATCH -J t000_ASThread_WRAP_VERTICAL_17206892431870_1_2 +#SBATCH --qos=gp_debug +# +# +#SBATCH -A whatever +#SBATCH --output=t000_ASThread_WRAP_VERTICAL_17206892431870_1_2.out +#SBATCH --error=t000_ASThread_WRAP_VERTICAL_17206892431870_1_2.err +#SBATCH -t 00:02:00 +# +# +#SBATCH -n 1 +# +# +# +# +# \ No newline at end of file diff --git a/test/unit/test_scheduler_general.py b/test/unit/test_scheduler_general.py index 46e62ef792b4dba50fcf6c3056e7dc87854823f9..713dbda69c038fc52bcb98e65a64630d3f21476e 100644 --- a/test/unit/test_scheduler_general.py +++ b/test/unit/test_scheduler_general.py @@ -84,6 +84,7 @@ PLATFORMS: MAX_WALLCLOCK: 48:00 TEMP_DIR: '' MAX_PROCESSORS: 99999 + PROCESSORS_PER_NODE: 123 pytest-ecaccess: type: ecaccess version: slurm @@ -126,7 +127,46 @@ JOBS: NAME: [pjm, slurm, ecaccess, ps] RUNNING: once wallclock: 00:01 - """) + wrap: + SCRIPT: | + echo "Hello World, I'm a wrapper" + For: + NAME: [horizontal,vertical,vertical_horizontal,horizontal_vertical] + DEPENDENCIES: [wrap_horizontal-1,wrap_vertical-1,wrap_vertical_horizontal-1,wrap_horizontal_vertical-1] + QUEUE: gp_debug + PLATFORM: pytest-slurm + RUNNING: chunk + wallclock: 00:01 +Wrappers: + wrapper_h: + type: horizontal + jobs_in_wrapper: wrap_horizontal + wrapper_v: + type: vertical + jobs_in_wrapper: wrap_vertical + wrapper_vh: + type: vertical-horizontal + jobs_in_wrapper: wrap_vertical_horizontal + wrapper_hv: + type: horizontal-vertical + jobs_in_wrapper: wrap_horizontal_vertical +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 fc1 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + # Number of chunks of the experiment. + NUMCHUNKS: '2' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + """) + + @@ -147,29 +187,52 @@ JOBS: @pytest.fixture def generate_cmds(prepare_scheduler): init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True) - Autosubmit.inspect(expid='t000',check_wrapper=False,force=True, lst=None, filter_chunks=None, filter_status=None, filter_section=None) + Autosubmit.inspect(expid='t000', check_wrapper=True, force=True, lst=None, filter_chunks=None, filter_status=None, filter_section=None) return prepare_scheduler -@pytest.mark.parametrize("scheduler", ['pjm', 'slurm', 'ecaccess', 'ps']) -def test_default_parameters(scheduler: str, generate_cmds): +@pytest.mark.parametrize("scheduler, job_type", [ + ('pjm', 'SINGLE'), + ('slurm', 'SINGLE'), + ('ecaccess', 'SINGLE'), + ('ps', 'SINGLE'), + ('slurm', 'horizontal'), + ('slurm', 'vertical'), + ('slurm', 'horizontal_vertical'), + ('slurm', 'vertical_horizontal') +]) +def test_scheduler_job_types(scheduler, job_type, generate_cmds): + # Test code that uses scheduler and job_typedef test_default_parameters(scheduler: str, job_type: str, generate_cmds): """ Test that the default parameters are correctly set in the scheduler files. It is a comparasion line to line, so the new templates must match the same line order as the old ones. Additional default parameters must be filled in the files/base_{scheduler}.yml as well as any change in the order :param generate_cmds: fixture that generates the templates. + :param scheduler: Target scheduler + :param job_type: Wrapped or not :return: """ # Load the base file for each scheduler scheduler = scheduler.upper() + job_type = job_type.upper() expected_data = {} - for base_f in _get_script_files_path().glob('base_*.cmd'): - if scheduler in base_f.stem.split('_')[1].upper(): - expected_data = Path(base_f).read_text() - break + if job_type == "SINGLE": + for base_f in _get_script_files_path().glob('base_*.cmd'): + if scheduler in base_f.stem.split('_')[1].upper(): + expected_data = Path(base_f).read_text() + break + else: + expected_data = (Path(_get_script_files_path()) / Path(f"base_{job_type.lower()}_{scheduler.lower()}.cmd")).read_text() if not expected_data: - raise NotImplemented + assert False, f"Could not find the expected data for {scheduler} and {job_type}" # Get the actual default parameters for the scheduler - actual = Path(f"{generate_cmds.strpath}/t000/tmp/t000_BASE_{scheduler}.cmd").read_text() + if job_type == "SINGLE": + actual = Path(f"{generate_cmds.strpath}/t000/tmp/t000_BASE_{scheduler}.cmd").read_text() + else: + for asthread in Path(f"{generate_cmds.strpath}/t000/tmp").glob(f"*ASThread_WRAP_{job_type}_[0-9]*.cmd"): + actual = asthread.read_text() + break + else: + assert False, f"Could not find the actual data for {scheduler} and {job_type}" # Remove all after # Autosubmit header # ################### # count number of lines in expected @@ -178,7 +241,7 @@ def test_default_parameters(scheduler: str, generate_cmds): actual = '\n'.join(actual) # Compare line to line for i, (line1, line2) in enumerate(zip(expected_data.split('\n'), actual.split('\n'))): - if "PJM -o" in line1 or "PJM -e" in line1 or "#SBATCH --output" in line1 or "#SBATCH --error" in line1: # output error will be different + if "PJM -o" in line1 or "PJM -e" in line1 or "#SBATCH --output" in line1 or "#SBATCH --error" in line1 or "#SBATCH -J" in line1: # output error will be different continue elif "##" in line1 or "##" in line2: # comment line continue