diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 560ca938eac055c03b033c4f9d2d64cfe8a050f7..913849f3503c1dcb82495b4b81adedaa99c14d4a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2350,7 +2350,8 @@ class Autosubmit: if p.log_recovery_process: p.cleanup_event.set() for p in platforms_to_test: - p.log_recovery_process.join() + if p.log_recovery_process: + p.log_recovery_process.join() for job in job_list.get_completed_failed_without_logs(): job_list.update_log_status(job, as_conf) job_list.save() @@ -2523,12 +2524,6 @@ class Autosubmit: if error_message != "": raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message), 7014) - if not inspect: - for package in valid_packages_to_submit: - wrapper_time = None - for job in package.jobs: # if jobs > 1 == wrapped == same submission time - job.write_submit_time(wrapper_submit_time=wrapper_time) - wrapper_time = job.submit_time_timestamp if wrapper_errors and not any_job_submitted and len(job_list.get_in_queue()) == 0: # Deadlock situation diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index b974313c5c63b9d3e140b139c062d4d2c748f250..1e8127bef01a839c7fcaec13157bf5d244a8a3e6 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -216,10 +216,10 @@ class ExperimentHistoryDbManager(DatabaseManager): self._update_job_data_by_id(job_data_dc) return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) - def update_job_data_dc_by_job_id(self, job_data_dc): + def update_job_data_dc_by_job_id_name(self, job_data_dc): """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ - self._update_job_data_by_job_id(job_data_dc) - return self.get_job_data_by_job_id(job_data_dc.job_id) + self._update_job_data_by_id(job_data_dc) + return self.get_job_data_by_job_id_name(job_data_dc.job_id, job_data_dc.job_name) def update_list_job_data_dc_by_each_id(self, job_data_dcs): """ Return length of updated list. """ @@ -324,26 +324,11 @@ class ExperimentHistoryDbManager(DatabaseManager): statement = ''' UPDATE job_data SET last=?, submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=?, - children=?, platform_output=? WHERE id=? ''' - arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), - job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, - job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, - job_data_dc.children, job_data_dc.platform_output, job_data_dc._id) - self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) - - def _update_job_data_by_job_id(self, job_data_dc): - """ - Update job_data table with data class JobData. - Update last, submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err by id. - """ - statement = ''' UPDATE job_data SET last=?, submit=?, start=?, finish=?, modified=?, - job_id=?, status=?, energy=?, extra_data=?, - nnodes=?, ncpus=?, rowstatus=?, out=?, err=?, - children=?, platform_output=? WHERE job_id=? ''' + children=?, platform_output=?, id=? WHERE id=?''' arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, - job_data_dc.children, job_data_dc.platform_output, job_data_dc.job_id) + job_data_dc.children, job_data_dc.platform_output, job_data_dc._id, job_data_dc._id) self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) def _update_experiment_run(self, experiment_run_dc): @@ -366,19 +351,12 @@ class ExperimentHistoryDbManager(DatabaseManager): job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows] - def get_job_data_by_name(self, job_name): - """ Get List of Models.JobDataRow for job_name """ - statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC") - arguments = (job_name,) - job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [Models.JobDataRow(*row) for row in job_data_rows] - - def get_job_data_by_job_id(self, job_id): + def get_job_data_by_job_id_name(self, job_id, job_name): """ Get List of Models.JobDataRow for job_id """ - statement = self.get_built_select_statement("job_data", "job_id=?") - arguments = (int(job_id),) + statement = self.get_built_select_statement("job_data", "job_id=? AND job_name=? ORDER BY counter") + arguments = (int(job_id), str(job_name),) job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - models = [Models.JobDataRow(*row) for row in job_data_rows][0] + models = [Models.JobDataRow(*row) for row in job_data_rows][-1] return JobData.from_model(models) def get_job_data_max_counter(self): diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 1423139cdb82331e1898fcfb871f2a0a59582b4c..8dcf5d068bb84255c9a5c86866ae9d5c8c02f6a2 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -97,7 +97,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_by_job_id(job_id) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.start = start @@ -106,7 +106,7 @@ class ExperimentHistory: job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code) job_data_dc_last.job_id = job_id job_data_dc_last.children = children - return self.manager.update_job_data_dc_by_job_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') @@ -115,7 +115,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_by_job_id(job_id) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.finish = finish if finish > 0 else int(time()) @@ -124,7 +124,7 @@ class ExperimentHistory: job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS job_data_dc_last.out = out_file if out_file else "" job_data_dc_last.err = err_file if err_file else "" - return self.manager.update_job_data_dc_by_job_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 5e94b6ac0a4b801f2f7c148f40475a0fb7e83e74..23491b6dbf30ec30b4f1c58bb9461494518fcd30 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -251,12 +251,14 @@ class Job(object): self.het = None self.updated_log = False self.log_retrieved = False - self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time + self.start_time_timestamp = None self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial self._script = None # Inline code to be executed self._log_recovery_retries = None self.ready_date = None + self.wrapper_name = None + self.is_wrapper = False def _init_runtime_parameters(self): # hetjobs @@ -271,9 +273,10 @@ class Job(object): self._memory = '' self._memory_per_task = '' self.log_retrieved = False - self.start_time_placeholder = time.time() + self.start_time_timestamp = time.time() + self.end_time_placeholder = time.time() self.processors_per_node = "" - self.stat_file = self.script_name[:-4] + "_STAT" + self.stat_file = self.script_name[:-4] + "_STAT_0" @property @@ -1127,66 +1130,77 @@ class Job(object): def retrieve_internal_retrials_logfiles(self, platform): log_retrieved = False - original = copy.deepcopy(self.local_logs) - for i in range(0, int(self.retrials + 1)): - if i > 0: - self.local_logs = (original[0][:-4] + "_{0}".format(i) + ".out", original[1][:-4] + "_{0}".format(i) + ".err") - self.remote_logs = self.get_new_remotelog_name(i) - if not self.remote_logs: - self.log_retrieved = False - else: + last_retrial = 0 + try: + for i in range(0, int(self.retrials + 1)): + self.update_local_logs(count=i, update_submit_time=False) + backup_log = copy.copy(self.remote_logs) + self.remote_logs = self.get_new_remotelog_name(i) if self.check_remote_log_exists(platform): - try: - self.synchronize_logs(platform, self.remote_logs, self.local_logs) - remote_logs = copy.deepcopy(self.local_logs) - platform.get_logs_files(self.expid, remote_logs) - log_retrieved = True - except BaseException: - log_retrieved = False - self.log_retrieved = log_retrieved - - def write_stats(self): - # Update the logs with Autosubmit Job ID Brand + self.synchronize_logs(platform, self.remote_logs, self.local_logs) + remote_logs = copy.deepcopy(self.local_logs) + platform.get_logs_files(self.expid, remote_logs) + log_retrieved = True + last_retrial = i + else: + self.remote_logs = backup_log + break + except: + pass + self.log_retrieved = log_retrieved + if self.log_retrieved: + self.platform.processed_wrapper_logs.add(self.wrapper_name) - try: - for local_log in self.local_logs: - self.platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + return last_retrial + + def write_stats(self, last_retrial): # write stats if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones - for i in range(0, int(self.retrials + 1)): - if self.platform.get_stat_file(self, count=i): - self.write_vertical_time(i) - self.inc_fail_count() + for i in range(0, int(last_retrial + 1)): + self.platform.get_stat_file(self, count=i) + self.write_vertical_time(i) + self.inc_fail_count() + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) else: + self.update_local_logs(update_submit_time=False) self.platform.get_stat_file(self) - self.write_start_time(from_stat_file=True) + self.write_submit_time() + self.write_start_time() self.write_end_time(self.status == Status.COMPLETED) Log.result(f"{self.fail_count} retrials of job:{self.name} and {self.id} has been inserted in the db") + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) def retrieve_logfiles(self, platform, raise_error=False): """ - Retrieves log files from remote host meant to be used inside a process. - :param platform: platform that is calling the function, already connected. - :param raise_error: boolean to raise an error if the logs are not retrieved - :return: + Retrieves log files from remote host + :param platform: HPCPlatform object + :param raise_error: boolean, if True, raises an error if the log files are not retrieved + :return: dict with finish timestamps per job """ backup_logname = copy.copy(self.local_logs) if self.wrapper_type == "vertical": - self.retrieve_internal_retrials_logfiles(platform) + last_retrial = self.retrieve_internal_retrials_logfiles(platform) else: self.retrieve_external_retrials_logfiles(platform) - + last_retrial = 0 if not self.log_retrieved: self.local_logs = backup_logname - if raise_error: + if raise_error and self.wrapper_name not in self.platform.processed_wrapper_logs: raise AutosubmitCritical("Failed to retrieve logs for job {0}".format(self.name), 6000) - else: - Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) else: - self.write_stats() + self.write_stats(last_retrial) def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') @@ -1292,10 +1306,6 @@ class Job(object): self.retrieve_logfiles(self.platform) else: self.platform.add_job_to_log_recover(self) - - - - return self.status @staticmethod @@ -1328,16 +1338,16 @@ class Job(object): :param default_status: status to set if job is not completed. By default, is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file_location = os.path.join(self._tmp_path, f"LOG_{self.expid}", self.name + '_COMPLETED') - if os.path.exists(log_name): + if os.path.exists(completed_file) or os.path.exists(completed_file_location): if not over_wallclock: self.status = Status.COMPLETED else: return Status.COMPLETED else: - Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( - self.name), 6009) + Log.warning(f"Couldn't find {self.name} COMPLETED file") if not over_wallclock: self.status = default_status else: @@ -1878,7 +1888,11 @@ class Job(object): self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") self.x11 = False if str(as_conf.jobs_data[self.section].get("X11", False)).lower() == "false" else True - self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" + if self.wrapper_type != "vertical" and self.packed: + self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" + else: + self.stat_file = f"{self.script_name[:-4]}_STAT_0" + if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1960,9 +1974,6 @@ class Job(object): self._init_runtime_parameters() if hasattr(self, "start_time"): self.start_time = time.time() - for event in self.platform.worker_events: # keep alive log retrieval workers. - if not event.is_set(): - event.set() # Parameters that affect to all the rest of parameters self.update_dict_parameters(as_conf) parameters = parameters.copy() @@ -1986,6 +1997,9 @@ class Job(object): # For some reason, there is return but the assignee is also necessary self.parameters = parameters # This return is only being used by the mock , to change the mock + for event in self.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() return parameters def update_content_extra(self,as_conf,files): @@ -2229,29 +2243,20 @@ class Job(object): str(set(parameters) - set(variables))), 5013) return out - def write_submit_time(self, hold=False, enable_vertical_write=False, wrapper_submit_time=None): + def update_local_logs(self, count=-1, update_submit_time=True): + if update_submit_time: + self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') + if count > 0: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out_retrial_{count}", + f"{self.name}.{self.submit_time_timestamp}.err_retrial_{count}") + else: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", + f"{self.name}.{self.submit_time_timestamp}.err") + + def write_submit_time(self): """ Writes submit date and time to TOTAL_STATS file. It doesn't write if hold is True. """ - - self.start_time_written = False - if not enable_vertical_write: - if wrapper_submit_time: - self.submit_time_timestamp = wrapper_submit_time - else: - self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') - if self.wrapper_type != "vertical": - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - else: - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", - f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - return - if self.wrapper_type == "vertical" and self.fail_count > 0: - self.submit_time_timestamp = self.finish_time_timestamp - print(("Call from {} with status {}".format(self.name, self.status_str))) - if hold is True: - return # Do not write for HELD jobs. - data_time = ["",int(datetime.datetime.strptime(self.submit_time_timestamp, "%Y%m%d%H%M%S").timestamp())] path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') if os.path.exists(path): @@ -2268,57 +2273,56 @@ class Job(object): platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) - def write_start_time(self, enable_vertical_write=False, from_stat_file=False, count=-1): + def update_start_time(self, count=-1): + start_time_ = self.check_start_time(count) # last known start time from the .cmd file + if start_time_: + self.start_time_timestamp = start_time_ + else: + Log.warning(f"Start time for job {self.name} not found in the .cmd file, using last known time.") + self.start_time_timestamp = self.start_time_timestamp if self.start_time_timestamp else time.time() + if count > 0 or self.wrapper_name in self.platform.processed_wrapper_logs: + self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp),'S') + + def write_start_time(self, count=-1, vertical_wrapper=False): """ Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool """ - - if not enable_vertical_write and self.wrapper_type == "vertical": - return - - self.start_time_written = True - if not from_stat_file: # last known start time from AS - self.start_time_placeholder = time.time() - elif from_stat_file: - start_time_ = self.check_start_time(count) # last known start time from the .cmd file - if start_time_: - start_time = start_time_ - else: - start_time = self.start_time_placeholder if self.start_time_placeholder else time.time() - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) - # Writing database - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=start_time, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) + if not vertical_wrapper: + self.update_start_time(count) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')) + # Writing database + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) return True def write_vertical_time(self, count=-1): - self.write_submit_time(enable_vertical_write=True) - self.write_start_time(enable_vertical_write=True, from_stat_file=True, count=count) - self.write_end_time(self.status == Status.COMPLETED, enable_vertical_write=True, count=count) + self.update_start_time(count=count) + self.update_local_logs(update_submit_time=False) + self.write_submit_time() + self.write_start_time(count=count, vertical_wrapper=True) + self.write_end_time(self.status == Status.COMPLETED, count=count) - def write_end_time(self, completed, enable_vertical_write=False, count = -1): + def write_end_time(self, completed, count=-1): """ - Writes ends date and time to TOTAL_STATS file - :param completed: True if job was completed successfully, False otherwise + Writes end timestamp to TOTAL_STATS file and jobs_data.db + :param completed: True if the job has been completed, False otherwise :type completed: bool + :param count: number of retrials + :type count: int """ - if not enable_vertical_write and self.wrapper_type == "vertical": - return end_time = self.check_end_time(count) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') - finish_time = None - final_status = None if end_time > 0: # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S')) @@ -2328,7 +2332,7 @@ class Job(object): else: f.write(date2str(datetime.datetime.now(), 'S')) self.finish_time_timestamp = date2str(datetime.datetime.now(), 'S') - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + finish_time = time.time() f.write(' ') if completed: final_status = "COMPLETED" @@ -2337,7 +2341,6 @@ class Job(object): final_status = "FAILED" f.write('FAILED') out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, @@ -2351,6 +2354,7 @@ class Job(object): thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() + def write_total_stat_by_retries(self, total_stats, first_retrial = False): """ Writes all data to TOTAL_STATS file @@ -2499,6 +2503,7 @@ class WrapperJob(Job): self.checked_time = datetime.datetime.now() self.hold = hold self.inner_jobs_running = list() + self.is_wrapper = True def _queuing_reason_cancel(self, reason): @@ -2535,6 +2540,7 @@ class WrapperJob(Job): self._check_running_jobs() # Check and update inner_jobs status that are eligible # Completed wrapper will always come from check function. elif self.status == Status.COMPLETED: + self._check_running_jobs() # Check and update inner_jobs status that are eligible self.check_inner_jobs_completed(self.job_list) # Fail can come from check function or running/completed checkers. @@ -2580,8 +2586,16 @@ class WrapperJob(Job): job.new_status = Status.COMPLETED job.updated_log = False job.update_status(self.as_config) + # for job in self.job_list: + # if job not in completed_jobs and job in self.inner_jobs_running: + # job.new_status = Status.FAILED + # job.packed = False + # else: + # job.new_status = Status.WAITING + # job.packed = False for job in completed_jobs: self.running_jobs_start.pop(job, None) + not_completed_jobs = list( set(not_completed_jobs) - set(completed_jobs)) @@ -2657,6 +2671,7 @@ class WrapperJob(Job): not_finished_jobs_names = ' '.join(list(not_finished_jobs_dict.keys())) remote_log_dir = self._platform.get_remote_log_dir() # PREPARE SCRIPT TO SEND + # When a inner_job is running? When the job has an _STAT file command = textwrap.dedent(""" cd {1} for job in {0} @@ -2679,9 +2694,13 @@ class WrapperJob(Job): os.chmod(log_dir, 0o770) open(multiple_checker_inner_jobs, 'w+').write(command) os.chmod(multiple_checker_inner_jobs, 0o770) - self._platform.send_file(multiple_checker_inner_jobs, False) - command = os.path.join( - self._platform.get_files_path(), "inner_jobs_checker.sh") + if self.platform.name != "local": # already "sent"... + self._platform.send_file(multiple_checker_inner_jobs, False) + command = f"cd {self._platform.get_files_path()}; {os.path.join( + self._platform.get_files_path(), 'inner_jobs_checker.sh')}" + else: + command = os.path.join( + self._platform.get_files_path(), "inner_jobs_checker.sh") # wait = 2 retries = 5 @@ -2722,9 +2741,9 @@ class WrapperJob(Job): if content == '': sleep(wait) retries = retries - 1 - temp_list = self.inner_jobs_running - self.inner_jobs_running = [ - job for job in temp_list if job.status == Status.RUNNING] + # temp_list = self.inner_jobs_running + # self.inner_jobs_running = [ + # job for job in temp_list if job.status == Status.RUNNING] if retries == 0 or over_wallclock: self.status = Status.FAILED @@ -2753,7 +2772,7 @@ class WrapperJob(Job): running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED or job.status == Status.QUEUING] self.inner_jobs_running = list() for job in running_jobs: - if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): + if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True, max_retries=2): if job.platform.get_file('{0}_FAILED'.format(job.name), False, wrapper_failed=True): self._check_finished_job(job, True) else: @@ -2761,21 +2780,28 @@ class WrapperJob(Job): self.inner_jobs_running.append(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id), 6009) try: - self._platform.send_command( - self._platform.cancel_cmd + " " + str(self.id)) + if self.platform_name == "local": + # Check if the job is still running to avoid a misleading message in the logs + if self.platform.get_pscall(self.id): + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + else: + Log.warning(f"Wrapper {self.name} failed, cancelling it") + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) except: - Log.info(f'Job with {self.id} was finished before canceling it') - + Log.debug(f'Job with {self.id} was finished before canceling it') + self._check_running_jobs() + for job in self.inner_jobs_running: + job.status = Status.FAILED for job in self.job_list: - #if job.status == Status.RUNNING: - #job.inc_fail_count() - # job.packed = False - # job.status = Status.FAILED + job.packed = False if job.status not in [Status.COMPLETED, Status.FAILED]: - job.packed = False job.status = Status.WAITING + else: + if job.wrapper_type == "vertical": # job is being retrieved internally by the wrapper + job.fail_count = job.retrials def _update_completed_jobs(self): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 6aad371df6b30a11d902d8b7c17a95793e4abb49..98744ed09fe0245bff243a5d69dadf229b9cf60b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2579,7 +2579,6 @@ class JobList(object): if hasattr(job, "x11") and job.x11: # X11 has it log writted in the run.out file. No need to check for log files as there are none job.updated_log = True return - Log.result(f"Sending job {job.stat_file} to the log recovery thread") log_recovered = self.check_if_log_is_recovered(job) if log_recovered: @@ -2600,8 +2599,8 @@ class JobList(object): if not hasattr(job, "updated_log") or not job.updated_log: for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): - date = int(datetime.datetime.fromtimestamp(log_recovered.stat().st_mtime).strftime("%Y%m%d%H%M%S")) - if job.ready_date and int(datetime.datetime.fromtimestamp(log_recovered.stat().st_mtime).strftime("%Y%m%d%H%M%S")) > int(job.ready_date): + file_timestamp = int(datetime.datetime.fromtimestamp(log_recovered.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if job.ready_date and file_timestamp >= int(job.ready_date): return log_recovered return None diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 0d91f3938e129a74a2aac82ea6f6aa77e0fdfd96..5dbce681e9364ea2c6a2facf9e3e6285c9737efd 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -491,14 +491,12 @@ class JobPackager(object): current_info.append(param[self.current_wrapper_section]) current_info.append(self._as_config) - if self.wrapper_type[self.current_wrapper_section] == 'vertical': - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) - elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + if self.wrapper_type[self.current_wrapper_section] == 'horizontal': + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section, wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section, wrapper_info=current_info)) else: - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits, wrapper_info=current_info) Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) @@ -738,7 +736,7 @@ class JobPackagerVertical(object): if child is not None and len(str(child)) > 0: child.update_parameters(wrapper_info[-1], {}) self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) - if self.total_wallclock <= self.max_wallclock: + if self.total_wallclock <= self.max_wallclock or not self.max_wallclock: child.packed = True child.level = level self.jobs_list.append(child) @@ -850,22 +848,14 @@ class JobPackagerVerticalMixed(JobPackagerVertical): :rtype: Job Object """ sorted_jobs = self.sorted_jobs - + child = None for index in range(self.index, len(sorted_jobs)): - child = sorted_jobs[index] - if self._is_wrappable(child): + child_ = sorted_jobs[index] + if child_.name != job.name and self._is_wrappable(child_): + child = child_ self.index = index + 1 - return child - continue - return None - # Not passing tests but better wrappers result to check - # for child in job.children: - # if child.name != job.name: - # if self._is_wrappable(child): - # self.index = self.index + 1 - # return child - # continue - # return None + break + return child def _is_wrappable(self, job): """ diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index f99654fe80bd2653126e696764775263c5011f90..17bffecc51f61875b8318530c28a99fdf5aca532 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -249,6 +249,7 @@ class JobPackageSimple(JobPackageBase): if len(job_scripts) == 0: job_scripts = self._job_scripts for job in self.jobs: + job.update_local_logs() #CLEANS PREVIOUS RUN ON LOCAL log_completed = os.path.join(self._tmp_path, job.name + '_COMPLETED') log_stat = os.path.join(self._tmp_path, job.name + '_STAT') @@ -262,7 +263,9 @@ class JobPackageSimple(JobPackageBase): if job.id is None or not job.id: continue Log.info("{0} submitted", job.name) - job.status = Status.SUBMITTED + job.status = Status.SUBMITTED + job.wrapper_name = job.name + class JobPackageSimpleWrapped(JobPackageSimple): @@ -342,6 +345,7 @@ class JobPackageArray(JobPackageBase): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: + job.update_local_logs() self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) @@ -353,9 +357,8 @@ class JobPackageArray(JobPackageBase): for i in range(0, len(self.jobs)): # platforms without a submit.cmd Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) - self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].wrapper_name = self.name class JobPackageThread(JobPackageBase): @@ -572,12 +575,14 @@ class JobPackageThread(JobPackageBase): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: + job.update_local_logs() filenames += " " + self.platform.remote_log_dir + "/" + job.name + "_STAT " + \ self.platform.remote_log_dir + "/" + job.name + "_COMPLETED" self.platform.remove_multiple_files(filenames) else: for job in self.jobs: + job.update_local_logs() self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: @@ -588,13 +593,12 @@ class JobPackageThread(JobPackageBase): if package_id is None or not package_id: return - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + def _common_script_content(self): pass @@ -658,6 +662,7 @@ class JobPackageThreadWrapped(JobPackageThread): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: + job.update_local_logs() self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: @@ -668,13 +673,13 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None or not package_id: raise Exception('Submission failed') - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + + class JobPackageVertical(JobPackageThread): """ @@ -752,7 +757,7 @@ class JobPackageVertical(JobPackageThread): self._wallclock = "{0}:{1}".format(hh_str,mm_str) Log.info("Submitting {2} with wallclock {0}:{1}".format(hh_str,mm_str,self._name)) else: - wallclock_by_level = None + wallclock_by_level = 0 # command: "timeout 0 sleep 2" == command: "sleep 2" return self._wrapper_factory.get_wrapper(self._wrapper_factory.vertical_wrapper, name=self._name, queue=self._queue, project=self._project, wallclock=self._wallclock, diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index f6356ab26427a662e702da2e22fc3034bcb02353..61f96832ae217153036c4f196bf538eb2c8d55c1 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -18,12 +18,15 @@ # along with Autosubmit. If not, see . import locale import os +from pathlib import Path from xml.dom.minidom import parseString import subprocess +from matplotlib.patches import PathPatch from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader +from autosubmit.platforms.wrappers.wrapper_factory import LocalWrapperFactory from autosubmitconfigparser.config.basicconfig import BasicConfig from time import sleep @@ -64,6 +67,9 @@ class LocalPlatform(ParamikoPlatform): self.job_status['RUNNING'] = ['0'] self.job_status['QUEUING'] = [] self.job_status['FAILED'] = [] + self._allow_wrappers = True + self._wrapper = LocalWrapperFactory(self) + self.update_cmds() def update_cmds(self): @@ -100,20 +106,24 @@ class LocalPlatform(ParamikoPlatform): return [int(element.firstChild.nodeValue) for element in jobs_xml] def get_submit_cmd(self, job_script, job, hold=False, export=""): - wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + if job: + wallclock = self.parse_time(job.wallclock) + seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + else: + seconds = 24 * 3600 if export == "none" or export == "None" or export is None or export == "": export = "" else: export += " ; " - return self.get_call(job_script, job, export=export,timeout=seconds) - + command = self.get_call(job_script, job, export=export,timeout=seconds) + return f"cd {self.remote_log_dir} ; {command}" def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) def connect(self, as_conf={}, reconnect=False): self.connected = True self.spawn_log_retrieval_process(as_conf) + def test_connection(self,as_conf): if not self.connected: self.connect(as_conf) @@ -144,10 +154,7 @@ class LocalPlatform(ParamikoPlatform): return True def send_file(self, filename, check=True): - self.check_remote_log_dir() - self.delete_file(filename,del_cmd=True) - command = '{0} {1} {2}'.format(self.put_cmd, os.path.join(self.tmp_path, filename), - os.path.join(self.tmp_path, 'LOG_' + self.expid, filename)) + command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}' try: subprocess.check_call(command, shell=True) except subprocess.CalledProcessError: @@ -157,6 +164,17 @@ class LocalPlatform(ParamikoPlatform): raise return True + def remove_multiple_files(self, filenames): + log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) + multiple_delete_previous_run = os.path.join( + log_dir, "multiple_delete_previous_run.sh") + if os.path.exists(log_dir): + lang = locale.getlocale()[1] + if lang is None: + lang = 'UTF-8' + open(multiple_delete_previous_run, 'wb+').write(("rm -f" + filenames).encode(lang)) + os.chmod(multiple_delete_previous_run, 0o770) + return "" def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): local_path = os.path.join(self.tmp_path, relative_path) @@ -180,26 +198,28 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src, wrapper_failed=False, sleeptime=None, max_retries=None, first=None): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=1, max_retries=1): """ Checks if a file exists in the platform :param src: source name :type src: str - :param wrapper_failed: if the wrapper failed + :param wrapper_failed: checks inner jobs files :type wrapper_failed: bool :param sleeptime: time to sleep :type sleeptime: int :param max_retries: maximum number of retries :type max_retries: int - :param first: if it is the first time - :type first: bool :return: True if the file exists, False otherwise :rtype: bool """ - file_exist = os.path.isfile(os.path.join(self.get_files_path(), src)) - if not file_exist: - Log.debug(f"File {self.get_files_path()}/{src} does not exist") - return file_exist + sleeptime = 1 + for i in range(max_retries): + if os.path.isfile(os.path.join(self.get_files_path(), src)): + return True + sleep(sleeptime) + Log.warning("File {0} does not exist".format(src)) + return False + def delete_file(self, filename,del_cmd = False): if del_cmd: @@ -260,3 +280,18 @@ class LocalPlatform(ParamikoPlatform): :type remote_logs: (str, str) """ return + + def check_completed_files(self, sections=None): + command = "find %s " % self.remote_log_dir + if sections: + for i, section in enumerate(sections.split()): + command += " -name *%s_COMPLETED" % section + if i < len(sections.split()) - 1: + command += " -o " + else: + command += " -name *_COMPLETED" + + if self.send_command(command, True): + return self._ssh_output + else: + return None \ No newline at end of file diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index a3be9202f72b01b1e7ea6f2569e6482f9a030fd2..8b19730a8e94a8b5d84a7c9017e8728022f10816 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,5 +1,6 @@ import locale from contextlib import suppress +from pathlib import Path from time import sleep import sys import socket @@ -20,6 +21,7 @@ from threading import Thread import threading import getpass from paramiko.agent import Agent +import time def threaded(fn): def wrapper(*args, **kwargs): @@ -526,7 +528,8 @@ class ParamikoPlatform(Platform): if cmd is None: return None if self.send_command(cmd,x11=x11): - job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=job.x11) + x11 = False if job is None else job.x11 + job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=x11) Log.debug("Job ID: {0}", job_id) return int(job_id) else: @@ -618,7 +621,19 @@ class ParamikoPlatform(Platform): self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: - job_status = Status.COMPLETED + # The Local platform has only 0 or 1, so it neccesary to look for the completed file. + # Not sure why it is called over_wallclock but is the only way to return a value + if self.type == "local": # wrapper has a different check completion + if not job.is_wrapper: + job_status = job.check_completion(over_wallclock=True) + else: + if Path(f"{self.remote_log_dir}/WRAPPER_FAILED").exists(): + job_status = Status.FAILED + else: + job_status = Status.COMPLETED + else: + job_status = Status.COMPLETED + elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING if not is_wrapper: @@ -650,8 +665,14 @@ class ParamikoPlatform(Platform): Log.error( 'check_job() The job id ({0}) status is {1}.', job_id, job_status) - if job_status in [Status.FAILED, Status.COMPLETED]: + if job_status in [Status.FAILED, Status.COMPLETED, Status.UNKNOWN]: job.updated_log = False + # backup for end time in case that the stat file is not found + job.end_time_placeholder = int(time.time()) + if job_status in [Status.RUNNING, Status.COMPLETED] and job.new_status in [Status.QUEUING, Status.SUBMITTED]: + # backup for start time in case that the stat file is not found + job.start_time_timestamp = int(time.time()) + if submit_hold_check: return job_status else: @@ -1216,18 +1237,23 @@ class ParamikoPlatform(Platform): :return: command to execute script :rtype: str """ - executable = '' - if job.type == Type.BASH: - executable = 'bash' - elif job.type == Type.PYTHON: - executable = 'python3' - elif job.type == Type.PYTHON2: - executable = 'python2' - elif job.type == Type.R: - executable = 'Rscript' - if job.executable != '': - executable = '' # Alternative: use job.executable with substituted placeholders - remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + if job: # If job is None, it is a wrapper + executable = '' + if job.type == Type.BASH: + executable = 'bash' + elif job.type == Type.PYTHON: + executable = 'python3' + elif job.type == Type.PYTHON2: + executable = 'python2' + elif job.type == Type.R: + executable = 'Rscript' + if job.executable != '': + executable = '' # Alternative: use job.executable with substituted placeholders + remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + else: + executable = 'python3' # wrappers are always python3 + remote_logs = (f"{job_script}.out", f"{job_script}.err") + if timeout < 1: command = export + ' nohup ' + executable + ' {0} > {1} 2> {2} & echo $!'.format( os.path.join(self.remote_log_dir, job_script), diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 4f198d81902068c6a3d6db83fc983bb7268be0d3..7291377b72a6337135698c096d4c15a6c621d4a4 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -151,7 +151,8 @@ class PJMPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - job.write_submit_time(hold=hold) + job.wrapper_name = package.name + i += 1 save = True except AutosubmitError as e: @@ -483,13 +484,10 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 + while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist @@ -497,9 +495,6 @@ class PJMPlatform(ParamikoPlatform): self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 9bca119205a47f1717964e29f56a2878113b1d8e..d504d5d08d0fd1f0c1dc94c3fe9a28d9617c2680 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -25,10 +25,13 @@ class UniqueQueue( super().__init__(maxsize, ctx=multiprocessing.get_context()) def put(self, job, block=True, timeout=None): - name_with_retrial = job.name+str(job.fail_count) - if name_with_retrial not in self.all_items: - self.all_items.add(name_with_retrial) - Queue.put(self, job, block, timeout) + if job.wrapper_type == "vertical": + unique_name = job.name + else: + unique_name = job.name+str(job.fail_count) + if unique_name not in self.all_items: + self.all_items.add(unique_name) + super().put(job, block, timeout) class Platform(object): @@ -109,7 +112,8 @@ class Platform(object): self.work_event = Event() self.cleanup_event = Event() self.log_recovery_process = None - + self.keep_alive_timeout = 60 * 5 # Useful in case of kill -9 + self.processed_wrapper_logs = set() @classmethod def update_workers(cls, event_worker): cls.worker_events.append(event_worker) @@ -352,7 +356,7 @@ class Platform(object): raise except AutosubmitCritical as e: - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except AutosubmitError as e: raise except Exception as e: @@ -663,7 +667,7 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): return True def get_stat_file(self, job, count=-1): @@ -680,7 +684,7 @@ class Platform(object): if self.get_file(filename, True): Log.debug('{0}_STAT file have been transferred', job.name) return True - Log.debug('{0}_STAT file not found', job.name) + Log.warning('{0}_STAT file not found', job.name) return False @autosubmit_parameter(name='current_logdir') @@ -848,68 +852,63 @@ class Platform(object): self.cleanup_event.set() self.log_recovery_process.join() - def wait_for_work(self, timeout=60): + def wait_for_work(self, sleep_time=60): """ - This function, waits for the work_event to be set by the main process. If it is not set, it returns False and the log recovery process ends. + This function waits for the work_event to be set or the cleanup_event to be set. """ - keep_working = False - # The whole point of this if is to make the regression tests faster to run. - if timeout >= 60: - unstoppable_timeout = 60 - timeout = timeout - 60 - else: - unstoppable_timeout = timeout - timeout = 0 - for remaining in range(unstoppable_timeout, 0, -1): + process_log = False + for remaining in range(sleep_time, 0, -1): # Min time to wait unless clean-up signal is set time.sleep(1) if self.work_event.is_set() or not self.recovery_queue.empty(): - keep_working = True - if not self.recovery_queue.empty() or self.cleanup_event.is_set(): + process_log = True + if self.cleanup_event.is_set(): + process_log = True break - if not keep_working: - while timeout > 0 and self.recovery_queue.empty() and not self.cleanup_event.is_set() and not self.work_event.is_set(): + if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process. + timeout = self.keep_alive_timeout - sleep_time + while timeout > 0 or not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): time.sleep(1) timeout -= 1 if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): - keep_working = True + process_log = True self.work_event.clear() - return keep_working + return process_log def recover_job_log(self, identifier, jobs_pending_to_process): job = None - # try: - while not self.recovery_queue.empty(): - try: - job = self.recovery_queue.get( - timeout=1) # Should be non-empty, but added a timeout for other possible errors. - job.children = set() # Children can't be serialized, so we set it to an empty set for this process. - job.platform = self # change the original platform to this process platform. - job._log_recovery_retries = 0 # reset the log recovery retries. - Log.debug(f"{identifier} Recovering log files for job {job.name}") + try: + while not self.recovery_queue.empty(): + try: + job = self.recovery_queue.get( + timeout=1) # Should be non-empty, but added a timeout for other possible errors. + job.children = set() # Children can't be serialized, so we set it to an empty set for this process. + job.platform = self # change the original platform to this process platform. + job._log_recovery_retries = 0 # reset the log recovery retries. + Log.debug(f"{identifier} Recovering log files for job {job.name} and retrial:{job.fail_count}") + job.retrieve_logfiles(self, raise_error=True) + if job.status == Status.FAILED: + Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") + except queue.Empty: + pass + # This second while is to keep retring the failed jobs. + while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval + job = jobs_pending_to_process.pop() + job._log_recovery_retries += 1 + Log.debug( + f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") job.retrieve_logfiles(self, raise_error=True) - if job.status == Status.FAILED: - Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") - except queue.Empty: + Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") + except Exception as e: + Log.info(f"{identifier} Error while recovering logs: {str(e)}") + try: + if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. + jobs_pending_to_process.add(job) + self.connected = False + Log.info(f"{identifier} Attempting to restore connection") + self.restore_connection(None) # Always restore the connection on a failure. + Log.result(f"{identifier} Sucessfully reconnected.") + except: pass - # This second while is to keep retring the failed jobs. - while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval - job = jobs_pending_to_process.pop() - job._log_recovery_retries += 1 - Log.debug( - f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") - job.retrieve_logfiles(self, raise_error=True) - Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") - # except Exception as e: - # Log.info(f"{identifier} Error while recovering logs: {str(e)}") - # try: - # if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. - # jobs_pending_to_process.add(job) - # self.connected = False - # Log.info(f"{identifier} Attempting to restore connection") - # self.restore_connection(None) # Always restore the connection on a failure. - # Log.result(f"{identifier} Sucessfully reconnected.") - # except: - # pass return jobs_pending_to_process def recover_platform_job_logs(self): @@ -925,11 +924,11 @@ class Platform(object): self.restore_connection(None) Log.get_logger("Autosubmit") # Log needs to be initialised in the new process Log.result(f"{identifier} Sucessfully connected.") - default_timeout = 60 - timeout = max(int(self.config.get("LOG_RECOVERY_TIMEOUT", default_timeout)), default_timeout) - while self.wait_for_work(timeout=timeout): + log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60) + self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5) + while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)): jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process) - if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end. + if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end. self.recover_job_log(identifier, jobs_pending_to_process) break Log.info(f"{identifier} Exiting.") \ No newline at end of file diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c48b98e9cfb0478c0cd89efe9950322181fd7ddd..1884a67390f5b04d3794541e1c62fe6c7849589d 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -189,6 +189,8 @@ class SlurmPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED + job.wrapper_name = package.name + # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -651,37 +653,30 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 while not file_exist and retries < max_retries: try: - # This return IOError if path doesn't exist + # This return IOError if a path doesn't exist self._ftpChannel.stat(os.path.join( self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) - sleeptime = sleeptime + 5 retries = retries + 1 else: - retries = 9999 + sleep(2) + retries = retries + 1 except BaseException as e: # Unrecoverable error if str(e).lower().find("garbage") != -1: - if not wrapper_failed: - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 else: - Log.printlog("remote logs {0} couldn't be recovered".format(filename), 6001) file_exist = False # won't exist retries = 999 # no more retries + if not file_exist: + Log.warning("File {0} couldn't be found".format(filename)) return file_exist diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 7ff5552db84966a83e7ce97260abe89d49076748..0135e2976c92fce6199bb34ed5813dfdfc964227 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -60,8 +60,10 @@ class WrapperBuilder(object): self.exit_thread = '' if "wallclock_by_level" in list(kwargs.keys()): self.wallclock_by_level = kwargs['wallclock_by_level'] + def build_header(self): return textwrap.dedent(self.header_directive) + self.build_imports() + def build_imports(self): pass def build_job_thread(self): @@ -153,7 +155,6 @@ class PythonWrapperBuilder(WrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') - #os.system("echo $(date +%s) > "+jobname+"_STAT") out = str(self.template) + ".out." + str(0) err = str(self.template) + ".err." + str(0) print(out+"\\n") @@ -446,8 +447,11 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): sequential_threads_launcher = textwrap.dedent(""" failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} - total_steps = 0 - print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + total_steps = 0 + try: + print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + except: + print("JOB.ID") for i in range(len({0})): job_retrials = retrials completed = False @@ -455,8 +459,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while fail_count <= job_retrials and not completed: current = {1} current.start() - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + start = int(time.time()) current.join({3}) total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -468,26 +471,42 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + finish = int(time.time()) + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + print(f"Completed_file:{{completed_path}}") + print(f"Writting:{{stat_path_tmp}}") + print(f"[Start:{{start}}, Finish:{{finish}}, Fail_count:{{fail_count}}]") + with open(f"{{stat_path_tmp}}", "w") as file: + file.write(f"{{start}}\\n") + file.write(f"{{finish}}\\n") if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") - os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) else: print(datetime.now(), "The job ", current.template," has FAILED") - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) #{1} fail_count = fail_count + 1 """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" + from pathlib import Path + fail_count = 0 + while fail_count <= job_retrials: + try: + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + Path(stat_path_tmp).replace(stat_path_tmp.replace(".tmp","")) + except: + print(f"Couldn't write the stat file:{{stat_path_tmp}}") + fail_count = fail_count + 1 if not os.path.exists(completed_path): open(failed_wrapper,'wb').close() open(failed_path, 'wb').close() if os.path.exists(failed_wrapper): os.remove(os.path.join(os.getcwd(),wrapper_id)) + print("WRAPPER_FAILED") wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") open(wrapper_failed, 'wb').close() os._exit(1) @@ -505,15 +524,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): self.fail_count = fail_count def run(self): + print("\\n") jobname = self.template.replace('.cmd', '') out = str(self.template) + ".out." + str(self.fail_count) err = str(self.template) + ".err." + str(self.fail_count) - print((out+"\\n")) - command = "./" + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() - print((command+"\\n")) - (self.status) = getstatusoutput("timeout {0} " + command + " > " + out + " 2> " + err) - for i in self.status: - print((str(i)+"\\n")) + out_path = os.path.join(os.getcwd(), out) + err_path = os.path.join(os.getcwd(), err) + template_path = os.path.join(os.getcwd(), self.template) + command = f"timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}" + print(command) + getstatusoutput(command) + """).format(str(self.wallclock_by_level),'\n'.ljust(13)) diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 1f47996a95f4622b17cf88dc6a990572f2d45476..1bd97c831c11119bab6bcb63bd5dc0dcd494803a 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -140,6 +140,58 @@ class WrapperFactory(object): def threads_directive(self, threads): raise NotImplemented(self.exception) + +class LocalWrapperFactory(WrapperFactory): + + def vertical_wrapper(self, **kwargs): + return PythonVerticalWrapperBuilder(**kwargs) + + def horizontal_wrapper(self, **kwargs): + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) + + def hybrid_wrapper_horizontal_vertical(self, **kwargs): + return PythonHorizontalVerticalWrapperBuilder(**kwargs) + + def hybrid_wrapper_vertical_horizontal(self, **kwargs): + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) + + def reservation_directive(self, reservation): + return '#' + + def dependency_directive(self, dependency): + return '#' + + def queue_directive(self, queue): + return '#' + + def processors_directive(self, processors): + return '#' + + def nodes_directive(self, nodes): + return '#' + + def tasks_directive(self, tasks): + return '#' + + def partition_directive(self, partition): + return '#' + + def exclusive_directive(self, exclusive): + return '#' + + def threads_directive(self, threads): + return '#' + + def header_directives(self, **kwargs): + return "" + class SlurmWrapperFactory(WrapperFactory): def vertical_wrapper(self, **kwargs): diff --git a/test/unit/test_database_regression.py b/test/unit/test_database_regression.py index 3e5032a7a4eae12b813791730842892488edb23c..7d12280fddff14ee8c795729b74e2ed58005a34a 100644 --- a/test/unit/test_database_regression.py +++ b/test/unit/test_database_regression.py @@ -76,16 +76,7 @@ def prepare_db(db_tmpdir): f.write(f""" PLATFORMS: dummy: - type: pjm - host: 127.0.0.1 - user: {db_tmpdir.owner} - project: whatever - scratch_dir: {db_tmpdir}/scratch - MAX_WALLCLOCK: 48:00 - TEMP_DIR: '' - MAX_PROCESSORS: 99999 - queue: dummy - DISABLE_RECOVERY_THREADS: True + type: dummy """) with main_path.open('w') as f: @@ -129,7 +120,7 @@ project: expid_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000") dummy_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/dummy_dir") real_data = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/real_data") - # write some dummy data inside scratch dir + # We write some dummy data inside the scratch_dir os.makedirs(expid_dir, exist_ok=True) os.makedirs(dummy_dir, exist_ok=True) os.makedirs(real_data, exist_ok=True) @@ -141,88 +132,100 @@ project: return db_tmpdir -@pytest.fixture -def success_jobs_file(db_tmpdir): - jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") - with jobs_path.open('w') as f: - f.write(f""" - JOBS: - job: - SCRIPT: | - echo "Hello World" - PLATFORM: local - RUNNING: chunk - wallclock: 00:01 - """) - - -@pytest.fixture -def failure_jobs_file(db_tmpdir): - jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") - with jobs_path.open('w') as f: - f.write(f""" +@pytest.mark.parametrize("jobs_data, expected_count, final_status", [ + # Success + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + """, 1, "COMPLETED"), + # Success wrapper + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 1, "COMPLETED"), + # Failure + (""" JOBS: job: SCRIPT: | echo "Hello World" + sleep 1 exit 1 + DEPENDENCIES: job-1 PLATFORM: local RUNNING: chunk wallclock: 00:01 retrials: 2 - """) - + """, 3, "FAILED"), + # Failure wrappers + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + exit 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:10 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 3, "FAILED"), +], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) +def test_db(db_tmpdir, prepare_db, jobs_data, expected_count, final_status, mocker): + # write jobs_data + jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") + with jobs_path.open('w') as f: + f.write(jobs_data) -@pytest.fixture -def run_experiment_success(prepare_db, db_tmpdir, success_jobs_file, mocker): + # Create init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True) - # job_list, submitter, exp_history, host, as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run("t000") + + # This is set in _init_log which is not called as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") with as_misc.open('w') as f: f.write(f""" -AS_MISC: True -ASMISC: - COMMAND: run -AS_COMMAND: run - """) - # Completed - with mocker.patch('autosubmit.platforms.platform.max', return_value=50): - Autosubmit.run_experiment(expid='t000') - # Access to the job_historical.db - job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") - autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") - assert job_data.exists() - assert autosubmit_db.exists() - return prepare_db + AS_MISC: True + ASMISC: + COMMAND: run + AS_COMMAND: run + """) + # Run the experiment + with mocker.patch('autosubmit.platforms.platform.max', return_value=20): + Autosubmit.run_experiment(expid='t000') -@pytest.fixture -def run_experiment_failure(prepare_db, db_tmpdir, failure_jobs_file, mocker): - init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True) - # job_list, submitter, exp_history, host, as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run("t000") - as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") - with as_misc.open('w') as f: - f.write(f""" -AS_MISC: True -ASMISC: - COMMAND: run -AS_COMMAND: run - """) - # Completed - # mock platform.localplatform.check_exists - with mocker.patch('autosubmit.platforms.platform.max', return_value=10): - with mocker.patch('autosubmit.platforms.platform.Platform.get_completed_files', return_value=False): - Autosubmit.run_experiment(expid='t000') - # Access to the job_historical.db + # Test database exists. job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") assert job_data.exists() assert autosubmit_db.exists() - return prepare_db - -def test_db_success(run_experiment_success, db_tmpdir): - job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") + # Check job_data info conn = sqlite3.connect(job_data) conn.row_factory = sqlite3.Row c = conn.cursor() @@ -234,63 +237,28 @@ def test_db_success(run_experiment_success, db_tmpdir): column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col in column_names] + print(f"Experiment folder: {db_tmpdir.strpath}") header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) print(f"\n{header}") print("-" * len(header)) # Print the rows - for row_dict in rows_as_dicts: + for row_dict in rows_as_dicts: # always print, for debug proposes print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) - # Check that all fields contain data, except extra_data, children, and platform_output - # Check that submit, start and finish are > 0 - assert row_dict["submit"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["start"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["status"] == "COMPLETED" - for key in [key for key in row_dict.keys() if - key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: - assert str(row_dict[key]) != "" - # Check that the job_data table has the expected number of entries - c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") - count_rows = c.fetchall() - for row in count_rows: - assert row["count"] == 1 - # Close the cursor and connection - c.close() - conn.close() - -def test_db_failure(run_experiment_failure, db_tmpdir): - job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") - conn = sqlite3.connect(job_data) - conn.row_factory = sqlite3.Row - c = conn.cursor() - c.execute("SELECT * FROM job_data") - rows = c.fetchall() - # Convert rows to a list of dictionaries - rows_as_dicts = [dict(row) for row in rows] - # Tune the print so it is more readable, so it is easier to debug in case of failure - column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] - column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col - in column_names] - header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) - print(f"\n{header}") - print("-" * len(header)) - # Print the rows for row_dict in rows_as_dicts: - print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) # Check that all fields contain data, except extra_data, children, and platform_output # Check that submit, start and finish are > 0 assert row_dict["submit"] > 0 and row_dict["finish"] != 1970010101 assert row_dict["start"] > 0 and row_dict["finish"] != 1970010101 assert row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["status"] == "FAILED" + assert row_dict["status"] == final_status for key in [key for key in row_dict.keys() if key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: - assert str(row_dict[key]) != "" + assert str(row_dict[key]) != str("") # Check that the job_data table has the expected number of entries c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") count_rows = c.fetchall() for row in count_rows: - assert row["count"] == 3 # three retrials + assert row["count"] == expected_count # Close the cursor and connection c.close() conn.close() diff --git a/test/unit/test_log_recovery.py b/test/unit/test_log_recovery.py index 036cffb3174cc7b86d7a6b62892ebbeaed6ecb7b..83bc74a30949ad7a0c597deaee5705aa01fedb90 100644 --- a/test/unit/test_log_recovery.py +++ b/test/unit/test_log_recovery.py @@ -100,6 +100,7 @@ def as_conf(prepare_test, mocker): def test_log_recovery_no_keep_alive(prepare_test, local, mocker, as_conf): mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 local.spawn_log_retrieval_process(as_conf) assert local.log_recovery_process.is_alive() time.sleep(2) @@ -109,6 +110,7 @@ def test_log_recovery_no_keep_alive(prepare_test, local, mocker, as_conf): def test_log_recovery_keep_alive(prepare_test, local, mocker, as_conf): mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 local.spawn_log_retrieval_process(as_conf) assert local.log_recovery_process.is_alive() local.work_event.set() @@ -124,6 +126,7 @@ def test_log_recovery_keep_alive(prepare_test, local, mocker, as_conf): def test_log_recovery_keep_alive_cleanup(prepare_test, local, mocker, as_conf): mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 local.spawn_log_retrieval_process(as_conf) assert local.log_recovery_process.is_alive() local.work_event.set() @@ -137,8 +140,9 @@ def test_log_recovery_keep_alive_cleanup(prepare_test, local, mocker, as_conf): def test_log_recovery_recover_log(prepare_test, local, mocker, as_conf): - mocker.patch('autosubmit.platforms.platform.max', return_value=20) - mocker.patch('autosubmit.job.job.Job.write_stats') # not sure how to test this + mocker.patch('autosubmit.platforms.platform.max', return_value=0) + local.keep_alive_timeout = 20 + mocker.patch('autosubmit.job.job.Job.write_stats') # Tested in test_database_regression.py local.spawn_log_retrieval_process(as_conf) local.work_event.set() job = Job('t000', '0000', Status.COMPLETED, 0)