diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 8900f7aa2fc1dda0d85518adebaf83d80a3e5975..b5d7d2ea9a4981f818233e72d234d82abe08615a 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -23,13 +23,12 @@ except ImportError: # noinspection PyCompatibility from ConfigParser import SafeConfigParser import json - +import re import os import pickle from time import localtime, strftime from sys import setrecursionlimit from shutil import move - from autosubmit.job.job import Job from bscearth.utils.log import Log from autosubmit.job.job_dict import DicJobs @@ -208,19 +207,40 @@ class JobList: dependency_running_type = dic_jobs.get_option(section, 'RUNNING', 'once').lower() delay = int(dic_jobs.get_option(section, 'DELAY', -1)) select_chunks_opt = dic_jobs.get_option(job_section, 'SELECT_CHUNKS', None) - select_chunks = [] + selected_chunks = [] if select_chunks_opt is not None: if '*' in select_chunks_opt: - if ',' in select_chunks_opt: - sections_chunks= select_chunks_opt.split(' ') - else: - sections_chunks = select_chunks_opt.split(',') + sections_chunks = select_chunks_opt.split(' ') for section_chunk in sections_chunks: info=section_chunk.split('*') if info[0] in key: - select_chunks.append(info[1]) - - dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits, select_chunks) + for relation in range(1,len(info)): + auxiliar_relation_list=[] + for location in info[relation].split('-'): + auxiliar_chunk_list = [] + location = location.strip('[').strip(']') + if ':' in location: + if len(location) == 3: + for chunk_number in range(int(location[0]),int(location[2])+1): + auxiliar_chunk_list.append(chunk_number) + elif len(location) == 2: + if ':' == location[0]: + for chunk_number in range(0, int(location[1])+1): + auxiliar_chunk_list.append(chunk_number) + elif ':' == location[1]: + for chunk_number in range(int(location[0])+1,len(dic_jobs._chunk_list)-1): + auxiliar_chunk_list.append(chunk_number) + elif ',' in location: + for chunk in location.split(','): + auxiliar_chunk_list.append(int(chunk)) + elif re.match('^[0-9]+$',location): + auxiliar_chunk_list.append(int(location)) + auxiliar_relation_list.append(auxiliar_chunk_list) + selected_chunks.append(auxiliar_relation_list) + if len(selected_chunks) >= 1: + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,selected_chunks) #[]select_chunks_dest,select_chunks_orig + else: + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,[]) #[]select_chunks_dest,select_chunks_orig dependencies[key] = dependency return dependencies @@ -246,30 +266,45 @@ class JobList: graph): for key in dependencies_keys: dependency = dependencies[key] - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list, job.member, member_list, job.date, date_list, dependency) if skip: continue - parents_jobs=dic_jobs.get_jobs(dependency.section, date, member, chunk) - for parent in parents_jobs: - if dependency.delay == -1 or chunk > dependency.delay: - if isinstance(parent, list): - if job.split is not None: - parent = filter(lambda _parent: _parent.split == job.split, parent)[0] + chunk_relations_to_add=list() + if len(dependency.select_chunks_orig) > 0: # find chunk relation + relation_indx = 0 + while relation_indx < len(dependency.select_chunks_orig): + if len(dependency.select_chunks_orig[relation_indx]) == 0 or job.chunk in dependency.select_chunks_orig[relation_indx] or job.chunk is None: + chunk_relations_to_add.append(relation_indx) + relation_indx+=1 + relation_indx -= 1 + + if len(dependency.select_chunks_orig) <= 0 or job.chunk is None or len(chunk_relations_to_add) > 0 : #If doesn't contain select_chunks or running isn't chunk . ... + parents_jobs=dic_jobs.get_jobs(dependency.section, date, member, chunk) + for parent in parents_jobs: + if dependency.delay == -1 or chunk > dependency.delay: + if isinstance(parent, list): + if job.split is not None: + parent = filter(lambda _parent: _parent.split == job.split, parent)[0] + else: + if dependency.splits is not None: + parent = filter(lambda _parent: _parent.split in dependency.splits, parent) + if len(dependency.select_chunks_dest) <= 0 or parent.chunk is None: + job.add_parent(parent) + JobList._add_edge(graph, job, parent) else: - if dependency.splits is not None: - parent = filter(lambda _parent: _parent.split in dependency.splits, parent) - if len(dependency.select_chunks) == 0 or str(parent.chunk) in dependency.select_chunks: - job.add_parent(parent) - JobList._add_edge(graph, job, parent) - + visited_parents=set() + for relation_indx in chunk_relations_to_add: + if parent.chunk in dependency.select_chunks_dest[relation_indx] or len(dependency.select_chunks_dest[relation_indx]) == 0: + if parent not in visited_parents: + job.add_parent(parent) + JobList._add_edge(graph, job, parent) + visited_parents.add(parent) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph) - @staticmethod def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency): skip = False diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 941bd8c425f78b06446e4c5c89c91f9d42d02835..14cc0030ee06c9ea0cb97a438b2c076e5c06601a 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -51,5 +51,15 @@ class Dependency(object): self.sign = sign self.delay = delay self.splits = splits - self.select_chunks = select_chunks + self.select_chunks_dest = list() + self.select_chunks_orig = list() + for chunk_relation in select_chunks: + self.select_chunks_dest.append(chunk_relation[0]) + if len(chunk_relation) > 1: + self.select_chunks_orig.append(chunk_relation[1]) + else: + self.select_chunks_orig.append([]) + + +