diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 89e76f1701aa0068ec0d13c7b24fee8078963834..a26f4078d967116ef2d67af338d69b56c897751d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -741,16 +741,28 @@ class Autosubmit: @staticmethod def _load_parameters(as_conf, job_list, platforms): + """ + Add parameters from configuration files into platform objects, and into the job_list object. + + :param as_conf: Basic configuration handler.\n + :type as_conf: AutosubmitConfig object\n + :param job_list: Handles the list as a unique entity.\n + :type job_list: JobList() object\n + :param platforms: List of platforms related to the experiment.\n + :type platforms: List() of Platform Objects. e.g EcPlatform(), SgePlatform(). + :return: Nothing, modifies input. + """ # Load parameters Log.debug("Loading parameters...") parameters = as_conf.load_parameters() for platform_name in platforms: platform = platforms[platform_name] + # Call method from platform.py parent object platform.add_parameters(parameters) - + # Platform = from DEFAULT.HPCARCH, e.g. marenostrum4 platform = platforms[as_conf.get_platform().lower()] platform.add_parameters(parameters, True) - + # Attach paramenters to JobList job_list.parameters = parameters @staticmethod def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False, check_wrapper=False): @@ -911,27 +923,47 @@ class Autosubmit: @staticmethod def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): + """ + :param as_conf: Class that handles basic configuration parameters of Autosubmit. \n + :type as_conf: AutosubmitConfig() Object \n + :param job_list: Representation of the jobs of the experiment, keeps the list of jobs inside. \n + :type job_list: JobList() Object \n + :param jobs_filtered: list of jobs that are relevant to the process. \n + :type jobs_filtered: List() of Job Objects \n + :param packages_persistence: Object that handles local db persistence. \n + :type packages_persistence: JobPackagePersistence() Object \n + :param only_wrappers: True when coming from Autosubmit.create(). False when coming from Autosubmit.inspect(), \n + :type only_wrappers: Boolean \n + :return: Nothing\n + :rtype: \n + """ job_list._job_list=jobs_filtered job_list.update_list(as_conf,False) + # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) + # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object submitter.load_platforms(as_conf) + # The value is retrieved from DEFAULT.HPCARCH hpcarch = as_conf.get_platform() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - platforms_to_test = set() + platforms_to_test = set() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - # noinspection PyTypeChecker + # Assign platform objects to each job + # noinspection PyTypeChecker job.platform = submitter.platforms[job.platform_name.lower()] + # Add object to set # noinspection PyTypeChecker platforms_to_test.add(job.platform) - ## case setstatus + # case setstatus job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + # Loading parameters again + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): + # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) - job_list.update_list(as_conf, False) @@ -1169,15 +1201,22 @@ class Autosubmit: """ Gets READY jobs and send them to the platforms if there is available space on the queues - :param as_conf: autosubmit config object - :param job_list: job list to check - :param platforms_to_test: platforms used - :type platforms_to_test: set - :return: True if at least one job was submitted, False otherwise - :rtype: bool + :param as_conf: autosubmit config object \n + :type as_conf: AutosubmitConfig object \n + :param job_list: job list to check \n + :type job_list: JobList object \n + :param platforms_to_test: platforms used \n + :type platforms_to_test: set of Platform Objects, e.g. SgePlatform(), LsfPlatform(). \n + :param packages_persistence: Handles database per experiment. \n + :type packages_persistence: JobPackagePersistence object \n + :param inspect: True if coming from generate_scripts_andor_wrappers(). \n + :type inspect: Boolean \n + :param only_wrappers: True if it comes from create -cw, False if it comes from inspect -cw. \n + :type only_wrappers: Boolean \n + :return: True if at least one job was submitted, False otherwise \n + :rtype: Boolean """ save = False - for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() @@ -1200,6 +1239,7 @@ class Autosubmit: continue if only_wrappers or inspect: for innerJob in package._jobs: + # Setting status to COMPLETED so it does not get stuck in the loop that calls this function innerJob.status=Status.COMPLETED if hasattr(package, "name"): @@ -1213,6 +1253,7 @@ class Autosubmit: if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): + # If it is instance of JobPackageThread, then it is JobPackageVertical. packages_persistence.save(package.name, package.jobs, package._expid, inspect) save = True except WrongTemplateException as e: @@ -1248,6 +1289,7 @@ class Autosubmit: 'name_to_id']: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): + # Saving only when it is a real multi job package packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 @@ -1379,11 +1421,17 @@ class Autosubmit: for job in jobs: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove + # for job in jobs: + # print(job.name + " from " + str(job.platform_name)) + # return False #WRAPPERS if as_conf.get_wrapper_type() != 'none' and check_wrapper: + # Class constructor creates table if it does not exist packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + # Permissons + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + #Database modification packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a298bcb721e238af46e0a62bdd33f9da8c33ec38..9edb4ce08b4b321ab3ea36b7a3f5452fbfc57f67 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -972,7 +972,31 @@ class Job(object): self.remote_logs = self.local_logs -class WrapperJob(Job): +class WrapperJob(Job): + """ + Defines a wrapper from a package. + + Calls Job constructor. + + :param name: Name of the Package \n + :type name: String \n + :param job_id: Id of the first Job of the package \n + :type job_id: Integer \n + :param status: 'READY' when coming from submit_ready_jobs() \n + :type status: String \n + :param priority: 0 when coming from submit_ready_jobs() \n + :type priority: Integer \n + :param job_list: List of jobs in the package \n + :type job_list: List() of Job() objects \n + :param total_wallclock: Wallclock of the package \n + :type total_wallclock: String Formatted \n + :param num_processors: Number of processors for the package \n + :type num_processors: Integer \n + :param platform: Platform object defined for the package \n + :type platform: Platform Object. e.g. EcPlatform() \n + :param as_config: Autosubmit basic configuration object \n + :type as_config: AutosubmitConfig object \n + """ def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config): super(WrapperJob, self).__init__(name, job_id, status, priority) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 0c259862946ebacc6d55a6aeb7d1c6f26d33ac3b..e9ceca8df0f2f6b5ef39ff139216eec51f43dfc2 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -119,7 +119,11 @@ class JobList: :param default_retrials: default retrials for ech job :type default_retrials: int :param new: is it a new generation? - :type new: bool + :type new: bool \n + :param wrapper_type: Type of wrapper defined by the user in autosubmit_.conf [wrapper] section. \n + :type wrapper type: String \n + :param wrapper_jobs: Job types defined in autosubmit_.conf [wrapper sections] to be wrapped. \n + :type wrapper_jobs: String \n """ self._parameters = parameters self._date_list = date_list @@ -148,7 +152,8 @@ class JobList: for job in self._job_list: job.parameters = parameters - if wrapper_type == 'vertical-mixed': + # Perhaps this should be done by default independent of the wrapper_type supplied + if wrapper_type == 'vertical-mixed': self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs(wrapper_jobs) @@ -345,53 +350,76 @@ class JobList: priority += 1 def _create_sorted_dict_jobs(self, wrapper_jobs): + """ + Creates a sorting of the jobs whose job.section is in wrapper_jobs, according to the following filters in order of importance: + date, member, RUNNING, and chunk number; where RUNNING is defined in jobs_.conf for each section. + + If the job does not have a chunk number, the total number of chunks configured for the experiment is used. + + :param wrapper_jobs: User defined job types in autosubmit_,conf [wrapper] section to be wrapped. \n + :type wrapper_jobs: String \n + :return: Sorted Dictionary of Dictionary of List that represents the jobs included in the wrapping process. \n + :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) + """ + # Dictionary Key: date, Value: (Dictionary Key: Member, Value: List) dict_jobs = dict() for date in self._date_list: dict_jobs[date] = dict() for member in self._member_list: dict_jobs[date][member] = list() num_chunks = len(self._chunk_list) - + # Select only relevant jobs, those belonging to the sections defined in the wrapper filtered_jobs_list = filter(lambda job: job.section in wrapper_jobs, self._job_list) - filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) + filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) - sections_running_type_map = dict() + sections_running_type_map = dict() for section in wrapper_jobs.split(" "): + # RUNNING = once, as default. This value comes from jobs_.conf sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", 'once') - + for date in self._date_list: str_date = self._get_date(date) for member in self._member_list: + # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and job.name.split("_")[2] == member, filtered_jobs_fake_date_member) previous_job = sorted_jobs_list[0] + + # get RUNNING for this section section_running_type = sections_running_type_map[previous_job.section] jobs_to_sort = [previous_job] - previous_section_running_type = None - + previous_section_running_type = None + # Index starts at 1 because 0 has been taken in a previous step for index in range(1, len(sorted_jobs_list) + 1): + # If not last item if index < len(sorted_jobs_list): job = sorted_jobs_list[index] - + # Test if section has changed. e.g. from INI to SIM if previous_job.section != job.section: previous_section_running_type = section_running_type section_running_type = sections_running_type_map[job.section] - + # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ or index == len(sorted_jobs_list): + # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) + # Important to note that the only differentiating factor would be chunk OR num_chunks jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) if len(k.name.split('_')) == 5 else num_chunks + 1))) + # Bringing back original job if identified for idx in range(0, len(jobs_to_sort)): + # Test if it is a fake job if jobs_to_sort[idx] in fake_original_job_map: fake_job = jobs_to_sort[idx] + # Get original jobs_to_sort[idx] = fake_original_job_map[fake_job] - + # Add to result, and reset jobs_to_sort + # By adding to the result at this step, only those with the same RUNNIN have been added. dict_jobs[date][member] += jobs_to_sort jobs_to_sort = [] @@ -401,6 +429,17 @@ class JobList: return dict_jobs def _create_fake_dates_members(self, filtered_jobs_list): + """ + Using the list of jobs provided, creates clones of these jobs and modifies names conditionted on job.date, job.member values (testing None). + The purpose is that all jobs share the same name structure. + + :param filtered_jobs_list: A list of jobs of only those that comply with certain criteria, e.g. those belonging to a user defined job type for wrapping. \n + :type filetered_jobs_list: List() of Job Objects \n + :return filtered_jobs_fake_date_member: List of fake jobs. \n + :rtype filtered_jobs_fake_date_member: List of Job Objects \n + :return fake_original_job_map: Dictionary that maps fake job to original one. \n + :rtype fake_original_job_map: Dictionary Key: Job Object, Value: Job Object + """ filtered_jobs_fake_date_member = [] fake_original_job_map = dict() @@ -409,29 +448,44 @@ class JobList: fake_job = None # running once and synchronize date if job.date is None and job.member is None: + # Declare None values as if they were the last items in corresponding list date = self._date_list[-1] member = self._member_list[-1] - fake_job = copy.deepcopy(job) + # Use previous values to modify name of fake job fake_job.name = fake_job.name.split('_', 1)[0] + "_" + self._get_date(date) + "_" \ + member + "_" + fake_job.name.split("_", 1)[1] + # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) + # Mapping fake jobs to orignal ones fake_original_job_map[fake_job] = job # running date or synchronize member elif job.member is None: + # Declare None value as if it were the last items in corresponding list member = self._member_list[-1] fake_job = copy.deepcopy(job) + # Use it to modify name of fake job fake_job.name = fake_job.name.split('_', 2)[0] + "_" + fake_job.name.split('_', 2)[ 1] + "_" + member + "_" + fake_job.name.split("_", 2)[2] + # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) + # Mapping fake jobs to orignal ones fake_original_job_map[fake_job] = job - + # There was no result if fake_job is None: filtered_jobs_fake_date_member.append(job) return filtered_jobs_fake_date_member, fake_original_job_map def _get_date(self, date): + """ + Parses a user defined Date (from [experiment] DATELIST) to return a special String representation of that Date + + :param date: String representation of a date in format YYYYYMMdd. \n + :type date: String \n + :return: String representation of date according to format. \n + :rtype: String \n + """ date_format = '' if date.hour > 1: date_format = 'H' @@ -674,7 +728,7 @@ class JobList: def get_active(self, platform=None): """ - Returns a list of active jobs (In platforms, Ready) + Returns a list of active jobs (In platforms queue + Ready) :param platform: job platform :type platform: HPCPlatform diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py index 2208263b2528f077c22aefbaeaca6b697cb48bb4..d17e2d5f018599239a82a474ccf235a2b04dbf44 100644 --- a/autosubmit/job/job_package_persistence.py +++ b/autosubmit/job/job_package_persistence.py @@ -24,6 +24,16 @@ from autosubmit.database.db_manager import DbManager class JobPackagePersistence(object): + """ + Class that handles packages workflow. + + Creates Packages Table, Wrappers Table. + + :param persistence_path: Path to the persistence folder pkl. \n + :type persistence_path: String \n + :param persistence_file: Name of the persistence pkl file. \n + :type persistence_file: String + """ VERSION = 1 JOB_PACKAGES_TABLE = 'job_package' diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a8b242fdcbed6a9f5dabfe4701dd073caea87484..5a7603150e1c0ffa997905111e944cecf050ed38 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -28,20 +28,32 @@ from math import ceil class JobPackager(object): """ - The main responsibility of this class is to manage the packages of jobs that have to be submitted. + Main class that manages Job wrapping. + + :param as_config: Autosubmit basic configuration.\n + :type as_config: AutosubmitConfig object.\n + :param platform: A particular platform we are dealing with, e.g. Lsf Platform.\n + :type platform: Specific Platform Object, e.g. LsfPlatform(), EcPlatform(), ...\n + :param jobs_list: Contains the list of the jobs, along other properties.\n + :type jobs_list: JobList object. """ - def __init__(self, as_config, platform, jobs_list): + def __init__(self, as_config, platform, jobs_list): self._as_config = as_config self._platform = platform self._jobs_list = jobs_list - + # Submitted + Queuing Jobs for specific Platform waiting_jobs = len(jobs_list.get_submitted(platform) + jobs_list.get_queuing(platform)) + # Calculate available space in Platform Queue self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs - self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) + # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf + # .total_jobs Maximum number of jobs at the same time + self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) + # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + # True or False self.remote_dependencies = self._as_config.get_remote_dependencies() self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -55,33 +67,39 @@ class JobPackager(object): """ Returns the list of the built packages to be submitted - :return: list of packages - :rtype list + :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical-mixed' or 'vertical'. \n + :rtype: List() of JobPackageVertical """ packages_to_submit = list() remote_dependencies_dict = dict() - + # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty if only_generate: jobs_to_submit = jobs_filtered else: jobs_ready = self._jobs_list.get_ready(self._platform) if jobs_ready == 0: + # If there are no jobs ready, result is tuple of empty return packages_to_submit, remote_dependencies_dict if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): + # If there is no more space in platform, result is tuple of empty return packages_to_submit, remote_dependencies_dict + # Sort by 6 first digits of date available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) - list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) + # Sort by Priority, highest first + list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) + # Take the first num_jobs_to_submit from the list of available jobs_to_submit = list_of_available[0:num_jobs_to_submit] - + # print(len(jobs_to_submit)) jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) - for section in jobs_to_submit_by_section: + # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined + # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] if self._platform.allow_wrappers and self.wrapper_type in ['horizontal', 'vertical', 'vertical-mixed', 'vertical-horizontal', 'horizontal-vertical'] \ and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): - + # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) if self.wrapper_type in ['vertical', 'vertical-mixed']: @@ -113,13 +131,15 @@ class JobPackager(object): The value for each key is a list() with all the jobs with the key section. :param jobs_list: list of jobs to be divided - :rtype: dict + :rtype: Dictionary Key: Section Name, Value: List(Job Object) """ + # .jobs_in_wrapper defined in .conf, see constructor. sections_split = self.jobs_in_wrapper.split() jobs_section = dict() for job in jobs_list: - section = next((s for s in sections_split if job.section in s and '&' in s), None) + # This iterator will always return None if there is no '&' defined in the section name + section = next((s for s in sections_split if job.section in s and '&' in s), None) if section is None: section = job.section if section not in jobs_section: @@ -156,9 +176,20 @@ class JobPackager(object): return packages, remote_dependencies_dict def _build_vertical_packages(self, section_list, max_wrapped_jobs): + """ + Builds Vertical-Mixed or Vertical + + :param section_list: Jobs defined as wrappable belonging to a common section.\n + :type section_list: List() of Job Objects. \n + :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n + :type max_wrapped_jobs: Integer. \n + :return: List of Wrapper Packages, Dictionary that details dependencies. \n + :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) + """ packages = [] potential_dependency = None remote_dependencies_dict = dict() + # True when autosubmit_.conf value [wrapper].DEPENDENCIES has been set to true if self.remote_dependencies: remote_dependencies_dict['name_to_id'] = dict() remote_dependencies_dict['dependencies'] = dict() @@ -177,15 +208,18 @@ class JobPackager(object): max_wrapped_jobs, self._platform.max_wallclock) jobs_list = job_vertical_packager.build_vertical_package(job) + # update max_jobs, potential_dependency is None self.max_jobs -= len(jobs_list) if job.status is Status.READY: packages.append(JobPackageVertical(jobs_list)) - else: + else: package = JobPackageVertical(jobs_list, potential_dependency) packages.append(package) + # Possible need of "if self.remote_dependencies here" remote_dependencies_dict['name_to_id'][potential_dependency] = -1 remote_dependencies_dict['dependencies'][package.name] = potential_dependency if self.remote_dependencies: + # Sending last item in list of packaged child = job_vertical_packager.get_wrappable_child(jobs_list[-1]) if child is not None: section_list.insert(section_list.index(job) + 1, child) @@ -251,6 +285,21 @@ class JobPackager(object): class JobPackagerVertical(object): + """ + Vertical Packager Parent Class + + :param jobs_list: Usually there is only 1 job in this list. \n + :type jobs_list: List() of Job Objects \n + :param total_wallclock: Wallclock per object. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: Integer + + """ def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): self.jobs_list = jobs_list @@ -260,15 +309,31 @@ class JobPackagerVertical(object): self.max_wallclock = max_wallclock def build_vertical_package(self, job): + """ + Goes trough the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable + and groups them together into a wrapper. + + :param job: Job to be wrapped. \n + :type job: Job Object \n + :return: List of jobs that are wrapped together. \n + :rtype: List() of Job Object \n + """ + # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion if len(self.jobs_list) >= self.max_jobs or len(self.jobs_list) >= self.max_wrapped_jobs: return self.jobs_list child = self.get_wrappable_child(job) + # If not None, it is wrappable if child is not None: + # Calculate total wallclock per possible wrapper self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) + # Testing against max from platform if self.total_wallclock <= self.max_wallclock: + # Marking, this is later tested in the main loop child.packed = True self.jobs_list.append(child) + # Recursive call return self.build_vertical_package(child) + # Wrapped jobs are accumulated and returned in this list return self.jobs_list def get_wrappable_child(self, job): @@ -279,11 +344,33 @@ class JobPackagerVertical(object): class JobPackagerVerticalSimple(JobPackagerVertical): + """ + Vertical Packager Class. First statement of the constructor builds JobPackagerVertical. + + :param jobs_list: List of jobs, usually only receives one job. \n + :type jobs_list: List() of Job Objects \n + :param total_wallclock: Wallclock from Job. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: Integer + """ def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): super(JobPackagerVerticalSimple, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) def get_wrappable_child(self, job): + """ + Goes through the children jobs of job, tests if wrappable using self._is_wrappable. + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: job (children) that is wrappable. \n + :rtype: Job Object + """ for child in job.children: if self._is_wrappable(child, job): return child @@ -291,32 +378,73 @@ class JobPackagerVerticalSimple(JobPackagerVertical): return None def _is_wrappable(self, job, parent=None): + """ + Determines if a job (children) is wrappable. Basic condition is that the parent should have the same section as the child. + Also, test that the parents of the job (children) are COMPLETED. + + :param job: Children Job to be tested. \n + :type job: Job Object \n + :param parent: Original Job whose children are tested. \n + :type parent: Job Object \n + :return: True if wrappable, False otherwise. \n + :rtype: Boolean + """ if job.section != parent.section: return False for other_parent in job.parents: + # First part, parents should be COMPLETED. + # Second part, no cycles. if other_parent.status != Status.COMPLETED and other_parent not in self.jobs_list: return False return True class JobPackagerVerticalMixed(JobPackagerVertical): - + """ + Vertical Mixed Class. First statement of the constructor builds JobPackagerVertical. + + :param dict_jobs: Jobs sorted by date, member, RUNNING, and chunk number. Only those relevant to the wrapper. \n + :type dict_jobs: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs sorted) \n + :param ready_job: Job to be wrapped. \n + :type ready_job: Job Object \n + :param jobs_list: ready_job as a list. \n + :type jobs_list: List() of Job Object \n + :param total_wallclock: wallclock time per job. \n + :type total_wallclock: String \n + :param max_jobs: Maximum number of jobs per platform. \n + :type max_jobs: Integer \n + :param max_wrapped_jobs: Value from jobs_parser, if not found default to an autosubmit_.conf value (Looks first in [wrapper] section). \n + :type max_wrapped_jobs: Integer \n + :param max_wallclock: Value from Platform. \n + :type max_wallclock: String \n + """ def __init__(self, dict_jobs, ready_job, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): super(JobPackagerVerticalMixed, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) self.ready_job = ready_job self.dict_jobs = dict_jobs - + # Last date from the ordering date = dict_jobs.keys()[-1] + # Last member from the last date from the ordering member = dict_jobs[date].keys()[-1] + # If job to be wrapped has date and member, use those if ready_job.date is not None: date = ready_job.date if ready_job.member is not None: member = ready_job.member - + # Extract list of sorted jobs per date and member self.sorted_jobs = dict_jobs[date][member] self.index = 0 def get_wrappable_child(self, job): + """ + Goes through the jobs with the same date and member than the input job, and return the first that satisfies self._is_wrappable() + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: job that is wrappable. \n + :rtype: Job Object + """ + # Unnecessary assignment sorted_jobs = self.sorted_jobs for index in range(self.index, len(sorted_jobs)): @@ -328,8 +456,19 @@ class JobPackagerVerticalMixed(JobPackagerVertical): return None def _is_wrappable(self, job): + """ + Determines if a job is wrappable. Basically, the job shouldn't have been packed already and the status must be READY or WAITING, + Its parents should be COMPLETED. + + :param job: job to be evaluated. \n + :type job: Job Object \n + :return: True if wrappable, False otherwise. \n + :rtype: Boolean + """ if job.packed is False and (job.status == Status.READY or job.status == Status.WAITING): for parent in job.parents: + # First part of this conditional is always going to be true because otherwise there would be a cycle + # Second part is actually relevant, parents of a wrapper should be COMPLETED if parent not in self.jobs_list and parent.status != Status.COMPLETED: return False return True diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index cecd1c13081b79edf7f4371ae81758155082986a..4303f7bbc3c9c91ae5710aa1ac21d5c463e516f1 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -76,6 +76,14 @@ class JobPackageBase(object): return self._platform def submit(self, configuration, parameters,only_generate=False): + """ + :para configuration: Autosubmit basic configuration \n + :type configuration: AutosubmitConfig object \n + :param parameters; Parameters from joblist \n + :type parameters: JobList,parameters \n + :param only_generate: True if coming from generate_scripts_andor_wrappers(). If true, only generates scripts; otherwise, submits. \n + :type only_generate: Boolean + """ exit=False for job in self.jobs: if job.check.lower() == Job.CHECK_ON_SUBMISSION: @@ -233,12 +241,16 @@ class JobPackageArray(JobPackageBase): class JobPackageThread(JobPackageBase): """ Class to manage a thread-based package of jobs to be submitted by autosubmit + + :param dependency: Name of potential dependency + :type dependency: String """ FILE_PREFIX = 'ASThread' def __init__(self, jobs, dependency=None, jobs_resources=dict()): super(JobPackageThread, self).__init__(jobs) self._job_scripts = {} + # Seems like this one is not used at all in the class self._job_dependency = dependency self._common_script = None self._wallclock = '00:00' @@ -408,6 +420,10 @@ class JobPackageThreadWrapped(JobPackageThread): class JobPackageVertical(JobPackageThread): """ Class to manage a vertical thread-based package of jobs to be submitted by autosubmit + + :param jobs: + :type jobs: + :param: dependency: """ def __init__(self, jobs, dependency=None): diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 404ab9eedbe5217a985bbf97a324bfee0530d35a..46196dbc67f9ec49a36e20d4b9bc76888026eda8 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -56,6 +56,7 @@ class ParamikoSubmitter(Submitter): platforms_used = list() hpcarch = asconf.get_platform() + # Traverse jobs defined in jobs_.conf and add platforms found if not already included job_parser = asconf.jobs_parser for job in job_parser.sections(): hpc = job_parser.get_option(job, 'PLATFORM', hpcarch).lower() @@ -63,8 +64,10 @@ class ParamikoSubmitter(Submitter): platforms_used.append(hpc) parser = asconf.platforms_parser - + # Declare platforms dictionary, key: Platform Name, Value: Platform Object platforms = dict() + + # Build Local Platform Object local_platform = LocalPlatform(asconf.expid, 'local', BasicConfig) local_platform.max_wallclock = asconf.get_max_wallclock() local_platform.max_processors = asconf.get_max_processors() @@ -74,11 +77,15 @@ class ParamikoSubmitter(Submitter): local_platform.temp_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, 'ASlogs') local_platform.root_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, local_platform.expid) local_platform.host = 'localhost' + # Add object to entry in dictionary platforms['local'] = local_platform platforms['LOCAL'] = local_platform + # parser is the platforms parser that represents platforms_.conf + # Traverse sections [] for section in parser.sections(): - + + # Consider only those included in the list of jobs if section.lower() not in platforms_used: continue @@ -103,10 +110,11 @@ class ParamikoSubmitter(Submitter): except ParamikoPlatformException as e: Log.error("Queue exception: {0}".format(e.message)) return None - + # Set the type and version of the platform found remote_platform.type = platform_type remote_platform._version = platform_version + # Concatenating host + project and adding to the object if parser.get_option(section, 'ADD_PROJECT_TO_HOST', '').lower() == 'true': host = '{0}-{1}'.format(parser.get_option(section, 'HOST', None), parser.get_option(section, 'PROJECT', None)) @@ -114,6 +122,7 @@ class ParamikoSubmitter(Submitter): host = parser.get_option(section, 'HOST', None) remote_platform.host = host + # Retrieve more configurations settings and save them in the object remote_platform.max_wallclock = parser.get_option(section, 'MAX_WALLCLOCK', asconf.get_max_wallclock()) remote_platform.max_processors = parser.get_option(section, 'MAX_PROCESSORS', @@ -142,10 +151,13 @@ class ParamikoSubmitter(Submitter): None) remote_platform.root_dir = os.path.join(remote_platform.scratch, remote_platform.project, remote_platform.user, remote_platform.expid) + # Executes update_cmds() from corresponding Platform Object remote_platform.update_cmds() + # Save platform into result dictionary platforms[section.lower()] = remote_platform for section in parser.sections(): + # if this section is included in platforms if parser.has_option(section, 'SERIAL_PLATFORM'): platforms[section.lower()].serial_platform = platforms[parser.get_option(section, 'SERIAL_PLATFORM',