From 0e330f1a267f97291846fda38bfc8aa172b2bc6c Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 15 Oct 2019 10:32:26 +0200 Subject: [PATCH 1/7] Initial documentation --- autosubmit/autosubmit.py | 37 +++++++++++++++++----- autosubmit/job/job_packager.py | 17 ++++++++-- autosubmit/platforms/paramiko_submitter.py | 18 +++++++++-- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 89e76f170..af3e4ac8a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -741,16 +741,20 @@ class Autosubmit: @staticmethod def _load_parameters(as_conf, job_list, platforms): + """ + + """ # Load parameters Log.debug("Loading parameters...") parameters = as_conf.load_parameters() for platform_name in platforms: platform = platforms[platform_name] + # Call method from platform.py parent object platform.add_parameters(parameters) - + # Platform = from DEFAULT.HPCARCH, e.g. marenostrum4 platform = platforms[as_conf.get_platform().lower()] platform.add_parameters(parameters, True) - + # Attach paramenters to JobList job_list.parameters = parameters @staticmethod def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False, check_wrapper=False): @@ -911,27 +915,40 @@ class Autosubmit: @staticmethod def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): + """ + as_conf: AutosubmitConfig object + job_list: JobList object + jobs_filtered: list of jobs + packages_persistence: JobPackagePersistence object + only_wrappers: True + """ job_list._job_list=jobs_filtered job_list.update_list(as_conf,False) submitter = Autosubmit._get_submitter(as_conf) + # Current choice is Paramiko Submitter + # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object submitter.load_platforms(as_conf) + # The value is retrieves from DEFAULT.HPCARCH hpcarch = as_conf.get_platform() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - platforms_to_test = set() + platforms_to_test = set() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - # noinspection PyTypeChecker + # Assign platform objects to each job + # noinspection PyTypeChecker job.platform = submitter.platforms[job.platform_name.lower()] + # Add object to set # noinspection PyTypeChecker platforms_to_test.add(job.platform) ## case setstatus job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + # Loading parameters again + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): + # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) - job_list.update_list(as_conf, False) @@ -1170,14 +1187,18 @@ class Autosubmit: Gets READY jobs and send them to the platforms if there is available space on the queues :param as_conf: autosubmit config object + :type as_conf: AutosubmitConfig object :param job_list: job list to check + :type job_list: JobList object :param platforms_to_test: platforms used - :type platforms_to_test: set + :type platforms_to_test: set of Platform Objects, e.g. SgePlatform(), LsfPlatform(). + :param packages_persistence: Handles database per experiment + :type packages_persistence: JobPackagePersistence object :return: True if at least one job was submitted, False otherwise :rtype: bool """ save = False - + for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a8b242fdc..6d978de62 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -29,18 +29,29 @@ from math import ceil class JobPackager(object): """ The main responsibility of this class is to manage the packages of jobs that have to be submitted. + + :param as_config: Autosubmit basic configuration.\n + :type as_config: AutosubmitConfig object.\n + :param platform: A particular platform we are dealing with, e.g. Lsf Platform.\n + :type platform: Specific Platform Object, e.g. LsfPlatform(), EcPlatform(), ...\n + :param jobs_list: Contains the list of the jobs, along other properties.\n + :type jobs_list: JobList object.\n """ - def __init__(self, as_config, platform, jobs_list): + def __init__(self, as_config, platform, jobs_list): self._as_config = as_config self._platform = platform self._jobs_list = jobs_list - + # Submitted + Queuing Jobs for specific Platform waiting_jobs = len(jobs_list.get_submitted(platform) + jobs_list.get_queuing(platform)) + # Calculate available space in Platform Queue self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs - self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) + # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf + # .total_jobs Maximum number of jobs at the same time + self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) + # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() self.remote_dependencies = self._as_config.get_remote_dependencies() self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 404ab9eed..46196dbc6 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -56,6 +56,7 @@ class ParamikoSubmitter(Submitter): platforms_used = list() hpcarch = asconf.get_platform() + # Traverse jobs defined in jobs_.conf and add platforms found if not already included job_parser = asconf.jobs_parser for job in job_parser.sections(): hpc = job_parser.get_option(job, 'PLATFORM', hpcarch).lower() @@ -63,8 +64,10 @@ class ParamikoSubmitter(Submitter): platforms_used.append(hpc) parser = asconf.platforms_parser - + # Declare platforms dictionary, key: Platform Name, Value: Platform Object platforms = dict() + + # Build Local Platform Object local_platform = LocalPlatform(asconf.expid, 'local', BasicConfig) local_platform.max_wallclock = asconf.get_max_wallclock() local_platform.max_processors = asconf.get_max_processors() @@ -74,11 +77,15 @@ class ParamikoSubmitter(Submitter): local_platform.temp_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, 'ASlogs') local_platform.root_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, local_platform.expid) local_platform.host = 'localhost' + # Add object to entry in dictionary platforms['local'] = local_platform platforms['LOCAL'] = local_platform + # parser is the platforms parser that represents platforms_.conf + # Traverse sections [] for section in parser.sections(): - + + # Consider only those included in the list of jobs if section.lower() not in platforms_used: continue @@ -103,10 +110,11 @@ class ParamikoSubmitter(Submitter): except ParamikoPlatformException as e: Log.error("Queue exception: {0}".format(e.message)) return None - + # Set the type and version of the platform found remote_platform.type = platform_type remote_platform._version = platform_version + # Concatenating host + project and adding to the object if parser.get_option(section, 'ADD_PROJECT_TO_HOST', '').lower() == 'true': host = '{0}-{1}'.format(parser.get_option(section, 'HOST', None), parser.get_option(section, 'PROJECT', None)) @@ -114,6 +122,7 @@ class ParamikoSubmitter(Submitter): host = parser.get_option(section, 'HOST', None) remote_platform.host = host + # Retrieve more configurations settings and save them in the object remote_platform.max_wallclock = parser.get_option(section, 'MAX_WALLCLOCK', asconf.get_max_wallclock()) remote_platform.max_processors = parser.get_option(section, 'MAX_PROCESSORS', @@ -142,10 +151,13 @@ class ParamikoSubmitter(Submitter): None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, remote_platform.user, remote_platform.expid) + # Executes update_cmds() from corresponding Platform Object remote_platform.update_cmds() + # Save platform into result dictionary platforms[section.lower()] = remote_platform for section in parser.sections(): + # if this section is included in platforms if parser.has_option(section, 'SERIAL_PLATFORM'): platforms[section.lower()].serial_platform = platforms[parser.get_option(section, 'SERIAL_PLATFORM', -- GitLab From 7d1218781bef46773f71326acdb1e5520c4df258 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 15 Oct 2019 12:34:31 +0200 Subject: [PATCH 2/7] More comments on wrapper relevant functions --- autosubmit/job/job_list.py | 9 +++++++-- autosubmit/job/job_packager.py | 29 +++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 0c2598629..ab78c957f 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -119,7 +119,11 @@ class JobList: :param default_retrials: default retrials for ech job :type default_retrials: int :param new: is it a new generation? - :type new: bool + :type new: bool \n + :param wrapper_type: Type of wrapper defined by the user in autosubmit_.conf [wrapper] section. \n + :type wrapper type: String \n + :param wrapper_jobs: Job types defined in autosubmit_.conf [wrapper sections] to be wrapped. \n + :type wrapper_jobs: String \n """ self._parameters = parameters self._date_list = date_list @@ -148,7 +152,8 @@ class JobList: for job in self._job_list: job.parameters = parameters - if wrapper_type == 'vertical-mixed': + # Perhaps this should be done by default independent of the wrapper_type supplied + if wrapper_type == 'vertical-mixed': self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs(wrapper_jobs) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 6d978de62..29ed141b7 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -35,7 +35,7 @@ class JobPackager(object): :param platform: A particular platform we are dealing with, e.g. Lsf Platform.\n :type platform: Specific Platform Object, e.g. LsfPlatform(), EcPlatform(), ...\n :param jobs_list: Contains the list of the jobs, along other properties.\n - :type jobs_list: JobList object.\n + :type jobs_list: JobList object. """ def __init__(self, as_config, platform, jobs_list): @@ -71,28 +71,35 @@ class JobPackager(object): """ packages_to_submit = list() remote_dependencies_dict = dict() - + # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty if only_generate: jobs_to_submit = jobs_filtered else: jobs_ready = self._jobs_list.get_ready(self._platform) if jobs_ready == 0: + # If there are no jobs ready, result is tuple of empty return packages_to_submit, remote_dependencies_dict if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): + # If there is no more space in platform, result is tuple of empty return packages_to_submit, remote_dependencies_dict + # Sort by 6 first digits of date available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) - list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) + # Sort by Priority, highest first + list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) + # Take the first num_jobs_to_submit from the list of available jobs_to_submit = list_of_available[0:num_jobs_to_submit] jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) for section in jobs_to_submit_by_section: + # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined + # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] if self._platform.allow_wrappers and self.wrapper_type in ['horizontal', 'vertical', 'vertical-mixed', 'vertical-horizontal', 'horizontal-vertical'] \ and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): - + # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) if self.wrapper_type in ['vertical', 'vertical-mixed']: @@ -124,13 +131,15 @@ class JobPackager(object): The value for each key is a list() with all the jobs with the key section. :param jobs_list: list of jobs to be divided - :rtype: dict + :rtype: Dictionary Key: Section Name, Value: List(Job Object) """ + # .jobs_in_wrapper defined in .conf, see constructor. sections_split = self.jobs_in_wrapper.split() jobs_section = dict() for job in jobs_list: - section = next((s for s in sections_split if job.section in s and '&' in s), None) + # This iterator (questionable choice of operator) will always return None if there is no '&' defined in the section name + section = next((s for s in sections_split if job.section in s and '&' in s), None) if section is None: section = job.section if section not in jobs_section: @@ -167,6 +176,14 @@ class JobPackager(object): return packages, remote_dependencies_dict def _build_vertical_packages(self, section_list, max_wrapped_jobs): + """ + + :param section_list: Jobs belonging to a section defined as wrappable.\n + :type section_list: List() of Job Objects. \n + :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined). \n + :type max_wrapped_jobs: Integer. \n + + """ packages = [] potential_dependency = None remote_dependencies_dict = dict() -- GitLab From a9ac148662d7c864b1b6d6a91b99cdab52b34923 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Thu, 17 Oct 2019 10:30:09 +0200 Subject: [PATCH 3/7] More comments and documentation on the wrappers workflow --- autosubmit/job/job_list.py | 70 +++++++++++++++++++++++------ autosubmit/job/job_packager.py | 82 +++++++++++++++++++++++++++++++--- 2 files changed, 134 insertions(+), 18 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index ab78c957f..80257ad93 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -350,53 +350,71 @@ class JobList: priority += 1 def _create_sorted_dict_jobs(self, wrapper_jobs): + """ + Creates a sorting of the jobs whose job.section is in wrapper_jobs, according to the following filters in order of importance: + date, member, RUNNING, and chunk number; where RUNNING is defined in jobs_.conf for each section. + + If the job does not have a chunk number, the total number of chunks configured for the experiment is used. + + :param wrapper_jobs: User defined job types to be wrapped in autosubmit_,conf [wrapper] section. \n + :type wrapper_jobs: String \n + :return: Sorted Dictionary of Dictionary of List that represents the jobs included in the wrapping process. \n + :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by RUNNING, and chunk number) + """ + # Dictionary Key: date, Value: (Dictionary Key: Member, Value: List) dict_jobs = dict() for date in self._date_list: dict_jobs[date] = dict() for member in self._member_list: dict_jobs[date][member] = list() num_chunks = len(self._chunk_list) - + # Select only relevant jobs, those belonging to the sections defined in the wrapper filtered_jobs_list = filter(lambda job: job.section in wrapper_jobs, self._job_list) - filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) + filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) - sections_running_type_map = dict() + sections_running_type_map = dict() for section in wrapper_jobs.split(" "): + # RUNNING = once, as default. This value comes from jobs_.conf sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", 'once') - + for date in self._date_list: str_date = self._get_date(date) for member in self._member_list: + # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and job.name.split("_")[2] == member, filtered_jobs_fake_date_member) - + previous_job = sorted_jobs_list[0] + # get RUNNING for this section section_running_type = sections_running_type_map[previous_job.section] jobs_to_sort = [previous_job] - previous_section_running_type = None - + previous_section_running_type = None + # Index starts at 1 because 0 has been taken in a previous step for index in range(1, len(sorted_jobs_list) + 1): + # If not last item if index < len(sorted_jobs_list): job = sorted_jobs_list[index] - + # Test if section has changed. e.g. from INI to SIM if previous_job.section != job.section: previous_section_running_type = section_running_type section_running_type = sections_running_type_map[job.section] - + # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ or index == len(sorted_jobs_list): - + # Sorting by date, member, chunk (JOB TYPE) jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) if len(k.name.split('_')) == 5 else num_chunks + 1))) + # Bringing back original job if identified for idx in range(0, len(jobs_to_sort)): if jobs_to_sort[idx] in fake_original_job_map: fake_job = jobs_to_sort[idx] jobs_to_sort[idx] = fake_original_job_map[fake_job] - + # Add to result, and reset jobs_to_sort + # By adding to the result at this step, only those with the same RUNNIN have been added. dict_jobs[date][member] += jobs_to_sort jobs_to_sort = [] @@ -406,6 +424,17 @@ class JobList: return dict_jobs def _create_fake_dates_members(self, filtered_jobs_list): + """ + Using the list of jobs provided, creates clones of these jobs and modifies names conditionted on job.date, job.member values (testing None). + The purpose is that all jobs share the same name structure. + + :param filtered_jobs_list: A list of jobs of only those that comply with certain criteria, e.g. those belonging to a user defined job type for wrapping. \n + :type filetered_jobs_list: List() of Job Objects \n + :return filtered_jobs_fake_date_member: List of fake jobs. \n + :rtype filtered_jobs_fake_date_member: List of Job Objects \n + :return fake_original_job_map: Dictionary that maps fake job to original one. \n + :rtype fake_original_job_map: Dictionary Key: Job Object, Value: Job Object + """ filtered_jobs_fake_date_member = [] fake_original_job_map = dict() @@ -414,29 +443,44 @@ class JobList: fake_job = None # running once and synchronize date if job.date is None and job.member is None: + # Declare None values as if they were the last items in corresponding list date = self._date_list[-1] member = self._member_list[-1] - fake_job = copy.deepcopy(job) + # Use previous values to modify name of fake job fake_job.name = fake_job.name.split('_', 1)[0] + "_" + self._get_date(date) + "_" \ + member + "_" + fake_job.name.split("_", 1)[1] + # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) + # Mapping fake jobs to orignal ones fake_original_job_map[fake_job] = job # running date or synchronize member elif job.member is None: + # Declare None value as if it were the last items in corresponding list member = self._member_list[-1] fake_job = copy.deepcopy(job) + # Use it to modify name of fake job fake_job.name = fake_job.name.split('_', 2)[0] + "_" + fake_job.name.split('_', 2)[ 1] + "_" + member + "_" + fake_job.name.split("_", 2)[2] + # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) + # Mapping fake jobs to orignal ones fake_original_job_map[fake_job] = job - + # There was no result if fake_job is None: filtered_jobs_fake_date_member.append(job) return filtered_jobs_fake_date_member, fake_original_job_map def _get_date(self, date): + """ + Parses a user defined Date (from [experiment] DATELIST) to return a special String representation of that Date + + :param date: String representation of a date in format YYYYYMMdd. \n + :type date: String \n + :return: String representation of date according to format. \n + :rtype: String \n + """ date_format = '' if date.hour > 1: date_format = 'H' diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 29ed141b7..cb682ba56 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -138,7 +138,7 @@ class JobPackager(object): jobs_section = dict() for job in jobs_list: - # This iterator (questionable choice of operator) will always return None if there is no '&' defined in the section name + # This iterator will always return None if there is no '&' defined in the section name section = next((s for s in sections_split if job.section in s and '&' in s), None) if section is None: section = job.section @@ -177,10 +177,11 @@ class JobPackager(object): def _build_vertical_packages(self, section_list, max_wrapped_jobs): """ + Builds Vertical-Mixed or Vertical :param section_list: Jobs belonging to a section defined as wrappable.\n :type section_list: List() of Job Objects. \n - :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined). \n + :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type max_wrapped_jobs: Integer. \n """ @@ -279,6 +280,21 @@ class JobPackager(object): class JobPackagerVertical(object): + """ + Vertical Packager Parent Class + + :param jobs_list: Usually there is only 1 job in this list. \n + :type jobs_list: List() of Job Objects \n + :param total_wallclock: Wallclock per object. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: Integer + + """ def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): self.jobs_list = jobs_list @@ -288,6 +304,9 @@ class JobPackagerVertical(object): self.max_wallclock = max_wallclock def build_vertical_package(self, job): + """ + + """ if len(self.jobs_list) >= self.max_jobs or len(self.jobs_list) >= self.max_wrapped_jobs: return self.jobs_list child = self.get_wrappable_child(job) @@ -307,6 +326,20 @@ class JobPackagerVertical(object): class JobPackagerVerticalSimple(JobPackagerVertical): + """ + Vertical Packager Class. First statement of the constructor builds JobPackagerVertical. + + :param jobs_list: List of jobs, usually only receives one job. \n + :type jobs_list: List() of Job Objects \n + :param total_wallclock: Wallclock from Job. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: Integer + """ def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): super(JobPackagerVerticalSimple, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) @@ -328,23 +361,51 @@ class JobPackagerVerticalSimple(JobPackagerVertical): class JobPackagerVerticalMixed(JobPackagerVertical): - + """ + Vertical Mixed Class. First statement of the constructor builds JobPackagerVertical. + + :param dict_jobs: Jobs sorted by date, member, RUNNING, and chunk number. Only those relevant to the wrapper. \n + :type dict_jobs: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs sorted) \n + :param ready_job: Job to be wrapped. \n + :type ready_job: Job Object \n + :param jobs_list: ready_job as a list. \n + :type jobs_list: List() of Job Object \n + :param total_wallclock: wallclock time per job. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: String \n + """ def __init__(self, dict_jobs, ready_job, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): super(JobPackagerVerticalMixed, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) self.ready_job = ready_job self.dict_jobs = dict_jobs - + # Last date from the ordering date = dict_jobs.keys()[-1] + # Last member from the last date from the ordering member = dict_jobs[date].keys()[-1] + # If job to be wrapped has date and member, use those if ready_job.date is not None: date = ready_job.date if ready_job.member is not None: member = ready_job.member - + # Extract list of sorted jobs per date and member self.sorted_jobs = dict_jobs[date][member] self.index = 0 def get_wrappable_child(self, job): + """ + Goes through the jobs with the same date and member than the input jub, and return the first that satisfies self._is_wrappable() + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: job that is wrappable. \n + :rtype: Job Object + """ + # Unnecessary assignment sorted_jobs = self.sorted_jobs for index in range(self.index, len(sorted_jobs)): @@ -356,8 +417,19 @@ class JobPackagerVerticalMixed(JobPackagerVertical): return None def _is_wrappable(self, job): + """ + Determines if a job is wrappable. Basically, the job shouldn't have been packed already and the status must be READY or WAITING, + Its parents should be COMPLETED. + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: True if wrappable, False otherwise. \n + :rtype: Boolean + """ if job.packed is False and (job.status == Status.READY or job.status == Status.WAITING): for parent in job.parents: + # First part of this conditional is always going to be true because otherwise there would be a cycle + # Second part is actually relevant, parents of a wrapper should be COMPLETED if parent not in self.jobs_list and parent.status != Status.COMPLETED: return False return True -- GitLab From 9786bcd3ba3ac5dd5ca54ab2a79b70cd8bfd24e4 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 18 Oct 2019 17:22:04 +0200 Subject: [PATCH 4/7] More comments, testing workflow on vertical-mixed --- autosubmit/autosubmit.py | 55 +++++++++++++++-------- autosubmit/job/job_list.py | 2 +- autosubmit/job/job_package_persistence.py | 10 +++++ autosubmit/job/job_packager.py | 55 +++++++++++++++++++---- autosubmit/job/job_packages.py | 4 ++ 5 files changed, 98 insertions(+), 28 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index af3e4ac8a..6e58b3ae3 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -916,16 +916,23 @@ class Autosubmit: @staticmethod def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): """ - as_conf: AutosubmitConfig object - job_list: JobList object - jobs_filtered: list of jobs - packages_persistence: JobPackagePersistence object - only_wrappers: True + :param as_conf: Class that handles basic configuration paramenters of Autosubmit. \n + :type as_conf: AutosubmitConfig() Object \n + :param job_list: Representation of the jobs of the experiment, keeps the list of jobs inside. \n + :type job_list: JobList() Object \n + :param jobs_filtered: list of jobs that are relevant to the process. \n + :type jobs_filtered: List() of Job Objects \n + :param packages_persistence: Object that handles local db persistence. \n + :type packages_persistence: JobPackagePersistence() Object \n + :param only_wrappers: True when coming from Autosubmit.create(). \n + :type only_wrappers: Boolean \n + :return: Nothing\n + :rtype: \n """ job_list._job_list=jobs_filtered job_list.update_list(as_conf,False) - submitter = Autosubmit._get_submitter(as_conf) # Current choice is Paramiko Submitter + submitter = Autosubmit._get_submitter(as_conf) # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object submitter.load_platforms(as_conf) # The value is retrieves from DEFAULT.HPCARCH @@ -941,7 +948,7 @@ class Autosubmit: # Add object to set # noinspection PyTypeChecker platforms_to_test.add(job.platform) - ## case setstatus + # case setstatus job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) # Loading parameters again @@ -1186,19 +1193,20 @@ class Autosubmit: """ Gets READY jobs and send them to the platforms if there is available space on the queues - :param as_conf: autosubmit config object - :type as_conf: AutosubmitConfig object - :param job_list: job list to check - :type job_list: JobList object - :param platforms_to_test: platforms used - :type platforms_to_test: set of Platform Objects, e.g. SgePlatform(), LsfPlatform(). - :param packages_persistence: Handles database per experiment - :type packages_persistence: JobPackagePersistence object - :return: True if at least one job was submitted, False otherwise - :rtype: bool + :param as_conf: autosubmit config object \n + :type as_conf: AutosubmitConfig object \n + :param job_list: job list to check \n + :type job_list: JobList object \n + :param platforms_to_test: platforms used \n + :type platforms_to_test: set of Platform Objects, e.g. SgePlatform(), LsfPlatform(). \n + :param packages_persistence: Handles database per experiment \n + :type packages_persistence: JobPackagePersistence object \n + :param only_wrappers: True if it comes from create -cw \n + :type only_wrappers: Boolean \n + :return: True if at least one job was submitted, False otherwise \n + :rtype: Boolean """ save = False - for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() @@ -1221,6 +1229,7 @@ class Autosubmit: continue if only_wrappers or inspect: for innerJob in package._jobs: + # Setting status to COMPLETED for some reason. innerJob.status=Status.COMPLETED if hasattr(package, "name"): @@ -1234,6 +1243,7 @@ class Autosubmit: if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): + # If it is instance of JobPackageThread, then it is JobPackageVertical. packages_persistence.save(package.name, package.jobs, package._expid, inspect) save = True except WrongTemplateException as e: @@ -1269,6 +1279,7 @@ class Autosubmit: 'name_to_id']: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): + # Saving only when it is a real multi job package packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 @@ -1400,11 +1411,17 @@ class Autosubmit: for job in jobs: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove + # for job in jobs: + # print(job.name + " from " + str(job.platform_name)) + # return False #WRAPPERS if as_conf.get_wrapper_type() != 'none' and check_wrapper: + # Class constructor creates table if it does not exist packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + # Permissons + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + #Database modification packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 80257ad93..54d7960c9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -723,7 +723,7 @@ class JobList: def get_active(self, platform=None): """ - Returns a list of active jobs (In platforms, Ready) + Returns a list of active jobs (In platforms queue + Ready) :param platform: job platform :type platform: HPCPlatform diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py index 2208263b2..d17e2d5f0 100644 --- a/autosubmit/job/job_package_persistence.py +++ b/autosubmit/job/job_package_persistence.py @@ -24,6 +24,16 @@ from autosubmit.database.db_manager import DbManager class JobPackagePersistence(object): + """ + Class that handles packages workflow. + + Creates Packages Table, Wrappers Table. + + :param persistence_path: Path to the persistence folder pkl. \n + :type persistence_path: String \n + :param persistence_file: Name of the persistence pkl file. \n + :type persistence_file: String + """ VERSION = 1 JOB_PACKAGES_TABLE = 'job_package' diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index cb682ba56..8d6b93c07 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -28,7 +28,7 @@ from math import ceil class JobPackager(object): """ - The main responsibility of this class is to manage the packages of jobs that have to be submitted. + Main class that manages Job wrapping. :param as_config: Autosubmit basic configuration.\n :type as_config: AutosubmitConfig object.\n @@ -53,6 +53,7 @@ class JobPackager(object): # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + # True or False self.remote_dependencies = self._as_config.get_remote_dependencies() self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -66,8 +67,8 @@ class JobPackager(object): """ Returns the list of the built packages to be submitted - :return: list of packages - :rtype list + :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical-mixed' or 'vertical'. \n + :rtype: List() of JobPackageVertical """ packages_to_submit = list() remote_dependencies_dict = dict() @@ -90,9 +91,8 @@ class JobPackager(object): num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) # Take the first num_jobs_to_submit from the list of available jobs_to_submit = list_of_available[0:num_jobs_to_submit] - + # print(len(jobs_to_submit)) jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) - for section in jobs_to_submit_by_section: # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] @@ -179,15 +179,17 @@ class JobPackager(object): """ Builds Vertical-Mixed or Vertical - :param section_list: Jobs belonging to a section defined as wrappable.\n + :param section_list: Jobs defined as wrappable belonging to a common section.\n :type section_list: List() of Job Objects. \n :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type max_wrapped_jobs: Integer. \n - + :return: List of Wrapper Packages, Dictionary that details dependencies. \n + :rtype: List() of JobPackageVertical(), Dictionary Key: String, (Dictionary Key: Variable Name, Value: String/Int) """ packages = [] potential_dependency = None remote_dependencies_dict = dict() + # True when autosubmit_.conf value [wrapper].DEPENDENCIES has been set to true if self.remote_dependencies: remote_dependencies_dict['name_to_id'] = dict() remote_dependencies_dict['dependencies'] = dict() @@ -206,15 +208,18 @@ class JobPackager(object): max_wrapped_jobs, self._platform.max_wallclock) jobs_list = job_vertical_packager.build_vertical_package(job) + # update max_jobs, potential_dependency is None self.max_jobs -= len(jobs_list) if job.status is Status.READY: packages.append(JobPackageVertical(jobs_list)) - else: + else: package = JobPackageVertical(jobs_list, potential_dependency) packages.append(package) + # Possible need of "if self.remote_dependencies here" remote_dependencies_dict['name_to_id'][potential_dependency] = -1 remote_dependencies_dict['dependencies'][package.name] = potential_dependency if self.remote_dependencies: + # Sending last item in list of packaged child = job_vertical_packager.get_wrappable_child(jobs_list[-1]) if child is not None: section_list.insert(section_list.index(job) + 1, child) @@ -305,17 +310,30 @@ class JobPackagerVertical(object): def build_vertical_package(self, job): """ + Goes trough the job and all the related jobs (children, or part of the same date, member ordering group), finds those suitable + and groups them together into a wrapper. + :param job: Job to be wrapped. \n + :type job: Job Object \n + :return: List of jobs that are wrapped together. \n + :rtype: List() of Job Object \n """ + # self.jobs_list starts as 1, with only, but wrapped jobs are added in the recursion if len(self.jobs_list) >= self.max_jobs or len(self.jobs_list) >= self.max_wrapped_jobs: return self.jobs_list child = self.get_wrappable_child(job) + # If not None, it is wrappable if child is not None: + # Calculate total wallclock per possible wrapper self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) + # Testing against max from platform if self.total_wallclock <= self.max_wallclock: + # Signaling, this is later tested in the main loop child.packed = True self.jobs_list.append(child) + # Recursive call return self.build_vertical_package(child) + # Wrapped jobs are accumulated and returned in this list return self.jobs_list def get_wrappable_child(self, job): @@ -345,6 +363,14 @@ class JobPackagerVerticalSimple(JobPackagerVertical): super(JobPackagerVerticalSimple, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) def get_wrappable_child(self, job): + """ + Goes through the children jobs of job, tests if wrappable using self._is_wrappable. + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: job (children) that is wrappable. \n + :rtype: Job Object + """ for child in job.children: if self._is_wrappable(child, job): return child @@ -352,9 +378,22 @@ class JobPackagerVerticalSimple(JobPackagerVertical): return None def _is_wrappable(self, job, parent=None): + """ + Determines if a job (children) is wrappable. Basic condition is that the parent should have the same section as the child. + Also, test that the parents of the job (children) are COMPLETED. + + :param job: Children Job to be tested. \n + :type job: Job Object \n + :param parent: Original Job whose children are tested. \n + :type parent: Job Object \n + :return: True if wrappable, False otherwise. \n + :rtype: Boolean + """ if job.section != parent.section: return False for other_parent in job.parents: + # First part, parents should be COMPLETED. + # Second part, no cycles. if other_parent.status != Status.COMPLETED and other_parent not in self.jobs_list: return False return True diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index cecd1c130..b52805c88 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -233,12 +233,16 @@ class JobPackageArray(JobPackageBase): class JobPackageThread(JobPackageBase): """ Class to manage a thread-based package of jobs to be submitted by autosubmit + + :param dependency: Name of potential dependency + :type dependency: String """ FILE_PREFIX = 'ASThread' def __init__(self, jobs, dependency=None, jobs_resources=dict()): super(JobPackageThread, self).__init__(jobs) self._job_scripts = {} + # Seems like this one is not used at all in the class self._job_dependency = dependency self._common_script = None self._wallclock = '00:00' -- GitLab From 66914c10a81c7c0b7d1b25194fa60e0169ab84c2 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 22 Oct 2019 10:58:01 +0200 Subject: [PATCH 5/7] Comments until WrapperJob --- autosubmit/autosubmit.py | 18 ++++++++++++---- autosubmit/job/job.py | 26 ++++++++++++++++++++++- autosubmit/job/job_list.py | 39 +++++++++++++++++++++++++++++----- autosubmit/job/job_packager.py | 10 ++++----- 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6e58b3ae3..46667b138 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -742,7 +742,15 @@ class Autosubmit: @staticmethod def _load_parameters(as_conf, job_list, platforms): """ + Add parameters from configuration files into platform objects, and into the job_list object. + :param as_conf: Basic configuration handler.\n + :type as_conf: AutosubmitConfig object\n + :param job_list: Handles the list as a unique entity.\n + :type job_list: JobList() object\n + :param platforms: List of platforms related to the experiment.\n + :type platforms: List() of Platform Objects. e.g EcPlatform(), SgePlatform(). + :return: Nothing, modifies input. """ # Load parameters Log.debug("Loading parameters...") @@ -916,7 +924,7 @@ class Autosubmit: @staticmethod def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): """ - :param as_conf: Class that handles basic configuration paramenters of Autosubmit. \n + :param as_conf: Class that handles basic configuration parameters of Autosubmit. \n :type as_conf: AutosubmitConfig() Object \n :param job_list: Representation of the jobs of the experiment, keeps the list of jobs inside. \n :type job_list: JobList() Object \n @@ -935,7 +943,7 @@ class Autosubmit: submitter = Autosubmit._get_submitter(as_conf) # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object submitter.load_platforms(as_conf) - # The value is retrieves from DEFAULT.HPCARCH + # The value is retrieved from DEFAULT.HPCARCH hpcarch = as_conf.get_platform() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) platforms_to_test = set() @@ -952,7 +960,7 @@ class Autosubmit: job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) # Loading parameters again - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) @@ -1199,8 +1207,10 @@ class Autosubmit: :type job_list: JobList object \n :param platforms_to_test: platforms used \n :type platforms_to_test: set of Platform Objects, e.g. SgePlatform(), LsfPlatform(). \n - :param packages_persistence: Handles database per experiment \n + :param packages_persistence: Handles database per experiment. \n :type packages_persistence: JobPackagePersistence object \n + :param inspect: True if coming from generate_scripts_andor_wrappers(). \n + :type inspect: Boolean \n :param only_wrappers: True if it comes from create -cw \n :type only_wrappers: Boolean \n :return: True if at least one job was submitted, False otherwise \n diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a298bcb72..9edb4ce08 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -972,7 +972,31 @@ class Job(object): self.remote_logs = self.local_logs -class WrapperJob(Job): +class WrapperJob(Job): + """ + Defines a wrapper from a package. + + Calls Job constructor. + + :param name: Name of the Package \n + :type name: String \n + :param job_id: Id of the first Job of the package \n + :type job_id: Integer \n + :param status: 'READY' when coming from submit_ready_jobs() \n + :type status: String \n + :param priority: 0 when coming from submit_ready_jobs() \n + :type priority: Integer \n + :param job_list: List of jobs in the package \n + :type job_list: List() of Job() objects \n + :param total_wallclock: Wallclock of the package \n + :type total_wallclock: String Formatted \n + :param num_processors: Number of processors for the package \n + :type num_processors: Integer \n + :param platform: Platform object defined for the package \n + :type platform: Platform Object. e.g. EcPlatform() \n + :param as_config: Autosubmit basic configuration object \n + :type as_config: AutosubmitConfig object \n + """ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config): super(WrapperJob, self).__init__(name, job_id, status, priority) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 54d7960c9..a1b490a40 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -356,10 +356,10 @@ class JobList: If the job does not have a chunk number, the total number of chunks configured for the experiment is used. - :param wrapper_jobs: User defined job types to be wrapped in autosubmit_,conf [wrapper] section. \n + :param wrapper_jobs: User defined job types in autosubmit_,conf [wrapper] section to be wrapped. \n :type wrapper_jobs: String \n :return: Sorted Dictionary of Dictionary of List that represents the jobs included in the wrapping process. \n - :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by RUNNING, and chunk number) + :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) """ # Dictionary Key: date, Value: (Dictionary Key: Member, Value: List) dict_jobs = dict() @@ -372,6 +372,12 @@ class JobList: filtered_jobs_list = filter(lambda job: job.section in wrapper_jobs, self._job_list) filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) + # print("Fake Jobs: ") + # for i in filtered_jobs_fake_date_member: + # print(i) + # print("Fake Mapping: ") + # for k,v in fake_original_job_map: + # print("Fake " + str(k) + " to " + str(v)) sections_running_type_map = dict() for section in wrapper_jobs.split(" "): @@ -384,8 +390,12 @@ class JobList: # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and job.name.split("_")[2] == member, filtered_jobs_fake_date_member) - + # print("Sort of Sorted") + # for i in sorted_jobs_list: + # print(i) + previous_job = sorted_jobs_list[0] + # print("Previous job " + str(previous_job)) # get RUNNING for this section section_running_type = sections_running_type_map[previous_job.section] @@ -403,15 +413,29 @@ class JobList: # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ or index == len(sorted_jobs_list): - # Sorting by date, member, chunk (JOB TYPE) + + # print("Jobs pre sort: ") + # for i in jobs_to_sort: + # print(str(i.name) + " -> " + str(i.name.split('_')[1]) + " OR " + str(i.name.split('_')[2]) + " OR " + str(int(i.name.split('_')[3]) + # if len(i.name.split('_')) == 5 else num_chunks + 1)) + + + # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) + # Important to note that the only differentiating factor would be chunk OR num_chunks jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) if len(k.name.split('_')) == 5 else num_chunks + 1))) + # print("Jobs to sort: ") + # for i in jobs_to_sort: + # print(str(i.name) + " -> " + str(i.name.split('_')[1]) + " OR " + str(i.name.split('_')[2]) + " OR " + str(int(i.name.split('_')[3]) + # if len(i.name.split('_')) == 5 else num_chunks + 1)) # Bringing back original job if identified for idx in range(0, len(jobs_to_sort)): + # Test if it is a fake job if jobs_to_sort[idx] in fake_original_job_map: fake_job = jobs_to_sort[idx] + # Get original jobs_to_sort[idx] = fake_original_job_map[fake_job] # Add to result, and reset jobs_to_sort # By adding to the result at this step, only those with the same RUNNIN have been added. @@ -420,7 +444,12 @@ class JobList: jobs_to_sort.append(job) previous_job = job - + # print("Result") + # for k in dict_jobs.keys(): + # for k2 in dict_jobs[k].keys(): + # for i in dict_jobs[k][k2]: + # print(str(k) + " -> " + str(k2) + " i: " + str(i)) + # return False return dict_jobs def _create_fake_dates_members(self, filtered_jobs_list): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 8d6b93c07..5a7603150 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -184,7 +184,7 @@ class JobPackager(object): :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type max_wrapped_jobs: Integer. \n :return: List of Wrapper Packages, Dictionary that details dependencies. \n - :rtype: List() of JobPackageVertical(), Dictionary Key: String, (Dictionary Key: Variable Name, Value: String/Int) + :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) """ packages = [] potential_dependency = None @@ -310,7 +310,7 @@ class JobPackagerVertical(object): def build_vertical_package(self, job): """ - Goes trough the job and all the related jobs (children, or part of the same date, member ordering group), finds those suitable + Goes trough the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable and groups them together into a wrapper. :param job: Job to be wrapped. \n @@ -318,7 +318,7 @@ class JobPackagerVertical(object): :return: List of jobs that are wrapped together. \n :rtype: List() of Job Object \n """ - # self.jobs_list starts as 1, with only, but wrapped jobs are added in the recursion + # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion if len(self.jobs_list) >= self.max_jobs or len(self.jobs_list) >= self.max_wrapped_jobs: return self.jobs_list child = self.get_wrappable_child(job) @@ -328,7 +328,7 @@ class JobPackagerVertical(object): self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) # Testing against max from platform if self.total_wallclock <= self.max_wallclock: - # Signaling, this is later tested in the main loop + # Marking, this is later tested in the main loop child.packed = True self.jobs_list.append(child) # Recursive call @@ -437,7 +437,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): def get_wrappable_child(self, job): """ - Goes through the jobs with the same date and member than the input jub, and return the first that satisfies self._is_wrappable() + Goes through the jobs with the same date and member than the input job, and return the first that satisfies self._is_wrappable() :param job: job to be evaluated. \n :type job: Job Object \n -- GitLab From 4d22a6fdaac7f5745fc4fa0daaf78a315f884350 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 22 Oct 2019 14:37:47 +0200 Subject: [PATCH 6/7] Cleaned some comments --- autosubmit/autosubmit.py | 5 ++++- autosubmit/job/job_list.py | 28 ++-------------------------- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 46667b138..2ab5fd62f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -962,6 +962,9 @@ class Autosubmit: # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): + # print("Printing Active from iter: ") + # for j in job_list.get_active(): + # print(j.name) # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) job_list.update_list(as_conf, False) @@ -1239,7 +1242,7 @@ class Autosubmit: continue if only_wrappers or inspect: for innerJob in package._jobs: - # Setting status to COMPLETED for some reason. + # Setting status to COMPLETED so it does not get stuck in the loop that calls this function innerJob.status=Status.COMPLETED if hasattr(package, "name"): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a1b490a40..e9ceca8df 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -372,12 +372,6 @@ class JobList: filtered_jobs_list = filter(lambda job: job.section in wrapper_jobs, self._job_list) filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) - # print("Fake Jobs: ") - # for i in filtered_jobs_fake_date_member: - # print(i) - # print("Fake Mapping: ") - # for k,v in fake_original_job_map: - # print("Fake " + str(k) + " to " + str(v)) sections_running_type_map = dict() for section in wrapper_jobs.split(" "): @@ -390,12 +384,9 @@ class JobList: # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and job.name.split("_")[2] == member, filtered_jobs_fake_date_member) - # print("Sort of Sorted") - # for i in sorted_jobs_list: - # print(i) previous_job = sorted_jobs_list[0] - # print("Previous job " + str(previous_job)) + # get RUNNING for this section section_running_type = sections_running_type_map[previous_job.section] @@ -413,22 +404,12 @@ class JobList: # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ or index == len(sorted_jobs_list): - - # print("Jobs pre sort: ") - # for i in jobs_to_sort: - # print(str(i.name) + " -> " + str(i.name.split('_')[1]) + " OR " + str(i.name.split('_')[2]) + " OR " + str(int(i.name.split('_')[3]) - # if len(i.name.split('_')) == 5 else num_chunks + 1)) - # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) # Important to note that the only differentiating factor would be chunk OR num_chunks jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) if len(k.name.split('_')) == 5 else num_chunks + 1))) - # print("Jobs to sort: ") - # for i in jobs_to_sort: - # print(str(i.name) + " -> " + str(i.name.split('_')[1]) + " OR " + str(i.name.split('_')[2]) + " OR " + str(int(i.name.split('_')[3]) - # if len(i.name.split('_')) == 5 else num_chunks + 1)) # Bringing back original job if identified for idx in range(0, len(jobs_to_sort)): @@ -444,12 +425,7 @@ class JobList: jobs_to_sort.append(job) previous_job = job - # print("Result") - # for k in dict_jobs.keys(): - # for k2 in dict_jobs[k].keys(): - # for i in dict_jobs[k][k2]: - # print(str(k) + " -> " + str(k2) + " i: " + str(i)) - # return False + return dict_jobs def _create_fake_dates_members(self, filtered_jobs_list): -- GitLab From 49337f28ffdd2b5c503800b1d07a25c1035ef191 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 22 Oct 2019 15:17:57 +0200 Subject: [PATCH 7/7] More comments on Vertical-Mixed --- autosubmit/autosubmit.py | 7 ++----- autosubmit/job/job_packages.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 2ab5fd62f..a26f4078d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -932,7 +932,7 @@ class Autosubmit: :type jobs_filtered: List() of Job Objects \n :param packages_persistence: Object that handles local db persistence. \n :type packages_persistence: JobPackagePersistence() Object \n - :param only_wrappers: True when coming from Autosubmit.create(). \n + :param only_wrappers: True when coming from Autosubmit.create(). False when coming from Autosubmit.inspect(), \n :type only_wrappers: Boolean \n :return: Nothing\n :rtype: \n @@ -962,9 +962,6 @@ class Autosubmit: # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): - # print("Printing Active from iter: ") - # for j in job_list.get_active(): - # print(j.name) # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) job_list.update_list(as_conf, False) @@ -1214,7 +1211,7 @@ class Autosubmit: :type packages_persistence: JobPackagePersistence object \n :param inspect: True if coming from generate_scripts_andor_wrappers(). \n :type inspect: Boolean \n - :param only_wrappers: True if it comes from create -cw \n + :param only_wrappers: True if it comes from create -cw, False if it comes from inspect -cw. \n :type only_wrappers: Boolean \n :return: True if at least one job was submitted, False otherwise \n :rtype: Boolean diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index b52805c88..4303f7bbc 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -76,6 +76,14 @@ class JobPackageBase(object): return self._platform def submit(self, configuration, parameters,only_generate=False): + """ + :para configuration: Autosubmit basic configuration \n + :type configuration: AutosubmitConfig object \n + :param parameters; Parameters from joblist \n + :type parameters: JobList,parameters \n + :param only_generate: True if coming from generate_scripts_andor_wrappers(). If true, only generates scripts; otherwise, submits. \n + :type only_generate: Boolean + """ exit=False for job in self.jobs: if job.check.lower() == Job.CHECK_ON_SUBMISSION: @@ -412,6 +420,10 @@ class JobPackageThreadWrapped(JobPackageThread): class JobPackageVertical(JobPackageThread): """ Class to manage a vertical thread-based package of jobs to be submitted by autosubmit + + :param jobs: + :type jobs: + :param: dependency: """ def __init__(self, jobs, dependency=None): -- GitLab