From 212ccb89f6b6df59112346d2f54355e80f25eca9 Mon Sep 17 00:00:00 2001 From: Larissa Batista Leite Date: Wed, 13 Dec 2017 19:02:09 +0100 Subject: [PATCH 1/9] Initial solution for job SPLITS option --- autosubmit/job/job.py | 1 + autosubmit/job/job_dict.py | 58 +++++++++++++++++++++++++++++--------- autosubmit/job/job_list.py | 21 ++++++++++---- test/unit/test_dic_jobs.py | 7 +++-- 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c81a083aa..4e662971e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -69,6 +69,7 @@ class Job(object): self.member = None self.date = None self.name = name + self.split = None self._long_name = None self.long_name = name self.date_format = '' diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 53bcdca4e..b0051577a 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -79,7 +79,8 @@ class DicJobs: elif running == 'chunk': synchronize = self.get_option(section, "SYNCHRONIZE", None) delay = int(self.get_option(section, "DELAY", -1)) - self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, jobs_data) + splits = int(self.get_option(section, "SPLITS", 0)) + self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, splits, jobs_data) def _create_jobs_once(self, section, priority, default_job_type, jobs_data=dict()): """ @@ -142,7 +143,7 @@ class DicJobs: conditional decision to the father which makes the call ''' - def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, jobs_data=dict()): + def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, jobs_data=dict()): """ Create jobs to be run once per chunk @@ -165,14 +166,27 @@ class DicJobs: count += 1 if delay == -1 or delay < chunk: if count % frequency == 0 or count == len(self._chunk_list): - if synchronize == 'date': - tmp_dic[chunk] = self.build_job(section, priority, None, None, - chunk, default_job_type, jobs_data) - elif synchronize == 'member': - tmp_dic[chunk] = dict() - for date in self._date_list: - tmp_dic[chunk][date] = self.build_job(section, priority, date, None, - chunk, default_job_type, jobs_data) + if splits > 0: + if synchronize == 'date': + tmp_dic[chunk] = [] + self._create_jobs_split(splits, section, None, None, chunk, priority, + default_job_type, jobs_data, tmp_dic[chunk]) + elif synchronize == 'member': + tmp_dic[chunk] = dict() + for date in self._date_list: + tmp_dic[chunk][date] = [] + self._create_jobs_split(splits, section, date, None, chunk, priority, + default_job_type, jobs_data, tmp_dic[chunk][date]) + + else: + if synchronize == 'date': + tmp_dic[chunk] = self.build_job(section, priority, None, None, + chunk, default_job_type, jobs_data) + elif synchronize == 'member': + tmp_dic[chunk] = dict() + for date in self._date_list: + tmp_dic[chunk][date] = self.build_job(section, priority, date, None, + chunk, default_job_type, jobs_data) # Real dic jobs assignment/creation self._dic[section] = dict() for date in self._date_list: @@ -188,10 +202,22 @@ class DicJobs: self._dic[section][date][member][chunk] = tmp_dic[chunk] elif synchronize == 'member': self._dic[section][date][member][chunk] = tmp_dic[chunk][date] - else: + + if splits > 0 and synchronize is None: + self._dic[section][date][member][chunk] = [] + self._create_jobs_split(splits, section, date, member, chunk, priority, default_job_type, jobs_data, self._dic[section][date][member][chunk]) + elif synchronize is None: self._dic[section][date][member][chunk] = self.build_job(section, priority, date, member, - chunk, default_job_type, jobs_data) - self._jobs_list.graph.add_node(self._dic[section][date][member][chunk].name) + chunk, default_job_type, jobs_data) + self._jobs_list.graph.add_node(self._dic[section][date][member][chunk].name) + + def _create_jobs_split(self, splits, section, date, member, chunk, priority, default_job_type, jobs_data, dict): + total_jobs = 1 + while total_jobs <= splits: + job = self.build_job(section, priority, date, member, chunk, default_job_type, jobs_data, total_jobs) + dict.append(job) + self._jobs_list.graph.add_node(job.name) + total_jobs += 1 def get_jobs(self, section, date=None, member=None, chunk=None): """ @@ -257,7 +283,7 @@ class DicJobs: jobs.append(dic[c]) return jobs - def build_job(self, section, priority, date, member, chunk, default_job_type, jobs_data=dict()): + def build_job(self, section, priority, date, member, chunk, default_job_type, jobs_data=dict(), split=-1): name = self._jobs_list.expid if date is not None: name += "_" + date2str(date, self._date_format) @@ -265,6 +291,8 @@ class DicJobs: name += "_" + member if chunk is not None: name += "_{0}".format(chunk) + if split > -1: + name += "_{0}".format(split) name += "_" + section if name in jobs_data: job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) @@ -277,6 +305,8 @@ class DicJobs: job.member = member job.chunk = chunk job.date_format = self._date_format + if split > -1: + job.split = split job.frequency = int(self.get_option(section, "FREQUENCY", 1)) job.delay = int(self.get_option(section, "DELAY", -1)) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 74af5a139..8f7556077 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -66,7 +66,7 @@ class JobList: self._persistence = job_list_persistence self._graph = DiGraph() - self._packages_dict = dict() + self.packages_dict = dict() self._ordered_jobs_by_date_member = dict() @property @@ -161,8 +161,13 @@ class JobList: dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs) for job in dic_jobs.get_jobs(job_section): - JobList._manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, - dependencies, graph) + num_jobs = 1 + if isinstance(job, list): + num_jobs = len(job) + for i in range(num_jobs): + _job = job[i] if num_jobs > 1 else job + JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, + dependencies, graph) @staticmethod def _manage_dependencies(dependencies_keys, dic_jobs): @@ -189,6 +194,7 @@ class JobList: graph): for key in dependencies_keys: dependency = dependencies[key] + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list, job.member, member_list, job.date, date_list, @@ -199,8 +205,13 @@ class JobList: for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk): # only creates the dependency in the graph if the delay is not defined or if the chunk is greater than it if dependency.delay == -1 or chunk > dependency.delay: - job.add_parent(parent) - graph.add_edge(parent.name, job.name) + num_parents = 1 + if isinstance(parent, list): + num_parents = len(parent) + for i in range(num_parents): + _parent = parent[i] if num_parents > 1 else parent + job.add_parent(_parent) + graph.add_edge(_parent.name, job.name) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph) diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 7448c02ff..6a595479f 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -98,9 +98,10 @@ class TestDicJobs(TestCase): frequency = 123 synchronize = 'date' delay = -1 + splits = 0 self.parser_mock.has_option = Mock(return_value=True) self.parser_mock.get = Mock(return_value='chunk') - self.dictionary.get_option = Mock(side_effect=[frequency, synchronize, delay]) + self.dictionary.get_option = Mock(side_effect=[frequency, synchronize, delay, splits]) self.dictionary._create_jobs_once = Mock() self.dictionary._create_jobs_startdate = Mock() self.dictionary._create_jobs_member = Mock() @@ -113,7 +114,7 @@ class TestDicJobs(TestCase): self.dictionary._create_jobs_once.assert_not_called() self.dictionary._create_jobs_startdate.assert_not_called() self.dictionary._create_jobs_member.assert_not_called() - self.dictionary._create_jobs_chunk.assert_called_once_with(section, priority, frequency, Type.BASH, synchronize, delay, {}) + self.dictionary._create_jobs_chunk.assert_called_once_with(section, priority, frequency, Type.BASH, synchronize, delay, splits, {}) def test_dic_creates_right_jobs_by_startdate(self): # arrange @@ -158,7 +159,7 @@ class TestDicJobs(TestCase): self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, dict()) + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH) # assert self.assertEquals(len(self.date_list) * len(self.member_list) * len(self.chunk_list), -- GitLab From d403026de101edca78113177ace22b08da056747 Mon Sep 17 00:00:00 2001 From: Larissa Batista Leite Date: Thu, 14 Dec 2017 19:09:19 +0100 Subject: [PATCH 2/9] Dependencies for SPLIT jobs --- autosubmit/job/job_dict.py | 2 +- autosubmit/job/job_list.py | 36 +++++++++++++++++++++++++++++++----- autosubmit/job/job_utils.py | 3 ++- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index b0051577a..5ef753436 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -140,7 +140,7 @@ class DicJobs: ''' Maybe a good choice could be split this function or ascend the - conditional decision to the father which makes the call + conditional decision to the parent which makes the call ''' def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, jobs_data=dict()): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 8f7556077..a96ea682d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -180,15 +180,36 @@ class JobList: sign = '-' if '-' in key else '+' key_split = key.split(sign) section = key_split[0] + splits = None + if '[' in section: + section_name = section[0:section.find("[")] + splits_section = int(dic_jobs.get_option(section_name, 'SPLITS', 0)) + splits = JobList._calculate_splits_dependencies(section, splits_section) + section = section_name distance = key_split[1] dependency_running_type = dic_jobs.get_option(section, 'RUNNING', 'once').lower() - dependency = Dependency(section, int(distance), dependency_running_type, sign) + dependency = Dependency(section, int(distance), dependency_running_type, sign, splits=splits) delay = int(dic_jobs.get_option(section, 'DELAY', -1)) dependency.delay = delay dependencies[key] = dependency return dependencies + @staticmethod + def _calculate_splits_dependencies(section, max_splits): + splits_list = section[section.find("[") + 1:section.find("]")] + splits = [] + for str_split in splits_list.split(","): + if str_split.find(":") != -1: + numbers = str_split.split(":") + max_splits = min(int(numbers[1]), max_splits) + for count in range(int(numbers[0]), max_splits+1): + splits.append(int(str(count).zfill(len(numbers[0])))) + else: + if int(str_split) <= max_splits: + splits.append(int(str_split)) + return splits + @staticmethod def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph): @@ -203,13 +224,18 @@ class JobList: continue for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk): - # only creates the dependency in the graph if the delay is not defined or if the chunk is greater than it if dependency.delay == -1 or chunk > dependency.delay: num_parents = 1 if isinstance(parent, list): - num_parents = len(parent) + if job.split is not None: + parent = filter(lambda _parent: _parent.split == job.split, parent)[0] + else: + if dependency.splits is not None: + parent = filter(lambda _parent: _parent.split in dependency.splits, parent) + num_parents = len(parent) + for i in range(num_parents): - _parent = parent[i] if num_parents > 1 else parent + _parent = parent[i] if isinstance(parent, list) else parent job.add_parent(_parent) graph.add_edge(_parent.name, job.name) @@ -320,7 +346,7 @@ class JobList: sections_running_type_map = dict() for section in wrapper_expression.split(" "): - sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", '') + sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", 'once') for date in self._date_list: str_date = self._get_date(date) diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 9b9c5efe6..c772f0e63 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -44,10 +44,11 @@ class Dependency(object): """ - def __init__(self, section, distance=None, running=None, sign=None, delay=-1): + def __init__(self, section, distance=None, running=None, sign=None, delay=-1, splits=None): self.section = section self.distance = distance self.running = running self.sign = sign self.delay = delay + self.splits = splits -- GitLab From 25b9b009f234f795a40a0769cdc9ba75cf5ba9db Mon Sep 17 00:00:00 2001 From: Larissa Batista Leite Date: Fri, 15 Dec 2017 11:59:48 +0100 Subject: [PATCH 3/9] Small refactoring --- autosubmit/job/job.py | 19 ++++++++++++------- autosubmit/job/job_list.py | 23 ++++++++++++++--------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4e662971e..782cfd5b5 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -274,16 +274,21 @@ class Job(object): self.fail_count += 1 # Maybe should be renamed to the plural? - def add_parent(self, *new_parent): + def add_parent(self, *parents): """ Add parents for the job. It also adds current job as a child for all the new parents - :param new_parent: job's parents to add - :type new_parent: *Job - """ - for parent in new_parent: - self._parents.add(parent) - parent.__add_child(self) + :param parents: job's parents to add + :type parents: *Job + """ + for parent in parents: + num_parents = 1 + if isinstance(parent, list): + num_parents = len(parent) + for i in range(num_parents): + new_parent = parent[i] if isinstance(parent, list) else parent + self._parents.add(new_parent) + new_parent.__add_child(self) def __add_child(self, new_child): """ diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a96ea682d..40b8ca262 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -225,19 +225,15 @@ class JobList: for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk): if dependency.delay == -1 or chunk > dependency.delay: - num_parents = 1 if isinstance(parent, list): if job.split is not None: parent = filter(lambda _parent: _parent.split == job.split, parent)[0] else: if dependency.splits is not None: parent = filter(lambda _parent: _parent.split in dependency.splits, parent) - num_parents = len(parent) - for i in range(num_parents): - _parent = parent[i] if isinstance(parent, list) else parent - job.add_parent(_parent) - graph.add_edge(_parent.name, job.name) + job.add_parent(parent) + JobList._add_edge(graph, job, parent) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph) @@ -303,7 +299,7 @@ class JobList: for distance in range(1, max_distance): for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance): job.add_parent(parent) - graph.add_edge(parent.name, job.name) + JobList._add_edge(graph, job, parent) elif job.member is not None: member_index = member_list.index(job.member) max_distance = (member_index + 1) % job.frequency @@ -313,7 +309,7 @@ class JobList: for parent in dic_jobs.get_jobs(section_name, date, member_list[member_index - distance], chunk): job.add_parent(parent) - graph.add_edge(parent.name, job.name) + JobList._add_edge(graph, job, parent) elif job.date is not None: date_index = date_list.index(job.date) max_distance = (date_index + 1) % job.frequency @@ -323,7 +319,16 @@ class JobList: for parent in dic_jobs.get_jobs(section_name, date_list[date_index - distance], member, chunk): job.add_parent(parent) - graph.add_edge(parent.name, job.name) + JobList._add_edge(graph, job, parent) + + @staticmethod + def _add_edge(graph, job, parents): + num_parents = 1 + if isinstance(parents, list): + num_parents = len(parents) + for i in range(num_parents): + parent = parents[i] if isinstance(parents, list) else parents + graph.add_edge(parent.name, job.name) @staticmethod def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): -- GitLab From c42c34ce0ec35c65e6517db3c9581d2a9f28eaf8 Mon Sep 17 00:00:00 2001 From: Larissa Batista Leite Date: Fri, 5 Jan 2018 17:44:36 +0100 Subject: [PATCH 4/9] Grouping in visualization, missing status --- autosubmit/autosubmit.py | 76 ++++++++++++++++++++++++++++++----- autosubmit/job/job_list.py | 11 ++--- autosubmit/monitor/monitor.py | 60 ++++++++++++++++++++------- 3 files changed, 115 insertions(+), 32 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0218a8735..803e78eb8 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -170,12 +170,14 @@ class Autosubmit: subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', help='chooses type of output for generated plot') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split'), default=None, + help='Groups the jobs by date, member, chunk or split') group = subparser.add_mutually_exclusive_group(required=False) group.add_argument('-fl', '--list', type=str, help='Supply the list of job names to be changed. Default = "Any". ' 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') group.add_argument('-fc', '--filter_chunks', type=str, - help='Supply the list of chunks to change the status. Default = "Any". ' + help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), @@ -343,7 +345,7 @@ class Autosubmit: return Autosubmit.delete(args.expid, args.force) elif args.command == 'monitor': return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, - args.filter_type, args.hide, args.txt) + args.filter_type, args.hide, args.txt, args.group_by) elif args.command == 'stats': return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide) elif args.command == 'clean': @@ -741,6 +743,9 @@ class Autosubmit: package.submit(as_conf, job_list.parameters) + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id @@ -757,7 +762,7 @@ class Autosubmit: return save @staticmethod - def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False): + def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, group_by=None): """ Plots workflow graph for a given experiment with status of each job coded by node color. Plot is created in experiment's plot folder with name __