diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c07b930e1012014381087465785b43f7e5a3f354..a915e2219859005151b8eba5629c15c6587fc17f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1549,6 +1549,8 @@ class Autosubmit: date_format = 'M' wrapper_jobs = dict() for wrapper_section, wrapper_data in as_conf.experiment_data.get("WRAPPERS", {}).items(): + if type(wrapper_data) is not dict: + continue wrapper_jobs[wrapper_section] = as_conf.get_wrapper_jobs(wrapper_data) Log.warning("Aux Job_list was generated successfully") submitter = Autosubmit._get_submitter(as_conf) @@ -4490,6 +4492,9 @@ class Autosubmit: wrapper_jobs = dict() for wrapper_name, wrapper_parameters in as_conf.get_wrappers().items(): + #continue if it is a global option (non-dict) + if type(wrapper_parameters) is not dict: + continue wrapper_jobs[wrapper_name] = as_conf.get_wrapper_jobs(wrapper_parameters) job_list.generate(date_list, member_list, num_chunks, chunk_ini, parameters, date_format, diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index acdad4fb49a3ba62de242859085fdb160f2bda53..495288ee3567f3704a07653ece544f226fb5bd08 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1163,6 +1163,8 @@ class Job(object): parameters['WRAPPER' + "_EXTENSIBLE"] = as_conf.get_extensible_wallclock() for wrapper_section,wrapper_val in wrappers.items(): + if type(wrapper_val) is not dict: + continue parameters[wrapper_section] = as_conf.get_wrapper_type(as_conf.experiment_data["WRAPPERS"].get(wrapper_section)) parameters[wrapper_section+"_POLICY"] = as_conf.get_wrapper_policy(as_conf.experiment_data["WRAPPERS"].get(wrapper_section)) parameters[wrapper_section+"_METHOD"] = as_conf.get_wrapper_method(as_conf.experiment_data["WRAPPERS"].get(wrapper_section)).lower() @@ -1967,10 +1969,15 @@ class WrapperJob(Job): self._platform.send_command( self._platform.cancel_cmd + " " + str(self.id)) for job in self.job_list: + #if job.status == Status.RUNNING: + #job.inc_fail_count() + # job.packed = False + # job.status = Status.FAILED if job.status not in [Status.COMPLETED, Status.FAILED]: job.packed = False job.status = Status.WAITING + def _update_completed_jobs(self): for job in self.job_list: if job.status == Status.RUNNING: diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 5cf8b2964dbec35a47141355b91c2348ffa8b333..85b81bbc9bebb1c784fbc91f4d307ae4fa799111 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -401,6 +401,8 @@ class DicJobs: job.wallclock = "01:59" elif job.wallclock is None and job.platform_name.upper() != "LOCAL": job.wallclock = "00:00" + elif job.wallclock is None: + job.wallclock = "00:00" if job.retrials == -1: job.retrials = None notify_on = parameters[section].get("NOTIFY_ON",None) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 4607dae12b7f01bf70deb11e0a848e30157631b3..f0eeeb1fe41ea4a08502f475a3a1f3e1ac3b1d7b 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -88,6 +88,7 @@ class JobPackager(object): #todo add default values + #Wrapper building starts here for wrapper_section,wrapper_data in self._as_config.experiment_data.get("WRAPPERS",{}).items(): if isinstance(wrapper_data,collections.abc.Mapping ): self.wrapper_type[wrapper_section] = self._as_config.get_wrapper_type(wrapper_data) @@ -213,11 +214,13 @@ class JobPackager(object): for job in failed_wrapped_jobs: job.packed = False jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) - # create wrapped package jobs + # create wrapped package jobs Wrapper building starts here for wrapper_name,section_jobs in jobs_to_submit_by_section.items(): self.current_wrapper_section = wrapper_name for section,jobs in section_jobs.items(): if len(jobs) > 0: + if not self._platform.allow_wrappers: + Log.warning("Platform {0} does not allow wrappers, submitting jobs individually".format(self._platform.name)) if wrapper_name != "SIMPLE" and self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical'] : # Trying to find the value in jobs_parser, if not, default to an autosubmit_.yml value (Looks first in [wrapper] section) wrapper_limits = dict() @@ -547,7 +550,10 @@ class JobPackager(object): for job in jobs_list: if job.section.upper() in section_name.split("&"): jobs_by_section[wrapper_name][section_name].append(job) - remaining_jobs.remove(job) + try: + remaining_jobs.remove(job) + except ValueError: + pass for job in remaining_jobs: jobs_by_section["SIMPLE"][job.section].append(job) return jobs_by_section @@ -658,11 +664,11 @@ class JobPackager(object): horizontal_packager.wrapper_limits["max_by_section"][section] = horizontal_packager.wrapper_limits["max_by_section"][section] - 1 horizontal_packager.wrapper_limits["max"] = horizontal_packager.wrapper_limits["max"] - actual_wrapped_jobs for job in horizontal_package: - job_list = JobPackagerVertical([job], job.wallclock, horizontal_packager.wrapper_limits["max"], - horizontal_packager.wrapper_limits, - self._platform.max_wallclock, self.wrapper_type).build_vertical_package(job) - - current_package.append(job_list) + dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) + job_list = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, + horizontal_packager.wrapper_limits["max"], horizontal_packager.wrapper_limits, + self._platform.max_wallclock).build_vertical_package(job) + current_package.append(list(set(job_list))) for job in current_package[-1]: total_wallclock = sum_str_hours(total_wallclock, job.wallclock) @@ -673,7 +679,7 @@ class JobPackager(object): return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config, wrapper_section=self.current_wrapper_section ) - +#TODO rename and unite JobPackerVerticalMixed to JobPackerVertical since the distinguisment between the two is not needed anymore class JobPackagerVertical(object): """ Vertical Packager Parent Class @@ -734,9 +740,23 @@ class JobPackagerVertical(object): pass def _is_wrappable(self, job): - pass - + """ + Determines if a job is wrappable. Basically, the job shouldn't have been packed already and the status must be READY or WAITING, + Its parents should be COMPLETED. + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: True if wrappable, False otherwise. \n + :rtype: Boolean + """ + if job.packed is False and (job.status == Status.READY or job.status == Status.WAITING): + for parent in job.parents: + # First part of this conditional is true only if the parent is already on the wrapper package ( job_lists == current_wrapped jobs there ) + # Second part is actually relevant, parents of a wrapper should be COMPLETED + if parent not in self.jobs_list and parent.status != Status.COMPLETED: + return False + return True + return False class JobPackagerVerticalMixed(JobPackagerVertical): """ @@ -776,6 +796,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): self.sorted_jobs = dict_jobs[date][member] self.index = 0 + def get_wrappable_child(self, job): """ Goes through the jobs with the same date and member than the input job, and return the first that satisfies self._is_wrappable() @@ -785,7 +806,6 @@ class JobPackagerVerticalMixed(JobPackagerVertical): :return: job that is wrappable. \n :rtype: Job Object """ - # Unnecessary assignment sorted_jobs = self.sorted_jobs for index in range(self.index, len(sorted_jobs)): @@ -795,6 +815,14 @@ class JobPackagerVerticalMixed(JobPackagerVertical): return child continue return None + # Not passing tests but better wrappers result to check + # for child in job.children: + # if child.name != job.name: + # if self._is_wrappable(child): + # self.index = self.index + 1 + # return child + # continue + # return None def _is_wrappable(self, job): """ @@ -808,7 +836,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): """ if job.packed is False and (job.status == Status.READY or job.status == Status.WAITING): for parent in job.parents: - # First part of this conditional is always going to be true because otherwise there would be a cycle + # First part of this conditional is true only if the parent is already on the wrapper package ( job_lists == current_wrapped jobs there ) # Second part is actually relevant, parents of a wrapper should be COMPLETED if parent not in self.jobs_list and parent.status != Status.COMPLETED: return False diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 2a5878e41856faa704af1ae9869748f6c7ebb1ef..4c3cc8a5fef0cacc91802345887c64ca0d61c009 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -280,7 +280,7 @@ class JobPackageArray(JobPackageBase): self._common_script = None self._array_size_id = "[1-" + str(len(jobs)) + "]" self._wallclock = '00:00' - self._num_processors = '1' + self._num_processors = '0' for job in jobs: if job.wallclock > self._wallclock: self._wallclock = job.wallclock @@ -363,7 +363,7 @@ class JobPackageThread(JobPackageBase): self._job_dependency = dependency self._common_script = None self._wallclock = '00:00' - self._num_processors = '1' + self._num_processors = '0' self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section @@ -410,7 +410,7 @@ class JobPackageThread(JobPackageBase): return jobs_scripts @property def queue(self): - if str(self._num_processors) == '1': + if str(self._num_processors) == '1' or str(self._num_processors) == '0': return self.platform.serial_platform.serial_queue else: return self._queue @@ -522,7 +522,7 @@ class JobPackageThreadWrapped(JobPackageThread): @property def queue(self): - if str(self._num_processors) == '1': + if str(self._num_processors) == '1' or str(self._num_processors) == '0': return self.platform.serial_platform.serial_queue else: return self.platform.queue @@ -682,7 +682,7 @@ class JobPackageHorizontal(JobPackageThread): num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, - directives=self._custom_directives,threads=self._threads,method=self.method.lower()) + directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition) class JobPackageHybrid(JobPackageThread): """ @@ -727,7 +727,7 @@ class JobPackageVerticalHorizontal(JobPackageHybrid): wallclock=self._wallclock, num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, - rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower()) + rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition) class JobPackageHorizontalVertical(JobPackageHybrid): @@ -738,5 +738,5 @@ class JobPackageHorizontalVertical(JobPackageHybrid): wallclock=self._wallclock, num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, - rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower()) + rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads,method=self.method.lower(),partition=self.partition) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 0533e175592a35ab949a879c595be5f3cf80a23f..8b3849715ad210c4247747bd7c6246d516b9caae 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -102,8 +102,8 @@ class EcPlatform(ParamikoPlatform): self.mkdir_cmd = ("ecaccess-file-mkdir " + self.host + ":" + self.scratch + "/" + self.project_dir + "/" + self.user + "/" + self.expid + "; " + "ecaccess-file-mkdir " + self.host + ":" + self.remote_log_dir) - self.check_remote_permissions_cmd = "ecaccess-file-mkdir " + os.path.join(self.scratch,self.project_dir,self.user,"_permission_checker_azxbyc") - self.check_remote_permissions_remove_cmd = "ecaccess-file-rmdir " + os.path.join(self.scratch,self.project_dir,self.user,"_permission_checker_azxbyc") + self.check_remote_permissions_cmd = "ecaccess-file-mkdir " + self.host+":"+os.path.join(self.scratch,self.project_dir,self.user,"_permission_checker_azxbyc") + self.check_remote_permissions_remove_cmd = "ecaccess-file-rmdir " + self.host+":"+os.path.join(self.scratch,self.project_dir,self.user,"_permission_checker_azxbyc") def get_checkhost_cmd(self): return self._checkhost_cmd diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index df5414469dd972f816fee85c919c4c8d874179bd..a09b93f7a6e1768a25e9236d345fa28a7c1ed76e 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -534,8 +534,8 @@ class ParamikoPlatform(Platform): job.start_time = datetime.datetime.now() # URi: start time if job.start_time is not None and str(job.wrapper_type).lower() == "none": wallclock = job.wallclock - if job.wallclock == "00:00": - wallclock == job.platform.max_wallclock + if job.wallclock == "00:00" or job.wallclock is None: + wallclock = job.platform.max_wallclock if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "": if job.is_over_wallclock(job.start_time,wallclock): try: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 082f09dd62530ae35139f4fb29ec0521bbcbacef..53d0d115d9841d84dcb1babdf3f1b9af1cfd1b11 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -622,7 +622,7 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod def allocated_nodes(): - return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" + return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" def check_file_exists(self, filename,wrapper_failed=False): file_exist = False diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 9b2ffb3f9c17fc857640ee1c8c786f8df4eeebb8..15379eddecc3e237766f141b9859165c9cd63a5f 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -108,7 +108,7 @@ class PythonWrapperBuilder(WrapperBuilder): sample_list = list(sample_str) random.shuffle(sample_list) final_string = ''.join(sample_list) - return final_string+"_FAILED" + return final_string def build_imports(self): return textwrap.dedent(""" @@ -133,9 +133,9 @@ class PythonWrapperBuilder(WrapperBuilder): self.stream.flush() def __getattr__(self, attr): return getattr(self.stream, attr) - sys.stdout = Unbuffered(sys.stdout) - wrapper_id = "{1}" + node_id = "{1}" + wrapper_id = "{1}_FAILED" # Defining scripts to be run scripts= {0} """).format(str(self.job_scripts), self.get_random_alphanumeric_string(5,5),'\n'.ljust(13)) @@ -172,10 +172,15 @@ class PythonWrapperBuilder(WrapperBuilder): {0} os.system("mkdir -p machinefiles") - with open('node_list', 'r') as file: + with open("node_list_{{0}}".format(node_id), 'r') as file: all_nodes = file.read() + os.remove("node_list_{{0}}".format(node_id)) + all_nodes = all_nodes.split("_NEWLINE_") + if all_nodes[-1] == "": + all_nodes = all_nodes[:-1] + print(all_nodes) """).format(self.allocated_nodes, '\n'.ljust(13)) def build_cores_list(self): @@ -191,9 +196,11 @@ while total_cores > 0: total_cores -= 1 all_cores.append(all_nodes[idx]) else: - idx += 1 + if idx < len(all_nodes)-1: + idx += 1 processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) def build_machinefiles(self): @@ -312,6 +319,7 @@ processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) parallel_threads_launcher = textwrap.dedent(""" pid_list = [] for i in range(len({0})): + print("Starting job ", {0}[i]) if type({0}[i]) != list: job = {0}[i] jobname = job.replace(".cmd", '') @@ -344,6 +352,8 @@ for i in range(len(pid_list)): parallel_threads_launcher = textwrap.dedent(""" pid_list = [] for i in range(len({0})): + print("Starting job ", {0}[i]) + if type({0}[i]) != list: job = {0}[i] jobname = job.replace(".cmd", '') @@ -380,6 +390,8 @@ for i in range(len(pid_list)): parallel_threads_launcher = textwrap.dedent(""" pid_list = [] for i in range(len({0})): + print("Starting job ", {0}[i]) + if type({0}[i]) != list: job = {0}[i] jobname = job.replace(".cmd", '') @@ -537,6 +549,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): parallel_threads_launcher = textwrap.dedent(""" pid_list = [] for i in range(len({0})): + print("Starting job ", {0}[i]) if type({0}[i]) != list: job = {0}[i] jobname = job.replace(".cmd", '') @@ -678,10 +691,14 @@ class SrunWrapperBuilder(WrapperBuilder): {0} os.system("mkdir -p machinefiles") - with open('node_list', 'r') as file: + with open("node_list_{{0}}".format(node_id), 'r') as file: all_nodes = file.read() + os.remove("node_list_{{0}}".format(node_id)) all_nodes = all_nodes.split("_NEWLINE_") + if all_nodes[-1] == "": + all_nodes = all_nodes[:-1] + print(all_nodes) """).format(self.allocated_nodes, '\n'.ljust(13)) def build_cores_list(self): @@ -697,8 +714,10 @@ while total_cores > 0: total_cores -= 1 all_cores.append(all_nodes[idx]) else: - idx += 1 + if idx < len(all_nodes)-1: + idx += 1 processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) diff --git a/docs/source/userguide/set and share the configuration/fig/advanced_conf.jpg b/docs/source/userguide/set and share the configuration/fig/advanced_conf.jpg new file mode 100644 index 0000000000000000000000000000000000000000..02321ffd0e06c0ce57a9c2fb3bc88acf12f0cd9d Binary files /dev/null and b/docs/source/userguide/set and share the configuration/fig/advanced_conf.jpg differ diff --git a/docs/source/userguide/set and share the configuration/index.rst b/docs/source/userguide/set and share the configuration/index.rst index ce686625cc4ffea5e463a9d715d47668fbae62ea..394740a04ffd2ac36fb2a77aba1545f8cb8a6c4e 100644 --- a/docs/source/userguide/set and share the configuration/index.rst +++ b/docs/source/userguide/set and share the configuration/index.rst @@ -240,8 +240,18 @@ Model configuration is distributed at `git. -cw # Unstarted experiment + autosubmit monitor -cw # Ongoing experiment + autosubmit inspect -cw -f # Visualize wrapper cmds -In ``autosubmit_cxxx.yml``, regardless of the wrapper type, you need to make sure that the values of the variables **MAXWAITINGJOBS** and **TOTALJOBS** are increased according to the number of jobs expected to be waiting/running at the same time in your experiment. +Basic configuration +=================== -For example: +To configure a new wrapper, the user has to define a `WRAPPERS` section in any configuration file. When using the standard configuration, this one is autosubmit.yml. -.. code-block:: yaml +.. code-block:: YAML - config: - EXPID: .... - AUTOSUBMIT_VERSION: 4.0.0 - ... + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" - MAXWAITINGJOBS: 100 - TOTALJOBS: 100 - ... +By default, Autosubmit will try to bundle jobs of the same type. The user can alter this behavior by setting the `JOBS_IN_WRAPPER` parameter directive in the wrapper section. -and below the config: block, add the wrapper directive, indicating the wrapper type: +When using multiple wrappers or 2-dim wrappers is essential to define the `JOBS_IN_WRAPPER` parameter. -.. code-block:: yaml +.. code-block:: YAML - wrappers: - wrapper: - TYPE: - JOBS_IN_WRAPPER: + WRAPPERS: + WRAPPER_H: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + WRAPPER_V: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM2" + WRAPPER_VH: + TYPE: "vertical-horizontal" + JOBS_IN_WRAPPER: "SIM3 SIM4" + WRAPPER_HV: + TYPE: "horizontal-vertical" + JOBS_IN_WRAPPER: "SIM5 SIM6" + +.. figure:: fig/wrapper_all.png + :name: wrapper all + :align: center + :alt: wrapper all -You can also specify which job types should be wrapped. This can be done using the **JOBS_IN_WRAPPER** parameter. -It is only required for the vertical-mixed type (in which the specified job types will be wrapped together), so if nothing is specified, all jobs will be wrapped. -By default, jobs of the same type will be wrapped together, as long as the constraints are satisfied. +.. important:: Autosubmit will not wrap tasks with external and non-fulfilled dependencies. -Number of jobs in a package -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Wrapper parameters description +------------------------------ -.. code-block:: yaml +Type +~~~~ - wrappers: - wrapper: - TYPE: - MIN_WRAPPED: 2 - MAX_WRAPPED: 999 - POLICY: flexible #default is flexible. Values: flexible,strict,mixed +The type parameter allow the user to determine the wrapper algorithm. +It affects tasks in wrapper order executions, and in hybrid cases, it adds some internal logic. -- **MAX_WRAPPED** can be defined in ``jobs_cxxx.yml`` in order to limit the number of jobs wrapped for the corresponding job section - - If not defined, it considers the **MAX_WRAPPED** defined under wrapper: in ``autosubmit_cxxx.yml`` - - If **MAX_WRAPPED** is not defined, then **TOTALJOBS** is used by default -- **MIN_WRAPPED** can be defined in ``autosubmit_cxxx.yml`` in order to limit the minimum number of jobs that a wrapper can contain - - If not defined, it considers that **MIN_WRAPPED** is 2. - - If **POLICY** is flexible and it is not possible to wrap **MIN_WRAPPED** or more tasks, these tasks will be submitted as individual jobs, as long as the condition is not satisfied. - - If **POLICY** is mixed and there are failed jobs inside a wrapper, these jobs will be submitted as individual jobs. - - If **POLICY** is strict and it is not possible to wrap **MIN_WRAPPED** or more tasks, these tasks will not be submitted until there are enough tasks to build a package. - - Strict and mixed policies can cause **deadlocks**. +.. code-block:: YAML + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" -Wrapper check time -~~~~~~~~~~~~~~~~~~ +Jobs_in_wrapper +~~~~~~~~~~~~~~~ -It is possible to override the **SAFETYSLEEPTIME** for the wrapper, by using **CHECK_TIME_WRAPPER** and defining a time interval (in seconds) in which the wrapper internal jobs should be checked. +The jobs_in_wrapper parameter allow the user to determine the tasks inside a wrapper by giving the job_section name. It can group multiple tasks by providing more than one job_section name. -.. important:: Note that the **numbers** shown in this documentation are examples. The actual values must be set according to the specific workflow, as well as the platform configurations. +.. code-block:: YAML -Vertical wrapper ----------------- + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + -The vertical wrapper is more appropriate when there are many sequential jobs. To use it, set TYPE: vertical: +Method +~~~~~~ -.. code-block:: yaml +The method parameter allow the user to determine if the wrapper will use machine files or threads. - wrappers: - wrapper: - TYPE: vertical +This allows to form a wrapper with that relies on machinefiles to work. -In order to be able to use the vertical wrapper, in ``platforms_cxxx.yml`` set the maximum wallclock allowed by the platform in use: +.. code-block:: YAML -.. code-block:: yaml + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + METHOD: ASTHREAD - marenostrum4: - ... - MAX_WALLCLOCK: 72:00 +or -Remember to add to each job the corresponding WALLCLOCK time. +.. code-block:: YAML -Vertical with multiple sections -------------------------------- + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" -This is a mode of the vertical wrapper that allows jobs of different types to be wrapped together. -Note that the solution considers the order of the sections defined in the ``jobs_cxxx.yml`` file, so the order of the sections given in **JOBS_IN_WRAPPER** is irrelevant. -Additionally, jobs are grouped within the corresponding date, member and chunk hierarchy. +This allows to form a wrapper with shared-memory paradigm instead of rely in machinefiles to work in parallel. -.. code-block:: yaml - wrappers: - wrapper: - TYPE: vertical - JOBS_IN_WRAPPER: SIM&SIM2 # REQUIRED +.. code-block:: YAML -.. figure:: fig/vertical-mixed.png - :name: vertical-mixed - :width: 100% - :align: center - :alt: vertical-mixed wrapper + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + METHOD: SRUN -Horizontal wrapper ------------------- +Extend_wallclock +~~~~~~~~~~~~~~~~ -The horizontal wrapper is more appropriate when there are multiple ensemble members that can be run in parallel. +The extend_wallclock parameter allow the users to provide extra headroom for the wrapper. The accepted value is an integer. Autosubmit will translate this value automatically to the max_wallclock of the sum of wrapper inner-tasks wallclock at the horizontal level. -If the wrapped jobs have an mpirun call, they will need machine files to specify in which nodes each job will run. -Different cases may need specific approaches when creating the machine files. For auto-ecearth use COMPONENTS instead of STANDARD. +.. code-block:: YAML -.. code-block:: yaml + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + extend_wallclock: 1 - wrappers: - wrapper: - TYPE: horizontal - JOBS_IN_WRAPPER: SIM +Retrials +~~~~~~~~ +The retrials parameter allows the users to enable or disable the wrapper's retrial mechanism. This value overrides the general tasks defined. +Vertical wrappers will retry the jobs without resubmitting the wrapper. -In order to be able to use the horizontal wrapper, in ``platforms_cxxx.yml`` set the maximum number of processors allowed by the platform in use: +.. code-block:: YAML -.. code-block:: yaml + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + RETRIALS: 2 - marenostrum4: - ... - MAX_PROCESSORS: 2400 +Queue +~~~~~~ -.. figure:: fig/horizontal_remote.png - :name: horizontal_remote - :width: 60% - :align: center - :alt: horizontally wrapped jobs +The queue parameter allows the users to define a different queue for the wrapper. This value overrides the platform queue and job queue. -Shared-memory Experiments -~~~~~~~~~~~~~~~~~~~~~~~~~ +.. code-block:: YAML -There is also the possibility of setting the option **METHOD** to SRUN in the wrapper directive (**ONLY** for vertical and vertical-horizontal wrappers). + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + QUEUE: BSC_ES -This allows to form a wrapper with shared-memory paradigm instead of rely in machinefiles to work in parallel. +Export +~~~~~~ -.. code-block:: yaml +The queue parameter allows the users to define a path to a script that will load environment scripts before running the wrapper tasks. This value overrides the job queue. - wrappers: - wrapper: +.. code-block:: YAML + + WRAPPERS: + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + EXPORT: %CURRENT_ROOTDIR%/envmodules.sh - TYPE: vertical - METHOD: srun # default ASTHREAD -Hybrid wrapper --------------- -The hybrid wrapper is a wrapper that works both vertically and horizontally at the same time, meaning that members and chunks can be wrapped in one single job. -Mixed approach using a combination of horizontal and vertical wrappers and the list of jobs is a list of lists. +Check_time_wrapper +~~~~~~~~~~~~~~~~~~ -Horizontal-vertical -------------------- +The CHECK_TIME_WRAPPER parameter defines the frequency, in seconds, on which Autosubmit will check the remote platform status of all the wrapper tasks. This affects all wrappers. -- There is a dependency between lists. Each list runs after the previous one finishes; the jobs within the list run in parallel at the same time -- It is particularly suitable if there are jobs of different types in the list with different wall clocks, but dependencies between jobs of different lists; it waits for all the jobs in the list to finish before starting the next list +.. code-block:: YAML + WRAPPERS: + CHECK_TIME_WRAPPER: 10 + WRAPPER_0: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + WRAPPER_1: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM1" -.. code-block:: yaml +Number of jobs in a wrapper({MIN/MAX}_WRAPPED{_H/_V} +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - wrappers: - wrapper: - TYPE: horizontal-vertical - MACHINEFILES: STANDARD - JOBS_IN_WRAPPER: SIM&DA - -.. figure:: fig/dasim.png - :name: wrapper_horizontal_vertical - :width: 100% + +Users can configure the maximum and the minimum number of jobs in each wrapper by configuring MAX_WRAPPED and MIN_WRAPPED inside the wrapper section. If the user doesn't set them, Autosubmit will default to MAX_WRAPPED: “infinite” and MIN_WRAPPED: 2. + +.. code-block:: YAML + + WRAPPERS: + MIN_WRAPPED: 2 + MAX_WRAPPED: 999999 + WRAPPER_0: + MAX_WRAPPED: 2 + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" + WRAPPER_1: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM1" + +For 2-dim wrappers, {MAX_MIN}_WRAPPED_{V/H} must be used instead of the general one. + +.. code-block:: YAML + + WRAPPERS: + MIN_WRAPPED: 2 + MAX_WRAPPED: 999999 + WRAPPER_0: + MAX_WRAPPED_H: 2 + MAX_WRAPPED_V: 4 + MIN_WRAPPED_H: 2 + MIN_WRAPPED_V: 2 + TYPE: "horizontal-vertical" + JOBS_IN_WRAPPER: "SIM SIM1" + +Policy +~~~~~~ + + +Autosubmit will wrap as many tasks as possible while respecting the limits set in the configuration(MAX_WRAPPED, MAX_WRAPPED_H, MAX_WRAPPED_V, MIN_WRAPPED, MIN_WRAPPED_V, and MIN_WRAPPED_H parameters). However, users have three different policies available to tune the behavior in situations where there aren’t enough tasks in general, or there are uncompleted tasks remaining from a failed wrapper job: + +* Flexible: if there aren’t at least MIN_WRAPPED tasks to be grouped, Autosubmit will submit them as individual jobs. +* Mixed: will wait for MIN_WRAPPED jobs to be available to create a wrapper, except if one of the wrapped tasks had failed beforehand. In this case, Autosubmit will submit them individually. +* Strict: will always wait for MIN_WRAPPED tasks to be ready to create a wrapper. + + +.. warning: Mixed and strict policies can cause deadlocks. + +.. code-block:: YAML + + WRAPPERS: + POLICY: "flexible" + WRAPPER_0: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM SIM1" + +.. _Vertical: + +Vertical wrapper +================ + +Vertical wrappers are suited for sequential dependent jobs (e.x. chunks of SIM tasks that depend on the previous chunk). Defining the platform’s `MAX_WALLCLOCK` is essential since the wrapper's total wallclock time will be the sum of each job and will be a limiting factor for the creation of the wrapper, which will not bundle more jobs than the ones fitting in the wallclock time. + +Autosubmit supports wrapping together vertically jobs of different types. + +.. code-block:: YAML + + WRAPPERS: + WRAPPER_V: + TYPE: "vertical" + JOBS_IN_WRAPPER: "SIM" + +.. figure:: fig/wrapper_v.png + :name: wrapper vertical :align: center - :alt: hybrid wrapper + :alt: wrapper vertical +.. _Horizontal: -Vertical-horizontal -------------------- +Horizontal wrapper +================== -- In this approach, each list is independent of each other and run in parallel; jobs within the list run one after the other -- It is particularly suitable for running many sequential ensembles +Horizontal wrappers are suited for jobs that must run parallel (e.x. members of SIM tasks). Defining the platform’s `MAX_PROCESSORS` is essential since the wrapper processor amount will be the sum of each job and will be a limiting factor for the creation of the wrapper, which will not bundle more jobs than the ones fitting in the `MAX_PROCESSORS` of the platform. +.. code-block:: YAML -.. code-block:: yaml + WRAPPERS: + WRAPPER_H: + TYPE: "horizontal" + JOBS_IN_WRAPPER: "SIM" - wrappers: - wrapper: - TYPE: vertical-horizontal - MACHINEFILES: STANDARD - JOBS_IN_WRAPPER: SIM - -.. figure:: fig/vertical-horizontal.png - :name: wrapper_vertical_horizontal - :width: 100% + +.. figure:: fig/wrapper_h.png + :name: wrapper horizontal :align: center - :alt: hybrid wrapper + :alt: wrapper horizontal -Multiple wrappers at once -------------------------- -This is an special mode that allows you to use multiple **independent** wrappers on the same experiment. By using an special variable that allows to define subwrapper sections +.. _Vertical-horizontal: -.. code-block:: yaml +Vertical-horizontal wrapper +=========================== - wrappers: - wrapper_0: - TYPE: vertical - JOBS_IN_WRAPPER: SIM +The vertical-horizontal wrapper allows bundling together a vertical sequence of tasks independent of the horizontal ones. Therefore, all horizontal tasks do not need to finish to progress to the next horizontal level. + +.. figure:: fig/wrapper_vh.png + :name: wrapper vertical-horizontal + :align: center + :alt: wrapper vertical-horizontal + + +.. _Horizontal-vertical: - wrapper_1: - TYPE: vertical - JOBS_IN_WRAPPER: DA&REDUCE +Horizontal-vertical wrapper +=========================== -.. figure:: fig/multiple_wrappers.png - :name: - :width: 100% +The horizontal-vertical wrapper allows bundling together tasks that could run simultaneously but need to communicate before progressing to the next horizontal level. + + +.. figure:: fig/wrapper_hv.png + :name: wrapper horizontal-vertical :align: center - :alt: multi wrapper + :alt: wrapper horizontal-vertical -Summary -------- -In `autosubmit_cxxx.yml`: -.. code-block:: YAML +Advanced example: Set-up an crossdate wrapper +--------------------------------------------- + +Considering the following configuration: + +.. code-block:: yaml + + experiment: + DATELIST: 20120101 20120201 + MEMBERS: "000 001" + CHUNKSIZEUNIT: day + CHUNKSIZE: '1' + NUMCHUNKS: '3' + + JOBS: + LOCAL_SETUP: + FILE: templates/local_setup.sh + PLATFORM: marenostrum_archive + RUNNING: once + NOTIFY_ON: COMPLETED + LOCAL_SEND_SOURCE: + FILE: templates/01_local_send_source.sh + PLATFORM: marenostrum_archive + DEPENDENCIES: LOCAL_SETUP + RUNNING: once + NOTIFY_ON: FAILED + LOCAL_SEND_STATIC: + FILE: templates/01b_local_send_static.sh + PLATFORM: marenostrum_archive + DEPENDENCIES: LOCAL_SETUP + RUNNING: once + NOTIFY_ON: FAILED + REMOTE_COMPILE: + FILE: templates/02_compile.sh + DEPENDENCIES: LOCAL_SEND_SOURCE + RUNNING: once + PROCESSORS: '4' + WALLCLOCK: 00:50 + NOTIFY_ON: COMPLETED + SIM: + FILE: templates/05b_sim.sh + DEPENDENCIES: + LOCAL_SEND_STATIC: + REMOTE_COMPILE: + SIM-1: + DA-1: + RUNNING: chunk + PROCESSORS: '68' + WALLCLOCK: 00:12 + NOTIFY_ON: FAILED + LOCAL_SEND_INITIAL_DA: + FILE: templates/00b_local_send_initial_DA.sh + PLATFORM: marenostrum_archive + DEPENDENCIES: LOCAL_SETUP LOCAL_SEND_INITIAL_DA-1 + RUNNING: chunk + SYNCHRONIZE: member + DELAY: '0' + COMPILE_DA: + FILE: templates/02b_compile_da.sh + DEPENDENCIES: LOCAL_SEND_SOURCE + RUNNING: once + WALLCLOCK: 00:20 + NOTIFY_ON: FAILED + DA: + FILE: templates/05c_da.sh + DEPENDENCIES: + SIM: + LOCAL_SEND_INITIAL_DA: + CHUNKS_TO: "all" + DATES_TO: "all" + MEMBERS_TO: "all" + COMPILE_DA: + DA: + DATES_FROM: + "20120201": + CHUNKS_FROM: + 1: + DATES_TO: "20120101" + CHUNKS_TO: "1" + RUNNING: chunk + SYNCHRONIZE: member + DELAY: '0' + WALLCLOCK: 00:12 + PROCESSORS: '256' + NOTIFY_ON: FAILED - # Basic Configuration of wrapper - #TYPE: {vertical,horizontal,horizontal-vertical,vertical-horizontal} # REQUIRED - # JOBS_IN_WRAPPER: Sections that should be wrapped together ex SIM - # METHOD: Select between MACHINESFILES or Shared-Memory. - # MIN_WRAPPED set the minim number of jobs that should be included in the wrapper. DEFAULT: 2 - # MAX_WRAPPED set the maxim number of jobs that should be included in the wrapper. DEFAULT: TOTALJOBS - # Policy: Select the behaviour of the inner jobs Strict/Flexible/Mixed - # EXTEND_WALLCLOCK: Allows to extend the wallclock by the max wallclock of the horizontal package (max inner job). Values are integer units (0,1,2) - # RETRIALS: Enables a retrial mechanism for vertical wrappers, or default retrial mechanism for the other wrappers - - wrapperS: - wrapper: - TYPE: Vertical #REQUIRED - JOBS_IN_WRAPPER: SIM # Job types (as defined in jobs_cxxx.yml) separated by space. REQUIRED only if vertical-mixed - MIN_WRAPPED: 2 - MAX_WRAPPED: 9999 # OPTIONAL. Integer value, overrides TOTALJOBS - CHECK_TIME_WRAPPER: # OPTIONAL. Time in seconds, overrides SAFETYSLEEPTIME - POLICY: flexible # OPTIONAL, Wrapper policy, mixed, flexible, strict - QUEUE: bsc_es # If not specified, queue will be the same of the first SECTION specified on JOBS_IN_WRAPPER - #EXPORT: Allows to run an env script or load some modules before running this wrapper. # If not specified, export value will be the same of the first SECTION specified on JOBS_IN_WRAPPER - -In `platforms_cxxx.yml`: .. code-block:: yaml - marenostrum4: - ... - MAX_WALLCLOCK: - MAX_PROCESSORS: - PROCESSORS_PER_NODE: 48 + wrappers: + wrapper_simda: + TYPE: "horizontal-vertical" + JOBS_IN_WRAPPER: "SIM DA" + +.. figure:: fig/monarch-da.png + :name: crossdate-example + :align: center + :alt: crossdate-example diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 495030c77d5c2350262d96bcaa44ce472aaa525b..b3eed1a6073e3f6f4ccc24e11c42174e4c6110bf 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -11,6 +11,7 @@ from autosubmit.job.job_list_persistence import JobListPersistenceDb from autosubmit.job.job_common import Status from random import randrange from collections import OrderedDict +import pytest class TestWrappers(TestCase):