diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 9b39c1ff8a482ae81b9f71fc2347bef2721b04ca..9a4e961cbda4918ec270f6a02e9823eac935ed93 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1886,8 +1886,8 @@ class Autosubmit: for platform in platform_to_test: try: platform.test_connection() - except BaseException: - issues += "\n[{1}] Connection Unsuccessful to host {0}".format( + except BaseException as e : + issues += "\n[{1}] Connection Unsuccessful to host {0} trace".format( platform.host, platform.name) continue Log.result("[{1}] Connection successful to host {0}", diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 68fef5e09791ffc21c00d7de842bd908a89d0d62..94e1a58380e5e037813621e9545e964b30b32f86 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -80,6 +80,7 @@ class Job(object): return "{0} STATUS: {1}".format(self.name, self.status) def __init__(self, name, job_id, status, priority): + self.wrapper_type = "none" self._wrapper_queue = None self._platform = None self._queue = None @@ -576,24 +577,20 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( e.message, self.name)) - except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) - except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) return - @threaded - def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): + def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0): + max_logs = 0 + sleep(5) try: - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() - remote_logs = (self.script_name + ".out", - self.script_name + ".err") submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) platform = submitter.platforms[platform_name.lower()] @@ -601,6 +598,27 @@ class Job(object): platform.test_connection() except: pass + max_logs = int(as_conf.get_retrials()) - fail_count + last_log = int(as_conf.get_retrials()) - fail_count + if self.wrapper_type == "vertical": + stat_file = self.script_name[:-4] + "_STAT_" + found = False + retrials = 0 + while retrials < 3 and not found: + sleep(2) + if platform.check_stat_file_by_retrials(stat_file + str(max_logs)): + found = True + retrials = retrials - 1 + for i in range(max_logs-1,-1,-1): + if platform.check_stat_file_by_retrials(stat_file + str(i)): + last_log = i + else: + break + remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) + + else: + remote_logs = (self.script_name + ".out", self.script_name + ".err") + except Exception as e: Log.printlog( "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) @@ -609,7 +627,6 @@ class Job(object): retries = 5 sleeptime = 0 i = 0 - sleep(5) no_continue = False try: while (not out_exist and not err_exist) and i < retries: @@ -638,25 +655,70 @@ class Job(object): retries, remote_logs[0], remote_logs[1])) return if copy_remote_logs: + l_log = copy.deepcopy(local_logs) + r_log = copy.deepcopy(remote_logs) # unifying names for log files if remote_logs != local_logs: - self.synchronize_logs( - platform, remote_logs, local_logs) - remote_logs = copy.deepcopy(local_logs) - platform.get_logs_files(self.expid, remote_logs) - # Update the logs with Autosubmit Job Id Brand + if self.wrapper_type == "vertical": # internal_Retrial mechanism + log_start = last_log + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + time_stamp = "1970" + total_stats = ["", "","FAILED"] + while log_start <= max_logs: + try: + if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): + with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: + total_stats = [f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]] + try: + total_stats[0] = float(total_stats[0]) + total_stats[1] = float(total_stats[1]) + except: + total_stats[0] = int(str(total_stats[0]).split('.')[0]) + total_stats[1] = int(str(total_stats[1]).split('.')[0]) + if max_logs != ( int(as_conf.get_retrials()) - fail_count ): + time_stamp = date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + else: + with open(os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP'), 'r+') as f2: + for line in f2.readlines(): + if len(line) > 0: + time_stamp = line.split(" ")[0] + + self.write_total_stat_by_retries(total_stats,max_logs == ( int(as_conf.get_retrials()) - fail_count )) + platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) + l_log = (self.script_name[:-4] + "." + time_stamp + ".out",self.script_name[:-4] + "." + time_stamp + ".err") + r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) ) + self.synchronize_logs(platform, r_log, l_log,last = False) + platform.get_logs_files(self.expid, l_log) + try: + for local_log in l_log: + platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + pass + max_logs = max_logs - 1 + else: + max_logs = -1 # exit, no more logs + except BaseException as e: + max_logs = -1 # exit + local_logs = copy.deepcopy(l_log) + remote_logs = copy.deepcopy(local_logs) + if self.wrapper_type != "vertical": + self.synchronize_logs(platform, remote_logs, local_logs) + remote_logs = copy.deepcopy(local_logs) + platform.get_logs_files(self.expid, remote_logs) + # Update the logs with Autosubmit Job Id Brand + try: + for local_log in local_logs: + platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( + e.message, self.name)) try: - for local_log in local_logs: - platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) + platform.closeConnection() except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( - e.message, self.name)) - try: - platform.closeConnection() - except BaseException as e: - pass - return + pass + return except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) @@ -681,7 +743,6 @@ class Job(object): except BaseException as e: pass return - def update_status(self, copy_remote_logs=False, failed_file=False): """ Updates job status, checking COMPLETED file if needed @@ -703,7 +764,6 @@ class Job(object): self.check_completion() else: self.status = new_status - if self.status == Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status == Status.QUEUING: @@ -723,7 +783,6 @@ class Job(object): Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() - elif self.status == Status.UNKNOWN: Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( self.name), 3000) @@ -739,13 +798,11 @@ class Job(object): # after checking the jobs , no job should have the status "submitted" Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( self.name), 6008) - if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN, Status.RUNNING]: self.write_start_time() # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: - self.write_end_time(self.status == Status.COMPLETED) # New thread, check if file exist expid = copy.deepcopy(self.expid) platform_name = copy.deepcopy(self.platform_name.lower()) @@ -757,9 +814,13 @@ class Job(object): if as_conf.get_disable_recovery_threads(self.platform.name) == "true": self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) else: - self.retrieve_logfiles( - copy_remote_logs, local_logs, remote_logs, expid, platform_name) - + self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count)) + if self.wrapper_type == "vertical": + max_logs = int(as_conf.get_retrials()) + for i in range(0,max_logs): + self.inc_fail_count() + else: + self.write_end_time(self.status == Status.COMPLETED) return self.status @staticmethod @@ -987,11 +1048,11 @@ class Job(object): template += template_file.read() else: if self.type == Type.BASH: - template = 'sleep 30' + template = 'sleep 10' elif self.type == Type.PYTHON: - template = 'time.sleep(30)' + template = 'time.sleep(10)' elif self.type == Type.R: - template = 'Sys.sleep(30)' + template = 'Sys.sleep(10)' else: template = '' except: @@ -1140,87 +1201,155 @@ class Job(object): str(set(parameters) - set(variables))), 6013) return out - def write_submit_time(self): + def write_submit_time(self,enabled = False): """ Writes submit date and time to TOTAL_STATS file """ - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + data_time = ["",time.time()] + if self.wrapper_type != "vertical" or enabled: + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + else: + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') if os.path.exists(path): f = open(path, 'a') f.write('\n') else: f = open(path, 'w') - f.write(date2str(datetime.datetime.now(), 'S')) + if not enabled: + f.write(date2str(datetime.datetime.now(), 'S')) + if self.wrapper_type == "vertical": + f.write(" "+str(time.time())) + else: + path2 = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') + f2 = open(path2, 'r') + for line in f2.readlines(): + if len(line) > 0: + data_time = line.split(" ") + try: + data_time[1] = float(data_time[1]) + except: + data_time[1] = int(data_time[1]) + f.write(data_time[0]) + f2.close() + try: + os.remove(path2) + except: + pass # Get # Writing database - JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) + if self.wrapper_type != "vertical" or enabled: + JobDataStructure(self.expid).write_submit_time(self.name, data_time[1], Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) - def write_start_time(self): + + def write_start_time(self, enabled = False): """ Writes start date and time to TOTAL_STATS file :return: True if succesful, False otherwise :rtype: bool """ - if self._platform.get_stat_file(self.name, retries=5): - start_time = self.check_start_time() - else: - Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( - self.name), 3000) - start_time = time.time() - timestamp = date2str(datetime.datetime.now(), 'S') + if self.wrapper_type != "vertical" or enabled: + if self._platform.get_stat_file(self.name, retries=5): #fastlook + start_time = self.check_start_time() + else: + Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( + self.name), 3000) + start_time = time.time() + timestamp = date2str(datetime.datetime.now(), 'S') - self.local_logs = (self.name + "." + timestamp + - ".out", self.name + "." + timestamp + ".err") + self.local_logs = (self.name + "." + timestamp + + ".out", self.name + "." + timestamp + ".err") - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) - # Writing database - JobDataStructure(self.expid).write_start_time(self.name, start_time, Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) + # Writing database + JobDataStructure(self.expid).write_start_time(self.name, start_time, Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) return True - def write_end_time(self, completed): + def write_end_time(self, completed,enabled = False): """ Writes ends date and time to TOTAL_STATS file :param completed: True if job was completed successfully, False otherwise :type completed: bool """ - self._platform.get_stat_file(self.name, retries=5) - end_time = self.check_end_time() + if self.wrapper_type != "vertical" or enabled: + self._platform.get_stat_file(self.name, retries=5) + end_time = self.check_end_time() + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + finish_time = None + final_status = None + if end_time > 0: + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) + # date2str(datetime.datetime.fromtimestamp(end_time), 'S') + finish_time = end_time + else: + f.write(date2str(datetime.datetime.now(), 'S')) + finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + f.write(' ') + if completed: + final_status = "COMPLETED" + f.write('COMPLETED') + else: + final_status = "FAILED" + f.write('FAILED') + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, + self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) + # Launch second as threaded function + thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) + thread_write_finish.name = "JOB_data_{}".format(self.name) + thread_write_finish.start() + + def write_total_stat_by_retries_fix_newline(self): path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') - f.write(' ') - finish_time = None - final_status = None - if end_time > 0: - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) - # date2str(datetime.datetime.fromtimestamp(end_time), 'S') - finish_time = end_time - else: - f.write(date2str(datetime.datetime.now(), 'S')) - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') - f.write(' ') - if completed: - final_status = "COMPLETED" - f.write('COMPLETED') + f.write('\n') + f.close() + + def write_total_stat_by_retries(self,total_stats, first_retrial = False): + """ + Writes all data to TOTAL_STATS file + :param total_stats: data gathered by the wrapper + :type completed: str + """ + if first_retrial: + self.write_submit_time(True) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + if first_retrial: + f.write(" " + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) else: - final_status = "FAILED" - f.write('FAILED') + f.write('\n' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) out, err = self.local_logs path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function - JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, - self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) - # Launch second as threaded function - thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) - thread_write_finish.name = "JOB_data_{}".format(self.name) - thread_write_finish.start() + if not first_retrial: + JobDataStructure(self.expid).write_submit_time(self.name, total_stats[0], Status.VALUE_TO_KEY[ + self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self.queue, self.date, self.member, self.section, + self.chunk, self.platform_name, self.id, self.packed, + self._wrapper_queue) + JobDataStructure(self.expid).write_start_time(self.name, total_stats[0], Status.VALUE_TO_KEY[ + self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, + self.section, self.chunk, self.platform_name, self.id, + self.packed, self._wrapper_queue) + JobDataStructure(self.expid).write_finish_time(self.name, total_stats[1], total_stats[2], self.processors, + self.wallclock, self._queue, self.date, + self.member, self.section, self.chunk, self.platform_name, + self.id, self.platform, self.packed, + [job.id for job in self._parents], True, None, out, err, + self._wrapper_queue) def check_started_after(self, date_limit): """ @@ -1281,11 +1410,12 @@ class Job(object): parent.children.remove(self) self.parents.remove(parent) - def synchronize_logs(self, platform, remote_logs, local_logs): + def synchronize_logs(self, platform, remote_logs, local_logs, last = True): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err - self.local_logs = local_logs - self.remote_logs = copy.deepcopy(local_logs) + if last: + self.local_logs = local_logs + self.remote_logs = copy.deepcopy(local_logs) class WrapperJob(Job): @@ -1577,7 +1707,7 @@ class WrapperJob(Job): running_jobs = self.inner_jobs_running real_running = copy.deepcopy(self.inner_jobs_running) if check_ready_jobs: - running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED] + running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED or job.status == Status.QUEUING] self.inner_jobs_running = list() for job in running_jobs: if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index f80bd67364ad9f4f7ef31371a328990a38a05d2c..f97dcb355e3445f960c20704f2f7c6aa330e92ef 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -108,7 +108,7 @@ class StatisticsSnippetBash: ################### set -xuve job_name_ptrn='%CURRENT_LOGDIR%/%JOBNAME%' - echo $(date +%s) > ${job_name_ptrn}_STAT + echo $(date +%s) >> ${job_name_ptrn}_STAT ################### # Autosubmit job diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 595cfe6a484f79879acf19157a0dae8afc351187..ea27ac6a22a0592ad2fab20d48d26b1ca6754588 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -582,9 +582,9 @@ class JobList(object): # By adding to the result at this step, only those with the same RUNNIN have been added. dict_jobs[date][member] += jobs_to_sort jobs_to_sort = [] - - jobs_to_sort.append(job) - previous_job = job + if len(sorted_jobs_list) > 1 : + jobs_to_sort.append(job) + previous_job = job return dict_jobs diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 3065905a93f2e5c9ddba1d725aadd4555f08ccc1..32c5348af2ea7b091d04ca5630c532f9e67ad915 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -91,7 +91,7 @@ class JobListPersistencePkl(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] pickle.dump(jobs_data, fd) Log.debug('Job list saved') @@ -134,7 +134,7 @@ class JobListPersistenceDb(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) def _reset_table(self): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index f5bcad57c6b205be23de62c43ae99d3d0efed03f..c917a508dbba7cfca253d5a9781f999d35b5afb4 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -283,7 +283,6 @@ class JobPackager(object): if self.wrapper_type[self.current_wrapper_section] == 'vertical': built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], wrapper_limits) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], wrapper_limits, section) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: @@ -299,6 +298,7 @@ class JobPackager(object): aux_jobs = [] # Check failed jobs first for job in p.jobs: + job.wrapper_type = self.wrapper_type[self.current_wrapper_section] if len(self._jobs_list.jobs_to_run_first) > 0: if job not in self._jobs_list.jobs_to_run_first: job.packed = False diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index c8e1ff8ab99c4bfbfc1e9d2761c41e9d664ebd80..8fcdd26d7138a009265efc74bc203182227dc0b5 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -347,7 +347,7 @@ class JobPackageThread(JobPackageBase): self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section if configuration is not None: - self.inner_retrials = configuration.get_wrapper_retrials(self.current_wrapper_section) + self.inner_retrials = configuration.get_retrials() self.export = configuration.get_wrapper_export(self.current_wrapper_section) if self.export != "none" and self.export != "None": for job in self.jobs: diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 06e6a83f9b2bdbb7c42309ac7732a2fc374c4c92..c47acb515d577cb855fea5b8f022fb1a845dadcd 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -128,7 +128,6 @@ class LsfHeader(object): def run(self): out = str(self.template) + '.' + str(self.id_run) + '.out' err = str(self.template) + '.' + str(self.id_run) + '.err' - command = str(self.template) + ' ' + str(self.id_run) + ' ' + os.getcwd() (self.status) = getstatusoutput(command + ' > ' + out + ' 2> ' + err) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 5a38cbdd85ad16549ff5a53c661ed69ff47762f5..0d2b387c7b3e5f1ed5c48b7f242da3ecf0e17bcd 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -256,6 +256,10 @@ class ParamikoPlatform(Platform): raise AutosubmitError( 'Send file failed. Connection seems to no be active', 6004) + + def get_list_of_files(self): + return self._ftpChannel.get(self.get_files_path) + # Gets .err and .out def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False, wrapper_failed=False): """ @@ -336,8 +340,9 @@ class ParamikoPlatform(Platform): """ try: path_root = self.get_files_path() - self._ftpChannel.rename(os.path.join(path_root, src), - os.path.join(path_root, dest)) + src = os.path.join(path_root, src) + dest = os.path.join(path_root, dest) + self._ftpChannel.rename(src,dest) return True except IOError as e: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 48089f04fcbb17c877e82db09d7b8e5946d5a5df..11b3013739a2bd63b2bd779578fa4675132ce6e1 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -211,7 +211,16 @@ class Platform(object): (job_out_filename, job_err_filename) = remote_logs self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + def get_stat_file(self, exp_id, job_name): + """ + Get the given stat files for all retrials + :param exp_id: experiment id + :type exp_id: str + :param remote_logs: names of the log files + :type remote_logs: (str, str) + """ + self.get_files(job_name,False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -251,7 +260,19 @@ class Platform(object): Log.debug('{0}_STAT have been removed', job_name) return True return False + def remove_stat_file_by_retrials(self, job_name): + """ + Removes *STAT* files from remote + :param job_name: name of job to check + :type job_name: str + :return: True if successful, False otherwise + :rtype: bool + """ + filename = job_name + if self.delete_file(filename): + return True + return False def remove_completed_file(self, job_name): """ Removes *COMPLETED* files from remote @@ -293,6 +314,46 @@ class Platform(object): Log.debug('{0}_STAT file not found', job_name) return False + def check_stat_file_by_retrials(self, job_name, retries=0): + """ + check *STAT* file + + :param retries: number of intents to get the completed files + :type retries: int + :param job_name: name of job to check + :type job_name: str + :return: True if succesful, False otherwise + :rtype: bool + """ + filename = job_name + if self.check_file_exists(filename): + return True + else: + return False + def get_stat_file_by_retrials(self, job_name, retries=0): + """ + Copies *STAT* files from remote to local + + :param retries: number of intents to get the completed files + :type retries: int + :param job_name: name of job to check + :type job_name: str + :return: True if succesful, False otherwise + :rtype: bool + """ + filename = job_name + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + if os.path.exists(stat_local_path): + os.remove(stat_local_path) + if self.check_file_exists(filename): + if self.get_file(filename, True): + return True + else: + return False + else: + return False + def get_files_path(self): """ Get the path to the platform's LOG directory diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 2a11d876514cbc015d68d0c6839d5237ef006308..838b3b0d150f5f44c35b137e69cb80fe7aa24249 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -112,9 +112,11 @@ class PythonWrapperBuilder(WrapperBuilder): return textwrap.dedent(""" import os import sys + from bscearth.utils.date import date2str from threading import Thread from commands import getstatusoutput from datetime import datetime + import time from math import ceil from collections import OrderedDict import copy @@ -427,10 +429,14 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} for i in range(len({0})): - current = {1} - while (current.retrials >= 0) + job_retrials = retrials + completed = False + while job_retrials >= 0 and not completed: + current = {1} current.start() + os.system("echo "+str(time.time())+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) #Start/submit running current.join() + job_retrials = job_retrials - 1 """).format(jobs_list, thread,self.retrials,'\n'.ljust(13)) if footer: @@ -440,23 +446,26 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) + os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed if os.path.exists(completed_path): + completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" + os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) else: - open(failed_wrapper,'w').close() - open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #{1} """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" + if not os.path.exists(completed_path): + open(failed_wrapper,'w').close() + open(failed_path, 'w').close() + if os.path.exists(failed_wrapper): os.remove(os.path.join(os.getcwd(),wrapper_id)) wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") open(wrapper_failed, 'w').close() os._exit(1) - - - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return sequential_threads_launcher @@ -472,16 +481,16 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') os.system("echo $(date +%s) > "+jobname+"_STAT") - out = str(self.template) + ".out" - err = str(self.template) + ".err" + out = str(self.template) + ".out." + str(self.retrials) + err = str(self.template) + ".err." + str(self.retrials) print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) - self.retrials = self.retrials - 1 + """).format('\n'.ljust(13)) def build_main(self): self.exit_thread = "os._exit(1)" - return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, retrials)") + return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, job_retrials)") class PythonHorizontalWrapperBuilder(PythonWrapperBuilder): def build_main(self): diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index ec0c26b793c1b3b5a0f692883a814e6d0a8d35dd..b8460452aabaddbaf3231886a83e16ff1df08047 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -166,7 +166,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_method = Mock(return_value='ASThread') self.config.get_wrapper_queue = Mock(return_value='debug') self.config.get_wrapper_policy = Mock(return_value='flexible') - self.config.get_wrapper_retrials = Mock(return_value=0) + self.config.get_retrials = Mock(return_value=0) self.config.get = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self._platform, self.job_list)