From a8f5b0c94860e67c6f16d03f41fc8bbb3b708156 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 2 Dec 2021 14:14:27 +0100 Subject: [PATCH 1/5] tentative test to split once jobs (not working) --- autosubmit/job/job_dict.py | 19 +++++++++++++++---- autosubmit/job/job_list.py | 13 ++++++++----- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index ae72e50f9..99ea6e4ae 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -72,9 +72,12 @@ class DicJobs: running = self._parser.get(section, 'RUNNING').lower() frequency = int(self.get_option(section, "FREQUENCY", 1)) if running == 'once': - self._create_jobs_once(section, priority, default_job_type, jobs_data) + splits = int(self.get_option(section, "SPLITS", 0)) + self._create_jobs_once(section, priority, default_job_type, jobs_data,splits) + elif running == 'date': self._create_jobs_startdate(section, priority, frequency, default_job_type, jobs_data) + elif running == 'member': self._create_jobs_member(section, priority, frequency, default_job_type, jobs_data) elif running == 'chunk': @@ -84,7 +87,7 @@ class DicJobs: self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, splits, jobs_data) pass - def _create_jobs_once(self, section, priority, default_job_type, jobs_data=dict()): + def _create_jobs_once(self, section, priority, default_job_type, jobs_data=dict(),splits=0): """ Create jobs to be run once @@ -93,8 +96,16 @@ class DicJobs: :param priority: priority for the jobs :type priority: int """ - self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) - self._jobs_list.graph.add_node(self._dic[section].name) + total_jobs = 1 + self._dic[section] = [] + while total_jobs <= splits: + job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, total_jobs) + self._dic[section].append(job) + self._jobs_list.graph.add_node(self._dic[section][total_jobs-1].name) + total_jobs += 1 + pass + #self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) + #self._jobs_list.graph.add_node(self._dic[section].name) def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict()): """ diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 03254edb5..015b3068d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -230,11 +230,14 @@ class JobList(object): job.children.add(jobc) # Perhaps this should be done by default independent of the wrapper_type supplied - for wrapper_section in wrapper_jobs: - if wrapper_jobs[wrapper_section] != 'None': - self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) - else: - self._ordered_jobs_by_date_member[wrapper_section] = {} + try: + for wrapper_section in wrapper_jobs: + if wrapper_jobs[wrapper_section] != 'None': + self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) + else: + self._ordered_jobs_by_date_member[wrapper_section] = {} + except BaseException as e: + raise AutosubmitCritical("Some of {0} are not in the current job_list defined in jobs.conf".format(wrapper_jobs),7000) pass -- GitLab From 2244dca301af5f9ec5b48d7dadf7728eb788c631 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 9 Dec 2021 14:29:40 +0100 Subject: [PATCH 2/5] splits working on once --- autosubmit/autosubmit.py | 6 +++--- autosubmit/job/job_dict.py | 16 +++++++++++++--- autosubmit/job/job_list.py | 3 +-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 678efe572..89a0473dc 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2736,9 +2736,9 @@ class Autosubmit: backup_files = [] backup_conf = [] error = False + err_message = 'Invalid Configuration:' for platform in platforms: # Checks - err_message = 'Invalid Configuration:' Log.info( "Checking [{0}] from platforms configuration...", platform) if as_conf.get_migrate_user_to(platform) == '': @@ -2759,8 +2759,8 @@ class Autosubmit: err_message += "\nTEMP_DIR {0}, does not exists in [{1}]".format( p.temp_dir, platform) error = True - if error: - raise AutosubmitCritical(err_message, 7014) + if error: + raise AutosubmitCritical(err_message, 7014) for platform in platforms: if as_conf.get_migrate_project_to(platform) != '': Log.info("Project in platform configuration file successfully updated to {0}", diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 99ea6e4ae..c93aca1fa 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -96,14 +96,20 @@ class DicJobs: :param priority: priority for the jobs :type priority: int """ - total_jobs = 1 + self._dic[section] = [] + if splits <= 0: + job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, -1) + self._dic[section].append(job) + self._jobs_list.graph.add_node(job.name) + total_jobs = 1 while total_jobs <= splits: job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, total_jobs) self._dic[section].append(job) - self._jobs_list.graph.add_node(self._dic[section][total_jobs-1].name) + self._jobs_list.graph.add_node(job.name) total_jobs += 1 pass + #self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) #self._jobs_list.graph.add_node(self._dic[section].name) @@ -220,6 +226,7 @@ class DicJobs: if splits > 1 and synchronize is None: self._dic[section][date][member][chunk] = [] self._create_jobs_split(splits, section, date, member, chunk, priority, default_job_type, jobs_data, self._dic[section][date][member][chunk]) + pass elif synchronize is None: self._dic[section][date][member][chunk] = self.build_job(section, priority, date, member, chunk, default_job_type, jobs_data) @@ -256,7 +263,10 @@ class DicJobs: return jobs dic = self._dic[section] - if type(dic) is not dict: + #once jobs + if type(dic) is list: + jobs = dic + elif type(dic) is not dict: jobs.append(dic) else: if date is not None: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index c82f813f5..bf37a8940 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -565,8 +565,7 @@ class JobList(object): def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): for section in parser.sections(): Log.debug("Creating {0} jobs".format(section)) - dic_jobs.read_section( - section, priority, default_job_type, jobs_data) + dic_jobs.read_section(section, priority, default_job_type, jobs_data) priority += 1 def _create_sorted_dict_jobs(self, wrapper_jobs): -- GitLab From 2b1eb372a6f2bab47fae64d71d854af294e4f823 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 15 Dec 2021 18:30:45 +0100 Subject: [PATCH 3/5] test splits --- autosubmit/job/job_dict.py | 59 +++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index c93aca1fa..dbdab747f 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -68,22 +68,21 @@ class DicJobs: :type priority: int """ running = 'once' + splits = int(self.get_option(section, "SPLITS", 0)) + if self._parser.has_option(section, 'RUNNING'): running = self._parser.get(section, 'RUNNING').lower() frequency = int(self.get_option(section, "FREQUENCY", 1)) + if running == 'once': - splits = int(self.get_option(section, "SPLITS", 0)) self._create_jobs_once(section, priority, default_job_type, jobs_data,splits) - elif running == 'date': - self._create_jobs_startdate(section, priority, frequency, default_job_type, jobs_data) - + self._create_jobs_startdate(section, priority, frequency, default_job_type, jobs_data,splits) elif running == 'member': - self._create_jobs_member(section, priority, frequency, default_job_type, jobs_data) + self._create_jobs_member(section, priority, frequency, default_job_type, jobs_data,splits) elif running == 'chunk': synchronize = self.get_option(section, "SYNCHRONIZE", None) delay = int(self.get_option(section, "DELAY", -1)) - splits = int(self.get_option(section, "SPLITS", 0)) self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, splits, jobs_data) pass @@ -113,7 +112,7 @@ class DicJobs: #self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) #self._jobs_list.graph.add_node(self._dic[section].name) - def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict()): + def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict(), splits=-1): """ Create jobs to be run once per start date @@ -126,15 +125,24 @@ class DicJobs: :type frequency: int """ self._dic[section] = dict() + tmp_dic = dict() + tmp_dic[section] = dict() count = 0 for date in self._date_list: count += 1 if count % frequency == 0 or count == len(self._date_list): - self._dic[section][date] = self.build_job(section, priority, date, None, None, default_job_type, - jobs_data) - self._jobs_list.graph.add_node(self._dic[section][date].name) + if splits <= 0: + self._dic[section][date] = self.build_job(section, priority, date, None, None, default_job_type, + jobs_data) + self._jobs_list.graph.add_node(self._dic[section][date].name) + else: + tmp_dic[section] = [] + self._create_jobs_split(splits, section, date, member, None, priority, + default_job_type, jobs_data, tmp_dic[section]) + + - def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict()): + def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict(),splits=-1): """ Create jobs to be run once per member @@ -147,20 +155,24 @@ class DicJobs: :type frequency: int """ self._dic[section] = dict() + tmp_dic = {} + tmp_dic[section] = dict() for date in self._date_list: + tmp_dic[section][date] = dict() self._dic[section][date] = dict() count = 0 for member in self._member_list: count += 1 if count % frequency == 0 or count == len(self._member_list): - self._dic[section][date][member] = self.build_job(section, priority, date, member, None, - default_job_type, jobs_data) - self._jobs_list.graph.add_node(self._dic[section][date][member].name) + if splits <= 0: + self._dic[section][date][member] = self.build_job(section, priority, date, member, None,default_job_type, jobs_data,splits) + self._jobs_list.graph.add_node(self._dic[section][date][member].name) + else: + tmp_dic[section][date][member] = [] + self._create_jobs_split(splits, section, date, member, None, priority, + default_job_type, jobs_data, tmp_dic[section][date][member]) + self._dic[section] = tmp_dic[section] - ''' - Maybe a good choice could be split this function or ascend the - conditional decision to the parent which makes the call - ''' def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, jobs_data=dict()): """ @@ -280,7 +292,10 @@ class DicJobs: if date not in dic: return jobs dic = dic[date] - if type(dic) is not dict: + if type(dic) is list: + for job in dic: + jobs.append(job) + elif type(dic) is not dict: jobs.append(dic) else: if member is not None: @@ -295,8 +310,12 @@ class DicJobs: if member not in dic: return jobs dic = dic[member] - if type(dic) is not dict: + if type(dic) is list: + for job in dic: + jobs.append(job) + elif type(dic) is not dict: jobs.append(dic) + else: if chunk is not None: if chunk in dic: -- GitLab From d810959d8043c44e2ae2bfb71b1e9b00f5f9efe5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 16 Dec 2021 11:52:51 +0100 Subject: [PATCH 4/5] test splits --- autosubmit/job/job_dict.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index dbdab747f..19370025a 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -136,9 +136,10 @@ class DicJobs: jobs_data) self._jobs_list.graph.add_node(self._dic[section][date].name) else: - tmp_dic[section] = [] - self._create_jobs_split(splits, section, date, member, None, priority, - default_job_type, jobs_data, tmp_dic[section]) + tmp_dic[section][date] = [] + self._create_jobs_split(splits, section, date, None, None, priority, + default_job_type, jobs_data, tmp_dic[section][date]) + self._dic[section][date] = tmp_dic[section][date] @@ -155,7 +156,7 @@ class DicJobs: :type frequency: int """ self._dic[section] = dict() - tmp_dic = {} + tmp_dic = dict() tmp_dic[section] = dict() for date in self._date_list: tmp_dic[section][date] = dict() @@ -171,7 +172,7 @@ class DicJobs: tmp_dic[section][date][member] = [] self._create_jobs_split(splits, section, date, member, None, priority, default_job_type, jobs_data, tmp_dic[section][date][member]) - self._dic[section] = tmp_dic[section] + self._dic[section][date][member] = tmp_dic[section] def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, jobs_data=dict()): -- GitLab From 2a2cf4373b1de240433c9ba4dc7728ea83340a11 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 16 Dec 2021 14:12:44 +0100 Subject: [PATCH 5/5] Fixed a bug when split is not set, fixed pipeline --- autosubmit/job/job_dict.py | 5 +++-- test/unit/test_dic_jobs.py | 27 +++++++++++++++++---------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 19370025a..20c769816 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -96,13 +96,14 @@ class DicJobs: :type priority: int """ - self._dic[section] = [] + if splits <= 0: job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, -1) - self._dic[section].append(job) + self._dic[section] = job self._jobs_list.graph.add_node(job.name) total_jobs = 1 while total_jobs <= splits: + self._dic[section] = [] job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, total_jobs) self._dic[section].append(job) self._jobs_list.graph.add_node(job.name) diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 49785c712..a6a52f624 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -30,9 +30,11 @@ class TestDicJobs(TestCase): # arrange section = 'fake-section' priority = 999 + frequency = 123 + splits = -1 self.parser_mock.has_option = Mock(return_value=True) self.parser_mock.get = Mock(return_value='once') - self.dictionary.get_option = Mock(return_value=123) + self.dictionary.get_option = Mock(return_value=-1) self.dictionary._create_jobs_once = Mock() self.dictionary._create_jobs_startdate = Mock() self.dictionary._create_jobs_member = Mock() @@ -42,7 +44,7 @@ class TestDicJobs(TestCase): self.dictionary.read_section(section, priority, Type.BASH) # assert - self.dictionary._create_jobs_once.assert_called_once_with(section, priority, Type.BASH, {}) + self.dictionary._create_jobs_once.assert_called_once_with(section, priority, Type.BASH, {},splits) self.dictionary._create_jobs_startdate.assert_not_called() self.dictionary._create_jobs_member.assert_not_called() self.dictionary._create_jobs_chunk.assert_not_called() @@ -52,9 +54,12 @@ class TestDicJobs(TestCase): section = 'fake-section' priority = 999 frequency = 123 + splits = -1 self.parser_mock.has_option = Mock(return_value=True) self.parser_mock.get = Mock(return_value='date') - self.dictionary.get_option = Mock(return_value=frequency) + #self.dictionary.get_option = Mock(return_value=frequency) + self.dictionary.get_option = Mock(side_effect=[splits,frequency ]) + #self.dictionary.get_option = Mock(return_value=splits) self.dictionary._create_jobs_once = Mock() self.dictionary._create_jobs_startdate = Mock() self.dictionary._create_jobs_member = Mock() @@ -65,7 +70,7 @@ class TestDicJobs(TestCase): # assert self.dictionary._create_jobs_once.assert_not_called() - self.dictionary._create_jobs_startdate.assert_called_once_with(section, priority, frequency, Type.BASH, {}) + self.dictionary._create_jobs_startdate.assert_called_once_with(section, priority, frequency, Type.BASH, {}, splits) self.dictionary._create_jobs_member.assert_not_called() self.dictionary._create_jobs_chunk.assert_not_called() @@ -74,9 +79,10 @@ class TestDicJobs(TestCase): section = 'fake-section' priority = 999 frequency = 123 + splits = 0 self.parser_mock.has_option = Mock(return_value=True) self.parser_mock.get = Mock(return_value='member') - self.dictionary.get_option = Mock(return_value=frequency) + self.dictionary.get_option = Mock(side_effect=[splits,frequency]) self.dictionary._create_jobs_once = Mock() self.dictionary._create_jobs_startdate = Mock() self.dictionary._create_jobs_member = Mock() @@ -88,7 +94,7 @@ class TestDicJobs(TestCase): # assert self.dictionary._create_jobs_once.assert_not_called() self.dictionary._create_jobs_startdate.assert_not_called() - self.dictionary._create_jobs_member.assert_called_once_with(section, priority, frequency, Type.BASH, {}) + self.dictionary._create_jobs_member.assert_called_once_with(section, priority, frequency, Type.BASH, {},splits) self.dictionary._create_jobs_chunk.assert_not_called() def test_read_section_running_chunk_create_jobs_chunk(self): @@ -101,7 +107,7 @@ class TestDicJobs(TestCase): splits = 0 self.parser_mock.has_option = Mock(return_value=True) self.parser_mock.get = Mock(return_value='chunk') - self.dictionary.get_option = Mock(side_effect=[frequency, synchronize, delay, splits]) + self.dictionary.get_option = Mock(side_effect=[splits,frequency, synchronize, delay]) self.dictionary._create_jobs_once = Mock() self.dictionary._create_jobs_startdate = Mock() self.dictionary._create_jobs_member = Mock() @@ -507,13 +513,14 @@ class TestDicJobs(TestCase): mock_section = Mock() mock_section.name = 'fake-section' priority = 999 - self.dictionary.build_job = Mock(return_value=mock_section) + splits = -1 + self.dictionary.build_job = Mock(side_effect=[mock_section, splits]) self.job_list.graph.add_node = Mock() - self.dictionary._create_jobs_once(mock_section.name, priority, Type.BASH, dict()) + self.dictionary._create_jobs_once(mock_section.name, priority, Type.BASH, dict(),splits) self.assertEquals(mock_section, self.dictionary._dic[mock_section.name]) - self.dictionary.build_job.assert_called_once_with(mock_section.name, priority, None, None, None, Type.BASH, {}) + self.dictionary.build_job.assert_called_once_with(mock_section.name, priority, None, None, None, Type.BASH, {},splits) self.job_list.graph.add_node.assert_called_once_with(mock_section.name) -- GitLab