From 5ba309dfb8d63913e054de59d771f6ebce4d9821 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 25 Feb 2021 12:45:50 +0100 Subject: [PATCH 1/3] Added debug info, Solved two bug related with failed jobs, job_Tracker and recovery --- autosubmit/autosubmit.py | 38 +++++++++++++++---------------- autosubmit/database/db_jobdata.py | 3 ++- autosubmit/job/job_list.py | 21 ++++------------- autosubmit/platforms/platform.py | 2 +- 4 files changed, 26 insertions(+), 38 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 06009b325..19fd3ef95 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1509,11 +1509,11 @@ class Autosubmit: list_prevStatus = [] queuing_jobs = job_list.get_in_queue_grouped_id( platform) + Log.debug('Checking jobs for platform={0}'.format(platform.name)) for job_id, job in queuing_jobs.items(): # Check Wrappers one-by-one if job_list.job_package_map and job_id in job_list.job_package_map: - Log.debug( - 'Checking wrapper job with id ' + str(job_id)) + Log.debug('Checking Wrapper {0}'.format(str(job_id))) wrapper_job = job_list.job_package_map[job_id] # Setting prev_status as an easy way to check status change for inner jobs if as_conf.get_notifications() == 'true': @@ -1560,10 +1560,8 @@ class Autosubmit: # Detect and store changes job_changes_tracker = {job.name: ( job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} - # job_data_structure.process_status_changes( - # job_changes_tracker) - # job_changes_tracker = {} else: # Prepare jobs, if slurm check all active jobs at once. + # TODO: All of this should be a function, put in slurm_platform file, paramiko and ecmwf check_jobs to clean the code job = job[0] prev_status = job.status if job.status == Status.FAILED: @@ -1590,16 +1588,13 @@ class Autosubmit: as_conf.get_mails_to()) save = True - if platform.type == "slurm" and list_jobid != "": - slurm.append( - [platform, list_jobid, list_prevStatus, completed_joblist]) - # END Normal jobs + wrappers - # CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) - for platform_jobs in slurm: + if platform.type == "slurm" and list_jobid != "": # IF there are jobs in an slurm platform, prepare the check them at once + slurm.append([platform, list_jobid, list_prevStatus, completed_joblist]) + for platform_jobs in slurm: # Check slurm single jobs, the other platforms has already been checked. platform = platform_jobs[0] jobs_to_check = platform_jobs[1] - platform.check_Alljobs( - platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + Log.debug("Checking all jobs at once") + platform.check_Alljobs(platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] @@ -1614,6 +1609,7 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True + Log.debug("End of checking") # End Check Current jobs save2 = job_list.update_list( as_conf, submitter=submitter) @@ -1645,15 +1641,22 @@ class Autosubmit: # Save job_list if not is a failed submitted job recovery = True try: + failed_jobs = job_list.get_failed() + failed_jobs += job_list.get_ready() + failed_names = {} + for job in failed_jobs: + if job.fail_count > 0: + failed_names[job.name] = job.fail_count job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) Autosubmit._load_parameters( as_conf, job_list, submitter.platforms) for job in job_list.get_job_list(): + if job in failed_names: + job.fail_count = failed_names[job.name] if job.platform_name is None: job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower( - )] + job.platform = submitter.platforms[job.platform_name.lower()] packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) @@ -1680,10 +1683,7 @@ class Autosubmit: if main_loop_retrials > 0: main_loop_retrials = main_loop_retrials - 1 try: - Autosubmit.restore_platforms(platforms_to_test) platforms_to_test = set() - Autosubmit.restore_platforms(platforms_to_test) - for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch @@ -1692,6 +1692,7 @@ class Autosubmit: )] # noinspection PyTypeChecker platforms_to_test.add(job.platform) + Autosubmit.restore_platforms(platforms_to_test) except BaseException: raise AutosubmitCritical( "Autosubmit couldn't recover the platforms", 7050, e.message) @@ -1705,7 +1706,6 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise - # 3 Log.result("No more jobs to run.") # Updating job data header with current information job_data_structure.validate_current_run( diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index c39c91b1a..1058896e1 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1221,7 +1221,8 @@ class JobDataStructure(MainDataBase): platform_object.write_job_extrainfo( job_data_last.get_hdata(), out_file_path) except Exception as exp: - Log.info(traceback.format_exc()) + #Log.info(traceback.format_exc()) #TODO Wilmer, this is stopping autosubmit, "Tuple index out of range" + Log.info("Couldn't write finish time {0}",exp.message) Log.warning(str(exp)) #energy = 0 diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index ef4e51483..ee23ac7db 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1242,8 +1242,6 @@ class JobList(object): save = False if self.update_from_file(store_change): save = store_change - - # reset jobs that has failed less than 10 times Log.debug('Updating FAILED jobs') write_log_status = False for job in self.get_failed(): @@ -1254,20 +1252,13 @@ class JobList(object): retrials = job.retrials if job.fail_count <= retrials: - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY - if submitter is not None: - job.platform = submitter.platforms[job.platform_name.lower( - )] - job.platform.test_connection() - job.platform = submitter.platforms[job.platform_name.lower( - )] - job.platform.test_connection() - + #if submitter is not None: + # job.platform = submitter.platforms[job.platform_name.lower()] + # job.platform.test_connection() job.id = None - job.packed = False save = True Log.debug( @@ -1284,10 +1275,6 @@ class JobList(object): save = True Log.debug( "Job is failed".format(job.name)) - - - - # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1d40c83bd..a2af24d6d 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -290,7 +290,7 @@ class Platform(object): if self.get_file(filename, True): Log.debug('{0}_STAT file have been transfered', job_name) return True - Log.debug('Something did not work well when transferring the STAT file') + Log.debug('{0}_STAT file not found', job_name) return False def get_files_path(self): -- GitLab From 992e595f654ca4917d3278c433371eb8d00b9239 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 25 Feb 2021 13:39:44 +0100 Subject: [PATCH 2/3] Solved a bug with threads and failed jobs --- autosubmit/job/job.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 7b6d86eb8..775e32084 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -577,9 +577,9 @@ class Job(object): remote_logs = (self.script_name + ".out", self.script_name + ".err") submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) - self._platform = submitter.platforms[platform_name.lower()] + platform = submitter.platforms[platform_name.lower()] try: - self._platform.test_connection() + platform.test_connection() except: pass except Exception as e: @@ -595,12 +595,12 @@ class Job(object): try: while (not out_exist and not err_exist) and i < retries: try: - out_exist = self._platform.check_file_exists( + out_exist = platform.check_file_exists( remote_logs[0], True) except IOError as e: out_exist = False try: - err_exist = self._platform.check_file_exists( + err_exist = platform.check_file_exists( remote_logs[1], True) except IOError as e: err_exists = False @@ -609,7 +609,7 @@ class Job(object): i = i + 1 sleep(sleeptime) try: - self._platform.restore_connection() + platform.restore_connection() except BaseException as e: Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format( e.message, self.name), 6001) @@ -622,19 +622,19 @@ class Job(object): # unifying names for log files if remote_logs != local_logs: self.synchronize_logs( - self._platform, remote_logs, local_logs) + platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) - self._platform.get_logs_files(self.expid, remote_logs) + platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand try: for local_log in local_logs: - self._platform.write_jobid(self.id, os.path.join( + platform.write_jobid(self.id, os.path.join( self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( e.message, self.name)) try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass return @@ -642,7 +642,7 @@ class Job(object): Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass @@ -651,14 +651,14 @@ class Job(object): Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) try: - self._platform.closeConnection() + platform.closeConnection() except: pass return sleep(5) # safe wait before end a thread try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass return -- GitLab From 636b32c6cb30f7d6b46e246c57b317ef395e7ba6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 25 Feb 2021 15:43:52 +0100 Subject: [PATCH 3/3] Solved a bug with threads and failed jobs --- autosubmit/autosubmit.py | 5 +++- autosubmit/job/job_list.py | 55 +++++++++++++++++--------------------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 19fd3ef95..3e79ace5f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1424,7 +1424,7 @@ class Autosubmit: None, jobs[0].platform, as_conf, jobs[0].hold) job_list.job_package_map[jobs[0].id] = wrapper_job Log.debug("Checking job_list current status") - save = job_list.update_list(as_conf) + save = job_list.update_list(as_conf, first_time=True) job_list.save() Log.info( @@ -1491,6 +1491,9 @@ class Autosubmit: total_jobs = len(job_list.get_job_list()) Log.info("\n\n{0} of {1} jobs remaining ({2})".format( total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) + if len(job_list.get_failed()) > 0: + Log.info("{0} jobs has been failed ({1})".format( + len(job_list.get_failed()), time.strftime("%H:%M"))) safetysleeptime = as_conf.get_safetysleeptime() default_retrials = as_conf.get_retrials() check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index ee23ac7db..2de651129 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1229,7 +1229,7 @@ class JobList(object): def parameters(self, value): self._parameters = value - def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None): + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time = False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1244,37 +1244,32 @@ class JobList(object): save = store_change Log.debug('Updating FAILED jobs') write_log_status = False - for job in self.get_failed(): - job.inc_fail_count() - if not hasattr(job, 'retrials') or job.retrials is None: - retrials = as_conf.get_retrials() - else: - retrials = job.retrials - - if job.fail_count <= retrials: - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - #if submitter is not None: - # job.platform = submitter.platforms[job.platform_name.lower()] - # job.platform.test_connection() - job.id = None - job.packed = False - save = True - Log.debug( - "Resetting job: {0} status to: READY for retrial...".format(job.name)) + if not first_time: + for job in self.get_failed(): + if not hasattr(job, 'retrials') or job.retrials is None: + retrials = as_conf.get_retrials() else: - job.status = Status.WAITING - save = True + retrials = job.retrials + if job.fail_count < retrials: + job.inc_fail_count() + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.id = None + job.packed = False + save = True + Log.debug( + "Resetting job: {0} status to: READY for retrial...".format(job.name)) + else: + job.status = Status.WAITING + save = True + job.packed = False + Log.debug( + "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) + else: + job.status = Status.FAILED job.packed = False - Log.debug( - "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) - else: - job.status = Status.FAILED - job.packed = False - save = True - Log.debug( - "Job is failed".format(job.name)) + save = True # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: -- GitLab