diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 119ad03aa40129655edc6c4e6b5c2b6b418bbee4..0218a8735c3a49f0fac2c1212134e18d7ea87eb7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -66,6 +66,8 @@ from bscearth.utils.config_parser import ConfigParserFactory from job.job_common import Status from git.autosubmit_git import AutosubmitGit from job.job_list import JobList +from job.job_packages import JobPackageThread +from job.job_package_persistence import JobPackagePersistence from job.job_list_persistence import JobListPersistenceDb from job.job_list_persistence import JobListPersistencePkl # noinspection PyPackageRequirements @@ -643,6 +645,9 @@ class Autosubmit: job_list.check_scripts(as_conf) + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -691,7 +696,7 @@ class Autosubmit: if Autosubmit.exit: return 2 - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test): + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): job_list.save() if Autosubmit.exit: return 2 @@ -712,7 +717,7 @@ class Autosubmit: return False @staticmethod - def submit_ready_jobs(as_conf, job_list, platforms_to_test): + def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -726,10 +731,22 @@ class Autosubmit: save = False for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) - packages_to_submit = JobPackager(as_conf, platform, job_list).build_packages() + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() for package in packages_to_submit: try: + if remote_dependencies_dict and package.name in remote_dependencies_dict['dependencies']: + remote_dependency = remote_dependencies_dict['dependencies'][package.name] + remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] + package.set_job_dependency(remote_dependency_id) + package.submit(as_conf, job_list.parameters) + + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + + if isinstance(package, JobPackageThread): + packages_persistence.save(package.name, package.jobs, package._expid) + save = True except WrongTemplateException as e: Log.error("Invalid parameter substitution in {0} template", e.job_name) @@ -846,11 +863,17 @@ class Autosubmit: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove + packages = None + if as_conf.get_wrapper_type() != 'none': + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() + monitor_exp = Monitor() + if txt_only: monitor_exp.generate_output_txt(expid, jobs, exp_path+"/tmp/LOG_"+expid) else: - monitor_exp.generate_output(expid, jobs, os.path.join(exp_path, "/tmp/LOG_", expid), file_format, not hide) + monitor_exp.generate_output(expid, jobs, os.path.join(exp_path, "/tmp/LOG_", expid), file_format, packages, not hide) return True @@ -1062,10 +1085,13 @@ class Autosubmit: Log.result("Recovery finalized") + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() + if not noplot: Log.info("\nPlotting the jobs list...") monitor_exp = Monitor() - monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), show=not hide) + monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), packages=packages, show=not hide) return True @@ -1791,7 +1817,8 @@ class Autosubmit: date_format = 'M' job_list.generate(date_list, member_list, num_chunks, chunk_ini, parameters, date_format, as_conf.get_retrials(), - as_conf.get_default_job_type()) + as_conf.get_default_job_type(), + as_conf.get_wrapper_expression()) if rerun == "true": chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) job_list.rerun(chunk_list) @@ -1800,10 +1827,13 @@ class Autosubmit: Log.info("\nSaving the jobs list...") job_list.save() + + JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).reset_table() if not noplot: Log.info("\nPlotting the jobs list...") monitor_exp = Monitor() - monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), output, not hide) + monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), output, None, not hide) Log.result("\nJob list created successfully") Log.user_warning("Remember to MODIFY the MODEL config files!") @@ -2017,10 +2047,13 @@ class Autosubmit: job_list.update_list(as_conf) Log.warning("Changes NOT saved to the JobList!!!!: use -s option to save") + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() + if not noplot: Log.info("\nPloting joblist...") monitor_exp = Monitor() - monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), show=not hide) + monitor_exp.generate_output(expid, job_list.get_job_list(), os.path.join(exp_path, "/tmp/LOG_", expid), packages=packages, show=not hide) return True @@ -2356,7 +2389,7 @@ class Autosubmit: date_format = 'M' job_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), as_conf.get_chunk_ini(), as_conf.load_parameters(), date_format, as_conf.get_retrials(), - as_conf.get_default_job_type(), False) + as_conf.get_default_job_type(), as_conf.get_wrapper_expression(), False) return job_list @staticmethod diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 83769e5e1586df4e290fe83ea75285442977b72f..4c0381081c5a6da0655b9af4fa37f4380afb206d 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -266,9 +266,10 @@ class AutosubmitConfig(object): """ Log.info('\nChecking configuration files...') self.reload() - result = self.check_autosubmit_conf() - result = result and self.check_platforms_conf() + #result = self.check_autosubmit_conf() + result = self.check_platforms_conf() result = result and self.check_jobs_conf() + result = result and self.check_autosubmit_conf() result = result and self.check_expdef_conf() if result: Log.result("Configuration files OK\n") @@ -294,6 +295,7 @@ class AutosubmitConfig(object): result = result and self._conf_parser.check_is_boolean('mail', 'NOTIFICATIONS', False) result = result and self.is_valid_communications_library() result = result and self.is_valid_storage_type() + result = result and self.is_valid_wrapper_expression() if self.get_notifications() == 'true': for mail in self.get_mails_to(): @@ -828,7 +830,7 @@ class AutosubmitConfig(object): def get_max_wallclock(self): """ - Returns max wallclock from autosubmit's config file + Returns max wallclock :rtype: str """ @@ -918,6 +920,33 @@ class AutosubmitConfig(object): """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + def get_wrapper_expression(self): + """ + Returns the wrapper expression the user has configured in the autosubmit's config + + :return: expression (or none) + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'EXPRESSION', 'None') + + def get_max_wrapped_jobs(self): + """ + Returns the maximum number of jobs that can be wrapped together as configured in autosubmit's config file + + :return: maximum number of jobs (or total jobs) + :rtype: string + """ + return int(self._conf_parser.get_option('wrapper', 'MAXWRAPPEDJOBS', self.get_total_jobs())) + + def get_jobs_sections(self): + """ + Returns the list of sections defined in the jobs config file + + :return: sections + :rtype: list + """ + return self._jobs_parser.sections() + def get_copy_remote_logs(self): """ Returns if the user has enabled the logs local copy from autosubmit's config file @@ -969,6 +998,16 @@ class AutosubmitConfig(object): storage_type = self.get_storage_type() return storage_type in ['pkl', 'db'] + def is_valid_wrapper_expression(self): + expression = self.get_wrapper_expression() + if expression != 'None': + parser = self._jobs_parser + sections = parser.sections() + for section in expression.split(" "): + if section not in sections: + return False + return True + def is_valid_git_repository(self): origin_exists = self._exp_parser.check_exists('git', 'PROJECT_ORIGIN') branch = self.get_git_project_branch() diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c68e989dadd88c600fe72cc42f8e4dbb7d398c40..c81a083aac8f93cabc97e13a858340a95356e325 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -91,6 +91,8 @@ class Job(object): self.write_start = False self._platform = None self.check = 'True' + self.packed = False + def __getstate__(self): odict = self.__dict__ diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index c139fdb8c7ebaf6bdc90b1906a377fe6f14b1521..74af5a1391743939c2751f63b758173d4a2d7d34 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -66,6 +66,9 @@ class JobList: self._persistence = job_list_persistence self._graph = DiGraph() + self._packages_dict = dict() + self._ordered_jobs_by_date_member = dict() + @property def expid(self): """ @@ -91,7 +94,7 @@ class JobList: self._graph = value def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, new=True): + default_job_type, wrapper_expression, new=True): """ Creates all jobs needed for the current workflow @@ -141,6 +144,10 @@ class JobList: for job in self._job_list: job.parameters = parameters + if wrapper_expression != 'None': + self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs(wrapper_expression) + + @staticmethod def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"): for job_section in jobs_parser.sections(): @@ -288,9 +295,136 @@ class JobList: dic_jobs.read_section(section, priority, default_job_type, jobs_data) priority += 1 + def _create_sorted_dict_jobs(self, wrapper_expression): + dict_jobs = dict() + for date in self._date_list: + dict_jobs[date] = dict() + for member in self._member_list: + dict_jobs[date][member] = list() + num_chunks = len(self._chunk_list) + + filtered_jobs_list = filter(lambda job: job.section in wrapper_expression, self._job_list) + + filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) + + sections_running_type_map = dict() + for section in wrapper_expression.split(" "): + sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", '') + + for date in self._date_list: + str_date = self._get_date(date) + for member in self._member_list: + sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and + job.name.split("_")[2] == member, filtered_jobs_fake_date_member) + + previous_job = sorted_jobs_list[0] + section_running_type = sections_running_type_map[previous_job.section] + + jobs_to_sort = [previous_job] + previous_section_running_type = None + + for index in range(1, len(sorted_jobs_list) + 1): + if index < len(sorted_jobs_list): + job = sorted_jobs_list[index] + + if previous_job.section != job.section: + previous_section_running_type = section_running_type + section_running_type = sections_running_type_map[job.section] + + if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ + or index == len(sorted_jobs_list): + + jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), + (int(k.name.split('_')[3]) + if len(k.name.split('_')) == 5 else num_chunks + 1))) + + for idx in range(0, len(jobs_to_sort)): + if jobs_to_sort[idx] in fake_original_job_map: + fake_job = jobs_to_sort[idx] + jobs_to_sort[idx] = fake_original_job_map[fake_job] + + dict_jobs[date][member] += jobs_to_sort + jobs_to_sort = [] + + jobs_to_sort.append(job) + previous_job = job + + return dict_jobs + + def _create_fake_dates_members(self, filtered_jobs_list): + filtered_jobs_fake_date_member = [] + fake_original_job_map = dict() + + import copy + for job in filtered_jobs_list: + fake_job = None + # running once and synchronize date + if job.date is None and job.member is None: + date = self._date_list[-1] + member = self._member_list[-1] + + fake_job = copy.deepcopy(job) + fake_job.name = fake_job.name.split('_', 1)[0] + "_" + self._get_date(date) + "_" \ + + member + "_" + fake_job.name.split("_", 1)[1] + filtered_jobs_fake_date_member.append(fake_job) + fake_original_job_map[fake_job] = job + # running date or synchronize member + elif job.member is None: + member = self._member_list[-1] + fake_job = copy.deepcopy(job) + fake_job.name = fake_job.name.split('_', 2)[0] + "_" + fake_job.name.split('_', 2)[ + 1] + "_" + member + "_" + fake_job.name.split("_", 2)[2] + filtered_jobs_fake_date_member.append(fake_job) + fake_original_job_map[fake_job] = job + + if fake_job is None: + filtered_jobs_fake_date_member.append(job) + + return filtered_jobs_fake_date_member, fake_original_job_map + + def _get_date(self, date): + date_format = '' + if self.parameters.get('CHUNKSIZEUNIT') is 'hour': + date_format = 'H' + for date in self._date_list: + if date.hour > 1: + date_format = 'H' + if date.minute > 1: + date_format = 'M' + str_date = date2str(date, date_format) + return str_date + def __len__(self): return self._job_list.__len__() + def get_date_list(self): + """ + Get inner date list + + :return: date list + :rtype: list + """ + return self._date_list + + def get_member_list(self): + + """ + Get inner member list + + :return: member list + :rtype: list + """ + return self._member_list + + def get_chunk_list(self): + """ + Get inner chunk list + + :return: chunk list + :rtype: list + """ + return self._chunk_list + def get_job_list(self): """ Get inner job list @@ -300,6 +434,15 @@ class JobList: """ return self._job_list + def get_ordered_jobs_by_date_member(self): + """ + Get the dictionary of jobs ordered according to wrapper's expression divided by date and member + + :return: jobs ordered divided by date and member + :rtype: dict + """ + return self._ordered_jobs_by_date_member + def get_completed(self, platform=None): """ Returns a list of completed jobs diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py new file mode 100644 index 0000000000000000000000000000000000000000..28edbcfd9cd19dce389f5a5a2913726727d99f8c --- /dev/null +++ b/autosubmit/job/job_package_persistence.py @@ -0,0 +1,66 @@ + + +# Copyright 2017 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + +import os + +from bscearth.utils.log import Log +from autosubmit.database.db_manager import DbManager + + +class JobPackagePersistence(object): + + VERSION = 1 + JOB_PACKAGES_TABLE = 'job_package' + TABLE_FIELDS = ['exp_id', 'package_name', 'job_name'] + + def __init__(self, persistence_path, persistence_file): + self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) + self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS) + + def load(self): + """ + Loads package of jobs from a database + :param persistence_file: str + :param persistence_path: str + + """ + return self.db_manager.select_all(self.JOB_PACKAGES_TABLE) + + def save(self, package_name, jobs, exp_id): + """ + Persists a job list in a database + :param packages_dict: dictionary of jobs per package + :param persistence_file: str + :param persistence_path: str + + """ + #self._reset_table() + job_packages_data = [] + for job in jobs: + job_packages_data += [(exp_id, package_name, job.name)] + + self.db_manager.insertMany(self.JOB_PACKAGES_TABLE, job_packages_data) + + def reset_table(self): + """ + Drops and recreates the database + + """ + self.db_manager.drop_table(self.JOB_PACKAGES_TABLE) + self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS) \ No newline at end of file diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 726f509ed38ee8c96da2dff584846182ab7d468a..d348094021e0646ef66f6d52a7830f57068bb7bd 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -51,6 +51,8 @@ class JobPackager(object): :rtype list """ packages_to_submit = list() + remote_dependencies_dict = dict() + jobs_ready = self._jobs_list.get_ready(self._platform) if jobs_ready == 0: return packages_to_submit @@ -61,53 +63,59 @@ class JobPackager(object): list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) jobs_to_submit = list_of_available[0:num_jobs_to_submit] + jobs_to_submit_by_section = JobPackager._divide_list_by_section(jobs_to_submit) - # If wrapper allowed / well-configured - wrapper_type = self._as_config.get_wrapper_type() - if self._platform.allow_wrappers and wrapper_type in ['horizontal', 'vertical']: - remote_dependencies = self._as_config.get_remote_dependencies() - max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) - if wrapper_type == 'vertical': - for section_list in jobs_to_submit_by_section.values(): - built_packages, max_jobs = JobPackager._build_vertical_packages(section_list, - max_jobs, - self._platform.max_wallclock, - remote_dependencies) + for section in jobs_to_submit_by_section: + wrapper_expression = self._as_config.get_wrapper_expression() + wrapper_type = self._as_config.get_wrapper_type() + if self._platform.allow_wrappers and wrapper_type in ['horizontal', 'vertical'] and \ + (wrapper_expression == 'None' or section in wrapper_expression.split(' ')): + + remote_dependencies = self._as_config.get_remote_dependencies() + max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) + max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) + + if wrapper_type == 'vertical': + built_packages, max_jobs, remote_dependencies_dict = JobPackager._build_vertical_packages( + self._jobs_list.get_ordered_jobs_by_date_member(), + wrapper_expression, + jobs_to_submit_by_section[section], + max_jobs, self._platform.max_wallclock, + max_wrapped_jobs, + remote_dependencies) packages_to_submit += built_packages - return packages_to_submit - elif wrapper_type == 'horizontal': - for section_list in jobs_to_submit_by_section.values(): - built_packages, max_jobs = JobPackager._build_horizontal_packages(section_list, - max_jobs, - self._platform.max_processors, - remote_dependencies) + elif wrapper_type == 'horizontal': + built_packages, max_jobs = JobPackager._build_horizontal_packages(jobs_to_submit_by_section[section], + max_jobs, self._platform.max_processors, + remote_dependencies) packages_to_submit += built_packages - return packages_to_submit - # No wrapper allowed / well-configured - for job in jobs_to_submit: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped([job]) else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - return packages_to_submit + # No wrapper allowed / well-configured + for job in jobs_to_submit_by_section[section]: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped([job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + + return packages_to_submit, remote_dependencies_dict @staticmethod def _divide_list_by_section(jobs_list): """ - Returns a dict() with as many keys as 'jobs_list' different sections. + Returns a dict() with as many keys as 'jobs_list' different sections The value for each key is a list() with all the jobs with the key section. :param jobs_list: list of jobs to be divided :rtype: dict """ - by_section = dict() + jobs_section = dict() for job in jobs_list: - if job.section not in by_section: - by_section[job.section] = list() - by_section[job.section].append(job) - return by_section + if job.section not in jobs_section: + jobs_section[job.section] = list() + jobs_section[job.section].append(job) + return jobs_section @staticmethod def _build_horizontal_packages(section_list, max_jobs, max_processors, remote_dependencies=False): @@ -132,50 +140,130 @@ class JobPackager(object): return packages, max_jobs @staticmethod - def _build_vertical_packages(section_list, max_jobs, max_wallclock, remote_dependencies=False): + def _build_vertical_packages(dict_jobs, wrapper_expression, section_list, max_jobs, max_wallclock, max_wrapped_jobs, + remote_dependencies=False): packages = [] potential_dependency = None + remote_dependencies_dict = dict() + if remote_dependencies: + remote_dependencies_dict['name_to_id'] = dict() + remote_dependencies_dict['dependencies'] = dict() + for job in section_list: if max_jobs > 0: - jobs_list = JobPackager._build_vertical_package(job, [job], job.wallclock, max_jobs, max_wallclock) - max_jobs -= len(jobs_list) - if job.status is Status.READY: - packages.append(JobPackageVertical(jobs_list)) - else: - packages.append(JobPackageVertical(jobs_list, potential_dependency)) - if remote_dependencies: - child = JobPackager._get_wrappable_child(jobs_list[-1], JobPackager._is_wrappable) - if child is not None: - section_list.insert(section_list.index(job) + 1, child) - potential_dependency = packages[-1].name + if job.packed == False: + Log.info("---------------PACKAGE-------------") + Log.info("Added " + job.name) + job.packed = True + + if job.section in wrapper_expression: + job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, max_jobs, + max_wrapped_jobs, max_wallclock) + else: + job_vertical_packager = JobPackagerVerticalSimple([job], job.wallclock, max_jobs, + max_wrapped_jobs, max_wallclock) + + jobs_list = job_vertical_packager.build_vertical_package(job) + max_jobs -= len(jobs_list) + if job.status is Status.READY: + packages.append(JobPackageVertical(jobs_list)) + else: + package = JobPackageVertical(jobs_list, potential_dependency) + packages.append(package) + remote_dependencies_dict['name_to_id'][potential_dependency] = -1 + remote_dependencies_dict['dependencies'][package.name] = potential_dependency + Log.info("---------------END PACKAGE-------------\n") + if remote_dependencies: + child = job_vertical_packager.get_wrappable_child(jobs_list[-1]) + if child is not None: + section_list.insert(section_list.index(job) + 1, child) + potential_dependency = packages[-1].name else: break - return packages, max_jobs + return packages, max_jobs, remote_dependencies_dict - @staticmethod - def _build_vertical_package(job, jobs_list, total_wallclock, max_jobs, max_wallclock): - if len(jobs_list) >= max_jobs: - return jobs_list - child = JobPackager._get_wrappable_child(job, JobPackager._is_wrappable) +class JobPackagerVertical(object): + + def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): + self.jobs_list = jobs_list + self.total_wallclock = total_wallclock + self.max_jobs = max_jobs + self.max_wrapped_jobs = max_wrapped_jobs + self.max_wallclock = max_wallclock + + def build_vertical_package(self, job): + if len(self.jobs_list) >= self.max_jobs or len(self.jobs_list) >= self.max_wrapped_jobs: + return self.jobs_list + child = self.get_wrappable_child(job) if child is not None: - total_wallclock = sum_str_hours(total_wallclock, child.wallclock) - if total_wallclock <= max_wallclock: - jobs_list.append(child) - return JobPackager._build_vertical_package(child, jobs_list, total_wallclock, max_jobs, max_wallclock) - return jobs_list + self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) + if self.total_wallclock <= self.max_wallclock: + Log.info("Added " + child.name) + child.packed = True + self.jobs_list.append(child) + return self.build_vertical_package(child) + Log.info("total wallclock "+self.total_wallclock) + return self.jobs_list - @staticmethod - def _get_wrappable_child(job, check_function): + def get_wrappable_child(self, job): + pass + + def _is_wrappable(self, job): + pass + +class JobPackagerVerticalSimple(JobPackagerVertical): + + def __init__(self, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): + super(JobPackagerVerticalSimple, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) + + def get_wrappable_child(self, job): for child in job.children: - if check_function(job, child): + if self._is_wrappable(child, job): return child continue return None - @staticmethod - def _is_wrappable(parent, child): - if child.section != parent.section: - return False - if len(child.parents) > 1: + def _is_wrappable(self, job, parent=None): + if job.section != parent.section: return False + for other_parent in job.parents: + if other_parent.status != Status.COMPLETED and other_parent not in self.jobs_list: + return False return True + +class JobPackagerVerticalMixed(JobPackagerVertical): + + def __init__(self, dict_jobs, ready_job, jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock): + super(JobPackagerVerticalMixed, self).__init__(jobs_list, total_wallclock, max_jobs, max_wrapped_jobs, max_wallclock) + self.ready_job = ready_job + self.dict_jobs = dict_jobs + + date = dict_jobs.keys()[-1] + member = dict_jobs[date].keys()[-1] + if ready_job.date is not None: + date = ready_job.date + if ready_job.member is not None: + member = ready_job.member + + self.sorted_jobs = dict_jobs[date][member] + self.index = 0 + + def get_wrappable_child(self, job): + sorted_jobs = self.sorted_jobs + + for index in range(self.index, len(sorted_jobs)): + child = sorted_jobs[index] + if self._is_wrappable(child): + self.index = index+1 + return child + continue + return None + + def _is_wrappable(self, job): + if job.packed == False and (job.status == Status.READY or job.status == Status.WAITING): + for parent in job.parents: + if parent not in self.jobs_list and parent.status != Status.COMPLETED: + return False + return True + return False + diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index bb19285f56c01a671b9f32da19641d46cd4e310d..fb7ee606aa4aad490f184547e7e06810419fa80f 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -255,6 +255,9 @@ class JobPackageThread(JobPackageBase): def _project(self): return self._platform.project + def set_job_dependency(self, dependency): + self._job_dependency = dependency + def _create_scripts(self, configuration): for i in range(1, len(self.jobs) + 1): self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) @@ -374,6 +377,7 @@ class JobPackageVertical(JobPackageThread): def __init__(self, jobs, dependency=None): super(JobPackageVertical, self).__init__(jobs, dependency) + #TODO unit or regression test of the wrappers, it will fail as in issue 280 for job in jobs: if job.processors > self._num_processors: self._num_processors = job.processors diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 048690c66cdb0c8356fcff6adce2250666180fc8..b0f4e4f0b4dd6d4b1c12b149957ea468eddbd041 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -77,7 +77,7 @@ class Monitor: else: return Monitor._table[Status.UNKNOWN] - def create_tree_list(self, expid, joblist): + def create_tree_list(self, expid, joblist, packages): """ Create graph from joblist @@ -116,6 +116,13 @@ class Monitor: exp = pydotplus.Subgraph(graph_name='Experiment', label=expid) self.nodes_ploted = set() Log.debug('Creating job graph...') + + jobs_packages_dict = dict() + if packages != None and packages: + for (exp_id, package_name, job_name) in packages: + jobs_packages_dict[job_name] = package_name + packages_subgraphs_dict = dict() + for job in joblist: if job.has_parents(): continue @@ -126,6 +133,20 @@ class Monitor: self._add_children(job, exp, node_job) graph.add_subgraph(exp) + + for node in exp.get_nodes(): + name = node.obj_dict['name'] + if name in jobs_packages_dict: + package = jobs_packages_dict[name] + if package not in packages_subgraphs_dict: + packages_subgraphs_dict[package] = pydotplus.graphviz.Cluster(graph_name=package) + packages_subgraphs_dict[package].obj_dict['attributes']['color'] = 'black' + packages_subgraphs_dict[package].obj_dict['attributes']['style'] = 'dashed' + packages_subgraphs_dict[package].add_node(node) + + for package, cluster in packages_subgraphs_dict.items(): + graph.add_subgraph(cluster) + Log.debug('Graph definition finalized') return graph @@ -148,7 +169,7 @@ class Monitor: if flag: self._add_children(child, exp, node_child) - def generate_output(self, expid, joblist, path, output_format="pdf", show=False): + def generate_output(self, expid, joblist, path, output_format="pdf", packages=None, show=False): """ Plots graph for joblist and stores it in a file @@ -167,7 +188,7 @@ class Monitor: output_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "plot", expid + "_" + output_date + "." + output_format) - graph = self.create_tree_list(expid, joblist) + graph = self.create_tree_list(expid, joblist, packages) Log.debug("Saving workflow plot at '{0}'", output_file) if output_format == "png": diff --git a/autosubmit/platforms/wrappers/lsf_wrapper.py b/autosubmit/platforms/wrappers/lsf_wrapper.py index ffc1dc7ad7b3f5384ec26de2b752f669a48dc1b7..3f2615bb6759361c87112728f950f7e61fd28ff3 100644 --- a/autosubmit/platforms/wrappers/lsf_wrapper.py +++ b/autosubmit/platforms/wrappers/lsf_wrapper.py @@ -71,6 +71,7 @@ class LsfWrapper(object): import sys from threading import Thread from commands import getstatusoutput + from datetime import datetime class JobThread(Thread): def __init__ (self, template, id_run): @@ -93,7 +94,7 @@ class LsfWrapper(object): completed_filename = scripts[i].replace('.cmd', '_COMPLETED') completed_path = os.path.join(os.getcwd(), completed_filename) if os.path.exists(completed_path): - print "The job ", current.template," has been COMPLETED" + print datetime.now(), " - The job ", current.template," has been COMPLETED" else: print "The job ", current.template," has FAILED" os._exit(1) @@ -168,4 +169,4 @@ class LsfWrapper(object): @classmethod def dependency_directive(cls, dependency): - return '#' if dependency is None else '#BSUB -w \'done("{0}")\' [-ti]'.format(dependency) + return '#' if dependency is None else '#BSUB -w \'done({0})\' [-ti]'.format(dependency) diff --git a/autosubmit/platforms/wrappers/slurm_wrapper.py b/autosubmit/platforms/wrappers/slurm_wrapper.py index c37f000fb41b0abdb34b7df84ed7aa5efff3fde1..5eedfff66be8c89189f27b882311719c0274f92e 100644 --- a/autosubmit/platforms/wrappers/slurm_wrapper.py +++ b/autosubmit/platforms/wrappers/slurm_wrapper.py @@ -48,6 +48,7 @@ class SlurmWrapper(object): import sys from threading import Thread from commands import getstatusoutput + from datetime import datetime class JobThread(Thread): def __init__ (self, template, id_run): @@ -70,9 +71,9 @@ class SlurmWrapper(object): completed_filename = scripts[i].replace('.cmd', '_COMPLETED') completed_path = os.path.join(os.getcwd(), completed_filename) if os.path.exists(completed_path): - print "The job ", current.template," has been COMPLETED" + print datetime.now(), "The job ", current.template," has been COMPLETED" else: - print "The job ", current.template," has FAILED" + print datetime.now(), "The job ", current.template," has FAILED" os._exit(1) """.format(filename, cls.queue_directive(queue), project, wallclock, num_procs, str(job_scripts), cls.dependency_directive(dependency), diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index dde5834ee29f5e820249db629e65fabc46a326ae..99426196683acb748b381b54ed399d4d74b48b86 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -218,7 +218,7 @@ class TestJobList(TestCase): graph_mock = Mock() job_list.graph = graph_mock # act - job_list.generate(date_list, member_list, num_chunks, 1, parameters, 'H', 9999, Type.BASH) + job_list.generate(date_list, member_list, num_chunks, 1, parameters, 'H', 9999, Type.BASH, 'None') # assert self.assertEquals(job_list.parameters, parameters) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py new file mode 100644 index 0000000000000000000000000000000000000000..dd20910f26ad2bb81b2f425e1bd7595b491c3330 --- /dev/null +++ b/test/unit/test_wrappers.py @@ -0,0 +1,1169 @@ +from unittest import TestCase +from mock import Mock +from autosubmit.job.job_packager import JobPackager +from autosubmit.job.job_packages import JobPackageVertical +from autosubmit.job.job import Job +from autosubmit.job.job_list import JobList +from autosubmit.job.job_dict import DicJobs +from autosubmit.job.job_utils import Dependency +from bscearth.utils.config_parser import ConfigParserFactory +from autosubmit.job.job_list_persistence import JobListPersistenceDb +from autosubmit.job.job_common import Status +from random import randrange +from collections import OrderedDict + +class TestWrappers(TestCase): + + @classmethod + def setUpClass(cls): + #set up different workflows to be used in the test methods + cls.workflows = dict() + cls.workflows['basic'] = dict() + cls.workflows['synchronize_date'] = dict() + cls.workflows['synchronize_member'] = dict() + cls.workflows['running_member'] = dict() + cls.workflows['running_date'] = dict() + cls.workflows['running_once'] = dict() + + cls.workflows['basic']['sections'] = OrderedDict() + cls.workflows['basic']['sections']["s1"] = dict() + cls.workflows['basic']['sections']["s1"]["RUNNING"] = "member" + cls.workflows['basic']['sections']["s1"]["WALLCLOCK"] = '00:50' + + cls.workflows['basic']['sections']["s2"] = dict() + cls.workflows['basic']['sections']["s2"]["RUNNING"] = "chunk" + cls.workflows['basic']['sections']["s2"]["WALLCLOCK"] = '00:10' + cls.workflows['basic']['sections']["s2"]["DEPENDENCIES"] = "s1 s2-1" + + cls.workflows['basic']['sections']["s3"] = dict() + cls.workflows['basic']['sections']["s3"]["RUNNING"] = "chunk" + cls.workflows['basic']['sections']["s3"]["WALLCLOCK"] = '00:20' + cls.workflows['basic']['sections']["s3"]["DEPENDENCIES"] = "s2" + + cls.workflows['basic']['sections']["s4"] = dict() + cls.workflows['basic']['sections']["s4"]["RUNNING"] = "chunk" + cls.workflows['basic']['sections']["s4"]["WALLCLOCK"] = '00:30' + cls.workflows['basic']['sections']["s4"]["DEPENDENCIES"] = "s3" + + cls.workflows['synchronize_date']['sections'] = OrderedDict() + cls.workflows['synchronize_date']['sections']["s1"] = dict() + cls.workflows['synchronize_date']['sections']["s1"]["RUNNING"] = "member" + cls.workflows['synchronize_date']['sections']["s1"]["WALLCLOCK"] = '00:50' + + cls.workflows['synchronize_date']['sections']["s2"] = dict() + cls.workflows['synchronize_date']['sections']["s2"]["RUNNING"] = "chunk" + cls.workflows['synchronize_date']['sections']["s2"]["WALLCLOCK"] = '00:10' + cls.workflows['synchronize_date']['sections']["s2"]["DEPENDENCIES"] = "s1 s2-1" + + cls.workflows['synchronize_date']['sections']["s3"] = dict() + cls.workflows['synchronize_date']['sections']["s3"]["RUNNING"] = "chunk" + cls.workflows['synchronize_date']['sections']["s3"]["WALLCLOCK"] = '00:20' + cls.workflows['synchronize_date']['sections']["s3"]["DEPENDENCIES"] = "s2" + + cls.workflows['synchronize_date']['sections']["s4"] = dict() + cls.workflows['synchronize_date']['sections']["s4"]["RUNNING"] = "chunk" + cls.workflows['synchronize_date']['sections']["s4"]["WALLCLOCK"] = '00:30' + cls.workflows['synchronize_date']['sections']["s4"]["DEPENDENCIES"] = "s3" + + cls.workflows['synchronize_date']['sections']["s5"] = dict() + cls.workflows['synchronize_date']['sections']["s5"]["RUNNING"] = "chunk" + cls.workflows['synchronize_date']['sections']["s5"]["SYNCHRONIZE"] = "date" + cls.workflows['synchronize_date']['sections']["s5"]["WALLCLOCK"] = '00:30' + cls.workflows['synchronize_date']['sections']["s5"]["DEPENDENCIES"] = "s2" + + cls.workflows['synchronize_member']['sections'] = OrderedDict() + cls.workflows['synchronize_member']['sections']["s1"] = dict() + cls.workflows['synchronize_member']['sections']["s1"]["RUNNING"] = "member" + cls.workflows['synchronize_member']['sections']["s1"]["WALLCLOCK"] = '00:50' + + cls.workflows['synchronize_member']['sections']["s2"] = dict() + cls.workflows['synchronize_member']['sections']["s2"]["RUNNING"] = "chunk" + cls.workflows['synchronize_member']['sections']["s2"]["WALLCLOCK"] = '00:10' + cls.workflows['synchronize_member']['sections']["s2"]["DEPENDENCIES"] = "s1 s2-1" + + cls.workflows['synchronize_member']['sections']["s3"] = dict() + cls.workflows['synchronize_member']['sections']["s3"]["RUNNING"] = "chunk" + cls.workflows['synchronize_member']['sections']["s3"]["WALLCLOCK"] = '00:20' + cls.workflows['synchronize_member']['sections']["s3"]["DEPENDENCIES"] = "s2" + + cls.workflows['synchronize_member']['sections']["s4"] = dict() + cls.workflows['synchronize_member']['sections']["s4"]["RUNNING"] = "chunk" + cls.workflows['synchronize_member']['sections']["s4"]["WALLCLOCK"] = '00:30' + cls.workflows['synchronize_member']['sections']["s4"]["DEPENDENCIES"] = "s3" + + cls.workflows['synchronize_member']['sections']["s5"] = dict() + cls.workflows['synchronize_member']['sections']["s5"]["RUNNING"] = "chunk" + cls.workflows['synchronize_member']['sections']["s5"]["SYNCHRONIZE"] = "member" + cls.workflows['synchronize_member']['sections']["s5"]["WALLCLOCK"] = '00:30' + cls.workflows['synchronize_member']['sections']["s5"]["DEPENDENCIES"] = "s2" + + cls.workflows['running_date']['sections'] = OrderedDict() + cls.workflows['running_date']['sections']["s1"] = dict() + cls.workflows['running_date']['sections']["s1"]["RUNNING"] = "member" + cls.workflows['running_date']['sections']["s1"]["WALLCLOCK"] = '00:50' + + cls.workflows['running_date']['sections']["s2"] = dict() + cls.workflows['running_date']['sections']["s2"]["RUNNING"] = "chunk" + cls.workflows['running_date']['sections']["s2"]["WALLCLOCK"] = '00:10' + cls.workflows['running_date']['sections']["s2"]["DEPENDENCIES"] = "s1 s2-1" + + cls.workflows['running_date']['sections']["s3"] = dict() + cls.workflows['running_date']['sections']["s3"]["RUNNING"] = "chunk" + cls.workflows['running_date']['sections']["s3"]["WALLCLOCK"] = '00:20' + cls.workflows['running_date']['sections']["s3"]["DEPENDENCIES"] = "s2" + + cls.workflows['running_date']['sections']["s4"] = dict() + cls.workflows['running_date']['sections']["s4"]["RUNNING"] = "chunk" + cls.workflows['running_date']['sections']["s4"]["WALLCLOCK"] = '00:30' + cls.workflows['running_date']['sections']["s4"]["DEPENDENCIES"] = "s3" + + cls.workflows['running_date']['sections']["s5"] = dict() + cls.workflows['running_date']['sections']["s5"]["RUNNING"] = "date" + cls.workflows['running_date']['sections']["s5"]["WALLCLOCK"] = '00:30' + cls.workflows['running_date']['sections']["s5"]["DEPENDENCIES"] = "s2" + + cls.workflows['running_once']['sections'] = OrderedDict() + cls.workflows['running_once']['sections']["s1"] = dict() + cls.workflows['running_once']['sections']["s1"]["RUNNING"] = "member" + cls.workflows['running_once']['sections']["s1"]["WALLCLOCK"] = '00:50' + + cls.workflows['running_once']['sections']["s2"] = dict() + cls.workflows['running_once']['sections']["s2"]["RUNNING"] = "chunk" + cls.workflows['running_once']['sections']["s2"]["WALLCLOCK"] = '00:10' + cls.workflows['running_once']['sections']["s2"]["DEPENDENCIES"] = "s1 s2-1" + + cls.workflows['running_once']['sections']["s3"] = dict() + cls.workflows['running_once']['sections']["s3"]["RUNNING"] = "chunk" + cls.workflows['running_once']['sections']["s3"]["WALLCLOCK"] = '00:20' + cls.workflows['running_once']['sections']["s3"]["DEPENDENCIES"] = "s2" + + cls.workflows['running_once']['sections']["s4"] = dict() + cls.workflows['running_once']['sections']["s4"]["RUNNING"] = "chunk" + cls.workflows['running_once']['sections']["s4"]["WALLCLOCK"] = '00:30' + cls.workflows['running_once']['sections']["s4"]["DEPENDENCIES"] = "s3" + + cls.workflows['running_once']['sections']["s5"] = dict() + cls.workflows['running_once']['sections']["s5"]["RUNNING"] = "once" + cls.workflows['running_once']['sections']["s5"]["WALLCLOCK"] = '00:30' + cls.workflows['running_once']['sections']["s5"]["DEPENDENCIES"] = "s2" + + def setUp(self): + self.experiment_id = 'random-id' + self.platform = Mock() + self.job_list = JobList(self.experiment_id, FakeBasicConfig, ConfigParserFactory(), + JobListPersistenceDb('.', '.')) + self.parser_mock = Mock(spec='SafeConfigParser') + + ### ONE SECTION WRAPPER ### + def test_returned_packages(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + max_jobs = 20 + max_wrapped_jobs = 20 + max_wallclock = '10:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m1_5_s2 = self.job_list.get_job_by_name('expid_d1_m1_5_s2') + d1_m1_6_s2 = self.job_list.get_job_by_name('expid_d1_m1_6_s2') + d1_m1_7_s2 = self.job_list.get_job_by_name('expid_d1_m1_7_s2') + d1_m1_8_s2 = self.job_list.get_job_by_name('expid_d1_m1_8_s2') + d1_m1_9_s2 = self.job_list.get_job_by_name('expid_d1_m1_9_s2') + d1_m1_10_s2 = self.job_list.get_job_by_name('expid_d1_m1_10_s2') + + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + d1_m2_5_s2 = self.job_list.get_job_by_name('expid_d1_m2_5_s2') + d1_m2_6_s2 = self.job_list.get_job_by_name('expid_d1_m2_6_s2') + d1_m2_7_s2 = self.job_list.get_job_by_name('expid_d1_m2_7_s2') + d1_m2_8_s2 = self.job_list.get_job_by_name('expid_d1_m2_8_s2') + d1_m2_9_s2 = self.job_list.get_job_by_name('expid_d1_m2_9_s2') + d1_m2_10_s2 = self.job_list.get_job_by_name('expid_d1_m2_10_s2') + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(None, 'None', section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, + d1_m1_9_s2, d1_m1_10_s2] + package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, + d1_m2_9_s2, d1_m2_10_s2] + + packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_jobs(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + max_jobs = 12 + max_wrapped_jobs = 10 + max_wallclock = '10:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m1_5_s2 = self.job_list.get_job_by_name('expid_d1_m1_5_s2') + d1_m1_6_s2 = self.job_list.get_job_by_name('expid_d1_m1_6_s2') + d1_m1_7_s2 = self.job_list.get_job_by_name('expid_d1_m1_7_s2') + d1_m1_8_s2 = self.job_list.get_job_by_name('expid_d1_m1_8_s2') + d1_m1_9_s2 = self.job_list.get_job_by_name('expid_d1_m1_9_s2') + d1_m1_10_s2 = self.job_list.get_job_by_name('expid_d1_m1_10_s2') + + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(None, 'None', section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, + d1_m1_9_s2, d1_m1_10_s2] + package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2] + + packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_wrapped_jobs(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + max_jobs = 20 + max_wrapped_jobs = 5 + max_wallclock = '10:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m1_5_s2 = self.job_list.get_job_by_name('expid_d1_m1_5_s2') + + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + d1_m2_5_s2 = self.job_list.get_job_by_name('expid_d1_m2_5_s2') + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(None, 'None', section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] + package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] + + packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_wallclock(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + max_jobs = 20 + max_wrapped_jobs = 15 + max_wallclock = '00:50' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m1_5_s2 = self.job_list.get_job_by_name('expid_d1_m1_5_s2') + + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + d1_m2_5_s2 = self.job_list.get_job_by_name('expid_d1_m2_5_s2') + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(None, 'None', section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] + package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] + + packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_section_not_self_dependent(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s3').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s3').status = Status.READY + + max_jobs = 20 + max_wrapped_jobs = 20 + max_wallclock = '10:00' + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + + section_list = [d1_m1_1_s3, d1_m2_1_s3] + + returned_packages = JobPackager._build_vertical_packages(None, 'None', section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2 = [d1_m1_1_s3] + package_m2_s2 = [d1_m2_1_s3] + + packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + ### MIXED WRAPPER ### + def test_returned_packages_mixed_wrapper(self): + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + max_jobs = 18 + max_wrapped_jobs = 18 + max_wallclock = '10:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, + d1_m1_4_s3] + package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, + d1_m2_4_s3] + + packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_parent_failed_mixed_wrapper(self): + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.FAILED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + max_jobs = 18 + max_wrapped_jobs = 18 + max_wallclock = '10:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + section_list = [d1_m1_1_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, + d1_m1_4_s3] + + packages = [JobPackageVertical(package_m1_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_jobs_mixed_wrapper(self): + wrapper_expression = "s2 s3" + max_jobs = 10 + max_wrapped_jobs = 10 + max_wallclock = '10:00' + + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, + d1_m1_4_s3] + package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3] + + packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_wrapped_jobs_mixed_wrapper(self): + wrapper_expression = "s2 s3" + max_jobs = 15 + max_wrapped_jobs = 5 + max_wallclock = '10:00' + + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2] + package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2] + + packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_max_wallclock_mixed_wrapper(self): + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + wrapper_expression = "s2 s3" + max_jobs = 18 + max_wrapped_jobs = 18 + max_wallclock = '01:00' + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + section_list = [d1_m1_1_s2, d1_m2_1_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, max_jobs, + max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3] + package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3] + + packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_returned_packages_first_chunks_completed_mixed_wrapper(self): + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m1_2_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m1_3_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_2_s2').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m1_1_s3').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_1_s3').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_2_s3').status = Status.COMPLETED + + + self.job_list.get_job_by_name('expid_d1_m1_4_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_3_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m1_2_s3').status = Status.READY + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.job_list._ordered_jobs_by_date_member["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + wrapper_expression = "s2 s3" + max_wrapped_jobs = 18 + max_jobs = 18 + max_wallclock = '10:00' + + section_list = [d1_m1_2_s3, d1_m1_4_s2, d1_m2_3_s2] + + returned_packages = JobPackager._build_vertical_packages(self.job_list.get_ordered_jobs_by_date_member(), + wrapper_expression, section_list, + max_jobs, max_wallclock, max_wrapped_jobs) + + package_m1_s2_s3 = [d1_m1_2_s3, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + package_m2_s2_s3 = [d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + + returned_packages = returned_packages[0] + for i in range(0, len(returned_packages)): + self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) + + def test_ordered_dict_jobs_simple_workflow_mixed_wrapper(self): + date_list = ["d1"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['basic'], date_list, member_list, chunk_list) + + self.job_list.get_job_by_name('expid_d1_m1_s1').status = Status.COMPLETED + self.job_list.get_job_by_name('expid_d1_m2_s1').status = Status.COMPLETED + + self.job_list.get_job_by_name('expid_d1_m1_1_s2').status = Status.READY + self.job_list.get_job_by_name('expid_d1_m2_1_s2').status = Status.READY + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + self.parser_mock.has_option = Mock(return_value=True) + self.parser_mock.get = Mock(return_value="chunk") + self.job_list._get_date = Mock(return_value='d1') + + ordered_jobs_by_date_member = dict() + ordered_jobs_by_date_member["d1"] = dict() + ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + + self.assertDictEqual(self.job_list._create_sorted_dict_jobs("s2 s3"), ordered_jobs_by_date_member) + + def test_ordered_dict_jobs_running_date_mixed_wrapper(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['running_date'], date_list, member_list, chunk_list) + + self.parser_mock.has_option = Mock(return_value=True) + self.parser_mock.get = Mock(side_effect=["chunk", "chunk", "date"]) + self.job_list._get_date = Mock(side_effect=['d1', 'd2']) + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + d1_s5 = self.job_list.get_job_by_name('expid_d1_s5') + + d2_m1_1_s2 = self.job_list.get_job_by_name('expid_d2_m1_1_s2') + d2_m1_2_s2 = self.job_list.get_job_by_name('expid_d2_m1_2_s2') + d2_m1_3_s2 = self.job_list.get_job_by_name('expid_d2_m1_3_s2') + d2_m1_4_s2 = self.job_list.get_job_by_name('expid_d2_m1_4_s2') + d2_m2_1_s2 = self.job_list.get_job_by_name('expid_d2_m2_1_s2') + d2_m2_2_s2 = self.job_list.get_job_by_name('expid_d2_m2_2_s2') + d2_m2_3_s2 = self.job_list.get_job_by_name('expid_d2_m2_3_s2') + d2_m2_4_s2 = self.job_list.get_job_by_name('expid_d2_m2_4_s2') + + d2_m1_1_s3 = self.job_list.get_job_by_name('expid_d2_m1_1_s3') + d2_m1_2_s3 = self.job_list.get_job_by_name('expid_d2_m1_2_s3') + d2_m1_3_s3 = self.job_list.get_job_by_name('expid_d2_m1_3_s3') + d2_m1_4_s3 = self.job_list.get_job_by_name('expid_d2_m1_4_s3') + d2_m2_1_s3 = self.job_list.get_job_by_name('expid_d2_m2_1_s3') + d2_m2_2_s3 = self.job_list.get_job_by_name('expid_d2_m2_2_s3') + d2_m2_3_s3 = self.job_list.get_job_by_name('expid_d2_m2_3_s3') + d2_m2_4_s3 = self.job_list.get_job_by_name('expid_d2_m2_4_s3') + + d2_s5 = self.job_list.get_job_by_name('expid_d2_s5') + + ordered_jobs_by_date_member = dict() + ordered_jobs_by_date_member["d1"] = dict() + ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3, d1_s5] + ordered_jobs_by_date_member["d2"] = dict() + ordered_jobs_by_date_member["d2"]["m1"] = [d2_m1_1_s2, d2_m1_1_s3, d2_m1_2_s2, d2_m1_2_s3, d2_m1_3_s2, + d2_m1_3_s3, d2_m1_4_s2, d2_m1_4_s3] + + ordered_jobs_by_date_member["d2"]["m2"] = [d2_m2_1_s2, d2_m2_1_s3, d2_m2_2_s2, d2_m2_2_s3, d2_m2_3_s2, + d2_m2_3_s3, d2_m2_4_s2, d2_m2_4_s3, d2_s5] + + self.assertDictEqual(self.job_list._create_sorted_dict_jobs("s2 s3 s5"), ordered_jobs_by_date_member) + + def test_ordered_dict_jobs_running_once_mixed_wrapper(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['running_once'], date_list, member_list, chunk_list) + + self.parser_mock.has_option = Mock(return_value=True) + self.parser_mock.get = Mock(side_effect=["chunk", "chunk", "once"]) + self.job_list._get_date = Mock(side_effect=['d2', 'd1', 'd2']) + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + d2_m1_1_s2 = self.job_list.get_job_by_name('expid_d2_m1_1_s2') + d2_m1_2_s2 = self.job_list.get_job_by_name('expid_d2_m1_2_s2') + d2_m1_3_s2 = self.job_list.get_job_by_name('expid_d2_m1_3_s2') + d2_m1_4_s2 = self.job_list.get_job_by_name('expid_d2_m1_4_s2') + d2_m2_1_s2 = self.job_list.get_job_by_name('expid_d2_m2_1_s2') + d2_m2_2_s2 = self.job_list.get_job_by_name('expid_d2_m2_2_s2') + d2_m2_3_s2 = self.job_list.get_job_by_name('expid_d2_m2_3_s2') + d2_m2_4_s2 = self.job_list.get_job_by_name('expid_d2_m2_4_s2') + + d2_m1_1_s3 = self.job_list.get_job_by_name('expid_d2_m1_1_s3') + d2_m1_2_s3 = self.job_list.get_job_by_name('expid_d2_m1_2_s3') + d2_m1_3_s3 = self.job_list.get_job_by_name('expid_d2_m1_3_s3') + d2_m1_4_s3 = self.job_list.get_job_by_name('expid_d2_m1_4_s3') + d2_m2_1_s3 = self.job_list.get_job_by_name('expid_d2_m2_1_s3') + d2_m2_2_s3 = self.job_list.get_job_by_name('expid_d2_m2_2_s3') + d2_m2_3_s3 = self.job_list.get_job_by_name('expid_d2_m2_3_s3') + d2_m2_4_s3 = self.job_list.get_job_by_name('expid_d2_m2_4_s3') + + s5 = self.job_list.get_job_by_name('expid_s5') + + ordered_jobs_by_date_member = dict() + ordered_jobs_by_date_member["d1"] = dict() + ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + ordered_jobs_by_date_member["d2"] = dict() + ordered_jobs_by_date_member["d2"]["m1"] = [d2_m1_1_s2, d2_m1_1_s3, d2_m1_2_s2, d2_m1_2_s3, d2_m1_3_s2, + d2_m1_3_s3, d2_m1_4_s2, d2_m1_4_s3] + + ordered_jobs_by_date_member["d2"]["m2"] = [d2_m2_1_s2, d2_m2_1_s3, d2_m2_2_s2, d2_m2_2_s3, d2_m2_3_s2, + d2_m2_3_s3, d2_m2_4_s2, d2_m2_4_s3, s5] + + self.assertDictEqual(self.job_list._create_sorted_dict_jobs("s2 s3 s5"), ordered_jobs_by_date_member) + + def test_ordered_dict_jobs_synchronize_date_mixed_wrapper(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['synchronize_date'], date_list, member_list, chunk_list) + + self.parser_mock.has_option = Mock(return_value=True) + self.parser_mock.get = Mock(return_value="chunk") + self.job_list._get_date = Mock(side_effect=['d2', 'd2', 'd2', 'd2', 'd1', 'd2']) + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + d2_m1_1_s2 = self.job_list.get_job_by_name('expid_d2_m1_1_s2') + d2_m1_2_s2 = self.job_list.get_job_by_name('expid_d2_m1_2_s2') + d2_m1_3_s2 = self.job_list.get_job_by_name('expid_d2_m1_3_s2') + d2_m1_4_s2 = self.job_list.get_job_by_name('expid_d2_m1_4_s2') + d2_m2_1_s2 = self.job_list.get_job_by_name('expid_d2_m2_1_s2') + d2_m2_2_s2 = self.job_list.get_job_by_name('expid_d2_m2_2_s2') + d2_m2_3_s2 = self.job_list.get_job_by_name('expid_d2_m2_3_s2') + d2_m2_4_s2 = self.job_list.get_job_by_name('expid_d2_m2_4_s2') + + d2_m1_1_s3 = self.job_list.get_job_by_name('expid_d2_m1_1_s3') + d2_m1_2_s3 = self.job_list.get_job_by_name('expid_d2_m1_2_s3') + d2_m1_3_s3 = self.job_list.get_job_by_name('expid_d2_m1_3_s3') + d2_m1_4_s3 = self.job_list.get_job_by_name('expid_d2_m1_4_s3') + d2_m2_1_s3 = self.job_list.get_job_by_name('expid_d2_m2_1_s3') + d2_m2_2_s3 = self.job_list.get_job_by_name('expid_d2_m2_2_s3') + d2_m2_3_s3 = self.job_list.get_job_by_name('expid_d2_m2_3_s3') + d2_m2_4_s3 = self.job_list.get_job_by_name('expid_d2_m2_4_s3') + + _1_s5 = self.job_list.get_job_by_name('expid_1_s5') + _2_s5 = self.job_list.get_job_by_name('expid_2_s5') + _3_s5 = self.job_list.get_job_by_name('expid_3_s5') + _4_s5 = self.job_list.get_job_by_name('expid_4_s5') + + ordered_jobs_by_date_member = dict() + ordered_jobs_by_date_member["d1"] = dict() + ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] + ordered_jobs_by_date_member["d2"] = dict() + ordered_jobs_by_date_member["d2"]["m1"] = [d2_m1_1_s2, d2_m1_1_s3, d2_m1_2_s2, d2_m1_2_s3, d2_m1_3_s2, + d2_m1_3_s3, d2_m1_4_s2, d2_m1_4_s3] + + ordered_jobs_by_date_member["d2"]["m2"] = [d2_m2_1_s2, d2_m2_1_s3, _1_s5, d2_m2_2_s2, d2_m2_2_s3, _2_s5, d2_m2_3_s2, + d2_m2_3_s3, _3_s5, d2_m2_4_s2, d2_m2_4_s3, _4_s5] + + self.assertDictEqual(self.job_list._create_sorted_dict_jobs("s2 s3 s5"), ordered_jobs_by_date_member) + + def test_ordered_dict_jobs_synchronize_member_mixed_wrapper(self): + date_list = ["d1", "d2"] + member_list = ["m1", "m2"] + chunk_list = [1, 2, 3, 4] + + self._createDummyJobs(self.workflows['synchronize_member'], date_list, member_list, chunk_list) + + self.parser_mock.has_option = Mock(return_value=True) + self.parser_mock.get = Mock(return_value="chunk") + self.job_list._get_date = Mock(side_effect=['d1', 'd2']) + + d1_m1_1_s2 = self.job_list.get_job_by_name('expid_d1_m1_1_s2') + d1_m1_2_s2 = self.job_list.get_job_by_name('expid_d1_m1_2_s2') + d1_m1_3_s2 = self.job_list.get_job_by_name('expid_d1_m1_3_s2') + d1_m1_4_s2 = self.job_list.get_job_by_name('expid_d1_m1_4_s2') + d1_m2_1_s2 = self.job_list.get_job_by_name('expid_d1_m2_1_s2') + d1_m2_2_s2 = self.job_list.get_job_by_name('expid_d1_m2_2_s2') + d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') + d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') + + d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') + d1_m1_2_s3 = self.job_list.get_job_by_name('expid_d1_m1_2_s3') + d1_m1_3_s3 = self.job_list.get_job_by_name('expid_d1_m1_3_s3') + d1_m1_4_s3 = self.job_list.get_job_by_name('expid_d1_m1_4_s3') + d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') + d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') + d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') + + d2_m1_1_s2 = self.job_list.get_job_by_name('expid_d2_m1_1_s2') + d2_m1_2_s2 = self.job_list.get_job_by_name('expid_d2_m1_2_s2') + d2_m1_3_s2 = self.job_list.get_job_by_name('expid_d2_m1_3_s2') + d2_m1_4_s2 = self.job_list.get_job_by_name('expid_d2_m1_4_s2') + d2_m2_1_s2 = self.job_list.get_job_by_name('expid_d2_m2_1_s2') + d2_m2_2_s2 = self.job_list.get_job_by_name('expid_d2_m2_2_s2') + d2_m2_3_s2 = self.job_list.get_job_by_name('expid_d2_m2_3_s2') + d2_m2_4_s2 = self.job_list.get_job_by_name('expid_d2_m2_4_s2') + + d2_m1_1_s3 = self.job_list.get_job_by_name('expid_d2_m1_1_s3') + d2_m1_2_s3 = self.job_list.get_job_by_name('expid_d2_m1_2_s3') + d2_m1_3_s3 = self.job_list.get_job_by_name('expid_d2_m1_3_s3') + d2_m1_4_s3 = self.job_list.get_job_by_name('expid_d2_m1_4_s3') + d2_m2_1_s3 = self.job_list.get_job_by_name('expid_d2_m2_1_s3') + d2_m2_2_s3 = self.job_list.get_job_by_name('expid_d2_m2_2_s3') + d2_m2_3_s3 = self.job_list.get_job_by_name('expid_d2_m2_3_s3') + d2_m2_4_s3 = self.job_list.get_job_by_name('expid_d2_m2_4_s3') + + d1_1_s5 = self.job_list.get_job_by_name('expid_d1_1_s5') + d1_2_s5 = self.job_list.get_job_by_name('expid_d1_2_s5') + d1_3_s5 = self.job_list.get_job_by_name('expid_d1_3_s5') + d1_4_s5 = self.job_list.get_job_by_name('expid_d1_4_s5') + + d2_1_s5 = self.job_list.get_job_by_name('expid_d2_1_s5') + d2_2_s5 = self.job_list.get_job_by_name('expid_d2_2_s5') + d2_3_s5 = self.job_list.get_job_by_name('expid_d2_3_s5') + d2_4_s5 = self.job_list.get_job_by_name('expid_d2_4_s5') + + ordered_jobs_by_date_member = dict() + ordered_jobs_by_date_member["d1"] = dict() + ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] + + ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_1_s5, d1_m2_2_s2, d1_m2_2_s3, d1_2_s5, d1_m2_3_s2, + d1_m2_3_s3, d1_3_s5, d1_m2_4_s2, d1_m2_4_s3, d1_4_s5] + ordered_jobs_by_date_member["d2"] = dict() + ordered_jobs_by_date_member["d2"]["m1"] = [d2_m1_1_s2, d2_m1_1_s3, d2_m1_2_s2, d2_m1_2_s3, d2_m1_3_s2, + d2_m1_3_s3, d2_m1_4_s2, d2_m1_4_s3] + + ordered_jobs_by_date_member["d2"]["m2"] = [d2_m2_1_s2, d2_m2_1_s3, d2_1_s5, d2_m2_2_s2, d2_m2_2_s3, d2_2_s5, d2_m2_3_s2, + d2_m2_3_s3, d2_3_s5, d2_m2_4_s2, d2_m2_4_s3, d2_4_s5] + + self.assertDictEqual(self.job_list._create_sorted_dict_jobs("s2 s3 s5"), ordered_jobs_by_date_member) + + def _createDummyJobs(self, sections_dict, date_list, member_list, chunk_list): + for section, section_dict in sections_dict.get('sections').items(): + running = section_dict['RUNNING'] + wallclock = section_dict['WALLCLOCK'] + + if running == 'once': + name = 'expid_' + section + job = self._createDummyJob(name, wallclock, section) + self.job_list._job_list.append(job) + elif running == 'date': + for date in date_list: + name = 'expid_' + date + "_" + section + job = self._createDummyJob(name, wallclock, section, date) + self.job_list._job_list.append(job) + elif running == 'member': + for date in date_list: + for member in member_list: + name = 'expid_' + date + "_" + member + "_" + section + job = self._createDummyJob(name, wallclock, section, date, member) + self.job_list._job_list.append(job) + elif running == 'chunk': + synchronize_type = section_dict['SYNCHRONIZE'] if 'SYNCHRONIZE' in section_dict else None + if synchronize_type == 'date': + for chunk in chunk_list: + name = 'expid_' + str(chunk) + "_" + section + job = self._createDummyJob(name, wallclock, section, None, None, chunk) + self.job_list._job_list.append(job) + elif synchronize_type == 'member': + for date in date_list: + for chunk in chunk_list: + name = 'expid_' + date + "_" + str(chunk) + "_" + section + job = self._createDummyJob(name, wallclock, section, date, None, chunk) + self.job_list._job_list.append(job) + else: + for date in date_list: + for member in member_list: + for chunk in chunk_list: + name = 'expid_' + date + "_" + member + "_" + str(chunk) + "_" + section + job = self._createDummyJob(name, wallclock, section, date, member, chunk) + self.job_list._job_list.append(job) + + self.job_list._date_list = date_list + self.job_list._member_list = member_list + self.job_list._chunk_list = chunk_list + + self.job_list._dic_jobs = DicJobs(self.job_list, self.parser_mock, date_list, member_list, chunk_list, "", 0) + self._manage_dependencies(sections_dict) + + def _manage_dependencies(self, sections_dict): + for job in self.job_list.get_job_list(): + section = job.section + dependencies = sections_dict['sections'][section]['DEPENDENCIES'] if 'DEPENDENCIES' in sections_dict['sections'][section] else '' + self._manage_job_dependencies(job, dependencies, sections_dict) + + def _manage_job_dependencies(self, job, dependencies, sections_dict): + for key in dependencies.split(): + if '-' not in key: + dependency = Dependency(key) + else: + sign = '-' if '-' in key else '+' + key_split = key.split(sign) + section = key_split[0] + distance = key_split[1] + dependency_running_type = sections_dict['sections'][section]['RUNNING'] + dependency = Dependency(section, int(distance), dependency_running_type, sign) + + skip, (chunk, member, date) = self.job_list._calculate_dependency_metadata(job.chunk, self.job_list.get_chunk_list(), + job.member, self.job_list.get_member_list(), + job.date, self.job_list.get_date_list(), + dependency) + if skip: + continue + + for parent in self._filter_jobs(dependency.section, date, member, chunk): + job.add_parent(parent) + + def _filter_jobs(self, section, date=None, member=None, chunk=None): + # TODO: improve the efficiency + jobs = filter(lambda job: job.section == section and job.date == date and job.member == member and job.chunk == chunk, + self.job_list.get_job_list()) + return jobs + + def _createDummyJob(self, name, total_wallclock, section, date=None, member=None, chunk=None): + job_id = randrange(1, 999) + job = Job(name, job_id, Status.WAITING, 0) + job.type = randrange(0, 2) + job.packed = False + job.wallclock = total_wallclock + job.platform = self.platform + + job.date = date + job.member = member + job.chunk = chunk + job.section = section + + return job + +class FakeBasicConfig: + def __init__(self): + pass + + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' \ No newline at end of file