diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4de01df598da2b7f7b9136ad1271715fb83699e2..a459d2d914d6a24bf8830e61cbb5b306340abc8a 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -21,33 +21,33 @@ Main module for Autosubmit. Only contains an interface class to all functionality implemented on Autosubmit """ -import os -import re -import time -import json -import datetime -import textwrap from collections import OrderedDict -import copy +import copy +import datetime +import json import locale +import os +import re +import textwrap +import time +from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates +from functools import reduce +from threading import Thread +from time import sleep +from typing import List, Union -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk +from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters +from autosubmit.history.experiment_history import ExperimentHistory from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty +from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk from autosubmit.job.job_utils import get_job_package_code -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmit.history.experiment_history import ExperimentHistory -from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates -from time import sleep -from threading import Thread from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter -from log.log import Log, AutosubmitCritical, AutosubmitError -from typing import List, Union -from functools import reduce +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters +from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") @@ -221,6 +221,9 @@ class Job(object): self.total_jobs = None self.max_waiting_jobs = None self.exclusive = "" + # internal + self.current_checkpoint_step = 0 + self.max_checkpoint_step = 0 @property @autosubmit_parameter(name='tasktype') @@ -252,6 +255,23 @@ class Job(object): def fail_count(self, value): self._fail_count = value + @property + @autosubmit_parameter(name='checkpoint') + def checkpoint(self): + '''Generates a checkpoint step for this job based on job.type.''' + if self.type == Type.PYTHON: + return "checkpoint()" + elif self.type == Type.R: + return "checkpoint()" + else: # bash + return "as_checkpoint" + + def get_checkpoint_files(self): + """ + Check if there is a file on the remote host that contains the checkpoint + """ + return self.platform.get_checkpoint_files(self) + @property @autosubmit_parameter(name='sdate') def sdate(self): @@ -570,6 +590,7 @@ class Job(object): :type value: HPCPlatform """ self._partition = value + @property def children(self): """ @@ -690,20 +711,20 @@ class Job(object): """ self.children.add(new_child) - def add_edge_info(self,parent_name, special_variables): + def add_edge_info(self, parent, special_variables): """ Adds edge information to the job - :param parent_name: parent name - :type parent_name: str + :param parent: parent job + :type parent: Job :param special_variables: special variables :type special_variables: dict """ - if parent_name not in self.edge_info: - self.edge_info[parent_name] = special_variables - else: - self.edge_info[parent_name].update(special_variables) - pass + if special_variables["STATUS"] not in self.edge_info: + self.edge_info[special_variables["STATUS"]] = {} + + self.edge_info[special_variables["STATUS"]][parent.name] = (parent,special_variables.get("FROM_STEP", 0)) + def delete_parent(self, parent): """ Remove a parent from the job @@ -1346,6 +1367,8 @@ class Job(object): return parameters def update_job_parameters(self,as_conf, parameters): + if self.checkpoint: # To activate placeholder sustitution per in the template + parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name parameters['FAIL_COUNT'] = str(self.fail_count) parameters['SDATE'] = self.sdate @@ -1427,6 +1450,8 @@ class Job(object): parameters['EXPORT'] = self.export parameters['PROJECT_TYPE'] = as_conf.get_project_type() self.wchunkinc = as_conf.get_wchunkinc(self.section) + for key,value in as_conf.jobs_data[self.section].items(): + parameters["CURRENT_"+key.upper()] = value return parameters def update_parameters(self, as_conf, parameters, @@ -1613,8 +1638,10 @@ class Job(object): except: pass for key, value in parameters.items(): + # parameters[key] can have '\\' characters that are interpreted as escape characters + # by re.sub. To avoid this, we use re.escape template_content = re.sub( - '%(?. -import textwrap import datetime +import textwrap class Status: @@ -41,6 +40,7 @@ class Status: # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED', 9: 'DELAYED'} + LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING", "SKIPPED", "FAILED", "UNKNOWN", "COMPLETED", "SUSPENDED"] def retval(self, value): return getattr(self, value) @@ -128,6 +128,16 @@ class StatisticsSnippetBash: job_name_ptrn='%CURRENT_LOGDIR%/%JOBNAME%' echo $(date +%s) > ${job_name_ptrn}_STAT + ################### + # AS CHECKPOINT FUNCTION + ################### + # Creates a new checkpoint file upon call based on the current numbers of calls to the function + + AS_CHECKPOINT_CALLS=0 + function as_checkpoint { + AS_CHECKPOINT_CALLS=$((AS_CHECKPOINT_CALLS+1)) + touch ${job_name_ptrn}_CHECKPOINT_${AS_CHECKPOINT_CALLS} + } ################### # Autosubmit job ################### @@ -190,11 +200,20 @@ class StatisticsSnippetPython: stat_file = open(job_name_ptrn + '_STAT', 'w') stat_file.write('{0:.0f}\\n'.format(time.time())) stat_file.close() - - + ################### + # Autosubmit Checkpoint + ################### + # Creates a new checkpoint file upton call based on the current numbers of calls to the function + AS_CHECKPOINT_CALLS = 0 + def as_checkpoint(): + global AS_CHECKPOINT_CALLS + global job_name_ptrn + AS_CHECKPOINT_CALLS = AS_CHECKPOINT_CALLS + 1 + open(job_name_ptrn + '_CHECKPOINT_' + str(AS_CHECKPOINT_CALLS), 'w').close() ################### # Autosubmit job ################### + """) @@ -254,7 +273,16 @@ class StatisticsSnippetR: fileConn<-file(paste(job_name_ptrn,"_STAT", sep = ''),"w") writeLines(toString(trunc(as.numeric(Sys.time()))), fileConn) close(fileConn) - + ################### + # Autosubmit Checkpoint + ################### + # Creates a new checkpoint file upton call based on the current numbers of calls to the function + AS_CHECKPOINT_CALLS = 0 + as_checkpoint <- function() { + AS_CHECKPOINT_CALLS <<- AS_CHECKPOINT_CALLS + 1 + fileConn<-file(paste(job_name_ptrn,"_CHECKPOINT_",AS_CHECKPOINT_CALLS, sep = ''),"w") + close(fileConn) + } ################### # Autosubmit job ################### diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index edd67d1c5ff9d1d3461f6420840614069352ee51..2304cc0969d4b3360c9f979c4f191b26d60febe8 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -13,37 +13,36 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. - # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import collections import copy -import re +import datetime +import math import os import pickle +import re import traceback -import math -import copy -from time import localtime, strftime, mktime +from bscearth.utils.date import date2str, parse_date +from networkx import DiGraph from shutil import move +from threading import Thread +from time import localtime, strftime, mktime +from typing import List, Dict + +import autosubmit.database.db_structure as DbStructure +from autosubmit.helpers.data_transfer import JobRow from autosubmit.job.job import Job -from autosubmit.job.job_package_persistence import JobPackagePersistence +from autosubmit.job.job_common import Status, bcolors from autosubmit.job.job_dict import DicJobs +from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmit.job.job_packages import JobPackageThread from autosubmit.job.job_utils import Dependency -from autosubmit.job.job_common import Status, bcolors -from bscearth.utils.date import date2str, parse_date -import autosubmit.database.db_structure as DbStructure -import datetime -from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction -from log.log import AutosubmitCritical, AutosubmitError, Log -from threading import Thread from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.helpers.data_transfer import JobRow -from typing import List, Dict -import log.fd_show +from log.log import AutosubmitCritical, AutosubmitError, Log + + # Log.get_logger("Log.Autosubmit") @@ -53,6 +52,7 @@ def threaded(fn): thread.name = "data_processing" thread.start() return thread + return wrapper @@ -62,7 +62,7 @@ class JobList(object): """ - def __init__(self, expid, config, parser_factory, job_list_persistence,as_conf): + def __init__(self, expid, config, parser_factory, job_list_persistence, as_conf): self._persistence_path = os.path.join( config.LOCAL_ROOT_DIR, expid, "pkl") self._update_file = "updated_list_" + expid + ".txt" @@ -70,6 +70,7 @@ class JobList(object): self._persistence_file = "job_list_" + expid self._job_list = list() self._base_job_list = list() + self.jobs_edges = {} self._expid = expid self._config = config self.experiment_data = as_conf.experiment_data @@ -92,6 +93,7 @@ class JobList(object): self._run_members = None self.jobs_to_run_first = list() self.rerun_job_list = list() + @property def expid(self): """ @@ -126,46 +128,56 @@ class JobList(object): @run_members.setter def run_members(self, value): - if value is not None and len(str(value)) > 0 : + if value is not None and len(str(value)) > 0: self._run_members = value - self._base_job_list = [job for job in self._job_list] + self._base_job_list = [job for job in self._job_list] found_member = False processed_job_list = [] - for job in self._job_list: # We are assuming that the jobs are sorted in topological order (which is the default) - if (job.member is None and found_member is False) or job.member in self._run_members or job.status not in [Status.WAITING, Status.READY]: + for job in self._job_list: # We are assuming that the jobs are sorted in topological order (which is the default) + if ( + job.member is None and not found_member) or job.member in self._run_members or job.status not in [ + Status.WAITING, Status.READY]: processed_job_list.append(job) if job.member is not None and len(str(job.member)) > 0: found_member = True - self._job_list = processed_job_list + self._job_list = processed_job_list # Old implementation that also considered children of the members. # self._job_list = [job for job in old_job_list if len( # job.parents) == 0 or len(set(old_job_list_names).intersection(set([jobp.name for jobp in job.parents]))) == len(job.parents)] - def create_dictionary(self, date_list, member_list, num_chunks, chunk_ini, date_format, default_retrials, wrapper_jobs): + def create_dictionary(self, date_list, member_list, num_chunks, chunk_ini, date_format, default_retrials, + wrapper_jobs): chunk_list = list(range(chunk_ini, num_chunks + 1)) jobs_parser = self._get_jobs_parser() dic_jobs = DicJobs(self, date_list, member_list, - chunk_list, date_format, default_retrials,jobs_data={},experiment_data=self.experiment_data) + chunk_list, date_format, default_retrials, jobs_data={}, + experiment_data=self.experiment_data) self._dic_jobs = dic_jobs for wrapper_section in wrapper_jobs: if str(wrapper_jobs[wrapper_section]).lower() != 'none': - self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) + self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs( + wrapper_jobs[wrapper_section]) else: self._ordered_jobs_by_date_member[wrapper_section] = {} pass + def _delete_edgeless_jobs(self): jobs_to_delete = [] # indices to delete for i, job in enumerate(self._job_list): if job.dependencies is not None: - if ( ( len(job.dependencies) > 0 and not job.has_parents()) and not job.has_children()) and job.delete_when_edgeless in ["true",True,1]: + if (( + len(job.dependencies) > 0 and not job.has_parents()) and not job.has_children()) and job.delete_when_edgeless in [ + "true", True, 1]: jobs_to_delete.append(job) # delete jobs by indices for i in jobs_to_delete: self._job_list.remove(i) + def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, wrapper_type=None, wrapper_jobs=dict(), new=True, notransitive=False, update_structure=False, run_only_members=[],show_log=True,jobs_data={},as_conf=""): + default_job_type, wrapper_type=None, wrapper_jobs=dict(), new=True, notransitive=False, + update_structure=False, run_only_members=[], show_log=True, jobs_data={}, as_conf=""): """ Creates all jobs needed for the current workflow @@ -203,8 +215,8 @@ class JobList(object): chunk_list = list(range(chunk_ini, num_chunks + 1)) self._chunk_list = chunk_list - - dic_jobs = DicJobs(self,date_list, member_list,chunk_list, date_format, default_retrials,jobs_data,experiment_data=self.experiment_data) + dic_jobs = DicJobs(self, date_list, member_list, chunk_list, date_format, default_retrials, jobs_data, + experiment_data=self.experiment_data) self._dic_jobs = dic_jobs priority = 0 if show_log: @@ -220,15 +232,15 @@ class JobList(object): except Exception as e: pass Log.info("Deleting previous pkl due being incompatible with current AS version") - if os.path.exists(os.path.join(self._persistence_path, self._persistence_file+".pkl")): - os.remove(os.path.join(self._persistence_path, self._persistence_file+".pkl")) - if os.path.exists(os.path.join(self._persistence_path, self._persistence_file+"_backup.pkl")): - os.remove(os.path.join(self._persistence_path, self._persistence_file+"_backup.pkl")) + if os.path.exists(os.path.join(self._persistence_path, self._persistence_file + ".pkl")): + os.remove(os.path.join(self._persistence_path, self._persistence_file + ".pkl")) + if os.path.exists(os.path.join(self._persistence_path, self._persistence_file + "_backup.pkl")): + os.remove(os.path.join(self._persistence_path, self._persistence_file + "_backup.pkl")) - self._create_jobs(dic_jobs, priority,default_job_type, jobs_data) + self._create_jobs(dic_jobs, priority, default_job_type, jobs_data) if show_log: Log.info("Adding dependencies...") - self._add_dependencies(date_list, member_list,chunk_list, dic_jobs, self.graph) + self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, self.graph) if show_log: Log.info("Removing redundant dependencies...") @@ -236,7 +248,7 @@ class JobList(object): new, notransitive, update_structure=update_structure) for job in self._job_list: job.parameters = parameters - job_data = jobs_data.get(job.name,"none") + job_data = jobs_data.get(job.name, "none") try: if job_data != "none": job.wrapper_type = job_data[12] @@ -253,7 +265,9 @@ class JobList(object): str(run_only_members))) old_job_list = [job for job in self._job_list] self._job_list = [ - job for job in old_job_list if job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, Status.READY]] + job for job in old_job_list if + job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, + Status.READY]] for job in self._job_list: for jobp in job.parents: if jobp in self._job_list: @@ -267,23 +281,24 @@ class JobList(object): for wrapper_section in wrapper_jobs: try: if wrapper_jobs[wrapper_section] is not None and len(str(wrapper_jobs[wrapper_section])) > 0: - self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) + self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs( + wrapper_jobs[wrapper_section]) else: self._ordered_jobs_by_date_member[wrapper_section] = {} except BaseException as e: - raise AutosubmitCritical("Some section jobs of the wrapper:{0} are not in the current job_list defined in jobs.conf".format(wrapper_section),7014,str(e)) - + raise AutosubmitCritical( + "Some section jobs of the wrapper:{0} are not in the current job_list defined in jobs.conf".format( + wrapper_section), 7014, str(e)) - @staticmethod - def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, graph, option="DEPENDENCIES"): - jobs_data = dic_jobs._jobs_data.get("JOBS",{}) + def _add_dependencies(self, date_list, member_list, chunk_list, dic_jobs, graph, option="DEPENDENCIES"): + jobs_data = dic_jobs._jobs_data.get("JOBS", {}) for job_section in jobs_data.keys(): Log.debug("Adding dependencies for {0} jobs".format(job_section)) # If it does not have dependencies, do nothing if not (job_section, option): continue - dependencies_keys = jobs_data[job_section].get(option,{}) + dependencies_keys = jobs_data[job_section].get(option, {}) if type(dependencies_keys) is str: if "," in dependencies_keys: dependencies_list = dependencies_keys.split(",") @@ -294,7 +309,7 @@ class JobList(object): dependencies_keys[dependency] = {} if dependencies_keys is None: dependencies_keys = {} - dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs, job_section) + dependencies = self._manage_dependencies(dependencies_keys, dic_jobs, job_section) for job in dic_jobs.get_jobs(job_section): num_jobs = 1 @@ -302,11 +317,10 @@ class JobList(object): num_jobs = len(job) for i in range(num_jobs): _job = job[i] if num_jobs > 1 else job - JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, - dependencies, graph) + self._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, + dependencies, graph) pass - @staticmethod def _manage_dependencies(dependencies_keys, dic_jobs, job_section): parameters = dic_jobs._jobs_data["JOBS"] @@ -334,7 +348,7 @@ class JobList(object): distance = int(key_split[1]) if '[' in section: - #Todo check what is this because we never enter this + # Todo check what is this because we never enter this try: section_name = section[0:section.find("[")] splits_section = int( @@ -344,13 +358,13 @@ class JobList(object): section = section_name except Exception as e: pass - if parameters.get(section,None) is None: - Log.printlog("WARNING: SECTION {0} is not defined in jobs.conf".format(section)) + if parameters.get(section, None) is None: continue - #raise AutosubmitCritical("Section:{0} doesn't exists.".format(section),7014) + # raise AutosubmitCritical("Section:{0} doesn't exists.".format(section),7014) dependency_running_type = str(parameters[section].get('RUNNING', 'once')).lower() delay = int(parameters[section].get('DELAY', -1)) - dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,relationships=dependencies_keys[key]) + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits, + relationships=dependencies_keys[key]) dependencies[key] = dependency return dependencies @@ -370,193 +384,331 @@ class JobList(object): splits.append(int(str_split)) return splits + @staticmethod - def _apply_filter(parent_value,filter_value,associative_list,filter_type="dates"): + def _apply_filter(parent_value, filter_value, associative_list, level_to_check="DATES_FROM", child=None, parent=None): """ Check if the current_job_value is included in the filter_value :param parent_value: :param filter_value: filter - :param associative_list: dates, members, chunks. - :param is_chunk: True if the filter_value is a chunk. - :return: boolean - """ - to_filter = [] - # strip special chars if any - filter_value = filter_value.strip("?") - if str(parent_value).lower().find("none") != -1: + :param associative_list: dates, members, chunks, splits. + :param filter_type: dates, members, chunks, splits . + :param level_to_check: Can be dates,members, chunks, splits. + :return: + """ + if "NONE".casefold() in str(parent_value).casefold(): return True - if filter_value.lower().find("all") != -1: + if parent and child and level_to_check.casefold() == "splits".casefold(): + if not parent.splits: + parent_splits = -1 + else: + parent_splits = parent.splits + if not child.splits: + child_splits = -1 + else: + child_splits = child.splits + if parent_splits == child_splits: + to_look_at_lesser = associative_list + lesser_group = -1 + lesser = str(parent_splits) + greater = str(child_splits) + lesser_value = "parent" + else: + if parent_splits > child_splits: + lesser = str(child_splits) + greater = str(parent_splits) + lesser_value = "child" + else: + lesser = str(parent_splits) + greater = str(child_splits) + lesser_value = "parent" + to_look_at_lesser = [associative_list[i:i + 1] for i in range(0, int(lesser), 1)] + for lesser_group in range(len(to_look_at_lesser)): + if lesser_value == "parent": + if str(parent_value) in to_look_at_lesser[lesser_group]: + break + else: + if str(child.split) in to_look_at_lesser[lesser_group]: + break + else: + to_look_at_lesser = associative_list + lesser_group = -1 + if "?" in filter_value: + # replace all ? for "" + filter_value = filter_value.replace("?", "") + if "*" in filter_value: + aux_filter = filter_value + filter_value = "" + for filter_ in aux_filter.split(","): + if "*" in filter_: + filter_,split_info = filter_.split("*") + if "\\" in split_info: + split_info = int(split_info.split("\\")[-1]) + else: + split_info = 1 + # split_info: if a value is 1, it means that the filter is 1-to-1, if it is 2, it means that the filter is 1-to-2, etc. + if child and parent: + if (split_info == 1 or level_to_check.casefold() != "splits".casefold()) and str(parent_value).casefold() == str(filter_).casefold(): + if child.split == parent_value: + return True + elif split_info > 1 and level_to_check.casefold() == "splits".casefold(): + # 1-to-X filter + to_look_at_greater = [associative_list[i:i + split_info] for i in + range(0, int(greater), split_info)] + if lesser_value == "parent": + if str(child.split) in to_look_at_greater[lesser_group]: + return True + else: + if str(parent_value) in to_look_at_greater[lesser_group]: + return True + else: + filter_value += filter_ + "," + else: + filter_value += filter_ + "," + filter_value = filter_value[:-1] + to_filter = JobList._parse_filters_to_check(filter_value,associative_list,level_to_check) + if to_filter is None: + return False + elif len(to_filter) == 0: + return False + elif "ALL".casefold() == str(to_filter[0]).casefold(): return True - elif filter_value.lower().find("natural") != -1: + elif "NATURAL".casefold() == str(to_filter[0]).casefold(): if parent_value is None or parent_value in associative_list: return True - elif filter_value.lower().find("none") != -1: + elif "NONE".casefold() == str(to_filter[0]).casefold(): return False - elif filter_value.find(",") != -1: - aux_filter = filter_value.split(",") - if filter_type not in ["chunks", "splits"]: - for value in aux_filter: - if str(value).isdigit(): - to_filter.append(associative_list[int(value)]) - else: - to_filter.append(value) - else: - to_filter = aux_filter - del aux_filter - elif filter_value.find(":") != -1: - start_end = filter_value.split(":") - start = start_end[0].strip("[]") - end = start_end[1].strip("[]") - del start_end - if filter_type not in ["chunks", "splits"]: # chunk directly - for value in range(int(start), int(end) + 1): - to_filter.append(value) - else: # index - for value in range(int(start+1), int(end) + 1): - to_filter.append(value) - else: - to_filter.append(filter_value) - - if str(parent_value).upper() in str(to_filter).upper(): + elif len( [ filter_ for filter_ in to_filter if str(parent_value).casefold() == str(filter_).casefold() ] )>0: return True else: return False + + @staticmethod + def _parse_filters_to_check(list_of_values_to_check,value_list=[],level_to_check="DATES_FROM"): + final_values = [] + list_of_values_to_check = str(list_of_values_to_check).upper() + if list_of_values_to_check is None: + return None + elif list_of_values_to_check.casefold() == "ALL".casefold() : + return ["ALL"] + elif list_of_values_to_check.casefold() == "NONE".casefold(): + return ["NONE"] + elif list_of_values_to_check.casefold() == "NATURAL".casefold(): + return ["NATURAL"] + elif "," in list_of_values_to_check: + for value_to_check in list_of_values_to_check.split(","): + final_values.extend(JobList._parse_filter_to_check(value_to_check,value_list,level_to_check)) + else: + final_values = JobList._parse_filter_to_check(list_of_values_to_check,value_list,level_to_check) + return final_values + + @staticmethod - def _check_relationship(relationships,level_to_check,value_to_check): + def _parse_filter_to_check(value_to_check,value_list=[],level_to_check="DATES_FROM"): + """ + Parse the filter to check and return the value to check. + Selection process: + value_to_check can be: + a range: [0:], [:N], [0:N], [:-1], [0:N:M] ... + a value: N. + a range with step: [0::M], [::2], [0::3], [::3] ... + :param value_to_check: value to check. + :param value_list: list of values to check. Dates, members, chunks or splits. + :return: parsed value to check. + """ + step = 1 + if value_to_check.count(":") == 1: + # range + if value_to_check[1] == ":": + # [:N] + # Find N index in the list + start = None + end = value_to_check.split(":")[1].strip("[]") + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + end = int(end) + elif value_to_check[-2] == ":": + # [N:] + # Find N index in the list + start = value_to_check.split(":")[0].strip("[]") + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + start = int(start) + end = None + else: + # [N:M] + # Find N index in the list + start = value_to_check.split(":")[0].strip("[]") + end = value_to_check.split(":")[1].strip("[]") + step = 1 + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + start = int(start) + end = int(end) + elif value_to_check.count(":") == 2: + # range with step + if value_to_check[-2] == ":" and value_to_check[-3] == ":": # [N::] + # Find N index in the list + start = value_to_check.split(":")[0].strip("[]") + end = None + step = 1 + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + start = int(start) + elif value_to_check[1] == ":" and value_to_check[2] == ":": # [::S] + # Find N index in the list + start = None + end = None + step = value_to_check.split(":")[-1].strip("[]") + # get index in the value_list + step = int(step) + elif value_to_check[1] == ":" and value_to_check[-2] == ":": # [:M:] + # Find N index in the list + start = None + end = value_to_check.split(":")[1].strip("[]") + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + end = int(end) + step = 1 + else: # [N:M:S] + # Find N index in the list + start = value_to_check.split(":")[0].strip("[]") + end = value_to_check.split(":")[1].strip("[]") + step = value_to_check.split(":")[2].strip("[]") + step = int(step) + if level_to_check in ["CHUNKS_FROM","SPLITS_FROM"]: + start = int(start) + end = int(end) + else: + # value + return [value_to_check] + ## values to return + if len(value_list) > 0: + if start is None: + start = value_list[0] + if end is None: + end = value_list[-1] + try: + if level_to_check == "CHUNKS_TO": + start = int(start) + end = int(end) + return value_list[slice(value_list.index(start), value_list.index(end)+1, int(step))] + except ValueError: + return value_list[slice(0,len(value_list)-1,int(step))] + else: + if not start: + start = 0 + if end is None: + return [] + return [number_gen for number_gen in range(int(start), int(end)+1, int(step))] + + def _check_relationship(self, relationships, level_to_check, value_to_check): """ Check if the current_job_value is included in the filter_value - :param relationship: current filter level to check. - :param level_to_check: can be a date, member, chunk or split. + :param relationships: current filter level to check. + :param level_to_check: Can be dates_from, members_from, chunks_from, splits_from. :param value_to_check: Can be None, a date, a member, a chunk or a split. :return: """ filters = [] - for filter_range,filter_data in relationships.get(level_to_check,{}).items(): - if not value_to_check or str(filter_range).upper() in "ALL" or str(value_to_check).upper() in str(filter_range).upper(): - if filter_data: - if "?" in filter_range: - filter_data["OPTIONAL"] = True - else: - filter_data["OPTIONAL"] = relationships["OPTIONAL"] + if level_to_check == "DATES_FROM": + try: + value_to_check = date2str(value_to_check, "%Y%m%d") # need to convert in some cases + except: + pass + try: + values_list = [date2str(date_, "%Y%m%d") for date_ in self._date_list] # need to convert in some cases + except: + values_list = self._date_list + elif level_to_check == "MEMBERS_FROM": + values_list = self._member_list # Str list + elif level_to_check == "CHUNKS_FROM": + values_list = self._chunk_list # int list + else: + values_list = [] # splits, int list ( artificially generated later ) + + relationship = relationships.get(level_to_check, {}) + status = relationship.pop("STATUS", relationships.get("STATUS", None)) + from_step = relationship.pop("FROM_STEP", relationships.get("FROM_STEP", None)) + for filter_range, filter_data in relationship.items(): + if filter_range.casefold() in ["ALL".casefold(),"NATURAL".casefold()] or ( not value_to_check or str(value_to_check).upper() in str( + JobList._parse_filters_to_check(filter_range,values_list,level_to_check)).upper()): + if not filter_data.get("STATUS", None): + filter_data["STATUS"] = status + if not filter_data.get("FROM_STEP", None): + filter_data["FROM_STEP"] = from_step filters.append(filter_data) # Normalize the filter return if len(filters) == 0: filters = [{}] return filters - @staticmethod - def _check_dates(relationships, current_job): + + def _check_dates(self, relationships, current_job): """ Check if the current_job_value is included in the filter_from and retrieve filter_to value :param relationships: Remaining filters to apply. :param current_job: Current job to check. :return: filters_to_apply """ - optional = False - filters_to_apply = JobList._check_relationship(relationships, "DATES_FROM", date2str(current_job.date)) - # there could be multiple filters that apply... per example - # Current task date is 20020201, and member is fc2 - # Dummy example, not specially usefull in a real case - #DATES_FROM: - #all: - #MEMBERS_FROM: - #ALL: ... - #CHUNKS_FROM: - #ALL: ... - #20020201: - #MEMBERS_FROM: - #fc2: - #DATES_TO: "20020201" - #MEMBERS_TO: "fc2" - #CHUNKS_TO: "ALL" - #SPLITS_FROM: - #ALL: - #SPLITS_TO: "1" - # this "for" iterates for ALL and fc2 as current task is selected in both filters - # The dict in this step is: - # [{MEMBERS_FROM{..},CHUNKS_FROM{...}},{MEMBERS_FROM{..},SPLITS_FROM{...}}] - for i,filter in enumerate(filters_to_apply): - # {MEMBERS_FROM{..},CHUNKS_FROM{...}} I want too look ALL filters not only one, but I want to go recursivily until get the _TO filter - optional = filter.pop("OPTIONAL", False) - # This is not an if_else, because the current level ( dates ) could have two different filters. - # Second case commented: ( date_from 20020201 ) - # Will enter, go recursivily to the similar methods and in the end it will do: - # Will enter members_from, and obtain [{DATES_TO: "20020201", MEMBERS_TO: "fc2", CHUNKS_TO: "ALL", CHUNKS_FROM{...}] + # Check the test_dependencies.py to see how to use this function + filters_to_apply = self._check_relationship(relationships, "DATES_FROM", date2str(current_job.date)) + for i, filter in enumerate(filters_to_apply): if "MEMBERS_FROM" in filter: - filters_to_apply_m = JobList._check_members({"MEMBERS_FROM": (filter.pop("MEMBERS_FROM")),"OPTIONAL":optional}, current_job) + filters_to_apply_m = self._check_members({"MEMBERS_FROM": (filter.pop("MEMBERS_FROM"))}, current_job) if len(filters_to_apply_m) > 0: filters_to_apply[i].update(filters_to_apply_m) # Will enter chunks_from, and obtain [{DATES_TO: "20020201", MEMBERS_TO: "fc2", CHUNKS_TO: "ALL", SPLITS_TO: "2"] if "CHUNKS_FROM" in filter: - filters_to_apply_c = JobList._check_chunks({"CHUNKS_FROM": (filter.pop("CHUNKS_FROM")),"OPTIONAL":optional}, current_job) + filters_to_apply_c = self._check_chunks({"CHUNKS_FROM": (filter.pop("CHUNKS_FROM"))}, current_job) if len(filters_to_apply_c) > 0 and len(filters_to_apply_c[0]) > 0: filters_to_apply[i].update(filters_to_apply_c) - #IGNORED + # IGNORED if "SPLITS_FROM" in filter: - filters_to_apply_s = JobList._check_splits({"SPLITS_FROM": (filter.pop("SPLITS_FROM")),"OPTIONAL":optional}, current_job) + filters_to_apply_s = self._check_splits({"SPLITS_FROM": (filter.pop("SPLITS_FROM"))}, current_job) if len(filters_to_apply_s) > 0: filters_to_apply[i].update(filters_to_apply_s) # Unify filters from all filters_from where the current job is included to have a single SET of filters_to - if optional: - for i,filter in enumerate(filters_to_apply): - filters_to_apply[i]["OPTIONAL"] = True - filters_to_apply = JobList._unify_to_filters(filters_to_apply) + filters_to_apply = self._unify_to_filters(filters_to_apply) # {DATES_TO: "20020201", MEMBERS_TO: "fc2", CHUNKS_TO: "ALL", SPLITS_TO: "2"} return filters_to_apply - @staticmethod - def _check_members(relationships, current_job): + + def _check_members(self,relationships, current_job): """ Check if the current_job_value is included in the filter_from and retrieve filter_to value :param relationships: Remaining filters to apply. :param current_job: Current job to check. :return: filters_to_apply """ - filters_to_apply = JobList._check_relationship(relationships, "MEMBERS_FROM", current_job.member) - optional = False - for i,filter in enumerate(filters_to_apply): - optional = filter.pop("OPTIONAL", False) - if "CHUNKS_FROM" in filter: - filters_to_apply_c = JobList._check_chunks({"CHUNKS_FROM": (filter.pop("CHUNKS_FROM")),"OPTIONAL":optional}, current_job) + filters_to_apply = self._check_relationship(relationships, "MEMBERS_FROM", current_job.member) + for i, filter_ in enumerate(filters_to_apply): + if "CHUNKS_FROM" in filter_: + filters_to_apply_c = self._check_chunks({"CHUNKS_FROM": (filter_.pop("CHUNKS_FROM"))}, current_job) if len(filters_to_apply_c) > 0: filters_to_apply[i].update(filters_to_apply_c) - - if "SPLITS_FROM" in filter: - filters_to_apply_s = JobList._check_splits({"SPLITS_FROM": (filter.pop("SPLITS_FROM")),"OPTIONAL":optional}, current_job) + if "SPLITS_FROM" in filter_: + filters_to_apply_s = self._check_splits({"SPLITS_FROM": (filter_.pop("SPLITS_FROM"))}, current_job) if len(filters_to_apply_s) > 0: filters_to_apply[i].update(filters_to_apply_s) - if optional: - for i,filter in enumerate(filters_to_apply): - filters_to_apply[i]["OPTIONAL"] = True - filters_to_apply = JobList._unify_to_filters(filters_to_apply) + filters_to_apply = self._unify_to_filters(filters_to_apply) return filters_to_apply - @staticmethod - def _check_chunks(relationships, current_job): + def _check_chunks(self,relationships, current_job): """ Check if the current_job_value is included in the filter_from and retrieve filter_to value :param relationships: Remaining filters to apply. :param current_job: Current job to check. :return: filters_to_apply """ - optional = False - filters_to_apply = JobList._check_relationship(relationships, "CHUNKS_FROM", current_job.chunk) - for i,filter in enumerate(filters_to_apply): - optional = filter.pop("OPTIONAL", False) + + filters_to_apply = self._check_relationship(relationships, "CHUNKS_FROM", current_job.chunk) + for i, filter in enumerate(filters_to_apply): if "SPLITS_FROM" in filter: - filters_to_apply_s = JobList._check_splits({"SPLITS_FROM": (filter.pop("SPLITS_FROM")),"OPTIONAL":optional}, current_job) + filters_to_apply_s = self._check_splits({"SPLITS_FROM": (filter.pop("SPLITS_FROM"))}, current_job) if len(filters_to_apply_s) > 0: filters_to_apply[i].update(filters_to_apply_s) - if optional: - for i,filter in enumerate(filters_to_apply): - filters_to_apply[i]["OPTIONAL"] = True - filters_to_apply = JobList._unify_to_filters(filters_to_apply) + filters_to_apply = self._unify_to_filters(filters_to_apply) return filters_to_apply - @staticmethod - def _check_splits(relationships, current_job): + def _check_splits(self,relationships, current_job): """ Check if the current_job_value is included in the filter_from and retrieve filter_to value :param relationships: Remaining filters to apply. @@ -564,13 +716,12 @@ class JobList(object): :return: filters_to_apply """ - filters_to_apply = JobList._check_relationship(relationships, "SPLITS_FROM", current_job.split) + filters_to_apply = self._check_relationship(relationships, "SPLITS_FROM", current_job.split) # No more FROM sections to check, unify _to FILTERS and return - filters_to_apply = JobList._unify_to_filters(filters_to_apply) + filters_to_apply = self._unify_to_filters(filters_to_apply) return filters_to_apply - @staticmethod - def _unify_to_filter(unified_filter,filter_to,filter_type): + def _unify_to_filter(self,unified_filter, filter_to, filter_type): """ Unify filter_to filters into a single dictionary :param unified_filter: Single dictionary with all filters_to @@ -578,60 +729,99 @@ class JobList(object): :param filter_type: "DATES_TO", "MEMBERS_TO", "CHUNKS_TO", "SPLITS_TO" :return: unified_filter """ - if "all" not in unified_filter[filter_type]: + if len(unified_filter[filter_type]) > 0 and unified_filter[filter_type][-1] != ",": + unified_filter[filter_type] += "," + if filter_type == "DATES_TO": + value_list = self._date_list + level_to_check = "DATES_FROM" + elif filter_type == "MEMBERS_TO": + value_list = self._member_list + level_to_check = "MEMBERS_FROM" + elif filter_type == "CHUNKS_TO": + value_list = self._chunk_list + level_to_check = "CHUNKS_FROM" + elif filter_type == "SPLITS_TO": + value_list = [] + level_to_check = "SPLITS_FROM" + if "all".casefold() not in unified_filter[filter_type].casefold(): aux = filter_to.pop(filter_type, None) if aux: aux = aux.split(",") for element in aux: - if element.lower().strip("?") in ["natural","none"] and len(unified_filter[filter_type]) > 0: + if element == "": continue + # Get only the first alphanumeric part and [:] chars + parsed_element = re.findall(r"([\[:\]a-zA-Z0-9]+)", element)[0].lower() + extra_data = element[len(parsed_element):] + parsed_element = JobList._parse_filter_to_check(parsed_element, value_list = value_list, level_to_check = filter_type) + # convert list to str + skip = False + if isinstance(parsed_element, list): + # check if any element is natural or none + for ele in parsed_element: + if type(ele) is str and ele.lower() in ["natural", "none"]: + skip = True else: - if filter_to.get("OPTIONAL",False) and element[-1] != "?": - element += "?" - unified_filter[filter_type].add(element) + if type(parsed_element) is str and parsed_element.lower() in ["natural", "none"]: + skip = True + if skip and len(unified_filter[filter_type]) > 0: + continue + else: + for ele in parsed_element: + if extra_data: + check_whole_string = str(ele)+extra_data+"," + else: + check_whole_string = str(ele)+"," + if str(check_whole_string) not in unified_filter[filter_type]: + unified_filter[filter_type] += check_whole_string + return unified_filter + @staticmethod - def _normalize_to_filters(filter_to,filter_type): + def _normalize_to_filters(filter_to, filter_type): """ Normalize filter_to filters to a single string or "all" :param filter_to: Unified filter_to dictionary :param filter_type: "DATES_TO", "MEMBERS_TO", "CHUNKS_TO", "SPLITS_TO" :return: """ - if len(filter_to[filter_type]) == 0: + if len(filter_to[filter_type]) == 0 or ("," in filter_to[filter_type] and len(filter_to[filter_type]) == 1): filter_to.pop(filter_type, None) - elif "all" in filter_to[filter_type]: + elif "all".casefold() in filter_to[filter_type]: filter_to[filter_type] = "all" else: - # transform to str separated by commas if multiple elements - filter_to[filter_type] = ",".join(filter_to[filter_type]) + # delete last comma + if "," in filter_to[filter_type][-1]: + filter_to[filter_type] = filter_to[filter_type][:-1] + # delete first comma + if "," in filter_to[filter_type][0]: + filter_to[filter_type] = filter_to[filter_type][1:] - @staticmethod - def _unify_to_filters(filter_to_apply): + def _unify_to_filters(self,filter_to_apply): """ Unify all filter_to filters into a single dictionary ( of current selection ) :param filter_to_apply: Filters to apply :return: Single dictionary with all filters_to """ - unified_filter = {"DATES_TO": set(), "MEMBERS_TO": set(), "CHUNKS_TO": set(), "SPLITS_TO": set()} + unified_filter = {"DATES_TO": "", "MEMBERS_TO": "", "CHUNKS_TO": "", "SPLITS_TO": ""} for filter_to in filter_to_apply: + if "STATUS" not in unified_filter and filter_to.get("STATUS", None): + unified_filter["STATUS"] = filter_to["STATUS"] + if "FROM_STEP" not in unified_filter and filter_to.get("FROM_STEP", None): + unified_filter["FROM_STEP"] = filter_to["FROM_STEP"] if len(filter_to) > 0: - JobList._unify_to_filter(unified_filter,filter_to,"DATES_TO") - JobList._unify_to_filter(unified_filter,filter_to,"MEMBERS_TO") - JobList._unify_to_filter(unified_filter,filter_to,"CHUNKS_TO") - JobList._unify_to_filter(unified_filter,filter_to,"SPLITS_TO") - filter_to.pop("OPTIONAL", None) - - JobList._normalize_to_filters(unified_filter,"DATES_TO") - JobList._normalize_to_filters(unified_filter,"MEMBERS_TO") - JobList._normalize_to_filters(unified_filter,"CHUNKS_TO") - JobList._normalize_to_filters(unified_filter,"SPLITS_TO") - + self._unify_to_filter(unified_filter, filter_to, "DATES_TO") + self._unify_to_filter(unified_filter, filter_to, "MEMBERS_TO") + self._unify_to_filter(unified_filter, filter_to, "CHUNKS_TO") + self._unify_to_filter(unified_filter, filter_to, "SPLITS_TO") + + JobList._normalize_to_filters(unified_filter, "DATES_TO") + JobList._normalize_to_filters(unified_filter, "MEMBERS_TO") + JobList._normalize_to_filters(unified_filter, "CHUNKS_TO") + JobList._normalize_to_filters(unified_filter, "SPLITS_TO") return unified_filter - @staticmethod - def _filter_current_job(current_job,relationships): - ''' - This function will filter the current job based on the relationships given + def _filter_current_job(self,current_job, relationships): + ''' This function will filter the current job based on the relationships given :param current_job: Current job to filter :param relationships: Relationships to apply :return: dict() with the filters to apply, or empty dict() if no filters to apply @@ -655,19 +845,16 @@ class JobList(object): filters_to_apply = {} # Check if filter_from-filter_to relationship is set if relationships is not None and len(relationships) > 0: - if "OPTIONAL" not in relationships: - relationships["OPTIONAL"] = False # Look for a starting point, this can be if else becasue they're exclusive as a DATE_FROM can't be in a MEMBER_FROM and so on if "DATES_FROM" in relationships: - filters_to_apply = JobList._check_dates(relationships, current_job) + filters_to_apply = self._check_dates(relationships, current_job) elif "MEMBERS_FROM" in relationships: - filters_to_apply = JobList._check_members(relationships, current_job) + filters_to_apply = self._check_members(relationships, current_job) elif "CHUNKS_FROM" in relationships: - filters_to_apply = JobList._check_chunks(relationships, current_job) + filters_to_apply = self._check_chunks(relationships, current_job) elif "SPLITS_FROM" in relationships: - filters_to_apply = JobList._check_splits(relationships, current_job) + filters_to_apply = self._check_splits(relationships, current_job) else: - relationships.pop("OPTIONAL", None) relationships.pop("CHUNKS_FROM", None) relationships.pop("MEMBERS_FROM", None) relationships.pop("DATES_FROM", None) @@ -675,10 +862,8 @@ class JobList(object): filters_to_apply = relationships return filters_to_apply - - @staticmethod - def _valid_parent(parent,member_list,date_list,chunk_list,is_a_natural_relation,filter_): + def _valid_parent(parent, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child): ''' Check if the parent is valid for the current job :param parent: job to check @@ -686,23 +871,32 @@ class JobList(object): :param date_list: list of dates :param chunk_list: list of chunks :param is_a_natural_relation: if the relation is natural or not - :param filters_to_apply: filters to apply :return: True if the parent is valid, False otherwise ''' - #check if current_parent is listed on dependency.relationships + # check if current_parent is listed on dependency.relationships associative_list = {} associative_list["dates"] = date_list associative_list["members"] = member_list associative_list["chunks"] = chunk_list - if parent.splits is not None: - associative_list["splits"] = [ str(split) for split in range(1,int(parent.splits)+1) ] + if not child.splits: + child_splits = 0 + else: + child_splits = int(child.splits) + if not parent.splits: + parent_splits = 0 + else: + parent_splits = int(parent.splits) + splits = max(child_splits, parent_splits) + if splits > 0: + associative_list["splits"] = [str(split) for split in range(1, int(splits) + 1)] else: associative_list["splits"] = None dates_to = str(filter_.get("DATES_TO", "natural")).lower() members_to = str(filter_.get("MEMBERS_TO", "natural")).lower() chunks_to = str(filter_.get("CHUNKS_TO", "natural")).lower() splits_to = str(filter_.get("SPLITS_TO", "natural")).lower() + if not is_a_natural_relation: if dates_to == "natural": dates_to = "none" @@ -712,28 +906,39 @@ class JobList(object): chunks_to = "none" if splits_to == "natural": splits_to = "none" - if dates_to == "natural": + if "natural" in dates_to: associative_list["dates"] = [date2str(parent.date)] if parent.date is not None else date_list - if members_to == "natural": + if "natural" in members_to: associative_list["members"] = [parent.member] if parent.member is not None else member_list - if chunks_to == "natural": + if "natural" in chunks_to: associative_list["chunks"] = [parent.chunk] if parent.chunk is not None else chunk_list - if splits_to == "natural": + if "natural" in splits_to: associative_list["splits"] = [parent.split] if parent.split is not None else parent.splits parsed_parent_date = date2str(parent.date) if parent.date is not None else None - # Apply all filters to look if this parent is an appropriated candidate for the current_job - valid_dates = JobList._apply_filter(parsed_parent_date, dates_to, associative_list["dates"], "dates") + valid_dates = JobList._apply_filter(parsed_parent_date, dates_to, associative_list["dates"], "dates") valid_members = JobList._apply_filter(parent.member, members_to, associative_list["members"], "members") - valid_chunks = JobList._apply_filter(parent.chunk, chunks_to, associative_list["chunks"], "chunks") - valid_splits = JobList._apply_filter(parent.split, splits_to, associative_list["splits"], "splits") + valid_chunks = JobList._apply_filter(parent.chunk, chunks_to, associative_list["chunks"], "chunks") + valid_splits = JobList._apply_filter(parent.split, splits_to, associative_list["splits"], "splits", child, parent) if valid_dates and valid_members and valid_chunks and valid_splits: - for value in [dates_to, members_to, chunks_to, splits_to]: - if "?" in value: - return True, True - return True, False - return False,False - @staticmethod - def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, + return True + return False + + def _add_edge_info(self, job, special_status): + """ + Special relations to be check in the update_list method + :param job: Current job + :param parent: parent jobs to check + :return: + """ + if special_status not in self.jobs_edges: + self.jobs_edges[special_status] = set() + self.jobs_edges[special_status].add(job) + if "ALL" not in self.jobs_edges: + self.jobs_edges["ALL"] = set() + self.jobs_edges["ALL"].add(job) + + def _manage_job_dependencies(self, dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, + dependencies, graph): ''' Manage the dependencies of a job @@ -750,10 +955,12 @@ class JobList(object): parsed_date_list = [] for dat in date_list: parsed_date_list.append(date2str(dat)) + special_conditions = dict() for key in dependencies_keys: - dependency = dependencies.get(key,None) + dependency = dependencies.get(key, None) if dependency is None: - Log.printlog("WARNING: SECTION {0} is not defined in jobs.conf. Dependency skipped".format(key),Log.WARNING) + Log.printlog("WARNING: SECTION {0} is not defined in jobs.conf. Dependency skipped".format(key), + Log.WARNING) continue skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list, job.member, member_list, @@ -767,25 +974,49 @@ class JobList(object): natural_jobs = dic_jobs.get_jobs(dependency.section, date, member, chunk) all_parents = list(set(other_parents + parents_jobs)) # Get dates_to, members_to, chunks_to of the deepest level of the relationship. - filters_to_apply = JobList._filter_current_job(job,copy.deepcopy(dependency.relationships)) + filters_to_apply = self._filter_current_job(job, copy.deepcopy(dependency.relationships)) + if "?" in filters_to_apply.get("SPLITS_TO", "") or "?" in filters_to_apply.get("DATES_TO","") or "?" in filters_to_apply.get("MEMBERS_TO", "") or "?" in filters_to_apply.get("CHUNKS_TO", ""): + only_marked_status = True + else: + only_marked_status = False + special_conditions["STATUS"] = filters_to_apply.pop("STATUS", None) + special_conditions["FROM_STEP"] = filters_to_apply.pop("FROM_STEP", None) for parent in all_parents: # If splits is not None, the job is a list of jobs if parent.name == job.name: continue # Check if it is a natural relation. The only difference is that a chunk can depend on a chunks <= than the current chunk - if parent in natural_jobs and (job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk ): + if parent in natural_jobs and (job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk): natural_relationship = True else: natural_relationship = False # Check if the current parent is a valid parent based on the dependencies set on expdef.conf - valid,optional = JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply) # If the parent is valid, add it to the graph - if valid: + + if JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship, + filters_to_apply,job): job.add_parent(parent) - JobList._add_edge(graph, job, parent) + self._add_edge(graph, job, parent) # Could be more variables in the future - if optional: - job.add_edge_info(parent.name,special_variables={"optional":True}) + # Do parse checkpoint + if special_conditions.get("STATUS", None): + if only_marked_status: + if str(job.split) + "?" in filters_to_apply.get("SPLITS_TO", "") or str( + job.chunk) + "?" in filters_to_apply.get("CHUNKS_TO", "") or str( + job.member) + "?" in filters_to_apply.get("MEMBERS_TO", "") or str( + job.date) + "?" in filters_to_apply.get("DATES_TO", ""): + selected = True + else: + selected = False + else: + selected = True + if selected: + if special_conditions.get("FROM_STEP", None): + job.max_checkpoint_step = int(special_conditions.get("FROM_STEP", 0)) if int( + special_conditions.get("FROM_STEP", + 0)) > job.max_checkpoint_step else job.max_checkpoint_step + self._add_edge_info(job, special_conditions["STATUS"]) + job.add_edge_info(parent, special_conditions) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph, other_parents) @@ -841,7 +1072,7 @@ class JobList(object): @staticmethod def handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, - section_name, graph,visited_parents): + section_name, graph, visited_parents): if job.wait and job.frequency > 1: if job.chunk is not None and len(str(job.chunk)) > 0: max_distance = (chunk_list.index(chunk) + 1) % job.frequency @@ -884,9 +1115,10 @@ class JobList(object): parent = parents[i] if isinstance(parents, list) else parents graph.add_edge(parent.name, job.name) pass + @staticmethod def _create_jobs(dic_jobs, priority, default_job_type, jobs_data=dict()): - for section in dic_jobs._jobs_data.get("JOBS",{}).keys(): + for section in dic_jobs._jobs_data.get("JOBS", {}).keys(): Log.debug("Creating {0} jobs".format(section)) dic_jobs.read_section(section, priority, default_job_type, jobs_data) priority += 1 @@ -903,7 +1135,10 @@ class JobList(object): :return: Sorted Dictionary of List that represents the jobs included in the wrapping process. \n :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) """ + # Dictionary Key: date, Value: (Dictionary Key: Member, Value: List) + job = None + dict_jobs = dict() for date in self._date_list: dict_jobs[date] = dict() @@ -911,7 +1146,6 @@ class JobList(object): dict_jobs[date][member] = list() num_chunks = len(self._chunk_list) - sections_running_type_map = dict() if wrapper_jobs is not None and len(str(wrapper_jobs)) > 0: if type(wrapper_jobs) is not list: @@ -921,13 +1155,12 @@ class JobList(object): char = " " wrapper_jobs = wrapper_jobs.split(char) - for section in wrapper_jobs: # RUNNING = once, as default. This value comes from jobs_.yml try: sections_running_type_map[section] = str(self.jobs_data[section].get("RUNNING", 'once')) except BaseException as e: - raise AutosubmitCritical("Key {0} doesn't exists.".format(section),7014,str(e)) + raise AutosubmitCritical("Key {0} doesn't exists.".format(section), 7014, str(e)) # Select only relevant jobs, those belonging to the sections defined in the wrapper @@ -945,20 +1178,19 @@ class JobList(object): for member in self._member_list: # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = list(filter(lambda job: job.name.split("_")[1] == str_date and - job.name.split("_")[2] == member, filtered_jobs_fake_date_member)) - #sorted_jobs_list = [job for job in filtered_jobs_fake_date_member if job.name.split("_")[1] == str_date and + job.name.split("_")[2] == member, + filtered_jobs_fake_date_member)) + # sorted_jobs_list = [job for job in filtered_jobs_fake_date_member if job.name.split("_")[1] == str_date and # job.name.split("_")[2] == member] - #There can be no jobs for this member when select chunk/member is enabled + # There can be no jobs for this member when select chunk/member is enabled if not sorted_jobs_list or len(sorted_jobs_list) == 0: continue - previous_job = sorted_jobs_list[0] # get RUNNING for this section section_running_type = sections_running_type_map[previous_job.section] - jobs_to_sort = [previous_job] previous_section_running_type = None # Index starts at 1 because 0 has been taken in a previous step @@ -971,14 +1203,16 @@ class JobList(object): previous_section_running_type = section_running_type section_running_type = sections_running_type_map[job.section] # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list - if (previous_section_running_type is not None and previous_section_running_type != section_running_type) \ + if ( + previous_section_running_type is not None and previous_section_running_type != section_running_type) \ or index == len(sorted_jobs_list): # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) # Important to note that the only differentiating factor would be chunk OR num_chunks jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) - if len(k.name.split('_')) == 5 else num_chunks + 1))) + if len(k.name.split( + '_')) == 5 else num_chunks + 1))) # Bringing back original job if identified for idx in range(0, len(jobs_to_sort)): @@ -991,7 +1225,7 @@ class JobList(object): # By adding to the result at this step, only those with the same RUNNING have been added. dict_jobs[date][member] += jobs_to_sort jobs_to_sort = [] - if len(sorted_jobs_list) > 1 : + if len(sorted_jobs_list) > 1: jobs_to_sort.append(job) previous_job = job @@ -1023,7 +1257,7 @@ class JobList(object): fake_job = copy.deepcopy(job) # Use previous values to modify name of fake job fake_job.name = fake_job.name.split('_', 1)[0] + "_" + self._get_date(date) + "_" \ - + member + "_" + fake_job.name.split("_", 1)[1] + + member + "_" + fake_job.name.split("_", 1)[1] # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) # Mapping fake jobs to original ones @@ -1113,7 +1347,8 @@ class JobList(object): def copy_ordered_jobs_by_date_member(self): pass - def get_ordered_jobs_by_date_member(self,section): + + def get_ordered_jobs_by_date_member(self, section): """ Get the dictionary of jobs ordered according to wrapper's expression divided by date and member @@ -1152,7 +1387,8 @@ class JobList(object): :return: completed jobs :rtype: list """ - uncompleted_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and + uncompleted_jobs = [job for job in self._job_list if + (platform is None or job.platform.name == platform.name) and job.status != Status.COMPLETED] if wrapper: @@ -1264,7 +1500,8 @@ class JobList(object): :rtype: list """ unsubmitted = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - (job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED)] + ( + job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED)] if wrapper: return [job for job in unsubmitted if job.packed is False] @@ -1288,7 +1525,7 @@ class JobList(object): else: return all_jobs - def get_job_names(self,lower_case=False): + def get_job_names(self, lower_case=False): """ Returns a list of all job names :param: lower_case: if true, returns lower case job names @@ -1309,21 +1546,24 @@ class JobList(object): def update_two_step_jobs(self): prev_jobs_to_run_first = self.jobs_to_run_first if len(self.jobs_to_run_first) > 0: - self.jobs_to_run_first = [ job for job in self.jobs_to_run_first if job.status != Status.COMPLETED ] + self.jobs_to_run_first = [job for job in self.jobs_to_run_first if job.status != Status.COMPLETED] keep_running = False for job in self.jobs_to_run_first: - running_parents = [parent for parent in job.parents if parent.status != Status.WAITING and parent.status != Status.FAILED ] #job is parent of itself + running_parents = [parent for parent in job.parents if + parent.status != Status.WAITING and parent.status != Status.FAILED] # job is parent of itself if len(running_parents) == len(job.parents): keep_running = True if len(self.jobs_to_run_first) > 0 and keep_running is False: - raise AutosubmitCritical("No more jobs to run first, there were still pending jobs but they're unable to run without their parents or there are failed jobs.",7014) + raise AutosubmitCritical( + "No more jobs to run first, there were still pending jobs but they're unable to run without their parents or there are failed jobs.", + 7014) - def parse_jobs_by_filter(self, unparsed_jobs,two_step_start = True): + def parse_jobs_by_filter(self, unparsed_jobs, two_step_start=True): jobs_to_run_first = list() - select_jobs_by_name = "" #job_name - select_all_jobs_by_section = "" # all + select_jobs_by_name = "" # job_name + select_all_jobs_by_section = "" # all filter_jobs_by_section = "" # Select, chunk / member - if "&" in unparsed_jobs: # If there are explicit jobs add them + if "&" in unparsed_jobs: # If there are explicit jobs add them jobs_to_check = unparsed_jobs.split("&") select_jobs_by_name = jobs_to_check[0] unparsed_jobs = jobs_to_check[1] @@ -1340,20 +1580,26 @@ class JobList(object): filter_jobs_by_section = aux[1] if two_step_start: try: - self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name,select_all_jobs_by_section=select_all_jobs_by_section,filter_jobs_by_section=filter_jobs_by_section) + self.jobs_to_run_first = self.get_job_related(select_jobs_by_name=select_jobs_by_name, + select_all_jobs_by_section=select_all_jobs_by_section, + filter_jobs_by_section=filter_jobs_by_section) except Exception as e: - raise AutosubmitCritical("Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format(unparsed_jobs)) + raise AutosubmitCritical( + "Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format( + unparsed_jobs)) else: try: self.rerun_job_list = self.get_job_related(select_jobs_by_name=select_jobs_by_name, - select_all_jobs_by_section=select_all_jobs_by_section, - filter_jobs_by_section=filter_jobs_by_section,two_step_start=two_step_start) + select_all_jobs_by_section=select_all_jobs_by_section, + filter_jobs_by_section=filter_jobs_by_section, + two_step_start=two_step_start) except Exception as e: raise AutosubmitCritical( "Check the {0} format.\nFirst filter is optional ends with '&'.\nSecond filter ends with ';'.\nThird filter must contain '['. ".format( unparsed_jobs)) - def get_job_related(self, select_jobs_by_name="",select_all_jobs_by_section="",filter_jobs_by_section="",two_step_start=True): + def get_job_related(self, select_jobs_by_name="", select_all_jobs_by_section="", filter_jobs_by_section="", + two_step_start=True): """ :param two_step_start: :param select_jobs_by_name: job name @@ -1367,25 +1613,29 @@ class JobList(object): jobs_date = [] # First Filter {select job by name} if select_jobs_by_name != "": - jobs_by_name = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.name.lower()+"([^a-z0-9_]|$)",select_jobs_by_name.lower()) is not None ] - jobs_by_name_no_expid = [job for job in self._job_list if - re.search("(^|[^0-9a-z_])" + job.name.lower()[5:] + "([^a-z0-9_]|$)", + jobs_by_name = [job for job in self._job_list if + re.search("(^|[^0-9a-z_])" + job.name.lower() + "([^a-z0-9_]|$)", select_jobs_by_name.lower()) is not None] + jobs_by_name_no_expid = [job for job in self._job_list if + re.search("(^|[^0-9a-z_])" + job.name.lower()[5:] + "([^a-z0-9_]|$)", + select_jobs_by_name.lower()) is not None] ultimate_jobs_list.extend(jobs_by_name) ultimate_jobs_list.extend(jobs_by_name_no_expid) # Second Filter { select all } if select_all_jobs_by_section != "": - all_jobs_by_section = [ job for job in self._job_list if re.search("(^|[^0-9a-z_])"+job.section.upper()+"([^a-z0-9_]|$)",select_all_jobs_by_section.upper()) is not None ] + all_jobs_by_section = [job for job in self._job_list if + re.search("(^|[^0-9a-z_])" + job.section.upper() + "([^a-z0-9_]|$)", + select_all_jobs_by_section.upper()) is not None] ultimate_jobs_list.extend(all_jobs_by_section) # Third Filter N section { date , member? , chunk?} # Section[date[member][chunk]] # filter_jobs_by_section="SIM[20[C:000][M:1]],DA[20 21[M:000 001][C:1]]" if filter_jobs_by_section != "": - section_name="" - section_dates="" - section_chunks="" - section_members="" + section_name = "" + section_dates = "" + section_chunks = "" + section_members = "" jobs_final = list() for complete_filter_by_section in filter_jobs_by_section.split(','): section_list = complete_filter_by_section.split('[') @@ -1401,20 +1651,27 @@ class JobList(object): elif 'm' in section_list[3].lower(): section_members = section_list[3].strip('mM:[]') - if section_name != "": jobs_filtered = [job for job in self._job_list if - re.search("(^|[^0-9a-z_])" + job.section.upper() + "([^a-z0-9_]|$)", - section_name.upper()) is not None] + re.search("(^|[^0-9a-z_])" + job.section.upper() + "([^a-z0-9_]|$)", + section_name.upper()) is not None] if section_dates != "": - jobs_date = [ job for job in jobs_filtered if re.search("(^|[^0-9a-z_])" + date2str(job.date, job.date_format) + "([^a-z0-9_]|$)", section_dates.lower()) is not None or job.date is None ] + jobs_date = [job for job in jobs_filtered if + re.search("(^|[^0-9a-z_])" + date2str(job.date, job.date_format) + "([^a-z0-9_]|$)", + section_dates.lower()) is not None or job.date is None] if section_chunks != "" or section_members != "": - jobs_final = [job for job in jobs_date if ( section_chunks == "" or re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)",section_chunks) is not None ) and ( section_members == "" or re.search("(^|[^0-9a-z_])" + str(job.member) + "([^a-z0-9_]|$)",section_members.lower()) is not None ) ] + jobs_final = [job for job in jobs_date if ( + section_chunks == "" or re.search("(^|[^0-9a-z_])" + str(job.chunk) + "([^a-z0-9_]|$)", + section_chunks) is not None) and ( + section_members == "" or re.search( + "(^|[^0-9a-z_])" + str(job.member) + "([^a-z0-9_]|$)", + section_members.lower()) is not None)] ultimate_jobs_list.extend(jobs_final) # Duplicates out ultimate_jobs_list = list(set(ultimate_jobs_list)) - Log.debug("List of jobs filtered by TWO_STEP_START parameter:\n{0}".format([job.name for job in ultimate_jobs_list])) + Log.debug( + "List of jobs filtered by TWO_STEP_START parameter:\n{0}".format([job.name for job in ultimate_jobs_list])) return ultimate_jobs_list def get_logs(self): @@ -1452,7 +1709,8 @@ class JobList(object): :return: ready jobs :rtype: list """ - ready = [job for job in self._job_list if ( platform is None or platform == "" or job.platform.name == platform.name ) and + ready = [job for job in self._job_list if + (platform is None or platform == "" or job.platform.name == platform.name) and job.status == Status.READY and job.hold is hold] if wrapper: @@ -1472,6 +1730,7 @@ class JobList(object): prepared = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and job.status == Status.PREPARED] return prepared + def get_delayed(self, platform=None): """ Returns a list of delayed jobs @@ -1482,8 +1741,9 @@ class JobList(object): :rtype: list """ delayed = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - job.status == Status.DELAYED] + job.status == Status.DELAYED] return delayed + def get_skipped(self, platform=None): """ Returns a list of skipped jobs @@ -1524,7 +1784,7 @@ class JobList(object): """ waiting_jobs = [job for job in self._job_list if ( - job.platform.type == platform_type and job.status == Status.WAITING)] + job.platform.type == platform_type and job.status == Status.WAITING)] return waiting_jobs def get_held_jobs(self, platform=None): @@ -1636,13 +1896,15 @@ class JobList(object): """ active = self.get_in_queue(platform) + self.get_ready( - platform=platform, hold=True) + self.get_ready(platform=platform, hold=False) + self.get_delayed(platform=platform) + platform=platform, hold=True) + self.get_ready(platform=platform, hold=False) + self.get_delayed( + platform=platform) tmp = [job for job in active if job.hold and not (job.status == - Status.SUBMITTED or job.status == Status.READY or job.status == Status.DELAYED) ] + Status.SUBMITTED or job.status == Status.READY or job.status == Status.DELAYED)] if len(tmp) == len(active): # IF only held jobs left without dependencies satisfied if len(tmp) != 0 and len(active) != 0: raise AutosubmitCritical( - "Only Held Jobs active. Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow)", 7066) + "Only Held Jobs active. Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow)", + 7066) active = [] return active @@ -1658,6 +1920,7 @@ class JobList(object): for job in self._job_list: if job.name == name: return job + def get_jobs_by_section(self, section_list): """ Returns the job that its name matches parameter section @@ -1688,7 +1951,7 @@ class JobList(object): def get_in_ready_grouped_id(self, platform): jobs = [] [jobs.append(job) for job in jobs if ( - platform is None or job.platform.name is platform.name)] + platform is None or job.platform.name is platform.name)] jobs_by_id = dict() for job in jobs: @@ -1791,15 +2054,15 @@ class JobList(object): try: self._persistence.save(self._persistence_path, - self._persistence_file, self._job_list if self.run_members is None or job_list is None else job_list) + self._persistence_file, + self._job_list if self.run_members is None or job_list is None else job_list) pass except BaseException as e: - raise AutosubmitError(str(e),6040,"Failure while saving the job_list") + raise AutosubmitError(str(e), 6040, "Failure while saving the job_list") except AutosubmitError as e: raise except BaseException as e: - raise AutosubmitError(str(e),6040,"Unknown failure while saving the job_list") - + raise AutosubmitError(str(e), 6040, "Unknown failure while saving the job_list") def backup_save(self): """ @@ -1813,8 +2076,8 @@ class JobList(object): exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) - Log.reset_status_file(os.path.join(aslogs_path,"jobs_active_status.log"),"status") - Log.reset_status_file(os.path.join(aslogs_path,"jobs_failed_status.log"),"status_failed") + Log.reset_status_file(os.path.join(aslogs_path, "jobs_active_status.log"), "status") + Log.reset_status_file(os.path.join(aslogs_path, "jobs_failed_status.log"), "status_failed") job_list = self.get_completed()[-5:] + self.get_in_queue() failed_job_list = self.get_failed() if len(job_list) > 0: @@ -1822,7 +2085,7 @@ class JobList(object): "Job Id", "Job Status", "Job Platform", "Job Queue") if len(failed_job_list) > 0: Log.status_failed("\n{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", "Job Name", - "Job Id", "Job Status", "Job Platform", "Job Queue") + "Job Id", "Job Status", "Job Platform", "Job Queue") for job in job_list: if len(job.queue) > 0 and str(job.platform.queue).lower() != "none": queue = job.queue @@ -1863,8 +2126,10 @@ class JobList(object): "_" + output_date)) def get_skippable_jobs(self, jobs_in_wrapper): - job_list_skip = [job for job in self.get_job_list() if job.skippable == "true" and (job.status == Status.QUEUING or job.status == - Status.RUNNING or job.status == Status.COMPLETED or job.status == Status.READY) and jobs_in_wrapper.find(job.section) == -1] + job_list_skip = [job for job in self.get_job_list() if + job.skippable == "true" and (job.status == Status.QUEUING or job.status == + Status.RUNNING or job.status == Status.COMPLETED or job.status == Status.READY) and jobs_in_wrapper.find( + job.section) == -1] skip_by_section = dict() for job in job_list_skip: if job.section not in skip_by_section: @@ -1886,6 +2151,40 @@ class JobList(object): def parameters(self, value): self._parameters = value + def check_checkpoint(self, job, parent): + """ Check if a checkpoint step exists for this edge""" + return job.get_checkpoint_files(parent.name) + + def check_special_status(self): + """ + Check if all parents of a job have the correct status for checkpointing + :return: jobs that fullfill the special conditions """ + jobs_to_check = [] + for status, sorted_job_list in self.jobs_edges.items(): + if status == "ALL": + continue + for job in sorted_job_list: + if job.status != Status.WAITING: + continue + if status in ["RUNNING", "FAILED"]: + # check checkpoint if any + if job.platform.connected: # This will be true only when used under setstatus/run + job.get_checkpoint_files() + non_completed_parents_current = 0 + completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) + for parent in job.edge_info[status].values(): + if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + continue + else: + status_str = Status.VALUE_TO_KEY[parent[0].status] + if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status): + non_completed_parents_current += 1 + if ( non_completed_parents_current + completed_parents ) == len(job.parents): + jobs_to_check.append(job) + + return jobs_to_check + + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False): # type: (AutosubmitConfig, bool, bool, object, bool) -> bool """ @@ -1908,7 +2207,7 @@ class JobList(object): write_log_status = False if not first_time: for job in self.get_failed(): - if self.jobs_data[job.section].get("RETRIALS",None) is None: + if self.jobs_data[job.section].get("RETRIALS", None) is None: retrials = int(as_conf.get_retrials()) else: retrials = int(job.retrials) @@ -1922,7 +2221,7 @@ class JobList(object): else: aux_job_delay = int(job.delay_retrials) - if self.jobs_data[job.section].get("DELAY_RETRY_TIME",None) or aux_job_delay <= 0: + if self.jobs_data[job.section].get("DELAY_RETRY_TIME", None) or aux_job_delay <= 0: delay_retry_time = str(as_conf.get_delay_retry_time()) else: delay_retry_time = job.retry_delay @@ -1930,7 +2229,7 @@ class JobList(object): retry_delay = job.fail_count * int(delay_retry_time[:-1]) + int(delay_retry_time[:-1]) elif "*" in delay_retry_time: retry_delay = int(delay_retry_time[1:]) - for retrial_amount in range(0,job.fail_count): + for retrial_amount in range(0, job.fail_count): retry_delay += retry_delay * 10 else: retry_delay = int(delay_retry_time) @@ -1957,6 +2256,14 @@ class JobList(object): job.status = Status.FAILED job.packed = False save = True + # Check checkpoint jobs, the status can be Any + for job in self.check_special_status(): + job.status = Status.READY + job.id = None + job.packed = False + job.wrapper_type = None + save = True + Log.debug(f"Special condition fullfilled for job {job.name}") # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None and len(str(job.synchronize)) > 0: @@ -1979,8 +2286,6 @@ class JobList(object): Log.debug( "Resetting sync job: {0} status to: WAITING for parents completion...".format( job.name)) - - Log.debug('Updating WAITING jobs') if not fromSetStatus: all_parents_completed = [] @@ -1988,9 +2293,12 @@ class JobList(object): if datetime.datetime.now() >= job.delay_end: job.status = Status.READY for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] - tmp2 = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] - tmp3 = [parent for parent in job.parents if parent.status == Status.SKIPPED or parent.status == Status.FAILED] + tmp = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] + tmp2 = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent.status == Status.SKIPPED or parent.status == Status.FAILED] + tmp3 = [parent for parent in job.parents if + parent.status == Status.SKIPPED or parent.status == Status.FAILED] failed_ones = [parent for parent in job.parents if parent.status == Status.FAILED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY @@ -2008,14 +2316,15 @@ class JobList(object): if parent.name in job.edge_info and job.edge_info[parent.name].get('optional', False): weak_dependencies_failure = True elif parent.section in job.dependencies: - if parent.status not in [Status.COMPLETED,Status.SKIPPED]: + if parent.status not in [Status.COMPLETED, Status.SKIPPED]: strong_dependencies_failure = True break if not strong_dependencies_failure and weak_dependencies_failure: job.status = Status.READY job.hold = False Log.debug( - "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format(job.name)) + "Setting job: {0} status to: READY (conditional jobs are completed/failed)...".format( + job.name)) break if as_conf.get_remote_dependencies() == "true": all_parents_completed.append(job.name) @@ -2043,23 +2352,26 @@ class JobList(object): job.hold = False save = True Log.debug( - "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format(job.name)) + "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format( + job.name)) Log.debug('Updating WAITING jobs eligible for be prepared') # Setup job name should be a variable for job in self.get_waiting_remote_dependencies('slurm'): if job.name not in all_parents_completed: tmp = [parent for parent in job.parents if ( - (parent.status == Status.SKIPPED or parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower())] + ( + parent.status == Status.SKIPPED or parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower())] if len(tmp) == len(job.parents): job.status = Status.PREPARED job.hold = True Log.debug( - "Setting job: {0} status to: Prepared for be held (all parents queuing, running or completed)...".format(job.name)) + "Setting job: {0} status to: Prepared for be held (all parents queuing, running or completed)...".format( + job.name)) Log.debug('Updating Held jobs') if self.job_package_map: held_jobs = [job for job in self.get_held_jobs() if ( - job.id not in list(self.job_package_map.keys()))] + job.id not in list(self.job_package_map.keys()))] held_jobs += [wrapper_job for wrapper_job in list(self.job_package_map.values()) if wrapper_job.status == Status.HELD] else: @@ -2078,7 +2390,7 @@ class JobList(object): job.hold = hold_wrapper if not job.hold: for inner_job in job.job_list: - inner_job.hold = False + inner_job.hold = False Log.debug( "Setting job: {0} status to: Queuing (all parents completed)...".format( job.name)) @@ -2086,7 +2398,7 @@ class JobList(object): tmp = [ parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): - job.hold = False + job.hold = False Log.debug( "Setting job: {0} status to: Queuing (all parents completed)...".format( job.name)) @@ -2118,7 +2430,7 @@ class JobList(object): for related_job in jobs_to_skip[section]: if members.index(job.member) < members.index( related_job.member) and job.chunk == related_job.chunk and jobdate == date2str( - related_job.date, related_job.date_format): + related_job.date, related_job.date_format): try: if job.status == Status.QUEUING: job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), @@ -2127,7 +2439,7 @@ class JobList(object): pass # job_id finished already job.status = Status.SKIPPED save = True - #save = True + # save = True self.update_two_step_jobs() Log.debug('Update finished') return save @@ -2176,7 +2488,8 @@ class JobList(object): if m_time_job_conf: if m_time_job_conf > m_time_db: Log.info( - "File jobs_{0}.yml has been modified since the last time the structure persistence was saved.".format(self.expid)) + "File jobs_{0}.yml has been modified since the last time the structure persistence was saved.".format( + self.expid)) structure_valid = False else: Log.info( @@ -2199,7 +2512,7 @@ class JobList(object): if structure_valid is False: # Structure does not exist, or it is not be updated, attempt to create it. Log.info("Updating structure persistence...") - self.graph = transitive_reduction(self.graph) # add threads for large experiments? todo + self.graph = transitive_reduction(self.graph) # add threads for large experiments? todo if self.graph: for job in self._job_list: children_to_remove = [ @@ -2235,7 +2548,8 @@ class JobList(object): out = False return out - def save_wrappers(self,packages_to_save,failed_packages,as_conf,packages_persistence,hold=False,inspect=False): + def save_wrappers(self, packages_to_save, failed_packages, as_conf, packages_persistence, hold=False, + inspect=False): for package in packages_to_save: if package.jobs[0].id not in failed_packages: if hasattr(package, "name"): @@ -2250,6 +2564,7 @@ class JobList(object): # Saving only when it is a real multi job package packages_persistence.save( package.name, package.jobs, package._expid, inspect) + def check_scripts(self, as_conf): """ When we have created the scripts, all parameters should have been substituted. @@ -2307,7 +2622,7 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, job_list_unparsed,as_conf, monitor=False): + def rerun(self, job_list_unparsed, as_conf, monitor=False): """ Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun @@ -2318,7 +2633,7 @@ class JobList(object): :type monitor: bool """ - self.parse_jobs_by_filter(job_list_unparsed,two_step_start=False) + self.parse_jobs_by_filter(job_list_unparsed, two_step_start=False) member_list = set() chunk_list = set() date_list = set() @@ -2347,8 +2662,8 @@ class JobList(object): for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - if as_conf.jobs_data[job_section].get('DEPENDENCIES',None) is not None: - dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES',{}) + if as_conf.jobs_data[job_section].get('DEPENDENCIES', None) is not None: + dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES', {}) if type(dependencies_keys) is str: dependencies_keys = dependencies_keys.upper().split() if dependencies_keys is None: @@ -2357,11 +2672,16 @@ class JobList(object): for job in self.get_jobs_by_section(job_section): for key in dependencies_keys: dependency = dependencies[key] - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, self._chunk_list, job.member, self._member_list, job.date, self._date_list, dependency) + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, + self._chunk_list, + job.member, + self._member_list, + job.date, self._date_list, + dependency) if skip: continue section_name = dependencies[key].section - for parent in self._dic_jobs.get_jobs(section_name, job.date, job.member,job.chunk): + for parent in self._dic_jobs.get_jobs(section_name, job.date, job.member, job.chunk): if not monitor: parent.status = Status.WAITING Log.debug("Parent: " + parent.name) @@ -2405,10 +2725,11 @@ class JobList(object): allJobs = self.get_all() if existingList is None else existingList # Header result = (bcolors.BOLD if nocolor is False else '') + \ - "## String representation of Job List [" + str(len(allJobs)) + "] " + "## String representation of Job List [" + str(len(allJobs)) + "] " if statusChange is not None and len(str(statusChange)) > 0: result += "with " + (bcolors.OKGREEN if nocolor is False else '') + str(len(list(statusChange.keys())) - ) + " Change(s) ##" + (bcolors.ENDC + bcolors.ENDC if nocolor is False else '') + ) + " Change(s) ##" + ( + bcolors.ENDC + bcolors.ENDC if nocolor is False else '') else: result += " ## " @@ -2419,7 +2740,7 @@ class JobList(object): if len(job.parents) == 0: roots.append(job) visited = list() - #print(root) + # print(root) # root exists for root in roots: if root is not None and len(str(root)) > 0: @@ -2480,17 +2801,17 @@ class JobList(object): prefix += "| " # Prefix + Job Name result = "\n" + prefix + \ - (bcolors.BOLD + bcolors.CODE_TO_COLOR[job.status] if nocolor is False else '') + \ - job.name + \ - (bcolors.ENDC + bcolors.ENDC if nocolor is False else '') + (bcolors.BOLD + bcolors.CODE_TO_COLOR[job.status] if nocolor is False else '') + \ + job.name + \ + (bcolors.ENDC + bcolors.ENDC if nocolor is False else '') if len(job._children) > 0: level += 1 children = job._children total_children = len(job._children) # Writes children number and status if color are not being showed result += " ~ [" + str(total_children) + (" children] " if total_children > 1 else " child] ") + \ - ("[" + Status.VALUE_TO_KEY[job.status] + - "] " if nocolor is True else "") + ("[" + Status.VALUE_TO_KEY[job.status] + + "] " if nocolor is True else "") if statusChange is not None and len(str(statusChange)) > 0: # Writes change if performed result += (bcolors.BOLD + @@ -2509,7 +2830,7 @@ class JobList(object): "] " if nocolor is True else "") return result - + @staticmethod def retrieve_packages(BasicConfig, expid, current_jobs=None): """ @@ -2574,7 +2895,8 @@ class JobList(object): return job_to_package, package_to_jobs, package_to_package_id, package_to_symbol @staticmethod - def retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, job_data_collection=None): + def retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, + job_data_collection=None): """ Retrieve job timestamps from database. :param job_data_collection: @@ -2635,10 +2957,13 @@ class JobList(object): if status_code in [Status.SUSPENDED]: t_submit = t_start = t_finish = 0 - return JobRow(job_data.job_name, int(queue_time), int(running_time), status, energy, JobList.ts_to_datetime(t_submit), JobList.ts_to_datetime(t_start), JobList.ts_to_datetime(t_finish), job_data.ncpus, job_data.run_id) + return JobRow(job_data.job_name, int(queue_time), int(running_time), status, energy, + JobList.ts_to_datetime(t_submit), JobList.ts_to_datetime(t_start), + JobList.ts_to_datetime(t_finish), job_data.ncpus, job_data.run_id) # Using standard procedure - if status_code in [Status.RUNNING, Status.SUBMITTED, Status.QUEUING, Status.FAILED] or make_exception is True: + if status_code in [Status.RUNNING, Status.SUBMITTED, Status.QUEUING, + Status.FAILED] or make_exception is True: # COMPLETED adds too much overhead so these values are now stored in a database and retrieved separately submit_time, start_time, finish_time, status = JobList._job_running_check( status_code, name, tmp_path) @@ -2651,7 +2976,7 @@ class JobList(object): Status.FAILED] else 0 else: queuing_for_min = ( - datetime.datetime.now() - submit_time) + datetime.datetime.now() - submit_time) running_for_min = datetime.datetime.now() - datetime.datetime.now() submit_time = mktime(submit_time.timetuple()) start_time = 0 @@ -2685,9 +3010,9 @@ class JobList(object): return seconds_queued = seconds_queued * \ - (-1) if seconds_queued < 0 else seconds_queued + (-1) if seconds_queued < 0 else seconds_queued seconds_running = seconds_running * \ - (-1) if seconds_running < 0 else seconds_running + (-1) if seconds_running < 0 else seconds_running if seconds is False: queue_time = math.ceil( seconds_queued / 60) if seconds_queued > 0 else 0 @@ -2697,17 +3022,17 @@ class JobList(object): queue_time = seconds_queued running_time = seconds_running - return JobRow(name, - int(queue_time), - int(running_time), - status, - energy, - JobList.ts_to_datetime(submit_time), - JobList.ts_to_datetime(start_time), - JobList.ts_to_datetime(finish_time), - 0, - 0) - + return JobRow(name, + int(queue_time), + int(running_time), + status, + energy, + JobList.ts_to_datetime(submit_time), + JobList.ts_to_datetime(start_time), + JobList.ts_to_datetime(finish_time), + 0, + 0) + @staticmethod def _job_running_check(status_code, name, tmp_path): """ @@ -2778,7 +3103,7 @@ class JobList(object): if len(values) > 3 and current_status != status_from_job and current_status != "NA": current_status = "SUSPICIOUS" return submit_time, start_time, finish_time, current_status - + @staticmethod def ts_to_datetime(timestamp): if timestamp and timestamp > 0: @@ -2786,4 +3111,4 @@ class JobList(object): # timestamp).strftime('%Y-%m-%d %H:%M:%S')) return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') else: - return None \ No newline at end of file + return None diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 8b8bffc557396e8fe870fddb2bf478b950060ccc..bb6b841ca911d0f782e13c21d64f8864187144bb 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -233,6 +233,48 @@ class Monitor: Log.debug('Graph definition finalized') return graph + def _check_final_status(self, job, child): + # order of self._table + # the dictionary is composed by: + label = None + if len(child.edge_info) > 0: + if job.name in child.edge_info.get("FAILED",{}): + color = self._table.get(Status.FAILED,None) + label = child.edge_info["FAILED"].get(job.name,0)[1] + elif job.name in child.edge_info.get("RUNNING",{}): + color = self._table.get(Status.RUNNING,None) + label = child.edge_info["RUNNING"].get(job.name,0)[1] + elif job.name in child.edge_info.get("QUEUING",{}): + color = self._table.get(Status.QUEUING,None) + elif job.name in child.edge_info.get("HELD",{}): + color = self._table.get(Status.HELD,None) + elif job.name in child.edge_info.get("DELAYED",{}): + color = self._table.get(Status.DELAYED,None) + elif job.name in child.edge_info.get("UNKNOWN",{}): + color = self._table.get(Status.UNKNOWN,None) + elif job.name in child.edge_info.get("SUSPENDED",{}): + color = self._table.get(Status.SUSPENDED,None) + elif job.name in child.edge_info.get("SKIPPED",{}): + color = self._table.get(Status.SKIPPED,None) + elif job.name in child.edge_info.get("WAITING",{}): + color = self._table.get(Status.WAITING,None) + elif job.name in child.edge_info.get("READY",{}): + color = self._table.get(Status.READY,None) + elif job.name in child.edge_info.get("SUBMITTED",{}): + color = self._table.get(Status.SUBMITTED,None) + else: + return None, None + if label and label == 0: + label = None + return color,label + else: + return None, None + + + + + + def _add_children(self, job, exp, node_job, groups, hide_groups): if job in self.nodes_plotted: return @@ -241,20 +283,29 @@ class Monitor: for child in sorted(job.children, key=lambda k: k.name): node_child, skip = self._check_node_exists( exp, child, groups, hide_groups) + color, label = self._check_final_status(job, child) if len(node_child) == 0 and not skip: node_child = self._create_node(child, groups, hide_groups) if node_child: exp.add_node(node_child) - if job.name in child.edge_info and child.edge_info[job.name].get('optional', False): - exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + if color: + # label = None doesn't disable label, instead it sets it to nothing and complain about invalid syntax + if label: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed",color=color,label=label)) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed",color=color)) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) else: skip = True elif not skip: node_child = node_child[0] - if job.name in child.edge_info and child.edge_info[job.name].get('optional', False): - exp.add_edge(pydotplus.Edge(node_job, node_child,style="dashed")) + if color: + # label = None doesn't disable label, instead it sets it to nothing and complain about invalid syntax + if label: + exp.add_edge(pydotplus.Edge(node_job, node_child, style="dashed", color=color, label=label)) + else: + exp.add_edge(pydotplus.Edge(node_job, node_child, style="dashed", color=color)) else: exp.add_edge(pydotplus.Edge(node_job, node_child)) skip = True diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1f23cc6fcaf1998ca852dcc004db972e347a9e38..79ea7919aa9b2ecdf427f732ac5961075397c44f 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,5 +1,6 @@ import locale import os +from pathlib import Path import traceback from autosubmit.job.job_common import Status @@ -512,6 +513,22 @@ class Platform(object): (job_out_filename, job_err_filename) = remote_logs self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + def get_checkpoint_files(self, job): + """ + Get all the checkpoint files of a job + :param job: Get the checkpoint files + :type job: Job + :param max_step: max step possible + :type max_step: int + """ + + if job.current_checkpoint_step < job.max_checkpoint_step: + remote_checkpoint_path = f'{self.get_files_path()}/CHECKPOINT_' + self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) + while self.check_file_exists(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') and job.current_checkpoint_step < job.max_checkpoint_step: + self.remove_checkpoint_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') + job.current_checkpoint_step += 1 + self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -582,6 +599,15 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def remove_checkpoint_file(self, filename): + """ + Removes *CHECKPOINT* files from remote + + :param job_name: name of job to check + :return: True if successful, False otherwise + """ + if self.check_file_exists(filename): + self.delete_file(filename) def check_file_exists(self, src, wrapper_failed=False): return True diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index e867ff062cbcd573a3f6aca57f51116de788b53a..a64d386e8375f9e19a4f01f1b2a5b76751503b0a 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,18 +18,16 @@ # along with Autosubmit. If not, see . import locale import os -from contextlib import suppress -from time import sleep +from datetime import datetime from time import mktime +from time import sleep from time import time -from datetime import datetime from typing import List, Union - from xml.dom.minidom import parseString from autosubmit.job.job_common import Status, parse_output_number -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log @@ -88,8 +86,8 @@ class SlurmPlatform(ParamikoPlatform): try: jobs_id = self.submit_Script(hold=hold) except AutosubmitError as e: - Log.error(f'TRACE:{e.trace}\n{e.message}') jobnames = [job.name for job in valid_packages_to_submit[0].jobs] + Log.error(f'TRACE:{e.trace}\n{e.message} JOBS:{jobnames}') for jobname in jobnames: jobid = self.get_jobid_by_jobname(jobname) #cancel bad submitted job if jobid is encountered diff --git a/docs/source/userguide/defining_workflows/fig/splits_1_to_1.png b/docs/source/userguide/defining_workflows/fig/splits_1_to_1.png new file mode 100644 index 0000000000000000000000000000000000000000..0e85db4e25fbbf3ba36316155235d02377299630 Binary files /dev/null and b/docs/source/userguide/defining_workflows/fig/splits_1_to_1.png differ diff --git a/docs/source/userguide/defining_workflows/fig/splits_1_to_n.png b/docs/source/userguide/defining_workflows/fig/splits_1_to_n.png new file mode 100644 index 0000000000000000000000000000000000000000..5e700a40087500a6c39f93e00e9354cf45bb5e60 Binary files /dev/null and b/docs/source/userguide/defining_workflows/fig/splits_1_to_n.png differ diff --git a/docs/source/userguide/defining_workflows/fig/splits_n_to_1.png b/docs/source/userguide/defining_workflows/fig/splits_n_to_1.png new file mode 100644 index 0000000000000000000000000000000000000000..ca7f21c5967712d37b2ae1f09e7f36d16bbeb9a0 Binary files /dev/null and b/docs/source/userguide/defining_workflows/fig/splits_n_to_1.png differ diff --git a/docs/source/userguide/defining_workflows/index.rst b/docs/source/userguide/defining_workflows/index.rst index 35b4e3859bc877023e0f0ca22d403f1429e6dbe6..257178ee73bedfff9480f23271172a5d05832058 100644 --- a/docs/source/userguide/defining_workflows/index.rst +++ b/docs/source/userguide/defining_workflows/index.rst @@ -204,6 +204,123 @@ For the new format, consider that the priority is hierarchy and goes like this D * You can define a MEMBERS_FROM inside the DEPENDENCY and DEPENDENCY.DATES_FROM. * You can define a CHUNKS_FROM inside the DEPENDENCY, DEPENDENCY.DATES_FROM, DEPENDENCY.MEMBERS_FROM, DEPENDENCY.DATES_FROM.MEMBERS_FROM +Start conditions +~~~~~~~~~~~~~~~~ + +Sometimes you want to run a job only when a certain condition is met. For example, you may want to run a job only when a certain task is running. +This can be achieved using the START_CONDITIONS feature based on the dependencies rework. + +Start conditions are achieved by adding the keyword ``STATUS`` and optionally ``FROM_STEP`` keywords into any dependency that you want. + +The ``STATUS`` keyword can be used to select the status of the dependency that you want to check. The possible values ( case-insensitive ) are: + +* "WAITING": The task is waiting for its dependencies to be completed. +* "DELAYED": The task is delayed by a delay condition. +* "PREPARED": The task is prepared to be submitted. +* "READY": The task is ready to be submitted. +* "SUBMITTED": The task is submitted. +* "HELD": The task is held. +* "QUEUING": The task is queuing. +* "RUNNING": The task is running. +* "SKIPPED": The task is skipped. +* "FAILED": The task is failed. +* "UNKNOWN": The task is unknown. +* "COMPLETED": The task is completed. # Default +* "SUSPENDED": The task is suspended. + +The status are ordered, so if you select "RUNNING" status, the task will be run if the parent is in any of the following statuses: "RUNNING", "QUEUING", "HELD", "SUBMITTED", "READY", "PREPARED", "DELAYED", "WAITING". + +.. code-block:: yaml + + ini: + FILE: ini.sh + RUNNING: member + + sim: + FILE: sim.sh + DEPENDENCIES: ini sim-1 + RUNNING: chunk + + postprocess: + FILE: postprocess.sh + DEPENDENCIES: + SIM: + STATUS: "RUNNING" + RUNNING: chunk + + +The ``FROM_STEP`` keyword can be used to select the **internal** step of the dependency that you want to check. The possible value is an integer. Additionally, the target dependency, must call to `%AS_CHECKPOINT%` inside their scripts. This will create a checkpoint that will be used to check the amount of steps processed. + +.. code-block:: yaml + + A: + FILE: a.sh + RUNNING: once + SPLITS: 2 + A_2: + FILE: a_2.sh + RUNNING: once + DEPENDENCIES: + A: + SPLIT_TO: "2" + STATUS: "RUNNING" + FROM_STEP: 2 + +There is now a new function that is automatically added in your scripts which is called ``as_checkpoint``. This is the function that is generating the checkpoint file. You can see the function below: + +.. code-block:: bash + + ################### + # AS CHECKPOINT FUNCTION + ################### + # Creates a new checkpoint file upon call based on the current numbers of calls to the function + + AS_CHECKPOINT_CALLS=0 + function as_checkpoint { + AS_CHECKPOINT_CALLS=$((AS_CHECKPOINT_CALLS+1)) + touch ${job_name_ptrn}_CHECKPOINT_${AS_CHECKPOINT_CALLS} + } + +And what you would have to include in your target dependency or dependencies is the call to this function which in this example is a.sh. + +The amount of calls is strongly related to the ``FROM_STEP`` value. + +``$expid/proj/$projname/as.sh`` + +.. code-block:: bash + + ##compute somestuff + as_checkpoint + ## compute some more stuff + as_checkpoint + + +To select an specific task, you have to combine the ``STATUS`` and ``CHUNKS_TO`` , ``MEMBERS_TO`` and ``DATES_TO``, ``SPLITS_TO`` keywords. + +.. code-block:: yaml + + A: + FILE: a + RUNNING: once + SPLITS: 1 + B: + FILE: b + RUNNING: once + SPLITS: 2 + DEPENDENCIES: A + C: + FILE: c + RUNNING: once + SPLITS: 1 + DEPENDENCIES: B + RECOVER_B_2: + FILE: fix_b + RUNNING: once + DEPENDENCIES: + B: + SPLIT_TO: "2" + STATUS: "RUNNING" + Job frequency ~~~~~~~~~~~~~ @@ -318,46 +435,100 @@ The resulting workflow of setting SYNCHRONIZE parameter to 'date' can be seen in Job split ~~~~~~~~~ -For jobs running at chunk level, it may be useful to split each chunk into different parts. +For jobs running at any level, it may be useful to split each task into different parts. This behaviour can be achieved using the SPLITS attribute to specify the number of parts. -It is possible to define dependencies to specific splits within [], as well as to a list/range of splits, -in the format [1:3,7,10] or [1,2,3] +It is also possible to specify the splits for each task using the SPLITS_FROM and SPLITS_TO attributes. -.. hint:: - This job parameter works with jobs with RUNNING parameter equals to 'chunk'. +There is also an special character '*' that can be used to specify that the split is 1-to-1 dependency. In order to use this character, you have to specify both SPLITS_FROM and SPLITS_TO attributes. .. code-block:: yaml ini: FILE: ini.sh - RUNNING: member + RUNNING: once sim: FILE: sim.sh DEPENDENCIES: ini sim-1 - RUNNING: chunk + RUNNING: once asim: FILE: asim.sh DEPENDENCIES: sim - RUNNING: chunk + RUNNING: once SPLITS: 3 post: FILE: post.sh - RUNNING: chunk - DEPENDENCIES: asim1: asim1:+1 + RUNNING: once + DEPENDENCIES: + asim: + SPLITS_FROM: + 2,3: # [2:3] is also valid + splits_to: 1,2*,3* # 1,[2:3]* is also valid, you can also specify the step with [2:3:step] + SPLITS: 3 -The resulting workflow can be seen in Figure :numref:`split` +In this example: -.. figure:: fig/split.png - :name: split +Post job will be split into 2 parts. +Each part will depend on the 1st part of the asim job. +The 2nd part of the post job will depend on the 2nd part of the asim job. +The 3rd part of the post job will depend on the 3rd part of the asim job. + +.. figure:: fig/splits_1_to_1.png + :name: split_1_to_1 :width: 100% :align: center - :alt: simple workflow plot + :alt: 1-to-1 - Example showing the job ASIM divided into 3 parts for each chunk. +Example2: N-to-1 dependency + +.. code-block:: yaml + + TEST: + FILE: TEST.sh + RUNNING: once + SPLITS: '4' + TEST2: + FILE: TEST2.sh + DEPENDENCIES: + TEST: + SPLITS_FROM: + "[1:2]": + SPLITS_TO: "[1:4]*\\2" + RUNNING: once + SPLITS: '2' + +.. figure:: fig/splits_n_to_1.png + :name: N_to_1 + :width: 100% + :align: center + :alt: N_to_1 + +Example3: 1-to-N dependency + +.. code-block:: yaml + + TEST: + FILE: TEST.sh + RUNNING: once + SPLITS: '2' + TEST2: + FILE: TEST2.sh + DEPENDENCIES: + TEST: + SPLITS_FROM: + "[1:4]": + SPLITS_TO: "[1:2]*\\2" + RUNNING: once + SPLITS: '4' + +.. figure:: fig/splits_1_to_n.png + :name: 1_to_N + :width: 100% + :align: center + :alt: 1_to_N Job delay ~~~~~~~~~ diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py new file mode 100644 index 0000000000000000000000000000000000000000..35dca3350841c6a7a0d38ea7c72811893cbd569c --- /dev/null +++ b/test/unit/test_checkpoints.py @@ -0,0 +1,170 @@ +from unittest import TestCase + +import inspect +import shutil +import tempfile +from mock import Mock, MagicMock +from random import randrange + +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_list import JobList +from autosubmit.job.job_list_persistence import JobListPersistenceDb +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory + + +class TestJobList(TestCase): + def setUp(self): + self.experiment_id = 'random-id' + self.as_conf = Mock() + self.as_conf.experiment_data = dict() + self.as_conf.experiment_data["JOBS"] = dict() + self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] + self.as_conf.experiment_data["PLATFORMS"] = dict() + self.temp_directory = tempfile.mkdtemp() + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), + JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + dummy_serial_platform = MagicMock() + dummy_serial_platform.name = 'serial' + dummy_platform = MagicMock() + dummy_platform.serial_platform = dummy_serial_platform + dummy_platform.name = 'dummy_platform' + # creating jobs for self list + self.completed_job = self._createDummyJobWithStatus(Status.COMPLETED) + self.completed_job.platform = dummy_platform + self.completed_job2 = self._createDummyJobWithStatus(Status.COMPLETED) + self.completed_job2.platform = dummy_platform + self.completed_job3 = self._createDummyJobWithStatus(Status.COMPLETED) + self.completed_job3.platform = dummy_platform + self.completed_job4 = self._createDummyJobWithStatus(Status.COMPLETED) + self.completed_job4.platform = dummy_platform + self.submitted_job = self._createDummyJobWithStatus(Status.SUBMITTED) + self.submitted_job.platform = dummy_platform + self.submitted_job2 = self._createDummyJobWithStatus(Status.SUBMITTED) + self.submitted_job2.platform = dummy_platform + self.submitted_job3 = self._createDummyJobWithStatus(Status.SUBMITTED) + self.submitted_job3.platform = dummy_platform + + self.running_job = self._createDummyJobWithStatus(Status.RUNNING) + self.running_job.platform = dummy_platform + self.running_job2 = self._createDummyJobWithStatus(Status.RUNNING) + self.running_job2.platform = dummy_platform + + self.queuing_job = self._createDummyJobWithStatus(Status.QUEUING) + self.queuing_job.platform = dummy_platform + + self.failed_job = self._createDummyJobWithStatus(Status.FAILED) + self.failed_job.platform = dummy_platform + self.failed_job2 = self._createDummyJobWithStatus(Status.FAILED) + self.failed_job2.platform = dummy_platform + self.failed_job3 = self._createDummyJobWithStatus(Status.FAILED) + self.failed_job3.platform = dummy_platform + self.failed_job4 = self._createDummyJobWithStatus(Status.FAILED) + self.failed_job4.platform = dummy_platform + self.ready_job = self._createDummyJobWithStatus(Status.READY) + self.ready_job.platform = dummy_platform + self.ready_job2 = self._createDummyJobWithStatus(Status.READY) + self.ready_job2.platform = dummy_platform + self.ready_job3 = self._createDummyJobWithStatus(Status.READY) + self.ready_job3.platform = dummy_platform + + self.waiting_job = self._createDummyJobWithStatus(Status.WAITING) + self.waiting_job.platform = dummy_platform + self.waiting_job2 = self._createDummyJobWithStatus(Status.WAITING) + self.waiting_job2.platform = dummy_platform + + self.unknown_job = self._createDummyJobWithStatus(Status.UNKNOWN) + self.unknown_job.platform = dummy_platform + + + self.job_list._job_list = [self.completed_job, self.completed_job2, self.completed_job3, self.completed_job4, + self.submitted_job, self.submitted_job2, self.submitted_job3, self.running_job, + self.running_job2, self.queuing_job, self.failed_job, self.failed_job2, + self.failed_job3, self.failed_job4, self.ready_job, self.ready_job2, + self.ready_job3, self.waiting_job, self.waiting_job2, self.unknown_job] + self.waiting_job.parents.add(self.ready_job) + self.waiting_job.parents.add(self.completed_job) + self.waiting_job.parents.add(self.failed_job) + self.waiting_job.parents.add(self.submitted_job) + self.waiting_job.parents.add(self.running_job) + self.waiting_job.parents.add(self.queuing_job) + + def tearDown(self) -> None: + shutil.rmtree(self.temp_directory) + + def test_add_edge_job(self): + special_variables = dict() + special_variables["STATUS"] = Status.VALUE_TO_KEY[Status.COMPLETED] + special_variables["FROM_STEP"] = 0 + for p in self.waiting_job.parents: + self.waiting_job.add_edge_info(p, special_variables) + for parent in self.waiting_job.parents: + self.assertEqual(self.waiting_job.edge_info[special_variables["STATUS"]][parent.name], + (parent, special_variables.get("FROM_STEP", 0))) + + + def test_add_edge_info_joblist(self): + special_conditions = dict() + special_conditions["STATUS"] = Status.VALUE_TO_KEY[Status.COMPLETED] + special_conditions["FROM_STEP"] = 0 + self.job_list._add_edge_info(self.waiting_job, special_conditions["STATUS"]) + self.assertEqual(len(self.job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED],[])),1) + self.job_list._add_edge_info(self.waiting_job2, special_conditions["STATUS"]) + self.assertEqual(len(self.job_list.jobs_edges.get(Status.VALUE_TO_KEY[Status.COMPLETED],[])),2) + + def test_check_special_status(self): + self.waiting_job.edge_info = dict() + + self.job_list.jobs_edges = dict() + # Adds edge info for waiting_job in the list + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.COMPLETED]) + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.READY]) + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.RUNNING]) + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.SUBMITTED]) + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.QUEUING]) + self.job_list._add_edge_info(self.waiting_job, Status.VALUE_TO_KEY[Status.FAILED]) + # Adds edge info for waiting_job + special_variables = dict() + for p in self.waiting_job.parents: + special_variables["STATUS"] = Status.VALUE_TO_KEY[p.status] + special_variables["FROM_STEP"] = 0 + self.waiting_job.add_edge_info(p,special_variables) + # call to special status + jobs_to_check = self.job_list.check_special_status() + for job in jobs_to_check: + tmp = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent in self.jobs_edges["ALL"]] + assert len(tmp) == len(job.parents) + self.waiting_job.add_parent(self.waiting_job2) + for job in jobs_to_check: + tmp = [parent for parent in job.parents if + parent.status == Status.COMPLETED or parent in self.jobs_edges["ALL"]] + assert len(tmp) == len(job.parents) + + + + def _createDummyJobWithStatus(self, status): + job_name = str(randrange(999999, 999999999)) + job_id = randrange(1, 999) + job = Job(job_name, job_id, status, 0) + job.type = randrange(0, 2) + return job + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' diff --git a/test/unit/test_dependencies.py b/test/unit/test_dependencies.py index a08a4a73d713e0ce3bd103eb2bba5be1ab698edf..ab8b4e35726bfc60b5a44f1039d677062428e7f9 100644 --- a/test/unit/test_dependencies.py +++ b/test/unit/test_dependencies.py @@ -1,23 +1,67 @@ -import unittest - +import copy +import inspect import mock +import tempfile +import unittest from copy import deepcopy -from autosubmit.job.job_list import JobList +from datetime import datetime + from autosubmit.job.job import Job from autosubmit.job.job_common import Status -from datetime import datetime +from autosubmit.job.job_list import JobList +from autosubmit.job.job_list_persistence import JobListPersistenceDb +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory + + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + class TestJobList(unittest.TestCase): def setUp(self): + self.experiment_id = 'random-id' + self.as_conf = mock.Mock() + self.as_conf.experiment_data = dict() + self.as_conf.experiment_data["JOBS"] = dict() + self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] + self.as_conf.experiment_data["PLATFORMS"] = dict() + self.temp_directory = tempfile.mkdtemp() + self.JobList = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), + JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + self.date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] + self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"] + self.chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + self.split_list = [1, 2, 3, 4, 5] + self.JobList._date_list = self.date_list + self.JobList._member_list = self.member_list + self.JobList._chunk_list = self.chunk_list + self.JobList._split_list = self.split_list + + # Define common test case inputs here self.relationships_dates = { - "OPTIONAL": False, "DATES_FROM": { "20020201": { "MEMBERS_FROM": { "fc2": { - "DATES_TO": "20020201", + "DATES_TO": "[20020201:20020202]*,20020203", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL" + "CHUNKS_TO": "all" } }, "SPLITS_FROM": { @@ -29,18 +73,17 @@ class TestJobList(unittest.TestCase): } } self.relationships_dates_optional = deepcopy(self.relationships_dates) - self.relationships_dates_optional["DATES_FROM"]["20020201"]["MEMBERS_FROM"] = { "fc2?": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", "CHUNKS_TO": "ALL", "SPLITS_TO": "5" } } + self.relationships_dates_optional["DATES_FROM"]["20020201"]["MEMBERS_FROM"] = { "fc2?": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", "CHUNKS_TO": "all", "SPLITS_TO": "5" } } self.relationships_dates_optional["DATES_FROM"]["20020201"]["SPLITS_FROM"] = { "ALL": { "SPLITS_TO": "1?" } } self.relationships_members = { - "OPTIONAL": False, "MEMBERS_FROM": { "fc2": { "SPLITS_FROM": { "ALL": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } } @@ -48,23 +91,21 @@ class TestJobList(unittest.TestCase): } } self.relationships_chunks = { - "OPTIONAL": False, "CHUNKS_FROM": { "1": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } } } self.relationships_chunks2 = { - "OPTIONAL": False, "CHUNKS_FROM": { "1": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" }, "2": { @@ -77,14 +118,12 @@ class TestJobList(unittest.TestCase): } } - self.relationships_splits = { - "OPTIONAL": False, "SPLITS_FROM": { "1": { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } } @@ -93,9 +132,15 @@ class TestJobList(unittest.TestCase): self.relationships_general = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } + self.relationships_general_1_to_1 = { + "DATES_TO": "20020201", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1*,2*,3*,4*,5*" + } # Create a mock Job object self.mock_job = mock.MagicMock(spec=Job) @@ -108,103 +153,191 @@ class TestJobList(unittest.TestCase): self.mock_job.member = None self.mock_job.chunk = None self.mock_job.split = None + + def test_unify_to_filter(self): + """Test the _unify_to_fitler function""" + # :param unified_filter: Single dictionary with all filters_to + # :param filter_to: Current dictionary that contains the filters_to + # :param filter_type: "DATES_TO", "MEMBERS_TO", "CHUNKS_TO", "SPLITS_TO" + # :return: unified_filter + unified_filter = \ + { + "DATES_TO": "20020201", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1" + } + filter_to = \ + { + "DATES_TO": "20020205,[20020207:20020208]", + "MEMBERS_TO": "fc2,fc3", + "CHUNKS_TO": "all" + } + filter_type = "DATES_TO" + result = self.JobList._unify_to_filter(unified_filter, filter_to, filter_type) + expected_output = \ + { + "DATES_TO": "20020201,20020205,20020207,20020208,", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1" + } + self.assertEqual(result, expected_output) + def test_simple_dependency(self): - result_d = JobList._check_dates({}, self.mock_job) - result_m = JobList._check_members({}, self.mock_job) - result_c = JobList._check_chunks({}, self.mock_job) - result_s = JobList._check_splits({}, self.mock_job) + result_d = self.JobList._check_dates({}, self.mock_job) + result_m = self.JobList._check_members({}, self.mock_job) + result_c = self.JobList._check_chunks({}, self.mock_job) + result_s = self.JobList._check_splits({}, self.mock_job) self.assertEqual(result_d, {}) self.assertEqual(result_m, {}) self.assertEqual(result_c, {}) self.assertEqual(result_s, {}) - def test_check_dates_optional(self): - self.mock_job.date = datetime.strptime("20020201", "%Y%m%d") - self.mock_job.member = "fc2" - self.mock_job.chunk = 1 - self.mock_job.split = 1 - result = JobList._check_dates(self.relationships_dates_optional, self.mock_job) - expected_output = { - "DATES_TO": "20020201?", - "MEMBERS_TO": "fc2?", - "CHUNKS_TO": "ALL?", - "SPLITS_TO": "1?" - } + + def test_parse_filters_to_check(self): + """Test the _parse_filters_to_check function""" + result = self.JobList._parse_filters_to_check("20020201,20020202,20020203",self.date_list) + expected_output = ["20020201","20020202","20020203"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filters_to_check("20020201,[20020203:20020205]",self.date_list) + expected_output = ["20020201","20020203","20020204","20020205"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filters_to_check("[20020201:20020203],[20020205:20020207]",self.date_list) + expected_output = ["20020201","20020202","20020203","20020205","20020206","20020207"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filters_to_check("20020201",self.date_list) + expected_output = ["20020201"] + self.assertEqual(result, expected_output) + + def test_parse_filter_to_check(self): + # Call the function to get the result + # Value can have the following formats: + # a range: [0:], [:N], [0:N], [:-1], [0:N:M] ... + # a value: N + # a range with step: [0::M], [::2], [0::3], [::3] ... + result = self.JobList._parse_filter_to_check("20020201",self.date_list) + expected_output = ["20020201"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[20020201:20020203]",self.date_list) + expected_output = ["20020201","20020202","20020203"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[20020201:20020203:2]",self.date_list) + expected_output = ["20020201","20020203"] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[20020202:]",self.date_list) + expected_output = self.date_list[1:] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[:20020203]",self.date_list) + expected_output = self.date_list[:3] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[::2]",self.date_list) + expected_output = self.date_list[::2] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[20020203::]",self.date_list) + expected_output = self.date_list[2:] + self.assertEqual(result, expected_output) + result = self.JobList._parse_filter_to_check("[:20020203:]",self.date_list) + expected_output = self.date_list[:3] + self.assertEqual(result, expected_output) + # test with a member N:N + result = self.JobList._parse_filter_to_check("[fc2:fc3]",self.member_list) + expected_output = ["fc2","fc3"] self.assertEqual(result, expected_output) + # test with a chunk + result = self.JobList._parse_filter_to_check("[1:2]",self.chunk_list,level_to_check="CHUNKS_FROM") + expected_output = [1,2] + self.assertEqual(result, expected_output) + # test with a split + result = self.JobList._parse_filter_to_check("[1:2]",self.split_list,level_to_check="SPLITS_FROM") + expected_output = [1,2] + self.assertEqual(result, expected_output) + + def test_check_dates(self): # Call the function to get the result self.mock_job.date = datetime.strptime("20020201", "%Y%m%d") self.mock_job.member = "fc2" self.mock_job.chunk = 1 self.mock_job.split = 1 - result = JobList._check_dates(self.relationships_dates, self.mock_job) + result = self.JobList._check_dates(self.relationships_dates, self.mock_job) expected_output = { - "DATES_TO": "20020201", + "DATES_TO": "20020201*,20020202*,20020203", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } self.assertEqual(result, expected_output) - self.mock_job.date = datetime.strptime("20020202", "%Y%m%d") - result = JobList._check_dates(self.relationships_dates, self.mock_job) + # failure + self.mock_job.date = datetime.strptime("20020301", "%Y%m%d") + result = self.JobList._check_dates(self.relationships_dates, self.mock_job) self.assertEqual(result, {}) + + def test_check_members(self): # Call the function to get the result self.mock_job.date = datetime.strptime("20020201", "%Y%m%d") self.mock_job.member = "fc2" - result = JobList._check_members(self.relationships_members, self.mock_job) + result = self.JobList._check_members(self.relationships_members, self.mock_job) expected_output = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } self.assertEqual(result, expected_output) self.mock_job.member = "fc3" - result = JobList._check_members(self.relationships_members, self.mock_job) + result = self.JobList._check_members(self.relationships_members, self.mock_job) + self.assertEqual(result, {}) + # FAILURE + self.mock_job.member = "fc99" + result = self.JobList._check_members(self.relationships_members, self.mock_job) self.assertEqual(result, {}) + def test_check_splits(self): # Call the function to get the result self.mock_job.split = 1 - result = JobList._check_splits(self.relationships_splits, self.mock_job) + result = self.JobList._check_splits(self.relationships_splits, self.mock_job) expected_output = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } self.assertEqual(result, expected_output) self.mock_job.split = 2 - result = JobList._check_splits(self.relationships_splits, self.mock_job) + result = self.JobList._check_splits(self.relationships_splits, self.mock_job) + self.assertEqual(result, {}) + # failure + self.mock_job.split = 99 + result = self.JobList._check_splits(self.relationships_splits, self.mock_job) self.assertEqual(result, {}) + def test_check_chunks(self): # Call the function to get the result self.mock_job.chunk = 1 - result = JobList._check_chunks(self.relationships_chunks, self.mock_job) + result = self.JobList._check_chunks(self.relationships_chunks, self.mock_job) expected_output = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } self.assertEqual(result, expected_output) self.mock_job.chunk = 2 - result = JobList._check_chunks(self.relationships_chunks, self.mock_job) + result = self.JobList._check_chunks(self.relationships_chunks, self.mock_job) self.assertEqual(result, {}) - # test splits_from - self.mock_job.split = 5 - result = JobList._check_chunks(self.relationships_chunks2, self.mock_job) - expected_output2 = { - "SPLITS_TO": "2" - } - self.assertEqual(result, expected_output2) - self.mock_job.split = 1 - result = JobList._check_chunks(self.relationships_chunks2, self.mock_job) + # failure + self.mock_job.chunk = 99 + result = self.JobList._check_chunks(self.relationships_chunks, self.mock_job) self.assertEqual(result, {}) + + + def test_check_general(self): # Call the function to get the result @@ -212,28 +345,29 @@ class TestJobList(unittest.TestCase): self.mock_job.member = "fc2" self.mock_job.chunk = 1 self.mock_job.split = 1 - result = JobList._filter_current_job(self.mock_job,self.relationships_general) + result = self.JobList._filter_current_job(self.mock_job,self.relationships_general) expected_output = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } self.assertEqual(result, expected_output) def test_valid_parent(self): - # Call the function to get the result - date_list = ["20020201"] + # Call the function to get the result + date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] member_list = ["fc1", "fc2", "fc3"] chunk_list = [1, 2, 3] + self.mock_job.splits = 10 is_a_natural_relation = False # Filter_to values filter_ = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1" } # PArent job values @@ -241,18 +375,192 @@ class TestJobList(unittest.TestCase): self.mock_job.member = "fc2" self.mock_job.chunk = 1 self.mock_job.split = 1 - result = JobList._valid_parent(self.mock_job, date_list, member_list, chunk_list, is_a_natural_relation, filter_) + child = copy.deepcopy(self.mock_job) + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) # it returns a tuple, the first element is the result, the second is the optional flag - self.assertEqual(result, (True,False)) + self.assertEqual(result, True) filter_ = { "DATES_TO": "20020201", "MEMBERS_TO": "fc2", - "CHUNKS_TO": "ALL", + "CHUNKS_TO": "all", "SPLITS_TO": "1?" } - result = JobList._valid_parent(self.mock_job, date_list, member_list, chunk_list, is_a_natural_relation, filter_) - self.assertEqual(result, (True,True)) + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + filter_ = { + "DATES_TO": "20020201", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1?" + } + self.mock_job.split = 2 + + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + filter_ = { + "DATES_TO": "[20020201:20020205]", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1" + } + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + filter_ = { + "DATES_TO": "[20020201:20020205]", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "all", + "SPLITS_TO": "1" + } + self.mock_job.date = datetime.strptime("20020206", "%Y%m%d") + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + filter_ = { + "DATES_TO": "[20020201:20020205]", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "[2:4]", + "SPLITS_TO": "[1:5]" + } + self.mock_job.date = datetime.strptime("20020201", "%Y%m%d") + self.mock_job.chunk = 2 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + + + def test_valid_parent_1_to_1(self): + child = copy.deepcopy(self.mock_job) + child.splits = 6 + date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] + member_list = ["fc1", "fc2", "fc3"] + chunk_list = [1, 2, 3] + is_a_natural_relation = False + + # Test 1_to_1 + filter_ = { + "DATES_TO": "[20020201:20020202],20020203,20020204,20020205", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "1,2,3,4,5,6", + "SPLITS_TO": "1*,2*,3*,4*,5*,6" + } + self.mock_job.splits = 6 + self.mock_job.split = 1 + self.mock_job.date = datetime.strptime("20020204", "%Y%m%d") + self.mock_job.chunk = 5 + child.split = 1 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + + def test_valid_parent_1_to_n(self): + self.mock_job.date = datetime.strptime("20020204", "%Y%m%d") + self.mock_job.chunk = 5 + child = copy.deepcopy(self.mock_job) + child.splits = 4 + self.mock_job.splits = 2 + + date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] + member_list = ["fc1", "fc2", "fc3"] + chunk_list = [1, 2, 3] + is_a_natural_relation = False + + # Test 1_to_N + filter_ = { + "DATES_TO": "[20020201:20020202],20020203,20020204,20020205", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "1,2,3,4,5,6", + "SPLITS_TO": "1*\\2,2*\\2" + } + child.split = 1 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 2 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 3 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 4 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + + child.split = 1 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 2 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 3 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 4 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + + def test_valid_parent_n_to_1(self): + self.mock_job.date = datetime.strptime("20020204", "%Y%m%d") + self.mock_job.chunk = 5 + child = copy.deepcopy(self.mock_job) + child.splits = 2 + self.mock_job.splits = 4 + + date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] + member_list = ["fc1", "fc2", "fc3"] + chunk_list = [1, 2, 3] + is_a_natural_relation = False + + # Test N_to_1 + filter_ = { + "DATES_TO": "[20020201:20020202],20020203,20020204,20020205", + "MEMBERS_TO": "fc2", + "CHUNKS_TO": "1,2,3,4,5,6", + "SPLITS_TO": "1*\\2,2*\\2,3*\\2,4*\\2" + } + child.split = 1 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 1 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 1 + self.mock_job.split = 3 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 1 + self.mock_job.split = 4 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + + child.split = 2 + self.mock_job.split = 1 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 2 + self.mock_job.split = 2 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, False) + child.split = 2 + self.mock_job.split = 3 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) + child.split = 2 + self.mock_job.split = 4 + result = self.JobList._valid_parent(self.mock_job, member_list, date_list, chunk_list, is_a_natural_relation, filter_,child) + self.assertEqual(result, True) if __name__ == '__main__': unittest.main() diff --git a/test/unit/test_job.py b/test/unit/test_job.py index caaf9c60a2da9478839ee8cc400f03422ad731f5..f1bfbcbac688a596d7a835e5444d2b6566a3e618 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -1,17 +1,19 @@ from unittest import TestCase + +import datetime +import inspect import os import sys -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.job.job_common import Status -from autosubmit.job.job import Job -from autosubmit.platforms.platform import Platform from mock import Mock, MagicMock from mock import patch -import datetime - # compatibility with both versions (2 & 3) from sys import version_info +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.platforms.platform import Platform +from autosubmitconfigparser.config.configcommon import AutosubmitConfig + if version_info.major == 2: import builtins as builtins else: @@ -362,7 +364,6 @@ class TestJob(TestCase): self.job.date_format = test[1] self.assertEquals(test[2], self.job.sdate) -import inspect class FakeBasicConfig: def __init__(self): pass diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index 0a3f6b3b414b58da78f50b9bedf830c1f99c28f8..e546b764d73f6c1301c9beb694a9d93bbd12af4b 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -275,7 +275,6 @@ class TestJobList(TestCase): job.type = randrange(0, 2) return job -import inspect class FakeBasicConfig: def __init__(self): pass