diff --git a/CHANGELOG b/CHANGELOG index e83498ae57e54c94fd25e9596a0857bf10c2fc2c..9dbb060bd33cc1ea1d8fe82c57dfdbe6ebba374c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +4.1.9 - Splits fixes +===================== +- Splits calendar: Added a full example in the documentation. +- Splits calendar: Fixed some issues with the split="auto". + 4.1.8 - Bug fixes. ===================== - Fixed an issue with a socket connection left open. diff --git a/VERSION b/VERSION index a7c00da34f2c7bd7186111bcdebb53af008cd94c..18837e703d86e02a82d3525a10a24bd9fdf29ea1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.8 +4.1.9 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 5e4d1dc2706fcd39f73bc72f318b4cc578343af4..5d02f627f48494a81c639f64c77bee1064027fcb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1700,7 +1700,8 @@ class Job(object): jobs_in_wrapper = jobs_in_wrapper.split(" ") if self.section.upper() in jobs_in_wrapper: self.retrials = wrapper_data.get("RETRIALS", self.retrials) - self.splits = as_conf.jobs_data.get(self.section,{}).get("SPLITS", None) + if not self.splits: + self.splits = as_conf.jobs_data.get(self.section,{}).get("SPLITS", None) self.delete_when_edgeless = as_conf.jobs_data.get(self.section,{}).get("DELETE_WHEN_EDGELESS", True) self.dependencies = str(as_conf.jobs_data.get(self.section,{}).get("DEPENDENCIES","")) self.running = as_conf.jobs_data.get(self.section,{}).get("RUNNING", "once") @@ -1880,7 +1881,8 @@ class Job(object): parameters['MEMBER'] = self.member parameters['SPLIT'] = self.split parameters['SHAPE'] = self.shape - parameters['SPLITS'] = self.splits + if parameters.get('SPLITS', "auto") == "auto": + parameters['SPLITS'] = self.splits parameters['DELAY'] = self.delay parameters['FREQUENCY'] = self.frequency parameters['SYNCHRONIZE'] = self.synchronize diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 2e3722bacfaf0c75070e7c9e81caf42ebcf8f6e5..4b20bc8e3ccbe4ea21b4286cf6d23cde619ab8a1 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import copy # Copyright 2017-2020 Earth Sciences Department, BSC-CNS @@ -266,7 +267,9 @@ class DicJobs: count = 0 for chunk in (chunk for chunk in self._chunk_list): if splits == "auto": - splits = calendar_chunk_section(self.experiment_data, section, date, chunk) + real_splits = calendar_chunk_section(self.experiment_data, section, date, chunk) + else: + real_splits = splits count += 1 if delay == -1 or delay < chunk: if count % frequency == 0 or count == len(self._chunk_list): @@ -278,14 +281,14 @@ class DicJobs: self._dic[section][date][member][chunk] = tmp_dic[chunk][date] else: self._dic[section][date][member][chunk] = [] - self._create_jobs_split(splits, section, date, member, chunk, priority, + self._create_jobs_split(real_splits, section, date, member, chunk, priority, default_job_type, self._dic[section][date][member][chunk]) def _create_jobs_split(self, splits, section, date, member, chunk, priority, default_job_type, section_data): splits_list = [-1] if splits <= 0 else range(1, splits + 1) for split in splits_list: - self.build_job(section, priority, date, member, chunk, default_job_type, section_data, split) + self.build_job(section, priority, date, member, chunk, default_job_type, section_data, splits, split) def update_jobs_filtered(self, current_jobs, next_level_jobs): if type(next_level_jobs) is dict: @@ -617,7 +620,7 @@ class DicJobs: jobs.append(dic[c]) return jobs - def build_job(self, section, priority, date, member, chunk, default_job_type, section_data, split=-1): + def build_job(self, section, priority, date, member, chunk, default_job_type, section_data, splits, split=-1): name = self.experiment_data.get("DEFAULT", {}).get("EXPID", "") if date: name += "_" + date2str(date, self._date_format) @@ -637,6 +640,7 @@ class DicJobs: job.member = member job.chunk = chunk job.split = split + job.splits = splits job.update_dict_parameters(self.as_conf) section_data.append(job) self.changes["NEWJOBS"] = True @@ -647,4 +651,5 @@ class DicJobs: Status.READY] else \ self._job_list[name].status section_data.append(self._job_list[name]) + self._job_list[name].splits = splits self.workflow_jobs.append(name) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d369aae7e3b200ac9c1a306ff91bc0fc495817f5..2ccb52ff7d7b6f3fddda8dd947157b5b9e057b61 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -162,6 +162,13 @@ class JobList(object): job.delete_when_edgeless).casefold() == "true".casefold(): self._job_list.remove(job) self.graph.remove_node(job.name) + @staticmethod + def check_split_set_to_auto(as_conf): + # If this is true, the workflow needs to be recreated on create + for job_name, values in as_conf.experiment_data["JOBS"].items(): + if values.get("SPLITS", None) == "auto": + return True + return False def generate(self, as_conf, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, @@ -198,6 +205,8 @@ class JobList(object): :param monitor: monitor :type monitor: bool """ + if create and self.check_split_set_to_auto(as_conf): + force = True if force: Log.debug("Resetting the workflow graph to a zero state") if os.path.exists(os.path.join(self._persistence_path, self._persistence_file + ".pkl")): @@ -538,7 +547,7 @@ class JobList(object): return final_values @staticmethod - def _parse_filter_to_check(value_to_check, value_list=[], level_to_check="DATES_FROM"): + def _parse_filter_to_check(value_to_check, value_list=[], level_to_check="DATES_FROM", splits=None): """ Parse the filter to check and return the value to check. Selection process: @@ -551,6 +560,13 @@ class JobList(object): :return: parsed value to check. """ step = 1 + if "SPLITS" in level_to_check: + if "auto" in value_to_check: + value_to_check = value_to_check.replace("auto", str(splits)) + elif "-1" in value_to_check: + value_to_check = value_to_check.replace("-1", str(splits)) + elif "last" in value_to_check: + value_to_check = value_to_check.replace("last", str(splits)) if value_to_check.count(":") == 1: # range if value_to_check[1] == ":": @@ -757,10 +773,10 @@ class JobList(object): filters_to_apply = self._check_relationship(relationships, "SPLITS_FROM", current_job.split) # No more FROM sections to check, unify _to FILTERS and return - filters_to_apply = self._unify_to_filters(filters_to_apply) + filters_to_apply = self._unify_to_filters(filters_to_apply, current_job.splits) return filters_to_apply - def _unify_to_filter(self, unified_filter, filter_to, filter_type): + def _unify_to_filter(self, unified_filter, filter_to, filter_type, splits = None): """ Unify filter_to filters into a single dictionary :param unified_filter: Single dictionary with all filters_to @@ -796,7 +812,7 @@ class JobList(object): parsed_element = re.findall(r"([\[:\]a-zA-Z0-9._-]+)", element)[0].lower() extra_data = element[len(parsed_element):] parsed_element = JobList._parse_filter_to_check(parsed_element, value_list=value_list, - level_to_check=filter_type) + level_to_check=filter_type, splits=splits) # convert list to str skip = False if isinstance(parsed_element, list): @@ -839,7 +855,7 @@ class JobList(object): if "," in filter_to[filter_type][0]: filter_to[filter_type] = filter_to[filter_type][1:] - def _unify_to_filters(self, filter_to_apply): + def _unify_to_filters(self, filter_to_apply, splits): """ Unify all filter_to filters into a single dictionary ( of current selection ) :param filter_to_apply: Filters to apply @@ -855,7 +871,7 @@ class JobList(object): self._unify_to_filter(unified_filter, filter_to, "DATES_TO") self._unify_to_filter(unified_filter, filter_to, "MEMBERS_TO") self._unify_to_filter(unified_filter, filter_to, "CHUNKS_TO") - self._unify_to_filter(unified_filter, filter_to, "SPLITS_TO") + self._unify_to_filter(unified_filter, filter_to, "SPLITS_TO", splits=splits) JobList._normalize_to_filters(unified_filter, "DATES_TO") JobList._normalize_to_filters(unified_filter, "MEMBERS_TO") @@ -1219,7 +1235,7 @@ class JobList(object): depends_on_itself = None if not job.splits: child_splits = 0 - else: + elif job.splits != "auto": child_splits = int(job.splits) parsed_date_list = [] for dat in date_list: diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index dae0ccc5e2c2d5f8e4e3ce29b57b6a774f2464b6..2da519d172bbe5d47887fa9c661357e15c783170 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +import copy + import math from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical -from bscearth.utils.date import date2str, chunk_end_date, chunk_start_date, subs_dates +from bscearth.utils.date import date2str, chunk_end_date, chunk_start_date, subs_dates, add_hours # Copyright 2017-2020 Earth Sciences Department, BSC-CNS @@ -100,8 +102,22 @@ def calendar_get_month_days(date_str): else: return 31 +def get_chunksize_in_hours(date_str,chunk_unit,chunk_length): -def calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, chunk_length): + if is_leap_year(int(date_str[0:4])): + num_days_in_a_year = 366 + else: + num_days_in_a_year = 365 + if chunk_unit == "year": + chunk_size_in_hours = num_days_in_a_year * 24 * chunk_length + elif chunk_unit == "month": + chunk_size_in_hours = calendar_get_month_days(date_str) * 24 * chunk_length + elif chunk_unit == "day": + chunk_size_in_hours = 24 * chunk_length + else: + chunk_size_in_hours = chunk_length + return chunk_size_in_hours +def calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, chunk_length, chunk_size_in_hours): """ Check if the split size is valid for the calendar :param date_str: Date in string format (YYYYMMDD) @@ -109,20 +125,13 @@ def calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, ch :param split_unit: Unit of the split :param chunk_unit: Unit of the chunk :param chunk_length: Size of the chunk + :param chunk_size_in_hours: chunk size in hours :return: Boolean """ if is_leap_year(int(date_str[0:4])): num_days_in_a_year = 366 else: num_days_in_a_year = 365 - if chunk_unit == "year": - chunk_size_in_hours = num_days_in_a_year * 24 * chunk_length - elif chunk_unit == "month": - chunk_size_in_hours = calendar_get_month_days(date_str) * 24 * chunk_length - elif chunk_unit == "day": - chunk_size_in_hours = 24 * chunk_length - else: - chunk_size_in_hours = chunk_length if split_unit == "year": split_size_in_hours = num_days_in_a_year * 24 * split_size @@ -150,7 +159,7 @@ def calendar_chunk_section(exp_data, section, date, chunk): :param parameters: :return: """ - date_str = date2str(date) + #next_auto_date = date splits = 0 jobs_data = exp_data.get('JOBS', {}) split_unit = str(exp_data.get("EXPERIMENT", {}).get('SPLITSIZEUNIT', jobs_data.get(section,{}).get("SPLITSIZEUNIT", None))).lower() @@ -182,16 +191,16 @@ def calendar_chunk_section(exp_data, section, date, chunk): else: num_max_splits = run_days split_size = get_split_size(exp_data, section) - if not calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, chunk_length): + chunk_size_in_hours = get_chunksize_in_hours(date2str(chunk_start),chunk_unit,chunk_length) + if not calendar_split_size_isvalid(date2str(chunk_start), split_size, split_unit, chunk_unit, chunk_length, chunk_size_in_hours): raise AutosubmitCritical(f"Invalid split size for the calendar. The split size is {split_size} and the unit is {split_unit}.") splits = num_max_splits / split_size if not splits.is_integer() and split_policy == "flexible": Log.warning(f"The number of splits:{num_max_splits}/{split_size} is not an integer. The number of splits will be rounded up due the flexible split policy.\n You can modify the SPLITPOLICY parameter in the section {section} to 'strict' to avoid this behavior.") elif not splits.is_integer() and split_policy == "strict": - raise AutosubmitCritical(f"The number of splits is not an integer. The number of splits will be rounded up due the strict split policy.\n You can modify the SPLITPOLICY parameter in the section {section} to 'flexible' to roundup the number.") + raise AutosubmitCritical(f"The number of splits is not an integer. Autosubmit can't continue.\nYou can modify the SPLITPOLICY parameter in the section {section} to 'flexible' to roundup the number. Or change the SPLITSIZE parameter to a value in which the division is an integer.") splits = math.ceil(splits) - Log.info(f"For the section {section} with date:{date_str} the number of splits is {splits}.") - + Log.info(f"For the section {section} with date:{date2str(chunk_start)} the number of splits is {splits}.") return splits def get_split_size_unit(data, section): diff --git a/docs/source/userguide/defining_workflows/fig/splits_auto.png b/docs/source/userguide/defining_workflows/fig/splits_auto.png new file mode 100644 index 0000000000000000000000000000000000000000..df4dad3020db441efa54830504a2520013ff27f7 Binary files /dev/null and b/docs/source/userguide/defining_workflows/fig/splits_auto.png differ diff --git a/docs/source/userguide/defining_workflows/index.rst b/docs/source/userguide/defining_workflows/index.rst index 57e12ff87afa86d4ed1410af4cc4fee5e6876dc6..84988a107749afe832fc5c2ed831bcd0237beb26 100644 --- a/docs/source/userguide/defining_workflows/index.rst +++ b/docs/source/userguide/defining_workflows/index.rst @@ -530,6 +530,111 @@ Example3: 1-to-N dependency :align: center :alt: 1_to_N +Job Splits with calendar +~~~~~~~~~~~~~~~~~~~~~~~~ + +For jobs running at any level, it may be useful to split each task into different parts based on the calendar. +This behaviour can be achieved setting the SPLITS: to "auto" and using the %EXPERIMENT.SPLITSIZE% and %EXPERIMENT.SPLITSIZEUNIT% variables. + +Example4: Auto split + +.. code-block:: yaml + + experiment: + DATELIST: 19900101 + MEMBERS: fc0 + # Chunk size unit. STRING: hour, day, month, year + CHUNKSIZEUNIT: month + # Split size unit. STRING: hour, day, month, year and lower than CHUNKSIZEUNIT + SPLITSIZEUNIT: day # default CHUNKSIZEUNIT-1 (month-1 == day) + # Chunk size. NUMERIC: 4, 6, 12 + CHUNKSIZE: 1 + # Split size. NUMERIC: 4, 6, 12 + SPLITSIZE: 15 + # Split policy. STRING: flexible, strict + SPLITPOLICY: flexible + # Total number of chunks in experiment. NUMERIC: 30, 15, 10 + NUMCHUNKS: 2 + # Calendar used. LIST: standard, noleap + CALENDAR: standard + + + JOBS: + APP: + FILE: app.sh + FOR: + DEPENDENCIES: + - APP_ENERGY_ONSHORE: + SPLITS_FROM: + all: + SPLITS_TO: previous + OPA_ENERGY_ONSHORE_1: + SPLITS_FROM: + all: + SPLITS_TO: all + OPA_ENERGY_ONSHORE_2: + SPLITS_FROM: + all: + SPLITS_TO: all + NAME: '%RUN.APP_NAMES%' + SPLITS: '1' + PLATFORM: 'local' + RUNNING: chunk + WALLCLOCK: 00:05 + DN: + DEPENDENCIES: + APP_ENERGY_ONSHORE-1: + SPLITS_TO: '1' + DN: + SPLITS_FROM: + all: + SPLITS_TO: previous + FILE: dn.sh + PLATFORM: 'local' + RUNNING: chunk + SPLITS: auto + WALLCLOCK: 00:05 + OPA: + CHECK: on_submission + FILE: opa.sh + FOR: + DEPENDENCIES: + - DN: + SPLITS_FROM: + all: + SPLITS_TO: "[1:%JOBS.DN.SPLITS%]*\\1" + OPA_ENERGY_ONSHORE_1: + SPLITS_FROM: + all: + SPLITS_TO: previous + - DN: + SPLITS_FROM: + all: + SPLITS_TO: "[1:%JOBS.DN.SPLITS%]*\\1" + OPA_ENERGY_ONSHORE_2: + SPLITS_FROM: + all: + SPLITS_TO: previous + NAME: '%RUN.OPA_NAMES%' + SPLITS: '[auto, auto]' + PLATFORM: 'local' + RUNNING: chunk + WALLCLOCK: 00:05 + RUN: + APP_NAMES: + - ENERGY_ONSHORE + OPA_NAMES: + - energy_onshore_1 + - energy_onshore_2 + + + +.. figure:: fig/splits_auto.png + :name: auto + :width: 100% + :align: center + :alt: auto + Job delay ~~~~~~~~~