diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 06009b32560dd6de54f35cdbadd21414581ecccb..3e79ace5fc68998a5b35091ca596cdf4005c407a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1424,7 +1424,7 @@ class Autosubmit: None, jobs[0].platform, as_conf, jobs[0].hold) job_list.job_package_map[jobs[0].id] = wrapper_job Log.debug("Checking job_list current status") - save = job_list.update_list(as_conf) + save = job_list.update_list(as_conf, first_time=True) job_list.save() Log.info( @@ -1491,6 +1491,9 @@ class Autosubmit: total_jobs = len(job_list.get_job_list()) Log.info("\n\n{0} of {1} jobs remaining ({2})".format( total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) + if len(job_list.get_failed()) > 0: + Log.info("{0} jobs has been failed ({1})".format( + len(job_list.get_failed()), time.strftime("%H:%M"))) safetysleeptime = as_conf.get_safetysleeptime() default_retrials = as_conf.get_retrials() check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() @@ -1509,11 +1512,11 @@ class Autosubmit: list_prevStatus = [] queuing_jobs = job_list.get_in_queue_grouped_id( platform) + Log.debug('Checking jobs for platform={0}'.format(platform.name)) for job_id, job in queuing_jobs.items(): # Check Wrappers one-by-one if job_list.job_package_map and job_id in job_list.job_package_map: - Log.debug( - 'Checking wrapper job with id ' + str(job_id)) + Log.debug('Checking Wrapper {0}'.format(str(job_id))) wrapper_job = job_list.job_package_map[job_id] # Setting prev_status as an easy way to check status change for inner jobs if as_conf.get_notifications() == 'true': @@ -1560,10 +1563,8 @@ class Autosubmit: # Detect and store changes job_changes_tracker = {job.name: ( job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} - # job_data_structure.process_status_changes( - # job_changes_tracker) - # job_changes_tracker = {} else: # Prepare jobs, if slurm check all active jobs at once. + # TODO: All of this should be a function, put in slurm_platform file, paramiko and ecmwf check_jobs to clean the code job = job[0] prev_status = job.status if job.status == Status.FAILED: @@ -1590,16 +1591,13 @@ class Autosubmit: as_conf.get_mails_to()) save = True - if platform.type == "slurm" and list_jobid != "": - slurm.append( - [platform, list_jobid, list_prevStatus, completed_joblist]) - # END Normal jobs + wrappers - # CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) - for platform_jobs in slurm: + if platform.type == "slurm" and list_jobid != "": # IF there are jobs in an slurm platform, prepare the check them at once + slurm.append([platform, list_jobid, list_prevStatus, completed_joblist]) + for platform_jobs in slurm: # Check slurm single jobs, the other platforms has already been checked. platform = platform_jobs[0] jobs_to_check = platform_jobs[1] - platform.check_Alljobs( - platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + Log.debug("Checking all jobs at once") + platform.check_Alljobs(platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] @@ -1614,6 +1612,7 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True + Log.debug("End of checking") # End Check Current jobs save2 = job_list.update_list( as_conf, submitter=submitter) @@ -1645,15 +1644,22 @@ class Autosubmit: # Save job_list if not is a failed submitted job recovery = True try: + failed_jobs = job_list.get_failed() + failed_jobs += job_list.get_ready() + failed_names = {} + for job in failed_jobs: + if job.fail_count > 0: + failed_names[job.name] = job.fail_count job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) Autosubmit._load_parameters( as_conf, job_list, submitter.platforms) for job in job_list.get_job_list(): + if job in failed_names: + job.fail_count = failed_names[job.name] if job.platform_name is None: job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower( - )] + job.platform = submitter.platforms[job.platform_name.lower()] packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) @@ -1680,10 +1686,7 @@ class Autosubmit: if main_loop_retrials > 0: main_loop_retrials = main_loop_retrials - 1 try: - Autosubmit.restore_platforms(platforms_to_test) platforms_to_test = set() - Autosubmit.restore_platforms(platforms_to_test) - for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch @@ -1692,6 +1695,7 @@ class Autosubmit: )] # noinspection PyTypeChecker platforms_to_test.add(job.platform) + Autosubmit.restore_platforms(platforms_to_test) except BaseException: raise AutosubmitCritical( "Autosubmit couldn't recover the platforms", 7050, e.message) @@ -1705,7 +1709,6 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise - # 3 Log.result("No more jobs to run.") # Updating job data header with current information job_data_structure.validate_current_run( diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index c39c91b1a13bca586a2bf9546c9ed8a05bc9f646..1058896e1e70e3d0d687eda55789303064729ab0 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1221,7 +1221,8 @@ class JobDataStructure(MainDataBase): platform_object.write_job_extrainfo( job_data_last.get_hdata(), out_file_path) except Exception as exp: - Log.info(traceback.format_exc()) + #Log.info(traceback.format_exc()) #TODO Wilmer, this is stopping autosubmit, "Tuple index out of range" + Log.info("Couldn't write finish time {0}",exp.message) Log.warning(str(exp)) #energy = 0 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 7b6d86eb84a82215c1c2ec163d11a6d1b091ca0b..775e32084f5b9d3216ec025cf4bed84703eb07d0 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -577,9 +577,9 @@ class Job(object): remote_logs = (self.script_name + ".out", self.script_name + ".err") submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) - self._platform = submitter.platforms[platform_name.lower()] + platform = submitter.platforms[platform_name.lower()] try: - self._platform.test_connection() + platform.test_connection() except: pass except Exception as e: @@ -595,12 +595,12 @@ class Job(object): try: while (not out_exist and not err_exist) and i < retries: try: - out_exist = self._platform.check_file_exists( + out_exist = platform.check_file_exists( remote_logs[0], True) except IOError as e: out_exist = False try: - err_exist = self._platform.check_file_exists( + err_exist = platform.check_file_exists( remote_logs[1], True) except IOError as e: err_exists = False @@ -609,7 +609,7 @@ class Job(object): i = i + 1 sleep(sleeptime) try: - self._platform.restore_connection() + platform.restore_connection() except BaseException as e: Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format( e.message, self.name), 6001) @@ -622,19 +622,19 @@ class Job(object): # unifying names for log files if remote_logs != local_logs: self.synchronize_logs( - self._platform, remote_logs, local_logs) + platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) - self._platform.get_logs_files(self.expid, remote_logs) + platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand try: for local_log in local_logs: - self._platform.write_jobid(self.id, os.path.join( + platform.write_jobid(self.id, os.path.join( self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( e.message, self.name)) try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass return @@ -642,7 +642,7 @@ class Job(object): Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass @@ -651,14 +651,14 @@ class Job(object): Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) try: - self._platform.closeConnection() + platform.closeConnection() except: pass return sleep(5) # safe wait before end a thread try: - self._platform.closeConnection() + platform.closeConnection() except BaseException as e: pass return diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index ef4e514835a1a048ea0b1b936f98b203af0c224a..2de65112952d5ad640e84b06aa08f08d31a3a116 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1229,7 +1229,7 @@ class JobList(object): def parameters(self, value): self._parameters = value - def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None): + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time = False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1242,52 +1242,34 @@ class JobList(object): save = False if self.update_from_file(store_change): save = store_change - - # reset jobs that has failed less than 10 times Log.debug('Updating FAILED jobs') write_log_status = False - for job in self.get_failed(): - job.inc_fail_count() - if not hasattr(job, 'retrials') or job.retrials is None: - retrials = as_conf.get_retrials() - else: - retrials = job.retrials - - if job.fail_count <= retrials: - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - if submitter is not None: - job.platform = submitter.platforms[job.platform_name.lower( - )] - job.platform.test_connection() - job.platform = submitter.platforms[job.platform_name.lower( - )] - job.platform.test_connection() - - job.id = None - - job.packed = False - save = True - Log.debug( - "Resetting job: {0} status to: READY for retrial...".format(job.name)) + if not first_time: + for job in self.get_failed(): + if not hasattr(job, 'retrials') or job.retrials is None: + retrials = as_conf.get_retrials() else: - job.status = Status.WAITING - save = True + retrials = job.retrials + if job.fail_count < retrials: + job.inc_fail_count() + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.id = None + job.packed = False + save = True + Log.debug( + "Resetting job: {0} status to: READY for retrial...".format(job.name)) + else: + job.status = Status.WAITING + save = True + job.packed = False + Log.debug( + "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) + else: + job.status = Status.FAILED job.packed = False - Log.debug( - "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) - else: - job.status = Status.FAILED - job.packed = False - save = True - Log.debug( - "Job is failed".format(job.name)) - - - - + save = True # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1d40c83bd37c4c33640ca69b900d26162df07939..a2af24d6d95b213045c617c64595671b9d19bfad 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -290,7 +290,7 @@ class Platform(object): if self.get_file(filename, True): Log.debug('{0}_STAT file have been transfered', job_name) return True - Log.debug('Something did not work well when transferring the STAT file') + Log.debug('{0}_STAT file not found', job_name) return False def get_files_path(self):