diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 168edb2560c0add01c63a77236786019e4de1d7c..2e6e15d99dc091c5e98187a3fba59cf323b1dd31 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2473,7 +2473,7 @@ class JobList(object): continue if status in ["RUNNING", "FAILED"]: # check checkpoint if any - if job.platform.connected: # This will be true only when used under setstatus/run + if job.platform and job.platform.connected: # This will be true only when used under setstatus/run job.get_checkpoint_files() non_completed_parents_current = 0 completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 67e833e2719049822cd43220dc0ea68a96bc7ad5..45fa740def9a94266bdc172a926bc9cdc01654af 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -26,8 +26,7 @@ from operator import attrgetter from math import ceil import operator from typing import List -import copy - +from contextlib import suppress class JobPackager(object): @@ -42,44 +41,6 @@ class JobPackager(object): :type jobs_list: JobList object. """ - def calculate_job_limits(self,platform,job=None): - jobs_list = self._jobs_list - # Submitted + Queuing Jobs for specific Platform - queuing_jobs = jobs_list.get_queuing(platform) - # We now consider the running jobs count - running_jobs = jobs_list.get_running(platform) - running_by_id = dict() - for running_job in running_jobs: - running_by_id[running_job.id] = running_job - running_jobs_len = len(running_by_id.keys()) - - queued_by_id = dict() - for queued_job in queuing_jobs: - queued_by_id[queued_job.id] = queued_job - queuing_jobs_len = len(list(queued_by_id.keys())) - - submitted_jobs = jobs_list.get_submitted(platform) - submitted_by_id = dict() - for submitted_job in submitted_jobs: - submitted_by_id[submitted_job.id] = submitted_job - submitted_jobs_len = len(list(submitted_by_id.keys())) - - waiting_jobs = submitted_jobs_len + queuing_jobs_len - # Calculate available space in Platform Queue - if job is not None and job.max_waiting_jobs and platform.max_waiting_jobs and int(job.max_waiting_jobs) != int(platform.max_waiting_jobs): - self._max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(waiting_jobs) - else: - self._max_wait_jobs_to_submit = int(platform.max_waiting_jobs) - int(waiting_jobs) - # .total_jobs is defined in each section of platforms_.yml, if not from there, it comes form autosubmit_.yml - # .total_jobs Maximum number of jobs at the same time - if job is not None and job.total_jobs != platform.total_jobs: - self._max_jobs_to_submit = job.total_jobs - queuing_jobs_len - else: - self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len - # Subtracting running jobs - self._max_jobs_to_submit = self._max_jobs_to_submit - running_jobs_len - self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 - self.max_jobs = min(self._max_wait_jobs_to_submit,self._max_jobs_to_submit) def __init__(self, as_config, platform, jobs_list, hold=False): self.current_wrapper_section = "WRAPPERS" @@ -88,6 +49,8 @@ class JobPackager(object): self._jobs_list = jobs_list self._max_wait_jobs_to_submit = 9999999 self.hold = hold + self.max_jobs = None + self._max_jobs_to_submit = None # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = dict() self.wrapper_policy = dict() @@ -159,6 +122,54 @@ class JobPackager(object): highest_completed.append(job) for job in highest_completed: job.distance_weight = job.distance_weight - 1 + + def calculate_wrapper_bounds(self, section_list): + + """ + Returns the minimum and maximum number of jobs that can be wrapped + + :param section_list: List of sections to be wrapped + :type section_list: List of strings + :return: Minimum and Maximum number of jobs that can be wrapped + :rtype: Dictionary with keys: min, max, min_v, max_v, min_h, max_h, max_by_section + """ + wrapper_limits = {'min': 1, 'max': 9999999, 'min_v': 1, 'max_v': 9999999, 'min_h': 1, 'max_h': 9999999, 'max_by_section': dict()} + + # Calculate the min and max based in the wrapper_section wrappers: min_wrapped:2, max_wrapped: 2 { wrapper_section: {min_wrapped: 6, max_wrapped: 6} } + wrapper_data = self._as_config.experiment_data.get("WRAPPERS",{}) + current_wrapper_data = wrapper_data.get(self.current_wrapper_section,{}) + if len(self._jobs_list.jobs_to_run_first) == 0: + wrapper_limits['min'] = int(current_wrapper_data.get("MIN_WRAPPED", wrapper_data.get("MIN_WRAPPED", 2))) + wrapper_limits['max'] = int(current_wrapper_data.get("MAX_WRAPPED", wrapper_data.get("MAX_WRAPPED", 9999999))) + wrapper_limits['min_v'] = int(current_wrapper_data.get("MIN_WRAPPED_V", wrapper_data.get("MIN_WRAPPED_V", 1))) + wrapper_limits['max_v'] = int(current_wrapper_data.get("MAX_WRAPPED_V", wrapper_data.get("MAX_WRAPPED_V", 1))) + wrapper_limits['min_h'] = int(current_wrapper_data.get("MIN_WRAPPED_H", wrapper_data.get("MIN_WRAPPED_H", 1))) + wrapper_limits['max_h'] = int(current_wrapper_data.get("MAX_WRAPPED_H", wrapper_data.get("MAX_WRAPPED_H", 1))) + # Max and min calculations + if wrapper_limits['max'] < wrapper_limits['max_v'] * wrapper_limits['max_h']: + wrapper_limits['max'] = wrapper_limits['max_v'] * wrapper_limits['max_h'] + if wrapper_limits['min'] < wrapper_limits['min_v'] * wrapper_limits['min_h']: + wrapper_limits['min'] = max(wrapper_limits['min_v'],wrapper_limits['min_h']) + # if one dimensional wrapper or value is the default + if wrapper_limits['max_v'] == 1 or current_wrapper_data.get("TYPE", "") == "vertical": + wrapper_limits['max_v'] = wrapper_limits['max'] + + if wrapper_limits['max_h'] == 1 or current_wrapper_data.get("TYPE", "") == "horizontal": + wrapper_limits['max_h'] = wrapper_limits['max'] + + if wrapper_limits['min_v'] == 1 and current_wrapper_data.get("TYPE", "") == "vertical": + wrapper_limits['min_v'] = wrapper_limits['min'] + + if wrapper_limits['min_h'] == 1 and current_wrapper_data.get("TYPE", "") == "horizontal": + wrapper_limits['min_h'] = wrapper_limits['min'] + + + # Calculate the max by section by looking at jobs_data[section].max_wrapped + for section in section_list: + wrapper_limits['max_by_section'][section] = self._as_config.jobs_data.get(section,{}).get("MAX_WRAPPED",wrapper_limits['max']) + + return wrapper_limits + def _special_variables(self,job): special_variables = dict() if job.section not in self.special_variables: @@ -166,16 +177,241 @@ class JobPackager(object): if job.total_jobs != self._platform.total_jobs: special_variables[job.section]["TOTAL_JOBS"] = job self.special_variables.update(special_variables) - def build_packages(self): - # type: () -> List[JobPackageBase] + + def check_jobs_to_run_first(self, package): """ - Returns the list of the built packages to be submitted + Check if the jobs to run first are in the package + :param package: + :return: + """ + run_first = False + if len(self._jobs_list.jobs_to_run_first) > 0: + for job in package.jobs[:]: + job.wrapper_type = package.wrapper_type + if job in self._jobs_list.jobs_to_run_first: + job.packed = False + run_first = True + else: + job.packed = False + package.jobs.remove(job) + if self.wrapper_type[self.current_wrapper_section] not in ["horizontal", "vertical", "vertical-mixed"]: + for seq in range(0, len(package.jobs_lists)): + with suppress(ValueError): + package.jobs_lists[seq].remove(job) + return package, run_first + + def check_real_package_wrapper_limits(self,package): + balanced = True + if self.wrapper_type[self.current_wrapper_section] == 'vertical-horizontal': + i = 0 + min_h = len(package.jobs_lists) + min_v = len(package.jobs_lists[0]) + for list_of_jobs in package.jobs_lists[1:-1]: + min_v = min(min_v, len(list_of_jobs)) + for list_of_jobs in package.jobs_lists[:]: + i = i + 1 + if min_v != len(list_of_jobs) and i < len(package.jobs_lists): + balanced = False + elif self.wrapper_type[self.current_wrapper_section] == 'horizontal-vertical': + min_v = len(package.jobs_lists) + min_h = len(package.jobs_lists[0]) + i = 0 + for list_of_jobs in package.jobs_lists[1:-1]: + min_h = min(min_h, len(list_of_jobs)) + for list_of_jobs in package.jobs_lists[:]: + i = i + 1 + if min_h != len(list_of_jobs) and i < len(package.jobs_lists): + balanced = False + elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': + min_h = len(package.jobs) + min_v = 1 + elif self.wrapper_type[self.current_wrapper_section] == 'vertical': + min_v = len(package.jobs) + min_h = 1 + else: + min_v = len(package.jobs) + min_h = len(package.jobs) + return min_v, min_h, balanced - :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical'. - :rtype: List() of JobPackageVertical + def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits): + """ + Check if the packages respect the wrapper policy and act in base of it ( submit wrapper, submit sequential, wait for more jobs to form a wrapper) + :param built_packages_tmp: List of packages to be submitted + :param packages_to_submit: List of packages to be submitted + :param max_jobs_to_submit: Maximum number of jobs to submit + :param wrapper_limits: Dictionary with keys: min, max, min_v, max_v, min_h, max_h, max_by_section + :return: packages_to_submit, max_jobs_to_submit + :rtype: List of packages to be submitted, int + :return: packages_to_submit, max_jobs_to_submit + """ + for p in built_packages_tmp: + if max_jobs_to_submit == 0: + break + infinite_deadlock = False # This will raise an autosubmit critical if true, infinite deadlock is when there are no more non-wrapped jobs in waiting or ready status + failed_innerjobs = False + # Check if the user is using the option to run first some jobs. if so, remove non-first jobs from the package and submit them sequentially following a flexible policy + if len(self._jobs_list.jobs_to_run_first) > 0: + p,run_first = self.check_jobs_to_run_first(p) + if run_first: + for job in p.jobs: + if max_jobs_to_submit == 0: + break + job.packed = False + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 + continue + for job in p.jobs: + if job.fail_count > 0: + failed_innerjobs = True + break + min_v, min_h, balanced = self.check_real_package_wrapper_limits(p) + # if the quantity is enough, make the wrapper + if len(p.jobs) >= wrapper_limits["min"] and min_v >= wrapper_limits["min_v"] and min_h >= wrapper_limits["min_h"] and not failed_innerjobs: + for job in p.jobs: + job.packed = True + packages_to_submit.append(p) + max_jobs_to_submit = max_jobs_to_submit - 1 + else: # Check if there is a deadlock or an infinite deadlock. Once checked, act in base of the wrapper policy. + deadlock = True + if deadlock: # Remaining jobs if chunk is the last one + for job in p.jobs: + if (job.running == "chunk" and job.chunk == int( + job.parameters["EXPERIMENT.NUMCHUNKS"])) and balanced: + deadlock = False + break + if not deadlock: # Submit package if deadlock has been liberated + for job in p.jobs: + job.packed = True + packages_to_submit.append(p) + max_jobs_to_submit = max_jobs_to_submit - 1 + else: + wallclock_sum = p.jobs[0].wallclock + for seq in range(1, min_v): + wallclock_sum = sum_str_hours(wallclock_sum, p.jobs[0].wallclock) + next_wrappable_jobs = self._jobs_list.get_jobs_by_section(self.jobs_in_wrapper[self.current_wrapper_section]) + next_wrappable_jobs = [job for job in next_wrappable_jobs if + job.status == Status.WAITING and job not in p.jobs] # Get only waiting jobs + active_jobs = list() + aux_active_jobs = list() + for job in next_wrappable_jobs: # Prone tree by looking only the closest children + direct_children = False + for related in job.parents: + if related in p.jobs: + direct_children = True + break + if direct_children: # Get parent of direct children that aren't in wrapper + aux_active_jobs += [aux_parent for aux_parent in job.parents if ( + aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED) and ( + aux_parent.section not in self.jobs_in_wrapper[ + self.current_wrapper_section] or ( + aux_parent.section in self.jobs_in_wrapper[ + self.current_wrapper_section] and aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED and aux_parent.status != Status.WAITING and aux_parent.status != Status.READY))] + aux_active_jobs = list(set(aux_active_jobs)) + track = [] # Tracker to prone tree for avoid the checking of the same parent from different nodes. + active_jobs_names = [job.name for job in + p.jobs] # We want to search if the actual wrapped jobs needs to run for add more jobs to this wrapper + hard_deadlock = False + for job in aux_active_jobs: + parents_to_check = [] + if job.status == Status.WAITING: # We only want to check uncompleted parents + aux_job = job + for parent in aux_job.parents: # First case + if parent.name in active_jobs_names: + hard_deadlock = True + infinite_deadlock = True + break + if (parent.status == Status.WAITING) and parent.name != aux_job.name: + parents_to_check.append(parent) + track.extend(parents_to_check) + while len( + parents_to_check) > 0 and not infinite_deadlock: # We want to look deeper on the tree until all jobs are completed, or we find an unresolvable deadlock. + aux_job = parents_to_check.pop(0) + for parent in aux_job.parents: + if parent.name in active_jobs_names: + hard_deadlock = True + infinite_deadlock = True + break + if ( + parent.status == Status.WAITING) and parent.name != aux_job.name and parent not in track: + parents_to_check.append(parent) + track.extend(parents_to_check) + if not infinite_deadlock: + active_jobs.append(job) # List of jobs that can continue to run without run this wrapper + + # Act in base of active_jobs and Policies + if self.wrapper_policy[self.current_wrapper_section] == "strict": + for job in p.jobs: + job.packed = False + if len(active_jobs) > 0: + Log.printlog(f'Wrapper policy is set to STRICT and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} <= defined_min:{wrapper_limits["min"]}] [wrappeable_h:{min_h} <= defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} <= defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.\nIf all values are <=, some innerjob has failed under strict policy', 6013) + else: + if len(self._jobs_list.get_in_queue()) == 0: + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) + elif self.wrapper_policy[self.current_wrapper_section] == "mixed": + error = True + show_log = True + for job in p.jobs: + if max_jobs_to_submit == 0: + break + if job.fail_count > 0 and job.status == Status.READY: + job.packed = False + Log.printlog( + "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") + error = False + show_log = False + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 + if error: + if len(active_jobs) > 0: + if show_log: + Log.printlog(f'Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} < defined_min:{wrapper_limits["min"]}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.', 6013) + else: + if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) + else: + Log.info( + "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") + for job in p.jobs: + if max_jobs_to_submit == 0: + break + job.packed = False + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 + return packages_to_submit, max_jobs_to_submit + + def error_message_policy(self,min_h,min_v,wrapper_limits,hard_deadlock,wallclock_sum,balanced): + message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{wrapper_limits['min']}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " + if hard_deadlock: + message += "\nCheck your configuration: The next wrappable job can't be wrapped until some of inner jobs of current packages finishes which is impossible" + if min_v > 1: + message += f"\nCheck your configuration: Check if current {wallclock_sum} vertical wallclock has reached the max defined on platforms.conf." + else: + message += "\nCheck your configuration: Only jobs_in_wrappers are active, check your jobs_in_wrapper dependencies." + if not balanced: + message += "\nPackages are not well balanced! (This is not the main cause of the Critical error)" + return message + + def check_if_packages_are_ready_to_build(self): + """ + Check if the packages are ready to be built + :return: List of jobs ready to be built, boolean indicating if packages can't be built for other reasons ( max_total_jobs...) """ - packages_to_submit = list() - # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty jobs_ready = list() if len(self._jobs_list.jobs_to_run_first) > 0: jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if @@ -214,7 +450,7 @@ class JobPackager(object): pass if len(jobs_ready) == 0: # If there are no jobs ready, result is tuple of empty - return packages_to_submit + return jobs_ready,False #check if there are jobs listed on calculate_job_limits for job in jobs_ready: self._special_variables(job) @@ -226,7 +462,7 @@ class JobPackager(object): # If there is no more space in platform, result is tuple of empty Log.debug("No more space in platform {0} for jobs {1}".format(self._platform.name, [job.name for job in jobs_ready])) - return packages_to_submit + return jobs_ready,False self.calculate_job_limits(self._platform) else: @@ -234,319 +470,116 @@ class JobPackager(object): if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): # If there is no more space in platform, result is tuple of empty Log.debug("No more space in platform {0} for jobs {1}".format(self._platform.name, [job.name for job in jobs_ready])) - return packages_to_submit - - - # Sort by 6 first digits of date - available_sorted = sorted( - jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) - # Sort by Priority, the highest first - list_of_available = sorted( - available_sorted, key=lambda k: k.priority, reverse=True) - num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) - # Take the first num_jobs_to_submit from the list of available - jobs_to_submit_tmp = list_of_available[0:num_jobs_to_submit] - #jobs_to_submit = [ - # fresh_job for fresh_job in jobs_to_submit_tmp if fresh_job.fail_count == 0] - jobs_to_submit = [fresh_job for fresh_job in jobs_to_submit_tmp] - failed_wrapped_jobs = [failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] - for job in failed_wrapped_jobs: + return jobs_ready,False + return jobs_ready,True + + def calculate_job_limits(self,platform,job=None): + jobs_list = self._jobs_list + # Submitted + Queuing Jobs for specific Platform + queuing_jobs = jobs_list.get_queuing(platform) + # We now consider the running jobs count + running_jobs = jobs_list.get_running(platform) + running_by_id = dict() + for running_job in running_jobs: + running_by_id[running_job.id] = running_job + running_jobs_len = len(running_by_id.keys()) + + queued_by_id = dict() + for queued_job in queuing_jobs: + queued_by_id[queued_job.id] = queued_job + queuing_jobs_len = len(list(queued_by_id.keys())) + + submitted_jobs = jobs_list.get_submitted(platform) + submitted_by_id = dict() + for submitted_job in submitted_jobs: + submitted_by_id[submitted_job.id] = submitted_job + submitted_jobs_len = len(list(submitted_by_id.keys())) + + waiting_jobs = submitted_jobs_len + queuing_jobs_len + # Calculate available space in Platform Queue + if job is not None and job.max_waiting_jobs and platform.max_waiting_jobs and int(job.max_waiting_jobs) != int(platform.max_waiting_jobs): + self._max_wait_jobs_to_submit = int(job.max_waiting_jobs) - int(waiting_jobs) + else: + self._max_wait_jobs_to_submit = int(platform.max_waiting_jobs) - int(waiting_jobs) + # .total_jobs is defined in each section of platforms_.yml, if not from there, it comes form autosubmit_.yml + # .total_jobs Maximum number of jobs at the same time + if job is not None and job.total_jobs != platform.total_jobs: + self._max_jobs_to_submit = job.total_jobs - queuing_jobs_len + else: + self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len + # Subtracting running jobs + self._max_jobs_to_submit = self._max_jobs_to_submit - running_jobs_len + self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 + self.max_jobs = min(self._max_wait_jobs_to_submit,self._max_jobs_to_submit) + + def build_packages(self): + # type: () -> List[JobPackageBase] + """ + Returns the list of the built packages to be submitted + + :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical'. + :rtype: List() of JobPackageVertical + """ + packages_to_submit = list() + jobs_ready,ready = self.check_if_packages_are_ready_to_build() + if not ready: + return [] + max_jobs_to_submit = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) + jobs_to_submit = sorted( + jobs_ready, key=lambda k: k.priority, reverse=True) + for job in [failed_job for failed_job in jobs_to_submit if failed_job.fail_count > 0]: job.packed = False - jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) - # create wrapped package jobs Wrapper building starts here - for wrapper_name,section_jobs in jobs_to_submit_by_section.items(): + jobs_to_wrap = self._divide_list_by_section(jobs_to_submit) + non_wrapped_jobs = jobs_to_wrap.pop("SIMPLE",[]) + + # Prepare packages for wrapped jobs + for wrapper_name, jobs in jobs_to_wrap.items(): + if max_jobs_to_submit == 0: + break self.current_wrapper_section = wrapper_name - for section,jobs in section_jobs.items(): - if len(jobs) > 0: - if self.current_wrapper_section != "SIMPLE" and not self._platform.allow_wrappers: - Log.warning("Platform {0} does not allow wrappers, submitting jobs individually".format(self._platform.name)) - if wrapper_name != "SIMPLE" and self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical'] : - # Trying to find the value in jobs_parser, if not, default to an autosubmit_.yml value (Looks first in [wrapper] section) - wrapper_limits = dict() - wrapper_limits["max_by_section"] = dict() - wrapper_limits["max"] = int(self._as_config.get_max_wrapped_jobs(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section])) - wrapper_limits["max_v"] = int(self._as_config.get_max_wrapped_jobs_vertical(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section])) - wrapper_limits["max_h"] = int(self._as_config.get_max_wrapped_jobs_horizontal(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section])) - if wrapper_limits["max"] < wrapper_limits["max_v"] * wrapper_limits["max_h"]: - wrapper_limits["max"] = wrapper_limits["max_v"] * wrapper_limits["max_h"] - if wrapper_limits["max_v"] == -1: - wrapper_limits["max_v"] = wrapper_limits["max"] - if wrapper_limits["max_h"] == -1: - wrapper_limits["max_h"] = wrapper_limits["max"] - if '&' not in section: - dependencies_keys = self._as_config.jobs_data[section].get('DEPENDENCIES', "") - wrapper_limits["max_by_section"][section] = wrapper_limits["max"] - wrapper_limits["min"] = min(self._as_config.jobs_data[section].get( - "MIN_WRAPPED", 99999999), 0) - else: - multiple_sections = section.split('&') - dependencies_keys = [] - min_value = int(self._as_config.get_min_wrapped_jobs(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section])) - for sectionN in multiple_sections: - if self._as_config.jobs_data[sectionN].get('DEPENDENCIES',"") != "": - dependencies_keys += self._as_config.jobs_data.get("DEPENDENCIES", "").split() - if self._as_config.jobs_data[sectionN].get('MAX_WRAPPED',None) is not None and len(str(self._as_config.jobs_data[sectionN].get('MAX_WRAPPED',None))) > 0: - wrapper_limits["max_by_section"][sectionN] = int(self._as_config.jobs_data[sectionN].get("MAX_WRAPPED")) - else: - wrapper_limits["max_by_section"][sectionN] = wrapper_limits["max"] - wrapper_limits["min"] = min(self._as_config.jobs_data[sectionN].get("MIN_WRAPPED",min_value),min_value) - hard_limit_wrapper = wrapper_limits["max"] - wrapper_limits["min"] = min(wrapper_limits["min"], hard_limit_wrapper) - wrapper_limits["min_v"] = self._as_config.get_min_wrapped_jobs_vertical(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section]) - wrapper_limits["min_h"] = self._as_config.get_min_wrapped_jobs_horizontal(self._as_config.experiment_data["WRAPPERS"][self.current_wrapper_section]) - wrapper_limits["max"] = hard_limit_wrapper - if wrapper_limits["min"] < wrapper_limits["min_v"] * wrapper_limits["min_h"]: - wrapper_limits["min"] = max(wrapper_limits["min_v"],wrapper_limits["min_h"]) - if len(self._jobs_list.jobs_to_run_first) > 0: - wrapper_limits["min"] = 2 - current_info = list() - for param in self.wrapper_info: - current_info.append(param[self.current_wrapper_section]) - if self.wrapper_type[self.current_wrapper_section] == 'vertical': - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) - elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) - elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp = list() - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) - else: - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) - - for p in built_packages_tmp: - infinite_deadlock = False # This will raise an autosubmit critical if true - failed_innerjobs = False - job_has_to_run_first = False - aux_jobs = [] - # Check failed jobs first - for job in p.jobs: - job.wrapper_type = p.wrapper_type - if len(self._jobs_list.jobs_to_run_first) > 0: - if job not in self._jobs_list.jobs_to_run_first: - job.packed = False - aux_jobs.append(job) - if job.fail_count > 0: - failed_innerjobs = True - if len(self._jobs_list.jobs_to_run_first) > 0: - job_has_to_run_first = True - for job in aux_jobs: - job.packed = False - p.jobs.remove(job) - if self.wrapper_type[self.current_wrapper_section] != "horizontal" and self.wrapper_type[self.current_wrapper_section] != "vertical" and self.wrapper_type[self.current_wrapper_section] != "vertical-mixed": - for seq in range(0,len(p.jobs_lists)): - try: - p.jobs_lists[seq].remove(job) - except Exception as e: - pass - if self.wrapper_type[self.current_wrapper_section] != "horizontal" and self.wrapper_type[self.current_wrapper_section] != "vertical" and self.wrapper_type[self.current_wrapper_section] != "vertical-mixed": - aux = p.jobs_lists - p.jobs_lists = [] - for seq in range(0,len(aux)): - if len(aux[seq]) > 0: - p.jobs_lists.append(aux[seq]) - if len(p.jobs) > 0: - balanced = True - if self.wrapper_type[self.current_wrapper_section] == 'vertical-horizontal': - min_h = len(p.jobs_lists) - min_v = len(p.jobs_lists[0]) - for list_of_jobs in p.jobs_lists[1:-1]: - min_v = min(min_v, len(list_of_jobs)) - - elif self.wrapper_type[self.current_wrapper_section] == 'horizontal-vertical': - min_v = len(p.jobs_lists) - min_h = len(p.jobs_lists[0]) - i = 0 - for list_of_jobs in p.jobs_lists[1:-1]: - min_h = min(min_h, len(list_of_jobs)) - for list_of_jobs in p.jobs_lists[:]: - i = i+1 - if min_h != len(list_of_jobs) and i < len(p.jobs_lists): - balanced = False - elif min_h != len(list_of_jobs) and i == len(p.jobs_lists): - if balanced: - for job in list_of_jobs: - job.packed = False - p.jobs.remove(job) - package = JobPackageSimple([job]) - packages_to_submit.append(package) - p.jobs_lists = p.jobs_lists[:-1] - - - - elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - min_h = len(p.jobs) - min_v = 1 - elif self.wrapper_type[self.current_wrapper_section] == 'vertical': - min_v = len(p.jobs) - min_h = 1 - else: - min_v = len(p.jobs) - min_h = len(p.jobs) - # if the quantity is enough, make the wrapper - - if (len(p.jobs) >= wrapper_limits["min"] and min_v >= wrapper_limits["min_v"] and min_h >= wrapper_limits["min_h"] and (not failed_innerjobs or self.wrapper_policy[self.current_wrapper_section] not in ["mixed","strict"] ) ) or job_has_to_run_first: - for job in p.jobs: - job.packed = True - packages_to_submit.append(p) - else: - deadlock = True - if deadlock: # Remaining jobs if chunk is the last one - for job in p.jobs: - if ( job.running == "chunk" and job.chunk == int(job.parameters["EXPERIMENT.NUMCHUNKS"]) ) and balanced: - deadlock = False - break - if not deadlock: # Submit package if deadlock has been liberated - for job in p.jobs: - job.packed = True - packages_to_submit.append(p) - else: - wallclock_sum = p.jobs[0].wallclock - for seq in range(1, min_v): - wallclock_sum = sum_str_hours(wallclock_sum, p.jobs[0].wallclock) - next_wrappable_jobs = self._jobs_list.get_jobs_by_section(self.jobs_in_wrapper[self.current_wrapper_section]) - next_wrappable_jobs = [job for job in next_wrappable_jobs if job.status == Status.WAITING and job not in p.jobs ] # Get only waiting jobs - active_jobs = list() - aux_active_jobs = list() - for job in next_wrappable_jobs: # Prone tree by looking only the closest children - direct_children = False - for related in job.parents: - if related in p.jobs: - direct_children = True - break - if direct_children: # Get parent of direct children that aren't in wrapper - aux_active_jobs += [aux_parent for aux_parent in job.parents if ( aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED) and ( aux_parent.section not in self.jobs_in_wrapper[self.current_wrapper_section] or ( aux_parent.section in self.jobs_in_wrapper[self.current_wrapper_section] and aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED and aux_parent.status != Status.WAITING and aux_parent.status != Status.READY ) ) ] - aux_active_jobs = list(set(aux_active_jobs)) - track = [] # Tracker to prone tree for avoid the checking of the same parent from different nodes. - active_jobs_names = [ job.name for job in p.jobs ] # We want to search if the actual wrapped jobs needs to run for add more jobs to this wrapper - hard_deadlock = False - for job in aux_active_jobs: - parents_to_check = [] - if job.status == Status.WAITING: # We only want to check uncompleted parents - aux_job = job - for parent in aux_job.parents: # First case - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if (parent.status == Status.WAITING ) and parent.name != aux_job.name: - parents_to_check.append(parent) - track.extend(parents_to_check) - while len(parents_to_check) > 0 and not infinite_deadlock: # We want to look deeper on the tree until all jobs are completed, or we find an unresolvable deadlock. - aux_job = parents_to_check.pop(0) - for parent in aux_job.parents: - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if (parent.status == Status.WAITING ) and parent.name != aux_job.name and parent not in track: - parents_to_check.append(parent) - track.extend(parents_to_check) - if not infinite_deadlock: - active_jobs.append(job) # List of jobs that can continue to run without run this wrapper - # Act in base of active_jobs and Policies - if self.wrapper_policy[self.current_wrapper_section] == "strict": - error = True - for job in p.jobs: - job.packed = False - if job in self._jobs_list.jobs_to_run_first: - error = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - if error: - if len(active_jobs) > 0: - Log.printlog( - "Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappable:{4} <= defined_min:{5}] [wrappeable_h:{0} <= defined_min_h:{1}]|[wrappeable_v:{2} <= defined_min_v:{3}] waiting until the wrapper can be formed.\nIf all values are <=, some innerjob has failed under strict policy".format( - min_h, wrapper_limits["min_h"], min_v, - wrapper_limits["min_v"], wrapper_limits["min"], len(active_jobs)), - 6013) - else: - message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappable:{4} < defined_min:{5}] [wrappable_h:{1} < defined_min_h:{2}]|[wrappeable_v:{3} < defined_min_v:{4}] ".format( - self.wrapper_policy[self.current_wrapper_section], min_h, - wrapper_limits["min_h"], min_v, wrapper_limits["min_v"], - wrapper_limits["min"], len(active_jobs)) - if hard_deadlock: - message += "\nCheck your configuration: The next wrappable job can't be wrapped until some of inner jobs of current packages finishes which is imposible" - if min_v > 1: - message += "\nCheck your configuration: Check if current {0} vertical wallclock has reached the max defined on platforms.conf.".format(wallclock_sum) - else: - message += "\nCheck your configuration: Only jobs_in_wrappers are active, check their dependencies." - if not balanced: - message += "\nPackages are not well balanced: Check your dependencies(This is not the main cause of the Critical error)" - if len(self._jobs_list.get_in_queue()) == 0: - raise AutosubmitCritical(message, 7014) - elif self.wrapper_policy[self.current_wrapper_section] == "mixed": - error = True - show_log = True - for job in p.jobs: - if job in self._jobs_list.jobs_to_run_first: - job.packed = False - error = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - if job.fail_count > 0 and job.status == Status.READY: - job.packed = False - Log.printlog( - "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") - error = False - show_log = False - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - if error: - if len(active_jobs) > 0: - if show_log: - Log.printlog( - "Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappable:{4} < defined_min:{5}] [wrappable_h:{0} < defined_min_h:{1}]|[wrappeable_v:{2} < defined_min_v:{3}] waiting until the wrapper can be formed.".format( - min_h, wrapper_limits["min_h"], min_v, - wrapper_limits["min_v"],wrapper_limits["min"],len(active_jobs)), 6013) - else: - message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappable:{4} < defined_min:{5}] [wrappable_h:{1} < defined_min_h:{2}]|[wrappeable_v:{3} < defined_min_v:{4}] ".format( - self.wrapper_policy[self.current_wrapper_section], min_h, - wrapper_limits["min_h"], min_v, wrapper_limits["min_v"],wrapper_limits["min"],len(active_jobs)) - if hard_deadlock: - message += "\nCheck your configuration: The next wrappable job can't be wrapped until some of inner jobs of current packages finishes which is impossible" - if min_v > 1: - message += "\nCheck your configuration: Check if current {0} vertical wallclock has reached the max defined on platforms.conf.".format( - wallclock_sum) - else: - message += "\nCheck your configuration: Only jobs_in_wrappers are active, check your jobs_in_wrapper dependencies." - if not balanced: - message += "\nPackages are not well balanced! (This is not the main cause of the Critical error)" - - if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution - raise AutosubmitCritical(message, 7014) - else: - for job in p.jobs: - job.packed = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - Log.info("Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") - else: - for job in jobs: - job.packed = False - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped([job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) + section = self._as_config.experiment_data.get("WRAPPERS",{}).get(self.current_wrapper_section,{}).get("JOBS_IN_WRAPPER", "") + if not self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical']: + Log.warning( + "Platform {0} does not allow wrappers, submitting jobs individually".format(self._platform.name)) + for job in jobs: + non_wrapped_jobs.append(job) + continue + if "&" in section: + section_list = section.split("&") + elif "," in section: + section_list = section.split(",") + else: + section_list = section.split(" ") + wrapper_limits = self.calculate_wrapper_bounds(section_list) + current_info = list() + built_packages_tmp = list() + for param in self.wrapper_info: + current_info.append(param[self.current_wrapper_section]) + if self.wrapper_type[self.current_wrapper_section] == 'vertical': + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) + elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': + if len(jobs) >= wrapper_limits["min_h"]: + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: + if len(jobs) >= wrapper_limits["min_h"]: + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + else: + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) + packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits) + # Now, prepare the packages for non-wrapper jobs + for job in non_wrapped_jobs: + if max_jobs_to_submit == 0: + break + if len(self._jobs_list.jobs_to_run_first) > 0: # if user wants to run first some jobs, submit them first + if job not in self._jobs_list.jobs_to_run_first: + continue + job.packed = False + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) for package in packages_to_submit: self.max_jobs = self.max_jobs - 1 @@ -572,21 +605,20 @@ class JobPackager(object): section_name += section+"&" section_name = section_name[:-1] sections_split[wrapper_name] = section_name - jobs_by_section[wrapper_name] = dict() - jobs_by_section[wrapper_name][section_name] = list() + jobs_by_section[wrapper_name] = list() - jobs_by_section["SIMPLE"] = collections.defaultdict(list) - remaining_jobs = copy.copy(jobs_list) + jobs_by_section["SIMPLE"] = [] for wrapper_name,section_name in sections_split.items(): - for job in jobs_list: + for job in jobs_list[:]: if job.section.upper() in section_name.split("&"): - jobs_by_section[wrapper_name][section_name].append(job) - try: - remaining_jobs.remove(job) - except ValueError: - pass - for job in remaining_jobs: - jobs_by_section["SIMPLE"][job.section].append(job) + jobs_by_section[wrapper_name].append(job) + jobs_list.remove(job) + # jobs not in wrapper + for job in (job for job in jobs_list): + jobs_by_section["SIMPLE"].append(job) + for wrappers in list(jobs_by_section.keys()): + if len(jobs_by_section[wrappers]) == 0: + del jobs_by_section[wrappers] return jobs_by_section diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 052b87fecd4712528e739cb27000b072338a6e42..c005020b87149a6862fff5447a2315d7c440b2ae 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -1,10 +1,15 @@ +from bscearth.utils.date import sum_str_hours +from operator import attrgetter + import shutil import tempfile from unittest import TestCase from mock import MagicMock + +import log.log from autosubmit.job.job_packager import JobPackager -from autosubmit.job.job_packages import JobPackageVertical +from autosubmit.job.job_packages import JobPackageVertical, JobPackageHorizontal, JobPackageHorizontalVertical , JobPackageVerticalHorizontal, JobPackageSimple from autosubmit.job.job import Job from autosubmit.job.job_list import JobList from autosubmit.job.job_dict import DicJobs @@ -1418,6 +1423,407 @@ class TestWrappers(TestCase): self.assertDictEqual(self.job_list._create_sorted_dict_jobs( "s2 s3 s5"), ordered_jobs_by_date_member) + def test_check_real_package_wrapper_limits(self): + # want to test self.job_packager.check_real_package_wrapper_limits(package,max_jobs_to_submit,packages_to_submit) + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + for section,s_value in self.workflows['basic']['sections'].items(): + self.as_conf.jobs_data[section] = s_value + self._createDummyJobs( + self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name( + 'expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name( + 'expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + self.job_packager.jobs_in_wrapper = wrapper_expression + + self.job_packager.retrials = 0 + # test vertical-wrapper + self.job_packager.wrapper_type["WRAPPER_V"] = 'vertical' + self.job_packager.current_wrapper_section = "WRAPPER_V" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "vertical" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3] + package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3] + + packages_v = [JobPackageVertical( + package_m1_s2_s3, configuration=self.as_conf), + JobPackageVertical(package_m2_s2_s3, configuration=self.as_conf)] + + for package in packages_v: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertTrue(balanced) + self.assertEqual(min_v, 4) + self.assertEqual(min_h, 1) + # test horizontal-wrapper + + self.job_packager.wrapper_type["WRAPPER_H"] = 'horizontal' + self.job_packager.current_wrapper_section = "WRAPPER_H" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "horizontal" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + packages_h = [JobPackageHorizontal( + package_m1_s2_s3, configuration=self.as_conf), + JobPackageHorizontal(package_m2_s2_s3, configuration=self.as_conf)] + for package in packages_h: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertTrue(balanced) + self.assertEqual(min_v, 1) + self.assertEqual(min_h, 4) + # test horizontal-vertical + self.job_packager.wrapper_type["WRAPPER_HV"] = 'horizontal-vertical' + self.job_packager.current_wrapper_section = "WRAPPER_HV" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "horizontal-vertical" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + jobs_resources = dict() + #### + total_wallclock = '00:00' + self._current_processors = 0 + current_package = [package_m1_s2_s3,package_m2_s2_s3] + max_procs = 99999 + #### + packages_hv = [JobPackageHorizontalVertical(current_package, max_procs, total_wallclock,jobs_resources=jobs_resources, configuration=self.as_conf, wrapper_section=self.job_packager.current_wrapper_section)] + + for package in packages_hv: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertTrue(balanced) + self.assertEqual(min_v, 2) + self.assertEqual(min_h, 4) + # unbalanced package + unbalanced_package = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2] + current_package = [package_m1_s2_s3,unbalanced_package,package_m2_s2_s3] + packages_hv_unbalanced = [JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self.as_conf, wrapper_section=self.job_packager.current_wrapper_section)] + for package in packages_hv_unbalanced: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertFalse(balanced) + self.assertEqual(min_v, 3) + self.assertEqual(min_h, 3) + # test vertical-horizontal + self.job_packager.wrapper_type["WRAPPER_VH"] = 'vertical-horizontal' + self.job_packager.current_wrapper_section = "WRAPPER_VH" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "vertical-horizontal" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + current_package = [package_m1_s2_s3,package_m2_s2_s3] + packages_vh = [JobPackageVerticalHorizontal( + current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self.as_conf, wrapper_section=self.job_packager.current_wrapper_section)] + for package in packages_vh: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertTrue(balanced) + self.assertEqual(min_v, 4) + self.assertEqual(min_h, 2) + current_package = [package_m1_s2_s3,unbalanced_package,package_m2_s2_s3] + packages_vh_unbalanced = [JobPackageVerticalHorizontal( + current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self.as_conf, wrapper_section=self.job_packager.current_wrapper_section)] + for package in packages_vh_unbalanced: + min_v, min_h, balanced = self.job_packager.check_real_package_wrapper_limits(package) + self.assertFalse(balanced) + self.assertEqual(min_v, 3) + self.assertEqual(min_h, 3) + + + def test_check_jobs_to_run_first(self): + # want to test self.job_packager.check_jobs_to_run_first(package) + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + for section, s_value in self.workflows['basic']['sections'].items(): + self.as_conf.jobs_data[section] = s_value + self._createDummyJobs( + self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name( + 'expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name( + 'expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, + d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, + d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, + d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, + d1_m2_4_s3] + + self.job_packager.jobs_in_wrapper = wrapper_expression + + self.job_packager.retrials = 0 + # test vertical-wrapper + self.job_packager.wrapper_type["WRAPPER_V"] = 'vertical' + self.job_packager.current_wrapper_section = "WRAPPER_V" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "vertical" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3] + + packages_v = [JobPackageVertical(package_m1_s2_s3, configuration=self.as_conf)] + self.job_packager._jobs_list.jobs_to_run_first = [] + for p in packages_v: + p2, run_first = self.job_packager.check_jobs_to_run_first(p) + self.assertEqual(p2.jobs, p.jobs) + self.assertEqual(run_first, False) + self.job_packager._jobs_list.jobs_to_run_first = [d1_m1_1_s2, d1_m1_1_s3] + for p in packages_v: + p2, run_first = self.job_packager.check_jobs_to_run_first(p) + self.assertEqual(p2.jobs, [d1_m1_1_s2, d1_m1_1_s3]) + self.assertEqual(run_first, True) + + def test_calculate_wrapper_bounds(self): + # want to test self.job_packager.calculate_wrapper_bounds(section_list) + self.job_packager.current_wrapper_section = "WRAPPER" + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "vertical" + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + section_list = ["S2", "S3"] + # default wrapper limits + wrapper_limits = {'max': 9999999, + 'max_by_section': {'S2': 9999999, 'S3': 9999999}, + 'max_h': 9999999, + 'max_v': 9999999, + 'min': 2, + 'min_h': 1, + 'min_v': 2 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED"] = 3 + self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED"] = 5 + self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED_H"] = 2 + self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED_V"] = 3 + self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED_H"] = 4 + self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED_V"] = 5 + + wrapper_limits = {'max': 5*4, + 'max_by_section': {'S2': 5*4, 'S3': 5*4}, + 'max_h': 4, + 'max_v': 5*4, + 'min': 3, + 'min_h': 2, + 'min_v': 3 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "horizontal" + wrapper_limits = {'max': 5*4, + 'max_by_section': {'S2': 5*4, 'S3': 5*4}, + 'max_h': 4*5, + 'max_v': 5, + 'min': 3, + 'min_h': 2, + 'min_v': 3 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "horizontal-vertical" + wrapper_limits = {'max': 5*4, + 'max_by_section': {'S2': 5*4, 'S3': 5*4}, + 'max_h': 4, + 'max_v': 5, + 'min': 3, + 'min_h': 2, + 'min_v': 3 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "vertical-horizontal" + wrapper_limits = {'max': 5*4, + 'max_by_section': {'S2': 5*4, 'S3': 5*4}, + 'max_h': 4, + 'max_v': 5, + 'min': 3, + 'min_h': 2, + 'min_v': 3 + } + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED"] = 3 + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED"] = 5 + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED_H"] = 2 + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MIN_WRAPPED_V"] = 3 + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED_H"] = 4 + self.job_packager._as_config.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["MAX_WRAPPED_V"] = 5 + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED"] + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED"] + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED_H"] + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MIN_WRAPPED_V"] + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED_H"] + del self.job_packager._as_config.experiment_data["WRAPPERS"]["MAX_WRAPPED_V"] + returned_wrapper_limits = self.job_packager.calculate_wrapper_bounds(section_list) + self.assertDictEqual(returned_wrapper_limits, wrapper_limits) + + def test_check_packages_respect_wrapper_policy(self): + # want to test self.job_packager.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits) + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + for section, s_value in self.workflows['basic']['sections'].items(): + self.as_conf.jobs_data[section] = s_value + self._createDummyJobs( + self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name( + 'expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name( + 'expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, + d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, + d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["WRAPPERS"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, + d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, + d1_m2_4_s3] + + self.job_packager.jobs_in_wrapper = wrapper_expression + + self.job_packager.retrials = 0 + # test vertical-wrapper + self.job_packager.wrapper_type["WRAPPER_V"] = 'vertical' + self.job_packager.current_wrapper_section = "WRAPPER_V" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section] = {} + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["TYPE"] = "horizontal" + self.as_conf.experiment_data["WRAPPERS"][self.job_packager.current_wrapper_section]["JOBS_IN_WRAPPER"] = "S2 S3" + packages_to_submit = [] + max_jobs_to_submit = 2 + wrapper_limits = {'max': 9999999, + 'max_by_section': {'S2': 9999999, 'S3': 9999999}, + 'max_h': 9999999, + 'max_v': 9999999, + 'min': 2, + 'min_h': 1, + 'min_v': 2 + } + package = [d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2, d1_m1_1_s2] + packages_h = [JobPackageHorizontal( + package, configuration=self.as_conf)] + + self.job_packager.wrapper_policy = {} + self.job_packager.wrapper_policy["WRAPPER_V"] = "flexible" + packages_to_submit2, max_jobs_to_submit2 = self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, + max_jobs_to_submit, wrapper_limits) + self.assertEqual(max_jobs_to_submit2, max_jobs_to_submit-1) + self.assertEqual(packages_to_submit2, packages_h) + + wrapper_limits = {'max': 2, + 'max_by_section': {'S2': 2, 'S3': 2}, + 'max_h': 2, + 'max_v': 2, + 'min': 2, + 'min_h': 2, + 'min_v': 2 + } + self.job_packager.jobs_in_wrapper = {self.job_packager.current_wrapper_section: {'S2': 2, 'S3': 2}} + packages_to_submit = [] + packages_to_submit2, max_jobs_to_submit2 = self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, + max_jobs_to_submit, wrapper_limits) + self.assertEqual(max_jobs_to_submit2, 0) + self.assertEqual(len(packages_to_submit2),2) + for p in packages_to_submit2: + self.assertEqual(type(p), JobPackageSimple) + + self.job_packager.wrapper_policy["WRAPPER_V"] = "mixed" + packages_to_submit = [] + with self.assertRaises(log.log.AutosubmitCritical): + self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, max_jobs_to_submit, wrapper_limits) + + self.job_packager.wrapper_policy["WRAPPER_V"] = "strict" + packages_to_submit = [] + with self.assertRaises(log.log.AutosubmitCritical): + self.job_packager.check_packages_respect_wrapper_policy(packages_h, packages_to_submit, max_jobs_to_submit, wrapper_limits) + + #def test_build_packages(self): + # want to test self.job_packager.build_packages() + # TODO: implement this test in the future + def _createDummyJobs(self, sections_dict, date_list, member_list, chunk_list): for section, section_dict in sections_dict.get('sections').items(): running = section_dict['RUNNING']