From c30009f4ad0a99fce89a3b86ff27d4fc4fb163a6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 10 Sep 2021 17:22:04 +0200 Subject: [PATCH 01/14] Added some improvements to internal retrial and a job recovery mechanism for multiple retrials in for the same job TODO timestamp is set to the sameone+`retrial number I need to change it TODO stat file? need to check in a real model --- autosubmit/autosubmit.py | 4 +- autosubmit/job/job.py | 57 ++++++++++++++----- autosubmit/job/job_list.py | 6 +- autosubmit/platforms/paramiko_platform.py | 4 ++ .../platforms/wrappers/wrapper_builder.py | 17 ++++-- 5 files changed, 62 insertions(+), 26 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 9b39c1ff8..9a4e961cb 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1886,8 +1886,8 @@ class Autosubmit: for platform in platform_to_test: try: platform.test_connection() - except BaseException: - issues += "\n[{1}] Connection Unsuccessful to host {0}".format( + except BaseException as e : + issues += "\n[{1}] Connection Unsuccessful to host {0} trace".format( platform.host, platform.name) continue Log.result("[{1}] Connection successful to host {0}", diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 68fef5e09..44f13c8c9 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -592,8 +592,19 @@ class Job(object): as_conf = AutosubmitConfig( expid, BasicConfig, ConfigParserFactory()) as_conf.reload() - remote_logs = (self.script_name + ".out", - self.script_name + ".err") + max_logs = 0 + wrapper_type = "none" + for wrapper_section in as_conf.get_wrapper_multi(): + if self.section in as_conf.get_wrapper_jobs(wrapper_section): + wrapper_type = as_conf.get_wrapper_type(wrapper_section) + if wrapper_type == "vertical": + max_logs = as_conf.get_wrapper_retrials(wrapper_section) + break + if wrapper_type != "vertical": + remote_logs = (self.script_name + ".out", self.script_name + ".err") + else: + remote_logs = (self.script_name + ".out." + max_logs , self.script_name + ".err." + max_logs) + submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) platform = submitter.platforms[platform_name.lower()] @@ -640,8 +651,24 @@ class Job(object): if copy_remote_logs: # unifying names for log files if remote_logs != local_logs: - self.synchronize_logs( - platform, remote_logs, local_logs) + if wrapper_type != "vertical": + other_logs = max_logs - 1 + while other_logs >= 0: # perhaps the order is reversed TODO + try: + r_log = (remote_logs[0][:-1]+other_logs,remote_logs[1][:-1]+other_logs) + l_log = (local_logs[0]+"_"+other_logs,local_logs[1]+"_"+other_logs) + self.synchronize_logs(platform, r_log, l_log) + platform.get_logs_files(self.expid, l_log) + try: + for local_log in l_log: + platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + pass + other_logs = other_logs -1 + except: # no more retrials + other_logs = other_logs -1 + self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand @@ -652,11 +679,11 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( e.message, self.name)) - try: - platform.closeConnection() - except BaseException as e: - pass - return + try: + platform.closeConnection() + except BaseException as e: + pass + return except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) @@ -979,19 +1006,19 @@ class Job(object): template = '' if as_conf.get_remote_dependencies(): if self.type == Type.BASH: - template = 'sleep 30' + "\n" + template = 'sleep 1' + "\n" elif self.type == Type.PYTHON: - template = 'time.sleep(30)' + "\n" + template = 'time.sleep(1)' + "\n" elif self.type == Type.R: - template = 'Sys.sleep(30)' + "\n" + template = 'Sys.sleep(1)' + "\n" template += template_file.read() else: if self.type == Type.BASH: - template = 'sleep 30' + template = 'sleep 1' elif self.type == Type.PYTHON: - template = 'time.sleep(30)' + template = 'time.sleep(1)' elif self.type == Type.R: - template = 'Sys.sleep(30)' + template = 'Sys.sleep(1)' else: template = '' except: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 595cfe6a4..ea27ac6a2 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -582,9 +582,9 @@ class JobList(object): # By adding to the result at this step, only those with the same RUNNIN have been added. dict_jobs[date][member] += jobs_to_sort jobs_to_sort = [] - - jobs_to_sort.append(job) - previous_job = job + if len(sorted_jobs_list) > 1 : + jobs_to_sort.append(job) + previous_job = job return dict_jobs diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 5a38cbdd8..ab5d3a8fd 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -256,6 +256,10 @@ class ParamikoPlatform(Platform): raise AutosubmitError( 'Send file failed. Connection seems to no be active', 6004) + + def get_list_of_files(self): + return self._ftpChannel.get(self.get_files_path) + # Gets .err and .out def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False, wrapper_failed=False): """ diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 2a11d8765..ae58296d8 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -427,10 +427,13 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} for i in range(len({0})): - current = {1} - while (current.retrials >= 0) + job_retrials = retrials + completed = False + while job_retrials >= 0 or completed: + current = {1} current.start() current.join() + job_retrials = job_retrials - 1 """).format(jobs_list, thread,self.retrials,'\n'.ljust(13)) if footer: @@ -441,6 +444,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) if os.path.exists(completed_path): + completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: open(failed_wrapper,'w').close() @@ -472,16 +476,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') os.system("echo $(date +%s) > "+jobname+"_STAT") - out = str(self.template) + ".out" - err = str(self.template) + ".err" + if self.retrials = + out = str(self.template) + ".out." + str(self.retrials) + err = str(self.template) + ".err." + str(self.retrials) print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) - self.retrials = self.retrials - 1 + """).format('\n'.ljust(13)) def build_main(self): self.exit_thread = "os._exit(1)" - return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, retrials)") + return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, job_retrials)") class PythonHorizontalWrapperBuilder(PythonWrapperBuilder): def build_main(self): -- GitLab From ab2fbcb44661086f46a0bb769f6585feb2f8305d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 10 Sep 2021 17:38:50 +0200 Subject: [PATCH 02/14] typo fix --- autosubmit/platforms/wrappers/wrapper_builder.py | 1 - 1 file changed, 1 deletion(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index ae58296d8..43fe291d7 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -476,7 +476,6 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') os.system("echo $(date +%s) > "+jobname+"_STAT") - if self.retrials = out = str(self.template) + ".out." + str(self.retrials) err = str(self.template) + ".err." + str(self.retrials) print(out+"\\n") -- GitLab From 629aa1eb8e44d15fd4c9c64f9f1bc2743093b0bc Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 13 Sep 2021 11:20:39 +0200 Subject: [PATCH 03/14] log retrials added --- autosubmit/job/job.py | 33 ++++++++++--------- .../platforms/wrappers/wrapper_builder.py | 2 +- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 44f13c8c9..e9b06bfb9 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -588,22 +588,25 @@ class Job(object): @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): + max_logs = 0 + wrapper_type = "none" + try: - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() - max_logs = 0 - wrapper_type = "none" - for wrapper_section in as_conf.get_wrapper_multi(): + list_of_wrappers = as_conf.get_wrapper_multi() + if len(list_of_wrappers) == 0: + list_of_wrappers.append("wrapper") + for wrapper_section in list_of_wrappers: if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = as_conf.get_wrapper_retrials(wrapper_section) + max_logs = int(as_conf.get_wrapper_retrials(wrapper_section)) break if wrapper_type != "vertical": remote_logs = (self.script_name + ".out", self.script_name + ".err") else: - remote_logs = (self.script_name + ".out." + max_logs , self.script_name + ".err." + max_logs) + remote_logs = (self.script_name + ".out." + str(max_logs) , self.script_name + ".err." + str(max_logs)) submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) @@ -651,18 +654,17 @@ class Job(object): if copy_remote_logs: # unifying names for log files if remote_logs != local_logs: - if wrapper_type != "vertical": + if wrapper_type == "vertical": other_logs = max_logs - 1 while other_logs >= 0: # perhaps the order is reversed TODO try: - r_log = (remote_logs[0][:-1]+other_logs,remote_logs[1][:-1]+other_logs) - l_log = (local_logs[0]+"_"+other_logs,local_logs[1]+"_"+other_logs) + r_log = (remote_logs[0][:-1]+str(other_logs),remote_logs[1][:-1]+str(other_logs)) + l_log = (local_logs[0]+"_"+str(other_logs),local_logs[1]+"_"+str(other_logs)) self.synchronize_logs(platform, r_log, l_log) platform.get_logs_files(self.expid, l_log) try: for local_log in l_log: - platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) + platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: pass other_logs = other_logs -1 @@ -1006,17 +1008,18 @@ class Job(object): template = '' if as_conf.get_remote_dependencies(): if self.type == Type.BASH: - template = 'sleep 1' + "\n" + template = 'sleep 30' + "\n" elif self.type == Type.PYTHON: - template = 'time.sleep(1)' + "\n" + template = 'time.sleep(30)' + "\n" elif self.type == Type.R: - template = 'Sys.sleep(1)' + "\n" + template = 'Sys.sleep(30)' + "\n" template += template_file.read() else: if self.type == Type.BASH: template = 'sleep 1' elif self.type == Type.PYTHON: template = 'time.sleep(1)' + template += 'crash' elif self.type == Type.R: template = 'Sys.sleep(1)' else: diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 43fe291d7..721714365 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -429,7 +429,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): for i in range(len({0})): job_retrials = retrials completed = False - while job_retrials >= 0 or completed: + while job_retrials >= 0 and not completed: current = {1} current.start() current.join() -- GitLab From f412e53a0794ca4bd3e4d3ca332618b5af81da55 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 13 Sep 2021 14:46:42 +0200 Subject: [PATCH 04/14] Fixed few bugs --- autosubmit/job/job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index e9b06bfb9..b875d74db 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1017,6 +1017,7 @@ class Job(object): else: if self.type == Type.BASH: template = 'sleep 1' + template += 'crash' elif self.type == Type.PYTHON: template = 'time.sleep(1)' template += 'crash' @@ -1607,7 +1608,7 @@ class WrapperJob(Job): running_jobs = self.inner_jobs_running real_running = copy.deepcopy(self.inner_jobs_running) if check_ready_jobs: - running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED] + running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED or job.status == Status.QUEUING] self.inner_jobs_running = list() for job in running_jobs: if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): -- GitLab From 75d1df5b8623ba0e43311041504cf80853b2edab Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 14 Sep 2021 19:29:00 +0200 Subject: [PATCH 05/14] stat files and retrials change --- autosubmit/job/job.py | 22 ++++++------------- autosubmit/job/job_common.py | 2 +- autosubmit/job/job_packages.py | 2 +- .../platforms/wrappers/wrapper_builder.py | 9 ++++---- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b875d74db..ab8dfd9e3 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -576,16 +576,13 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( e.message, self.name)) - except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) - except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) return - @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): max_logs = 0 @@ -597,11 +594,11 @@ class Job(object): list_of_wrappers = as_conf.get_wrapper_multi() if len(list_of_wrappers) == 0: list_of_wrappers.append("wrapper") - for wrapper_section in list_of_wrappers: + for wrapper_section in list_of_wrappers: #fastlook if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = int(as_conf.get_wrapper_retrials(wrapper_section)) + max_logs = int(as_conf.get_retrials()) break if wrapper_type != "vertical": remote_logs = (self.script_name + ".out", self.script_name + ".err") @@ -656,7 +653,7 @@ class Job(object): if remote_logs != local_logs: if wrapper_type == "vertical": other_logs = max_logs - 1 - while other_logs >= 0: # perhaps the order is reversed TODO + while other_logs >= 0: # perhaps the order is reversed TODO try: r_log = (remote_logs[0][:-1]+str(other_logs),remote_logs[1][:-1]+str(other_logs)) l_log = (local_logs[0]+"_"+str(other_logs),local_logs[1]+"_"+str(other_logs)) @@ -667,9 +664,9 @@ class Job(object): platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: pass - other_logs = other_logs -1 + other_logs = other_logs - 1 except: # no more retrials - other_logs = other_logs -1 + other_logs = other_logs - 1 self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) @@ -710,7 +707,6 @@ class Job(object): except BaseException as e: pass return - def update_status(self, copy_remote_logs=False, failed_file=False): """ Updates job status, checking COMPLETED file if needed @@ -732,7 +728,6 @@ class Job(object): self.check_completion() else: self.status = new_status - if self.status == Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status == Status.QUEUING: @@ -752,7 +747,6 @@ class Job(object): Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() - elif self.status == Status.UNKNOWN: Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( self.name), 3000) @@ -768,7 +762,6 @@ class Job(object): # after checking the jobs , no job should have the status "submitted" Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( self.name), 6008) - if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN, Status.RUNNING]: self.write_start_time() @@ -786,8 +779,7 @@ class Job(object): if as_conf.get_disable_recovery_threads(self.platform.name) == "true": self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) else: - self.retrieve_logfiles( - copy_remote_logs, local_logs, remote_logs, expid, platform_name) + self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name) return self.status @@ -1193,7 +1185,7 @@ class Job(object): :return: True if succesful, False otherwise :rtype: bool """ - if self._platform.get_stat_file(self.name, retries=5): + if self._platform.get_stat_file(self.name, retries=5): #fastlook start_time = self.check_start_time() else: Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index f80bd6736..f97dcb355 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -108,7 +108,7 @@ class StatisticsSnippetBash: ################### set -xuve job_name_ptrn='%CURRENT_LOGDIR%/%JOBNAME%' - echo $(date +%s) > ${job_name_ptrn}_STAT + echo $(date +%s) >> ${job_name_ptrn}_STAT ################### # Autosubmit job diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index c8e1ff8ab..8fcdd26d7 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -347,7 +347,7 @@ class JobPackageThread(JobPackageBase): self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section if configuration is not None: - self.inner_retrials = configuration.get_wrapper_retrials(self.current_wrapper_section) + self.inner_retrials = configuration.get_retrials() self.export = configuration.get_wrapper_export(self.current_wrapper_section) if self.export != "none" and self.export != "None": for job in self.jobs: diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 721714365..544172170 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -432,7 +432,9 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while job_retrials >= 0 and not completed: current = {1} current.start() + os.system("echo $(date +%s) > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) current.join() + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) job_retrials = job_retrials - 1 """).format(jobs_list, thread,self.retrials,'\n'.ljust(13)) @@ -447,6 +449,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("$(echo FAILED)) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) open(failed_wrapper,'w').close() open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" @@ -458,9 +462,6 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") open(wrapper_failed, 'w').close() os._exit(1) - - - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return sequential_threads_launcher @@ -481,7 +482,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) - + os.system("echo $(date +%s) >> "+jobname+"_STAT_"+str(self.retrials)) """).format('\n'.ljust(13)) def build_main(self): self.exit_thread = "os._exit(1)" -- GitLab From 71613bba63ab15014db136e87c14da4a0a7107eb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 15 Sep 2021 15:38:40 +0200 Subject: [PATCH 06/14] Added stat file on wrapper itself , only last to do store the logs in the jobdatestructure --- autosubmit/job/job.py | 77 +++++++++++++++---- autosubmit/platforms/platform.py | 42 ++++++++++ .../platforms/wrappers/wrapper_builder.py | 11 ++- 3 files changed, 110 insertions(+), 20 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ab8dfd9e3..c8bb490bb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -598,7 +598,7 @@ class Job(object): if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) + max_logs = int(as_conf.get_retrials()) - self.fail_count # - job.fail count break if wrapper_type != "vertical": remote_logs = (self.script_name + ".out", self.script_name + ".err") @@ -651,22 +651,36 @@ class Job(object): if copy_remote_logs: # unifying names for log files if remote_logs != local_logs: - if wrapper_type == "vertical": - other_logs = max_logs - 1 - while other_logs >= 0: # perhaps the order is reversed TODO + if wrapper_type == "vertical": # internal_Retrial mechanism + other_logs = max_logs - 1 # All except the first log ran + stat_file=self.script_name[:-4]+"_STAT_" + while other_logs >= 0: # pick all except the last retrial, last retrial is always the max_logs value try: - r_log = (remote_logs[0][:-1]+str(other_logs),remote_logs[1][:-1]+str(other_logs)) - l_log = (local_logs[0]+"_"+str(other_logs),local_logs[1]+"_"+str(other_logs)) - self.synchronize_logs(platform, r_log, l_log) - platform.get_logs_files(self.expid, l_log) - try: - for local_log in l_log: - platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - pass - other_logs = other_logs - 1 - except: # no more retrials - other_logs = other_logs - 1 + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + time_stamp = "1970" + total_stats = (0,0,0,"FAILED") + if platform.get_stat_file_by_retrials(stat_file+str(other_logs)): # if stat_{retrial} exist, the job has failed at least once + + with open(os.path.join(tmp_path,stat_file+str(other_logs)), 'r+') as f: + time_stamp = str(f.readline()[:-1]) + total_stats = (f.readline()[:-1],f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]) + self.write_total_stat_by_retries(total_stats) + platform.remove_stat_file_by_retrials(stat_file+str(other_logs)) + l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") + r_log = (remote_logs[0][:-1]+str(other_logs),remote_logs[1][:-1]+str(other_logs)) + self.synchronize_logs(platform, r_log, l_log) + platform.get_logs_files(self.expid, l_log) + try: + for local_log in l_log: + platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + pass + other_logs = other_logs - 1 # no more retrials + else: + other_logs = 0 # exit, no more logs + except: + other_logs = 0 # exit self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) @@ -780,6 +794,17 @@ class Job(object): self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) else: self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name) + list_of_wrappers = as_conf.get_wrapper_multi() + if len(list_of_wrappers) == 0: + list_of_wrappers.append("wrapper") + for wrapper_section in list_of_wrappers: # fastlook + if self.section in as_conf.get_wrapper_jobs(wrapper_section): + wrapper_type = as_conf.get_wrapper_type(wrapper_section) + if wrapper_type == "vertical": + max_logs = int(as_conf.get_retrials()) - self.fail_count # - job.fail count + for i in range(max_logs): + self.inc_fail_count() + break return self.status @@ -1245,6 +1270,26 @@ class Job(object): thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() + def write_total_stat_by_retries(self,total_stats): + """ + Writes all data to TOTAL_STATS file + :param total_stats: data gathered by the wrapper + :type completed: str + """ + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') #todo jobdatastructure + f = open(path, 'a') + f.write(total_stats[0]+' '+total_stats[1]+' '+total_stats[2]+' '+total_stats[3]) + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + #JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, + # self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) + # Launch second as threaded function + #thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, + # self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) + #thread_write_finish.name = "JOB_data_{}".format(self.name) + #thread_write_finish.start() + def check_started_after(self, date_limit): """ Checks if the job started after the given date diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 48089f04f..aa3fdc9fa 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -211,7 +211,16 @@ class Platform(object): (job_out_filename, job_err_filename) = remote_logs self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + def get_stat_file(self, exp_id, job_name): + """ + Get the given stat files for all retrials + :param exp_id: experiment id + :type exp_id: str + :param remote_logs: names of the log files + :type remote_logs: (str, str) + """ + self.get_files(job_name,False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -251,7 +260,19 @@ class Platform(object): Log.debug('{0}_STAT have been removed', job_name) return True return False + def remove_stat_file_by_retrials(self, job_name): + """ + Removes *STAT* files from remote + :param job_name: name of job to check + :type job_name: str + :return: True if successful, False otherwise + :rtype: bool + """ + filename = job_name + if self.delete_file(filename): + return True + return False def remove_completed_file(self, job_name): """ Removes *COMPLETED* files from remote @@ -293,6 +314,27 @@ class Platform(object): Log.debug('{0}_STAT file not found', job_name) return False + def get_stat_file_by_retrials(self, job_name, retries=0): + """ + Copies *STAT* files from remote to local + + :param retries: number of intents to get the completed files + :type retries: int + :param job_name: name of job to check + :type job_name: str + :return: True if succesful, False otherwise + :rtype: bool + """ + filename = job_name + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + if os.path.exists(stat_local_path): + os.remove(stat_local_path) + if self.check_file_exists(filename): + if self.get_file(filename, True): + return True + return False + def get_files_path(self): """ Get the path to the platform's LOG directory diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 544172170..6b306b0a3 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -112,6 +112,7 @@ class PythonWrapperBuilder(WrapperBuilder): return textwrap.dedent(""" import os import sys + from bscearth.utils.date import date2str from threading import Thread from commands import getstatusoutput from datetime import datetime @@ -432,7 +433,9 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while job_retrials >= 0 and not completed: current = {1} current.start() - os.system("echo $(date +%s) > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo "+date2str(datetime.now(), 'S')+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) current.join() os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) job_retrials = job_retrials - 1 @@ -449,8 +452,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) - os.system("$(echo FAILED)) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) open(failed_wrapper,'w').close() open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" @@ -482,7 +485,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) - os.system("echo $(date +%s) >> "+jobname+"_STAT_"+str(self.retrials)) + """).format('\n'.ljust(13)) def build_main(self): self.exit_thread = "os._exit(1)" -- GitLab From 9749b17bea788f4c3a4b8984745e9d1157195cf3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 15 Sep 2021 17:25:53 +0200 Subject: [PATCH 07/14] Corrected the run orders, corrected Stat files --- autosubmit/job/job.py | 36 ++++++++++--------- autosubmit/platforms/headers/lsf_header.py | 1 - .../platforms/wrappers/wrapper_builder.py | 5 ++- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c8bb490bb..12d871c55 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -584,7 +584,7 @@ class Job(object): e.message, self.name), 6001) return @threaded - def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): + def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0): max_logs = 0 wrapper_type = "none" @@ -598,7 +598,7 @@ class Job(object): if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - self.fail_count # - job.fail count + max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count break if wrapper_type != "vertical": remote_logs = (self.script_name + ".out", self.script_name + ".err") @@ -620,7 +620,7 @@ class Job(object): retries = 5 sleeptime = 0 i = 0 - sleep(5) + sleep(1) no_continue = False try: while (not out_exist and not err_exist) and i < retries: @@ -652,23 +652,26 @@ class Job(object): # unifying names for log files if remote_logs != local_logs: if wrapper_type == "vertical": # internal_Retrial mechanism - other_logs = max_logs - 1 # All except the first log ran + last_ran = 0 stat_file=self.script_name[:-4]+"_STAT_" - while other_logs >= 0: # pick all except the last retrial, last retrial is always the max_logs value + for i in range(max_logs+1): + if platform.get_stat_file_by_retrials(stat_file + str(i)): + last_ran = i + 1 + break + while last_ran <= max_logs: try: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) time_stamp = "1970" total_stats = (0,0,0,"FAILED") - if platform.get_stat_file_by_retrials(stat_file+str(other_logs)): # if stat_{retrial} exist, the job has failed at least once - - with open(os.path.join(tmp_path,stat_file+str(other_logs)), 'r+') as f: + if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): + with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: time_stamp = str(f.readline()[:-1]) total_stats = (f.readline()[:-1],f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]) self.write_total_stat_by_retries(total_stats) - platform.remove_stat_file_by_retrials(stat_file+str(other_logs)) + platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") - r_log = (remote_logs[0][:-1]+str(other_logs),remote_logs[1][:-1]+str(other_logs)) + r_log = (remote_logs[0][:-1]+str(max_logs),remote_logs[1][:-1]+str(max_logs)) self.synchronize_logs(platform, r_log, l_log) platform.get_logs_files(self.expid, l_log) try: @@ -676,11 +679,11 @@ class Job(object): platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: pass - other_logs = other_logs - 1 # no more retrials + max_logs = max_logs - 1 # no more retrials else: - other_logs = 0 # exit, no more logs + max_logs = last_ran - 1 # exit, no more logs except: - other_logs = 0 # exit + max_logs = 0 # exit self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) @@ -793,7 +796,7 @@ class Job(object): if as_conf.get_disable_recovery_threads(self.platform.name) == "true": self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) else: - self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name) + self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count)) list_of_wrappers = as_conf.get_wrapper_multi() if len(list_of_wrappers) == 0: list_of_wrappers.append("wrapper") @@ -801,11 +804,10 @@ class Job(object): if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - self.fail_count # - job.fail count - for i in range(max_logs): + max_logs = int(as_conf.get_retrials()) + for i in range(0,max_logs): self.inc_fail_count() break - return self.status @staticmethod diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 06e6a83f9..c47acb515 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -128,7 +128,6 @@ class LsfHeader(object): def run(self): out = str(self.template) + '.' + str(self.id_run) + '.out' err = str(self.template) + '.' + str(self.id_run) + '.err' - command = str(self.template) + ' ' + str(self.id_run) + ' ' + os.getcwd() (self.status) = getstatusoutput(command + ' > ' + out + ' 2> ' + err) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 6b306b0a3..3ac2b8418 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -434,7 +434,6 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): current = {1} current.start() os.system("echo "+date2str(datetime.now(), 'S')+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) current.join() os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) @@ -452,8 +451,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials)) open(failed_wrapper,'w').close() open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" -- GitLab From 749cd22f99a7bed421505c9ac2accb4832a98d37 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 15 Sep 2021 18:18:16 +0200 Subject: [PATCH 08/14] weird error on rename --- autosubmit/job/job.py | 56 +++++++++++-------- autosubmit/platforms/paramiko_platform.py | 5 +- .../platforms/wrappers/wrapper_builder.py | 4 +- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 12d871c55..5ae41cfaf 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -591,6 +591,13 @@ class Job(object): try: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() + submitter = self._get_submitter(as_conf) + submitter.load_platforms(as_conf) + platform = submitter.platforms[platform_name.lower()] + try: + platform.test_connection() + except: + pass list_of_wrappers = as_conf.get_wrapper_multi() if len(list_of_wrappers) == 0: list_of_wrappers.append("wrapper") @@ -598,20 +605,19 @@ class Job(object): if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": + max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count + last_log = max_logs + stat_file = self.script_name[:-4] + "_STAT_" + for i in range(max_logs+1): + if platform.get_stat_file_by_retrials(stat_file + str(i)): + last_log = i + break break if wrapper_type != "vertical": remote_logs = (self.script_name + ".out", self.script_name + ".err") else: - remote_logs = (self.script_name + ".out." + str(max_logs) , self.script_name + ".err." + str(max_logs)) - - submitter = self._get_submitter(as_conf) - submitter.load_platforms(as_conf) - platform = submitter.platforms[platform_name.lower()] - try: - platform.test_connection() - except: - pass + remote_logs = (self.script_name + ".out." + str(last_log) , self.script_name + ".err." + str(last_log)) except Exception as e: Log.printlog( "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) @@ -652,13 +658,12 @@ class Job(object): # unifying names for log files if remote_logs != local_logs: if wrapper_type == "vertical": # internal_Retrial mechanism - last_ran = 0 - stat_file=self.script_name[:-4]+"_STAT_" - for i in range(max_logs+1): - if platform.get_stat_file_by_retrials(stat_file + str(i)): - last_ran = i + 1 - break - while last_ran <= max_logs: + log_start = last_log + 1 + while log_start <= max_logs: + try: + platform.restore_connection() + except: + pass try: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) @@ -672,18 +677,22 @@ class Job(object): platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") r_log = (remote_logs[0][:-1]+str(max_logs),remote_logs[1][:-1]+str(max_logs)) - self.synchronize_logs(platform, r_log, l_log) + self.synchronize_logs(platform, r_log, l_log,last = False) platform.get_logs_files(self.expid, l_log) try: for local_log in l_log: platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: pass - max_logs = max_logs - 1 # no more retrials + max_logs = max_logs - 1 else: - max_logs = last_ran - 1 # exit, no more logs + max_logs = -1 # exit, no more logs except: - max_logs = 0 # exit + max_logs = -1 # exit + try: + platform.restore_connection() + except: + pass self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) @@ -1351,11 +1360,12 @@ class Job(object): parent.children.remove(self) self.parents.remove(parent) - def synchronize_logs(self, platform, remote_logs, local_logs): + def synchronize_logs(self, platform, remote_logs, local_logs, last = True): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err - self.local_logs = local_logs - self.remote_logs = copy.deepcopy(local_logs) + if last: + self.local_logs = local_logs + self.remote_logs = copy.deepcopy(local_logs) class WrapperJob(Job): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index ab5d3a8fd..0d2b387c7 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -340,8 +340,9 @@ class ParamikoPlatform(Platform): """ try: path_root = self.get_files_path() - self._ftpChannel.rename(os.path.join(path_root, src), - os.path.join(path_root, dest)) + src = os.path.join(path_root, src) + dest = os.path.join(path_root, dest) + self._ftpChannel.rename(src,dest) return True except IOError as e: diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 3ac2b8418..eaea190c5 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -451,8 +451,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) open(failed_wrapper,'w').close() open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" -- GitLab From 03fd2f437df85629754234c3c163061728bcb1b2 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 16 Sep 2021 11:44:17 +0200 Subject: [PATCH 09/14] All works, but there is a bug in the order of the stat file --- autosubmit/job/job.py | 34 ++++++++++--------- .../platforms/wrappers/wrapper_builder.py | 6 ++-- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 5ae41cfaf..c50141f2c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -605,7 +605,6 @@ class Job(object): if self.section in as_conf.get_wrapper_jobs(wrapper_section): wrapper_type = as_conf.get_wrapper_type(wrapper_section) if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count last_log = max_logs stat_file = self.script_name[:-4] + "_STAT_" @@ -626,7 +625,7 @@ class Job(object): retries = 5 sleeptime = 0 i = 0 - sleep(1) + sleep(4) no_continue = False try: while (not out_exist and not err_exist) and i < retries: @@ -659,24 +658,22 @@ class Job(object): if remote_logs != local_logs: if wrapper_type == "vertical": # internal_Retrial mechanism log_start = last_log + 1 + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + time_stamp = "1970" + total_stats = (0, 0, 0, "FAILED") while log_start <= max_logs: try: - platform.restore_connection() - except: - pass - try: - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - time_stamp = "1970" - total_stats = (0,0,0,"FAILED") if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): + #self.write_total_stat_by_retries_fix_newline(total_stats) with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: - time_stamp = str(f.readline()[:-1]) + time_stamp = str(f.readline()[:-1])+"_"+str(max_logs) total_stats = (f.readline()[:-1],f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]) + self.write_total_stat_by_retries(total_stats) platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") - r_log = (remote_logs[0][:-1]+str(max_logs),remote_logs[1][:-1]+str(max_logs)) + r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) ) self.synchronize_logs(platform, r_log, l_log,last = False) platform.get_logs_files(self.expid, l_log) try: @@ -793,7 +790,7 @@ class Job(object): self.write_start_time() # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: - self.write_end_time(self.status == Status.COMPLETED) + # New thread, check if file exist expid = copy.deepcopy(self.expid) platform_name = copy.deepcopy(self.platform_name.lower()) @@ -807,6 +804,7 @@ class Job(object): else: self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count)) list_of_wrappers = as_conf.get_wrapper_multi() + wrapper_type = "none" if len(list_of_wrappers) == 0: list_of_wrappers.append("wrapper") for wrapper_section in list_of_wrappers: # fastlook @@ -817,6 +815,7 @@ class Job(object): for i in range(0,max_logs): self.inc_fail_count() break + self.write_end_time(self.status == Status.COMPLETED) return self.status @staticmethod @@ -1045,10 +1044,8 @@ class Job(object): else: if self.type == Type.BASH: template = 'sleep 1' - template += 'crash' elif self.type == Type.PYTHON: template = 'time.sleep(1)' - template += 'crash' elif self.type == Type.R: template = 'Sys.sleep(1)' else: @@ -1281,6 +1278,11 @@ class Job(object): thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() + def write_total_stat_by_retries_fix_newline(self): + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write('\n') + f.close() def write_total_stat_by_retries(self,total_stats): """ Writes all data to TOTAL_STATS file @@ -1289,7 +1291,7 @@ class Job(object): """ path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') #todo jobdatastructure f = open(path, 'a') - f.write(total_stats[0]+' '+total_stats[1]+' '+total_stats[2]+' '+total_stats[3]) + f.write('\n'+total_stats[0]+' '+total_stats[1]+' '+total_stats[2]+' '+total_stats[3]) out, err = self.local_logs path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index eaea190c5..bb2ad023a 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -434,9 +434,9 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): current = {1} current.start() os.system("echo "+date2str(datetime.now(), 'S')+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) current.join() - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) job_retrials = job_retrials - 1 """).format(jobs_list, thread,self.retrials,'\n'.ljust(13)) @@ -451,7 +451,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: - os.system("echo $(date +%s) >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) open(failed_wrapper,'w').close() open(failed_path, 'w').close() -- GitLab From f799223e48b1a4f5fc7529a2aa68c28d6054c107 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 16 Sep 2021 12:16:47 +0200 Subject: [PATCH 10/14] pipeline --- test/unit/test_wrappers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index ec0c26b79..b8460452a 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -166,7 +166,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_method = Mock(return_value='ASThread') self.config.get_wrapper_queue = Mock(return_value='debug') self.config.get_wrapper_policy = Mock(return_value='flexible') - self.config.get_wrapper_retrials = Mock(return_value=0) + self.config.get_retrials = Mock(return_value=0) self.config.get = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self._platform, self.job_list) -- GitLab From 289a08175c836f4246a53720a3e7eda2c12689aa Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Sep 2021 16:53:56 +0200 Subject: [PATCH 11/14] Fix for total_stat file order Fix for non-vertical wrappers jobs, ready to merge --- autosubmit/job/job.py | 270 +++++++++++++------------ autosubmit/job/job_list_persistence.py | 4 +- autosubmit/job/job_packager.py | 2 +- 3 files changed, 149 insertions(+), 127 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c50141f2c..65704b15e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -80,6 +80,7 @@ class Job(object): return "{0} STATUS: {1}".format(self.name, self.status) def __init__(self, name, job_id, status, priority): + self.wrapper_type = "none" self._wrapper_queue = None self._platform = None self._queue = None @@ -586,8 +587,6 @@ class Job(object): @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0): max_logs = 0 - wrapper_type = "none" - try: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() @@ -598,25 +597,20 @@ class Job(object): platform.test_connection() except: pass - list_of_wrappers = as_conf.get_wrapper_multi() - if len(list_of_wrappers) == 0: - list_of_wrappers.append("wrapper") - for wrapper_section in list_of_wrappers: #fastlook - if self.section in as_conf.get_wrapper_jobs(wrapper_section): - wrapper_type = as_conf.get_wrapper_type(wrapper_section) - if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count - last_log = max_logs - stat_file = self.script_name[:-4] + "_STAT_" - for i in range(max_logs+1): - if platform.get_stat_file_by_retrials(stat_file + str(i)): - last_log = i - break - break - if wrapper_type != "vertical": - remote_logs = (self.script_name + ".out", self.script_name + ".err") + max_logs = 1 + last_log = 1 + if self.wrapper_type == "vertical": + max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count + last_log = max_logs + stat_file = self.script_name[:-4] + "_STAT_" + for i in range(max_logs+1): + if platform.get_stat_file_by_retrials(stat_file + str(i)): + last_log = i + break + remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) else: - remote_logs = (self.script_name + ".out." + str(last_log) , self.script_name + ".err." + str(last_log)) + remote_logs = (self.script_name + ".out", self.script_name + ".err") + except Exception as e: Log.printlog( "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) @@ -625,7 +619,7 @@ class Job(object): retries = 5 sleeptime = 0 i = 0 - sleep(4) + #sleep(4) no_continue = False try: while (not out_exist and not err_exist) and i < retries: @@ -656,40 +650,35 @@ class Job(object): if copy_remote_logs: # unifying names for log files if remote_logs != local_logs: - if wrapper_type == "vertical": # internal_Retrial mechanism - log_start = last_log + 1 + if self.wrapper_type == "vertical": # internal_Retrial mechanism + log_start = last_log exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) time_stamp = "1970" - total_stats = (0, 0, 0, "FAILED") + total_stats = ("", "", "", "FAILED") while log_start <= max_logs: try: if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): - #self.write_total_stat_by_retries_fix_newline(total_stats) with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: time_stamp = str(f.readline()[:-1])+"_"+str(max_logs) total_stats = (f.readline()[:-1],f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]) - - self.write_total_stat_by_retries(total_stats) + self.write_total_stat_by_retries(total_stats,max_logs == last_log) platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) ) - self.synchronize_logs(platform, r_log, l_log,last = False) - platform.get_logs_files(self.expid, l_log) - try: - for local_log in l_log: - platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - pass + if max_logs != last_log: + self.synchronize_logs(platform, r_log, l_log,last = False) + platform.get_logs_files(self.expid, l_log) + try: + for local_log in l_log: + platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + pass max_logs = max_logs - 1 else: max_logs = -1 # exit, no more logs except: max_logs = -1 # exit - try: - platform.restore_connection() - except: - pass self.synchronize_logs(platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) platform.get_logs_files(self.expid, remote_logs) @@ -790,7 +779,6 @@ class Job(object): self.write_start_time() # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: - # New thread, check if file exist expid = copy.deepcopy(self.expid) platform_name = copy.deepcopy(self.platform_name.lower()) @@ -803,19 +791,12 @@ class Job(object): self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) else: self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count)) - list_of_wrappers = as_conf.get_wrapper_multi() - wrapper_type = "none" - if len(list_of_wrappers) == 0: - list_of_wrappers.append("wrapper") - for wrapper_section in list_of_wrappers: # fastlook - if self.section in as_conf.get_wrapper_jobs(wrapper_section): - wrapper_type = as_conf.get_wrapper_type(wrapper_section) - if wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - for i in range(0,max_logs): - self.inc_fail_count() - break - self.write_end_time(self.status == Status.COMPLETED) + if self.wrapper_type == "vertical": + max_logs = int(as_conf.get_retrials()) + for i in range(0,max_logs): + self.inc_fail_count() + else: + self.write_end_time(self.status == Status.COMPLETED) return self.status @staticmethod @@ -1043,9 +1024,9 @@ class Job(object): template += template_file.read() else: if self.type == Type.BASH: - template = 'sleep 1' + template = 'sleep 1crash' elif self.type == Type.PYTHON: - template = 'time.sleep(1)' + template = 'time.sleep(1)crash' elif self.type == Type.R: template = 'Sys.sleep(1)' else: @@ -1196,112 +1177,153 @@ class Job(object): str(set(parameters) - set(variables))), 6013) return out - def write_submit_time(self): + def write_submit_time(self,enabled = False): """ Writes submit date and time to TOTAL_STATS file """ - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + data_time = ["",time.time()] + if self.wrapper_type != "vertical" or enabled: + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + else: + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') if os.path.exists(path): f = open(path, 'a') f.write('\n') else: f = open(path, 'w') - f.write(date2str(datetime.datetime.now(), 'S')) + if not enabled: + f.write(date2str(datetime.datetime.now(), 'S')) + if self.wrapper_type == "vertical": + f.write(" "+time.time()) + else: + path2 = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') + f2 = open(path, 'r') + data_time = "" + for line in f2.readline(): + if len(line) > 0: + data_time = line.split(" ") + f.write(data_time[0]) + f2.close() + try: + os.remove(path2) + except: + pass + # Get # Writing database - JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) + if self.wrapper_type != "vertical" or enabled: + JobDataStructure(self.expid).write_submit_time(self.name, data_time[1], Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) - def write_start_time(self): + def write_start_time(self, enabled = False): """ Writes start date and time to TOTAL_STATS file :return: True if succesful, False otherwise :rtype: bool """ - if self._platform.get_stat_file(self.name, retries=5): #fastlook - start_time = self.check_start_time() - else: - Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( - self.name), 3000) - start_time = time.time() - timestamp = date2str(datetime.datetime.now(), 'S') + if self.wrapper_type != "vertical" or enabled: + if self._platform.get_stat_file(self.name, retries=5): #fastlook + start_time = self.check_start_time() + else: + Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( + self.name), 3000) + start_time = time.time() + timestamp = date2str(datetime.datetime.now(), 'S') - self.local_logs = (self.name + "." + timestamp + - ".out", self.name + "." + timestamp + ".err") + self.local_logs = (self.name + "." + timestamp + + ".out", self.name + "." + timestamp + ".err") - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) - # Writing database - JobDataStructure(self.expid).write_start_time(self.name, start_time, Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) + # Writing database + JobDataStructure(self.expid).write_start_time(self.name, start_time, Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) return True - def write_end_time(self, completed): + def write_end_time(self, completed,enabled = False): """ Writes ends date and time to TOTAL_STATS file :param completed: True if job was completed successfully, False otherwise :type completed: bool """ - self._platform.get_stat_file(self.name, retries=5) - end_time = self.check_end_time() - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - finish_time = None - final_status = None - if end_time > 0: - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) - # date2str(datetime.datetime.fromtimestamp(end_time), 'S') - finish_time = end_time - else: - f.write(date2str(datetime.datetime.now(), 'S')) - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') - f.write(' ') - if completed: - final_status = "COMPLETED" - f.write('COMPLETED') - else: - final_status = "FAILED" - f.write('FAILED') - out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) - # Launch first as simple non-threaded function - JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, - self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) - # Launch second as threaded function - thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) - thread_write_finish.name = "JOB_data_{}".format(self.name) - thread_write_finish.start() + if self.wrapper_type != "vertical" or enabled: + self._platform.get_stat_file(self.name, retries=5) + end_time = self.check_end_time() + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + finish_time = None + final_status = None + if end_time > 0: + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) + # date2str(datetime.datetime.fromtimestamp(end_time), 'S') + finish_time = end_time + else: + f.write(date2str(datetime.datetime.now(), 'S')) + finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + f.write(' ') + if completed: + final_status = "COMPLETED" + f.write('COMPLETED') + else: + final_status = "FAILED" + f.write('FAILED') + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, + self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) + # Launch second as threaded function + thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) + thread_write_finish.name = "JOB_data_{}".format(self.name) + thread_write_finish.start() def write_total_stat_by_retries_fix_newline(self): path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write('\n') f.close() - def write_total_stat_by_retries(self,total_stats): + + def write_total_stat_by_retries(self,total_stats, last_retrial = False): """ Writes all data to TOTAL_STATS file :param total_stats: data gathered by the wrapper :type completed: str """ - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') #todo jobdatastructure - f = open(path, 'a') - f.write('\n'+total_stats[0]+' '+total_stats[1]+' '+total_stats[2]+' '+total_stats[3]) - out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) - # Launch first as simple non-threaded function - #JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, - # self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) - # Launch second as threaded function - #thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - # self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) - #thread_write_finish.name = "JOB_data_{}".format(self.name) - #thread_write_finish.start() + if last_retrial: + self.write_submit_time(True) + else: + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + if last_retrial: + f.write('\n' + total_stats[1] + ' ' + total_stats[2] + ' ' + total_stats[3]) + else: + f.write('\n' + total_stats[0] + ' ' + total_stats[1] + ' ' + total_stats[2] + ' ' + total_stats[3]) + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + if not last_retrial: + JobDataStructure(self.expid).write_submit_time(self.name, total_stats[0], Status.VALUE_TO_KEY[ + self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self.queue, self.date, self.member, self.section, + self.chunk, self.platform_name, self.id, self.packed, + self._wrapper_queue) + JobDataStructure(self.expid).write_start_time(self.name, total_stats[1], Status.VALUE_TO_KEY[ + self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, + self.section, self.chunk, self.platform_name, self.id, + self.packed, self._wrapper_queue) + JobDataStructure(self.expid).write_finish_time(self.name, total_stats[2], total_stats[3], self.processors, + self.wallclock, self._queue, self.date, + self.member, self.section, self.chunk, self.platform_name, + self.id, self.platform, self.packed, + [job.id for job in self._parents], True, None, out, err, + self._wrapper_queue) def check_started_after(self, date_limit): """ diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 3065905a9..32c5348af 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -91,7 +91,7 @@ class JobListPersistencePkl(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] pickle.dump(jobs_data, fd) Log.debug('Job list saved') @@ -134,7 +134,7 @@ class JobListPersistenceDb(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) def _reset_table(self): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index f5bcad57c..c917a508d 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -283,7 +283,6 @@ class JobPackager(object): if self.wrapper_type[self.current_wrapper_section] == 'vertical': built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], wrapper_limits) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], wrapper_limits, section) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: @@ -299,6 +298,7 @@ class JobPackager(object): aux_jobs = [] # Check failed jobs first for job in p.jobs: + job.wrapper_type = self.wrapper_type[self.current_wrapper_section] if len(self._jobs_list.jobs_to_run_first) > 0: if job not in self._jobs_list.jobs_to_run_first: job.packed = False -- GitLab From 80a86faa2b0b01965ac567aba6c3f2317c2110e3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 22 Sep 2021 19:15:50 +0200 Subject: [PATCH 12/14] Fixed some bugs related with jobdatastrucute , changed datatime to time.time , improved loop mechanism, fixed an issue with logs printing wrong times --- autosubmit/job/job.py | 109 ++++++++++-------- .../platforms/wrappers/wrapper_builder.py | 13 ++- 2 files changed, 70 insertions(+), 52 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 65704b15e..657c16d09 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -655,41 +655,56 @@ class Job(object): exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) time_stamp = "1970" - total_stats = ("", "", "", "FAILED") + total_stats = ["", "","FAILED"] while log_start <= max_logs: try: if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: - time_stamp = str(f.readline()[:-1])+"_"+str(max_logs) - total_stats = (f.readline()[:-1],f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]) - self.write_total_stat_by_retries(total_stats,max_logs == last_log) + total_stats = [f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]] + try: + total_stats[0] = float(total_stats[0]) + total_stats[1] = float(total_stats[1]) + except: + total_stats[0] = int(str(total_stats[0]).split('.')[0]) + total_stats[1] = int(str(total_stats[1]).split('.')[0]) + if max_logs != ( int(as_conf.get_retrials()) - fail_count ): + time_stamp = date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + else: + with open(os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP'), 'r+') as f2: + for line in f2.readlines(): + if len(line) > 0: + time_stamp = line.split(" ")[0] + + self.write_total_stat_by_retries(total_stats,max_logs == ( int(as_conf.get_retrials()) - fail_count )) platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) - l_log = (self.script_name[:-4] +"."+ time_stamp +".out", self.script_name[:-4] +"."+ time_stamp + ".err") + l_log = (self.script_name[:-4] + "." + time_stamp + ".out",self.script_name[:-4] + "." + time_stamp + ".err") r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) ) - if max_logs != last_log: - self.synchronize_logs(platform, r_log, l_log,last = False) - platform.get_logs_files(self.expid, l_log) - try: - for local_log in l_log: - platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - pass + self.synchronize_logs(platform, r_log, l_log,last = False) + platform.get_logs_files(self.expid, l_log) + try: + for local_log in l_log: + platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + pass max_logs = max_logs - 1 else: max_logs = -1 # exit, no more logs - except: + except BaseException as e: max_logs = -1 # exit - self.synchronize_logs(platform, remote_logs, local_logs) - remote_logs = copy.deepcopy(local_logs) - platform.get_logs_files(self.expid, remote_logs) - # Update the logs with Autosubmit Job Id Brand - try: - for local_log in local_logs: - platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( - e.message, self.name)) + local_logs = copy.deepcopy(l_log) + remote_logs = copy.deepcopy(local_logs) + if self.wrapper_type != "vertical": + self.synchronize_logs(platform, remote_logs, local_logs) + remote_logs = copy.deepcopy(local_logs) + platform.get_logs_files(self.expid, remote_logs) + # Update the logs with Autosubmit Job Id Brand + try: + for local_log in local_logs: + platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( + e.message, self.name)) try: platform.closeConnection() except BaseException as e: @@ -1024,9 +1039,9 @@ class Job(object): template += template_file.read() else: if self.type == Type.BASH: - template = 'sleep 1crash' + template = 'sleep 1 ; crash' elif self.type == Type.PYTHON: - template = 'time.sleep(1)crash' + template = 'time.sleep(1)' elif self.type == Type.R: template = 'Sys.sleep(1)' else: @@ -1194,27 +1209,30 @@ class Job(object): if not enabled: f.write(date2str(datetime.datetime.now(), 'S')) if self.wrapper_type == "vertical": - f.write(" "+time.time()) + f.write(" "+str(time.time())) else: path2 = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') - f2 = open(path, 'r') - data_time = "" - for line in f2.readline(): + f2 = open(path2, 'r') + for line in f2.readlines(): if len(line) > 0: data_time = line.split(" ") + try: + data_time[1] = float(data_time[1]) + except: + data_time[1] = int(data_time[1]) f.write(data_time[0]) f2.close() try: os.remove(path2) except: pass - # Get # Writing database if self.wrapper_type != "vertical" or enabled: JobDataStructure(self.expid).write_submit_time(self.name, data_time[1], Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) + def write_start_time(self, enabled = False): """ Writes start date and time to TOTAL_STATS file @@ -1289,36 +1307,35 @@ class Job(object): f.write('\n') f.close() - def write_total_stat_by_retries(self,total_stats, last_retrial = False): + def write_total_stat_by_retries(self,total_stats, first_retrial = False): """ Writes all data to TOTAL_STATS file :param total_stats: data gathered by the wrapper :type completed: str """ - if last_retrial: + if first_retrial: self.write_submit_time(True) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + if first_retrial: + f.write(" " + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) else: - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - if last_retrial: - f.write('\n' + total_stats[1] + ' ' + total_stats[2] + ' ' + total_stats[3]) - else: - f.write('\n' + total_stats[0] + ' ' + total_stats[1] + ' ' + total_stats[2] + ' ' + total_stats[3]) - out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) - # Launch first as simple non-threaded function - if not last_retrial: + f.write('\n' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') + ' ' + date2str(datetime.datetime.fromtimestamp(total_stats[1]), 'S') + ' ' + total_stats[2]) + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + if not first_retrial: JobDataStructure(self.expid).write_submit_time(self.name, total_stats[0], Status.VALUE_TO_KEY[ self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) - JobDataStructure(self.expid).write_start_time(self.name, total_stats[1], Status.VALUE_TO_KEY[ + JobDataStructure(self.expid).write_start_time(self.name, total_stats[0], Status.VALUE_TO_KEY[ self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) - JobDataStructure(self.expid).write_finish_time(self.name, total_stats[2], total_stats[3], self.processors, + JobDataStructure(self.expid).write_finish_time(self.name, total_stats[1], total_stats[2], self.processors, self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index bb2ad023a..39ede5539 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -116,6 +116,7 @@ class PythonWrapperBuilder(WrapperBuilder): from threading import Thread from commands import getstatusoutput from datetime import datetime + import time from math import ceil from collections import OrderedDict import copy @@ -433,10 +434,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while job_retrials >= 0 and not completed: current = {1} current.start() - os.system("echo "+date2str(datetime.now(), 'S')+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) - os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) + os.system("echo "+str(time.time())+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) #Start/submit running current.join() - os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) job_retrials = job_retrials - 1 """).format(jobs_list, thread,self.retrials,'\n'.ljust(13)) @@ -451,14 +450,16 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" else: - os.system("echo "+date2str(datetime.now(), 'S')+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) - open(failed_wrapper,'w').close() - open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" #{1} """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" + if not os.path.exists(completed_path): + open(failed_wrapper,'w').close() + open(failed_path, 'w').close() + if os.path.exists(failed_wrapper): os.remove(os.path.join(os.getcwd(),wrapper_id)) wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") -- GitLab From 84753e7fdb35cf83ed68cba1804b5d485b8c394c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 22 Sep 2021 19:16:50 +0200 Subject: [PATCH 13/14] Restored correct sleep values when using none template and disable the crash flag --- autosubmit/job/job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 657c16d09..548cc5ab0 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1039,11 +1039,11 @@ class Job(object): template += template_file.read() else: if self.type == Type.BASH: - template = 'sleep 1 ; crash' + template = 'sleep 10' elif self.type == Type.PYTHON: - template = 'time.sleep(1)' + template = 'time.sleep(10)' elif self.type == Type.R: - template = 'Sys.sleep(1)' + template = 'Sys.sleep(10)' else: template = '' except: -- GitLab From 1902efd4119207e117ef4e1b18978e8886fbf7e7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 22 Sep 2021 19:58:37 +0200 Subject: [PATCH 14/14] Fix when job finishes earlier, added retrial mechnaism and improved the speed --- autosubmit/job/job.py | 23 +++++++++++++------ autosubmit/platforms/platform.py | 21 ++++++++++++++++- .../platforms/wrappers/wrapper_builder.py | 5 ++-- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 548cc5ab0..94e1a5838 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -587,6 +587,7 @@ class Job(object): @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0): max_logs = 0 + sleep(5) try: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() @@ -597,17 +598,24 @@ class Job(object): platform.test_connection() except: pass - max_logs = 1 - last_log = 1 + max_logs = int(as_conf.get_retrials()) - fail_count + last_log = int(as_conf.get_retrials()) - fail_count if self.wrapper_type == "vertical": - max_logs = int(as_conf.get_retrials()) - fail_count # - job.fail count - last_log = max_logs stat_file = self.script_name[:-4] + "_STAT_" - for i in range(max_logs+1): - if platform.get_stat_file_by_retrials(stat_file + str(i)): + found = False + retrials = 0 + while retrials < 3 and not found: + sleep(2) + if platform.check_stat_file_by_retrials(stat_file + str(max_logs)): + found = True + retrials = retrials - 1 + for i in range(max_logs-1,-1,-1): + if platform.check_stat_file_by_retrials(stat_file + str(i)): last_log = i + else: break remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) + else: remote_logs = (self.script_name + ".out", self.script_name + ".err") @@ -619,7 +627,6 @@ class Job(object): retries = 5 sleeptime = 0 i = 0 - #sleep(4) no_continue = False try: while (not out_exist and not err_exist) and i < retries: @@ -648,6 +655,8 @@ class Job(object): retries, remote_logs[0], remote_logs[1])) return if copy_remote_logs: + l_log = copy.deepcopy(local_logs) + r_log = copy.deepcopy(remote_logs) # unifying names for log files if remote_logs != local_logs: if self.wrapper_type == "vertical": # internal_Retrial mechanism diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index aa3fdc9fa..11b301373 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -314,6 +314,22 @@ class Platform(object): Log.debug('{0}_STAT file not found', job_name) return False + def check_stat_file_by_retrials(self, job_name, retries=0): + """ + check *STAT* file + + :param retries: number of intents to get the completed files + :type retries: int + :param job_name: name of job to check + :type job_name: str + :return: True if succesful, False otherwise + :rtype: bool + """ + filename = job_name + if self.check_file_exists(filename): + return True + else: + return False def get_stat_file_by_retrials(self, job_name, retries=0): """ Copies *STAT* files from remote to local @@ -333,7 +349,10 @@ class Platform(object): if self.check_file_exists(filename): if self.get_file(filename, True): return True - return False + else: + return False + else: + return False def get_files_path(self): """ diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 39ede5539..838b3b0d1 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -446,13 +446,14 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) + os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed if os.path.exists(completed_path): completed = True print datetime.now(), "The job ", current.template," has been COMPLETED" + os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) else: - os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) print datetime.now(), "The job ", current.template," has FAILED" + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #{1} """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" -- GitLab