diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 6dc85a87e5731fbce4ebe5e4e430a05429503182..d213aa6604a3607a5addef15b75518d23745ad8c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -221,6 +221,8 @@ class Job(object): self.total_jobs = None self.max_waiting_jobs = None self.exclusive = "" + self._retrials = 0 + # internal self.current_checkpoint_step = 0 self.max_checkpoint_step = 0 @@ -255,6 +257,16 @@ class Job(object): def fail_count(self, value): self._fail_count = value + @property + @autosubmit_parameter(name='retrials') + def retrials(self): + """Max amount of retrials to run this job.""" + return self._retrials + + @retrials.setter + def retrials(self, value): + self._retrials = int(value) + @property @autosubmit_parameter(name='checkpoint') def checkpoint(self): @@ -917,6 +929,9 @@ class Job(object): @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0,job_id=""): + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.reload(force_load=True) + max_retrials = self.retrials max_logs = 0 last_log = 0 sleep(5) @@ -931,14 +946,13 @@ class Job(object): success = False error_message = "" platform = None - max_retrials = 0 while (count < retries) or not success: try: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.reload(force_load=True) - max_retrials = as_conf.get_retrials() - max_logs = int(as_conf.get_retrials()) - fail_count - last_log = int(as_conf.get_retrials()) - fail_count + max_retrials = self.retrials + max_logs = int(max_retrials) - fail_count + last_log = int(max_retrials) - fail_count submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) platform = submitter.platforms[platform_name] @@ -953,34 +967,32 @@ class Job(object): raise AutosubmitError( "Couldn't load the autosubmit platforms, seems that the local platform has some issue\n:{0}".format( error_message), 6006) - else: - try: - if self.wrapper_type is not None and self.wrapper_type == "vertical": - found = False - retrials = 0 - while retrials < 3 and not found: - if platform.check_stat_file_by_retrials(stat_file + str(max_logs)): - found = True - retrials = retrials + 1 - for i in range(max_logs-1,-1,-1): - if platform.check_stat_file_by_retrials(stat_file + str(i)): - last_log = i - else: - break - remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) + try: + if self.wrapper_type is not None and self.wrapper_type == "vertical": + found = False + retrials = 0 + while retrials < 3 and not found: + if platform.check_stat_file_by_retrials(stat_file + str(max_logs)): + found = True + retrials = retrials + 1 + for i in range(max_logs-1,-1,-1): + if platform.check_stat_file_by_retrials(stat_file + str(i)): + last_log = i + else: + break + remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) - else: - remote_logs = (self.script_name + ".out."+str(fail_count), self.script_name + ".err." + str(fail_count)) + else: + remote_logs = (self.script_name + ".out."+str(fail_count), self.script_name + ".err." + str(fail_count)) - except BaseException as e: - Log.printlog( - "{0} \n Couldn't connect to the remote platform for {1} job err/out files. ".format(str(e), self.name), 6001) + except BaseException as e: + Log.printlog( + "{0} \n Couldn't connect to the remote platform for {1} job err/out files. ".format(str(e), self.name), 6001) out_exist = False err_exist = False retries = 3 sleeptime = 0 i = 0 - no_continue = False try: while (not out_exist and not err_exist) and i < retries: try: @@ -992,7 +1004,7 @@ class Job(object): err_exist = platform.check_file_exists( remote_logs[1], False) except IOError as e: - err_exists = False + err_exist = False if not out_exist or not err_exist: sleeptime = sleeptime + 5 i = i + 1 @@ -1009,7 +1021,6 @@ class Job(object): return if copy_remote_logs: l_log = copy.deepcopy(local_logs) - r_log = copy.deepcopy(remote_logs) # unifying names for log files if remote_logs != local_logs: if self.wrapper_type == "vertical": # internal_Retrial mechanism @@ -1208,7 +1219,7 @@ class Job(object): else: self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count),job_id=self.id) if self.wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) + max_logs = int(self.retrials) for i in range(0,max_logs): self.inc_fail_count() else: diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index e2f673563efe1b6859cdfdf37350b05b8c5a08f7..440ddf63045286f9b1ee1ed9683baa40637e625e 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -405,7 +405,12 @@ class DicJobs: remote_max_wallclock = self.experiment_data["PLATFORMS"].get(job.platform_name,{}) remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) job.wallclock = parameters[section].get("WALLCLOCK", remote_max_wallclock) - job.retrials = int(parameters[section].get( 'RETRIALS', 0)) + for wrapper_section in self.experiment_data.get("WRAPPERS",{}).values(): + if job.section in wrapper_section.get("JOBS_IN_WRAPPER",""): + job.retrials = int(wrapper_section.get("RETRIALS", wrapper_section.get("INNER_RETRIALS",parameters[section].get('RETRIALS',self.experiment_data["CONFIG"].get("RETRIALS", 0))))) + break + else: + job.retrials = int(parameters[section].get('RETRIALS', self.experiment_data["CONFIG"].get("RETRIALS", 0))) job.delay_retrials = int(parameters[section].get( 'DELAY_RETRY_TIME', "-1")) if job.wallclock is None and job.platform_name.upper() != "LOCAL": job.wallclock = "01:59" diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 16deb576bb9c35ce167318b1278c6b79ae24099d..fd8b459d72c2a9c168e62f2112b8d75ebec75b7d 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -26,6 +26,7 @@ class TestDicJobs(TestCase): JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) self.parser_mock = Mock(spec='SafeConfigParser') self.date_list = ['fake-date1', 'fake-date2'] + self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"] self.member_list = ['fake-member1', 'fake-member2'] self.num_chunks = 99 self.chunk_list = list(range(1, self.num_chunks + 1)) @@ -213,18 +214,6 @@ class TestDicJobs(TestCase): frequency = 1 self.dictionary.build_job = Mock(return_value=mock_section) - # act - self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH) - - # assert - self.assertEqual(len(self.date_list) * len(self.member_list) * len(self.chunk_list), - self.dictionary.build_job.call_count) - self.assertEqual(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) - for date in self.date_list: - for member in self.member_list: - for chunk in self.chunk_list: - self.assertEqual(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section) - def test_dic_creates_right_jobs_by_chunk_with_frequency_3(self): # arrange mock_section = Mock() @@ -333,7 +322,6 @@ class TestDicJobs(TestCase): self.assertEqual(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) def test_create_job_creates_a_job_with_right_parameters(self): - section = 'test' priority = 99 date = datetime(2016, 1, 1) @@ -359,6 +347,7 @@ class TestDicJobs(TestCase): self.dictionary.experiment_data = dict() self.dictionary.experiment_data["JOBS"] = self.job_list.jobs_data self.dictionary.experiment_data["PLATFORMS"] = {} + self.dictionary.experiment_data["CONFIG"] = {} self.dictionary.experiment_data["PLATFORMS"]["FAKE-PLATFORM"] = {} job_list_mock = Mock() job_list_mock.append = Mock() @@ -391,9 +380,29 @@ class TestDicJobs(TestCase): self.assertEqual(options['TASKS'], created_job.tasks) self.assertEqual(options['MEMORY'], created_job.memory) self.assertEqual(options['WALLCLOCK'], created_job.wallclock) - self.assertEqual(0,created_job.retrials) + self.assertEqual(str(options['SYNCHRONIZE']), created_job.synchronize) + self.assertEqual(str(options['RERUN_ONLY']).lower(), created_job.rerun_only) + self.assertEqual(0, created_job.retrials) job_list_mock.append.assert_called_once_with(created_job) + # Test retrials + self.dictionary.experiment_data["CONFIG"]["RETRIALS"] = 2 + created_job = self.dictionary.build_job(section, priority, date, member, chunk, 'bash',self.as_conf.experiment_data) + self.assertEqual(2, created_job.retrials) + options['RETRIALS'] = 23 + # act + created_job = self.dictionary.build_job(section, priority, date, member, chunk, 'bash',self.as_conf.experiment_data) + self.assertEqual(options['RETRIALS'], created_job.retrials) + self.dictionary.experiment_data["CONFIG"] = {} + self.dictionary.experiment_data["CONFIG"]["RETRIALS"] = 2 + created_job = self.dictionary.build_job(section, priority, date, member, chunk, 'bash',self.as_conf.experiment_data) + self.assertEqual(options["RETRIALS"], created_job.retrials) + self.dictionary.experiment_data["WRAPPERS"] = dict() + self.dictionary.experiment_data["WRAPPERS"]["TEST"] = dict() + self.dictionary.experiment_data["WRAPPERS"]["TEST"]["RETRIALS"] = 3 + self.dictionary.experiment_data["WRAPPERS"]["TEST"]["JOBS_IN_WRAPPER"] = section + created_job = self.dictionary.build_job(section, priority, date, member, chunk, 'bash',self.as_conf.experiment_data) + self.assertEqual(self.dictionary.experiment_data["WRAPPERS"]["TEST"]["RETRIALS"], created_job.retrials) def test_get_member_returns_the_jobs_if_no_member(self): # arrange jobs = 'fake-jobs'