diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 733d35328b9cdb0b0ba1109923dfb2ba62260b11..4a3f85cf7e292550b1766c584c0f747f7312c3dd 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2578,12 +2578,6 @@ class Autosubmit: if error_message != "": raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message), 7014) - if not inspect: - for package in valid_packages_to_submit: - wrapper_time = None - for job in package.jobs: # if jobs > 1 == wrapped == same submission time - job.write_submit_time(wrapper_submit_time=wrapper_time) - wrapper_time = job.submit_time_timestamp if wrapper_errors and not any_job_submitted and len(job_list.get_in_queue()) == 0: # Deadlock situation diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 8df415c94682435e781588f939a850301bf784f3..1e8127bef01a839c7fcaec13157bf5d244a8a3e6 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -215,6 +215,11 @@ class ExperimentHistoryDbManager(DatabaseManager): """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ self._update_job_data_by_id(job_data_dc) return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) + + def update_job_data_dc_by_job_id_name(self, job_data_dc): + """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ + self._update_job_data_by_id(job_data_dc) + return self.get_job_data_by_job_id_name(job_data_dc.job_id, job_data_dc.job_name) def update_list_job_data_dc_by_each_id(self, job_data_dcs): """ Return length of updated list. """ @@ -319,11 +324,11 @@ class ExperimentHistoryDbManager(DatabaseManager): statement = ''' UPDATE job_data SET last=?, submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=?, - children=?, platform_output=? WHERE id=? ''' - arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), - job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, - job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, - job_data_dc.children, job_data_dc.platform_output, job_data_dc._id) + children=?, platform_output=?, id=? WHERE id=?''' + arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), + job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, + job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, + job_data_dc.children, job_data_dc.platform_output, job_data_dc._id, job_data_dc._id) self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) def _update_experiment_run(self, experiment_run_dc): @@ -346,13 +351,14 @@ class ExperimentHistoryDbManager(DatabaseManager): job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows] - def get_job_data_by_name(self, job_name): - """ Get List of Models.JobDataRow for job_name """ - statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC") - arguments = (job_name,) + def get_job_data_by_job_id_name(self, job_id, job_name): + """ Get List of Models.JobDataRow for job_id """ + statement = self.get_built_select_statement("job_data", "job_id=? AND job_name=? ORDER BY counter") + arguments = (int(job_id), str(job_name),) job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [Models.JobDataRow(*row) for row in job_data_rows] - + models = [Models.JobDataRow(*row) for row in job_data_rows][-1] + return JobData.from_model(models) + def get_job_data_max_counter(self): """ The max counter is the maximum count value for the count column in job_data. """ statement = "SELECT MAX(counter) as maxcounter FROM job_data" diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index fbf4403cb3ada66e21ff644c356637845e5cc4d8..e657d3729b4be3b5c7089bb138f6eef77b02b09e 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -97,23 +97,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code) - self._log.log("write_start_time {0} start not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.start = start @@ -122,7 +106,7 @@ class ExperimentHistory: job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code) job_data_dc_last.job_id = job_id job_data_dc_last.children = children - return self.manager.update_job_data_dc_by_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') @@ -131,24 +115,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code, - children=children) - self._log.log("write_finish_time {0} submit not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.finish = finish if finish > 0 else int(time()) @@ -157,7 +124,8 @@ class ExperimentHistory: job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS job_data_dc_last.out = out_file if out_file else "" job_data_dc_last.err = err_file if err_file else "" - return self.manager.update_job_data_dc_by_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) + except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') @@ -223,14 +191,14 @@ class ExperimentHistory: try: current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() update_these_changes = self._get_built_list_of_changes(job_list) - except Exception: + except: current_experiment_run_dc = 0 update_these_changes = [] # ("no runs") should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size, create) - if len(update_these_changes) > 0 and should_create_new_run is False: + if len(update_these_changes) > 0 and not should_create_new_run: self.manager.update_many_job_data_change_status(update_these_changes) if should_create_new_run: return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bf102db054058bc58288628c8a9980e23de27b58..ba4d704079cc82ffe5560d752feab7eb759da7ef 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -22,6 +22,7 @@ Main module for Autosubmit. Only contains an interface class to all functionalit """ from collections import OrderedDict +from pathlib import Path from autosubmit.job import job_utils from contextlib import suppress @@ -207,6 +208,7 @@ class Job(object): self._local_logs = ('', '') self._remote_logs = ('', '') self.script_name = self.name + ".cmd" + self.stat_file = self.script_name[:-4] + "_STAT_0" self.status = status self.prev_status = status self.old_status = self.status @@ -248,12 +250,16 @@ class Job(object): # hetjobs self.het = None self.updated_log = False - self.ready_start_date = None self.log_retrieved = False - self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time + self.start_time_timestamp = None self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial self._script = None # Inline code to be executed + self._log_recovery_retries = None + self.ready_date = None + self.wrapper_name = None + self.is_wrapper = False + def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -267,8 +273,10 @@ class Job(object): self._memory = '' self._memory_per_task = '' self.log_retrieved = False - self.start_time_placeholder = "" + self.start_time_timestamp = time.time() + self.end_time_placeholder = time.time() self.processors_per_node = "" + self.stat_file = self.script_name[:-4] + "_STAT_0" @property @@ -871,6 +879,13 @@ class Job(object): """Number of processors per node that the job can use.""" self._processors_per_node = value + def set_ready_date(self): + """ + Sets the ready start date for the job + """ + self.updated_log = False + self.ready_date = int(time.strftime("%Y%m%d%H%M%S")) + def inc_fail_count(self): """ Increments fail count @@ -974,7 +989,7 @@ class Job(object): :rtype: int """ if fail_count == -1: - logname = os.path.join(self._tmp_path, self.name + '_STAT') + logname = os.path.join(self._tmp_path, self.stat_file) else: fail_count = str(fail_count) logname = os.path.join(self._tmp_path, self.name + '_STAT_' + fail_count) @@ -1115,64 +1130,77 @@ class Job(object): def retrieve_internal_retrials_logfiles(self, platform): log_retrieved = False - original = copy.deepcopy(self.local_logs) - for i in range(0, int(self.retrials + 1)): - if i > 0: - self.local_logs = (original[0][:-4] + "_{0}".format(i) + ".out", original[1][:-4] + "_{0}".format(i) + ".err") - self.remote_logs = self.get_new_remotelog_name(i) - if not self.remote_logs: - self.log_retrieved = False - else: + last_retrial = 0 + try: + for i in range(0, int(self.retrials + 1)): + self.update_local_logs(count=i, update_submit_time=False) + backup_log = copy.copy(self.remote_logs) + self.remote_logs = self.get_new_remotelog_name(i) if self.check_remote_log_exists(platform): - try: - self.synchronize_logs(platform, self.remote_logs, self.local_logs) - remote_logs = copy.deepcopy(self.local_logs) - platform.get_logs_files(self.expid, remote_logs) - log_retrieved = True - except BaseException: - log_retrieved = False - self.log_retrieved = log_retrieved + self.synchronize_logs(platform, self.remote_logs, self.local_logs) + remote_logs = copy.deepcopy(self.local_logs) + platform.get_logs_files(self.expid, remote_logs) + log_retrieved = True + last_retrial = i + else: + self.remote_logs = backup_log + break + except: + pass + self.log_retrieved = log_retrieved + if self.log_retrieved: + self.platform.processed_wrapper_logs.add(self.wrapper_name) + + return last_retrial + + def write_stats(self, last_retrial): + # write stats + if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones + for i in range(0, int(last_retrial + 1)): + self.platform.get_stat_file(self, count=i) + self.write_vertical_time(i) + self.inc_fail_count() + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + else: + self.update_local_logs(update_submit_time=False) + self.platform.get_stat_file(self) + self.write_submit_time() + self.write_start_time() + self.write_end_time(self.status == Status.COMPLETED) + Log.result(f"{self.fail_count} retrials of job:{self.name} and {self.id} has been inserted in the db") + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + def retrieve_logfiles(self, platform, raise_error=False): """ - Retrieves log files from remote host meant to be used inside a process. - :param platform: platform that is calling the function, already connected. - :param raise_error: boolean to raise an error if the logs are not retrieved - :return: + Retrieves log files from remote host + :param platform: HPCPlatform object + :param raise_error: boolean, if True, raises an error if the log files are not retrieved + :return: dict with finish timestamps per job """ backup_logname = copy.copy(self.local_logs) - if self.wrapper_type == "vertical": - stat_file = self.script_name[:-4] + "_STAT_" - self.retrieve_internal_retrials_logfiles(platform) + last_retrial = self.retrieve_internal_retrials_logfiles(platform) else: - stat_file = self.script_name[:-4] + "_STAT" self.retrieve_external_retrials_logfiles(platform) - + last_retrial = 0 if not self.log_retrieved: self.local_logs = backup_logname - if raise_error: + if raise_error and self.wrapper_name not in self.platform.processed_wrapper_logs: raise AutosubmitCritical("Failed to retrieve logs for job {0}".format(self.name), 6000) - else: - Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) - else: - # Update the logs with Autosubmit Job ID Brand - try: - for local_log in self.local_logs: - platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) - # write stats - if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones - for i in range(0,int(self.retrials+1)): - if self.platform.get_stat_file(self.name, stat_file, count=i): - self.write_vertical_time(i) - self.inc_fail_count() - else: - self.platform.get_stat_file(self.name, stat_file) - self.write_start_time(from_stat_file=True) - self.write_end_time(self.status == Status.COMPLETED) + self.write_stats(last_retrial) def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') @@ -1252,11 +1280,6 @@ class Job(object): Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.FAILED: if not failed_file: - Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format( - self.name), 3000) - self._platform.get_completed_files( - self.name, wrapper_failed=self.packed) - self.check_completion() if self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) else: @@ -1276,12 +1299,6 @@ class Job(object): # after checking the jobs , no job should have the status "submitted" Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( self.name), 6008) - if self.status in [Status.COMPLETED, Status.FAILED]: - self.updated_log = False - - # # Write start_time() if not already written and job is running, completed or failed - # if self.status in [Status.RUNNING, Status.COMPLETED, Status.FAILED] and not self.start_time_written: - # self.write_start_time() # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: @@ -1289,10 +1306,6 @@ class Job(object): self.retrieve_logfiles(self.platform) else: self.platform.add_job_to_log_recover(self) - - - - return self.status @staticmethod @@ -1325,16 +1338,16 @@ class Job(object): :param default_status: status to set if job is not completed. By default, is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file_location = os.path.join(self._tmp_path, f"LOG_{self.expid}", self.name + '_COMPLETED') - if os.path.exists(log_name): + if os.path.exists(completed_file) or os.path.exists(completed_file_location): if not over_wallclock: self.status = Status.COMPLETED else: return Status.COMPLETED else: - Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( - self.name), 6009) + Log.warning(f"Couldn't find {self.name} COMPLETED file") if not over_wallclock: self.status = default_status else: @@ -1875,6 +1888,11 @@ class Job(object): self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") self.x11 = False if str(as_conf.jobs_data[self.section].get("X11", False)).lower() == "false" else True + if self.wrapper_type != "vertical" and self.packed: + self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" + else: + self.stat_file = f"{self.script_name[:-4]}_STAT_0" + if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1954,6 +1972,8 @@ class Job(object): """ as_conf.reload() self._init_runtime_parameters() + if hasattr(self, "start_time"): + self.start_time = time.time() # Parameters that affect to all the rest of parameters self.update_dict_parameters(as_conf) parameters = parameters.copy() @@ -1977,6 +1997,9 @@ class Job(object): # For some reason, there is return but the assignee is also necessary self.parameters = parameters # This return is only being used by the mock , to change the mock + for event in self.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() return parameters def update_content_extra(self,as_conf,files): @@ -2138,7 +2161,7 @@ class Job(object): filename = os.path.basename(os.path.splitext(additional_file)[0]) full_path = os.path.join(self._tmp_path,filename ) + "_" + self.name[5:] open(full_path, 'wb').write(additional_template_content.encode(lang)) - except Exception: + except: pass for key, value in parameters.items(): # parameters[key] can have '\\' characters that are interpreted as escape characters @@ -2220,29 +2243,20 @@ class Job(object): str(set(parameters) - set(variables))), 5013) return out - def write_submit_time(self, hold=False, enable_vertical_write=False, wrapper_submit_time=None): + def update_local_logs(self, count=-1, update_submit_time=True): + if update_submit_time: + self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') + if count > 0: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out_retrial_{count}", + f"{self.name}.{self.submit_time_timestamp}.err_retrial_{count}") + else: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", + f"{self.name}.{self.submit_time_timestamp}.err") + + def write_submit_time(self): """ Writes submit date and time to TOTAL_STATS file. It doesn't write if hold is True. """ - - self.start_time_written = False - if not enable_vertical_write: - if wrapper_submit_time: - self.submit_time_timestamp = wrapper_submit_time - else: - self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') - if self.wrapper_type != "vertical": - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - else: - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", - f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - return - if self.wrapper_type == "vertical" and self.fail_count > 0: - self.submit_time_timestamp = self.finish_time_timestamp - print(("Call from {} with status {}".format(self.name, self.status_str))) - if hold is True: - return # Do not write for HELD jobs. - data_time = ["",int(datetime.datetime.strptime(self.submit_time_timestamp, "%Y%m%d%H%M%S").timestamp())] path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') if os.path.exists(path): @@ -2259,57 +2273,56 @@ class Job(object): platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) - def write_start_time(self, enable_vertical_write=False, from_stat_file=False, count=-1): + def update_start_time(self, count=-1): + start_time_ = self.check_start_time(count) # last known start time from the .cmd file + if start_time_: + self.start_time_timestamp = start_time_ + else: + Log.warning(f"Start time for job {self.name} not found in the .cmd file, using last known time.") + self.start_time_timestamp = self.start_time_timestamp if self.start_time_timestamp else time.time() + if count > 0 or self.wrapper_name in self.platform.processed_wrapper_logs: + self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp),'S') + + def write_start_time(self, count=-1, vertical_wrapper=False): """ Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool """ - - if not enable_vertical_write and self.wrapper_type == "vertical": - return - - self.start_time_written = True - if not from_stat_file: # last known start time from AS - self.start_time_placeholder = time.time() - elif from_stat_file: - start_time_ = self.check_start_time(count) # last known start time from the .cmd file - if start_time_: - start_time = start_time_ - else: - start_time = self.start_time_placeholder if self.start_time_placeholder else time.time() - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) - # Writing database - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=start_time, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) + if not vertical_wrapper: + self.update_start_time(count) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')) + # Writing database + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) return True def write_vertical_time(self, count=-1): - self.write_submit_time(enable_vertical_write=True) - self.write_start_time(enable_vertical_write=True, from_stat_file=True, count=count) - self.write_end_time(self.status == Status.COMPLETED, enable_vertical_write=True, count=count) + self.update_start_time(count=count) + self.update_local_logs(update_submit_time=False) + self.write_submit_time() + self.write_start_time(count=count, vertical_wrapper=True) + self.write_end_time(self.status == Status.COMPLETED, count=count) - def write_end_time(self, completed, enable_vertical_write=False, count = -1): + def write_end_time(self, completed, count=-1): """ - Writes ends date and time to TOTAL_STATS file - :param completed: True if job was completed successfully, False otherwise + Writes end timestamp to TOTAL_STATS file and jobs_data.db + :param completed: True if the job has been completed, False otherwise :type completed: bool + :param count: number of retrials + :type count: int """ - if not enable_vertical_write and self.wrapper_type == "vertical": - return end_time = self.check_end_time(count) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') - finish_time = None - final_status = None if end_time > 0: # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S')) @@ -2319,7 +2332,7 @@ class Job(object): else: f.write(date2str(datetime.datetime.now(), 'S')) self.finish_time_timestamp = date2str(datetime.datetime.now(), 'S') - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + finish_time = time.time() f.write(' ') if completed: final_status = "COMPLETED" @@ -2328,7 +2341,6 @@ class Job(object): final_status = "FAILED" f.write('FAILED') out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, @@ -2342,6 +2354,7 @@ class Job(object): thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() + def write_total_stat_by_retries(self, total_stats, first_retrial = False): """ Writes all data to TOTAL_STATS file @@ -2490,6 +2503,8 @@ class WrapperJob(Job): self.checked_time = datetime.datetime.now() self.hold = hold self.inner_jobs_running = list() + self.is_wrapper = True + def _queuing_reason_cancel(self, reason): try: @@ -2525,6 +2540,7 @@ class WrapperJob(Job): self._check_running_jobs() # Check and update inner_jobs status that are eligible # Completed wrapper will always come from check function. elif self.status == Status.COMPLETED: + self._check_running_jobs() # Check and update inner_jobs status that are eligible self.check_inner_jobs_completed(self.job_list) # Fail can come from check function or running/completed checkers. @@ -2572,6 +2588,7 @@ class WrapperJob(Job): job.update_status(self.as_config) for job in completed_jobs: self.running_jobs_start.pop(job, None) + not_completed_jobs = list( set(not_completed_jobs) - set(completed_jobs)) @@ -2647,6 +2664,7 @@ class WrapperJob(Job): not_finished_jobs_names = ' '.join(list(not_finished_jobs_dict.keys())) remote_log_dir = self._platform.get_remote_log_dir() # PREPARE SCRIPT TO SEND + # When a inner_job is running? When the job has an _STAT file command = textwrap.dedent(""" cd {1} for job in {0} @@ -2669,9 +2687,12 @@ class WrapperJob(Job): os.chmod(log_dir, 0o770) open(multiple_checker_inner_jobs, 'w+').write(command) os.chmod(multiple_checker_inner_jobs, 0o770) - self._platform.send_file(multiple_checker_inner_jobs, False) - command = os.path.join( - self._platform.get_files_path(), "inner_jobs_checker.sh") + if self.platform.name != "local": # already "sent"... + self._platform.send_file(multiple_checker_inner_jobs, False) + command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}" + else: + command = os.path.join( + self._platform.get_files_path(), "inner_jobs_checker.sh") # wait = 2 retries = 5 @@ -2712,9 +2733,6 @@ class WrapperJob(Job): if content == '': sleep(wait) retries = retries - 1 - temp_list = self.inner_jobs_running - self.inner_jobs_running = [ - job for job in temp_list if job.status == Status.RUNNING] if retries == 0 or over_wallclock: self.status = Status.FAILED @@ -2743,7 +2761,7 @@ class WrapperJob(Job): running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED or job.status == Status.QUEUING] self.inner_jobs_running = list() for job in running_jobs: - if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): + if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True, max_retries=2): if job.platform.get_file('{0}_FAILED'.format(job.name), False, wrapper_failed=True): self._check_finished_job(job, True) else: @@ -2751,21 +2769,28 @@ class WrapperJob(Job): self.inner_jobs_running.append(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id), 6009) try: - self._platform.send_command( - self._platform.cancel_cmd + " " + str(self.id)) - except Exception: - Log.info(f'Job with {self.id} was finished before canceling it') - + if self.platform_name == "local": + # Check if the job is still running to avoid a misleading message in the logs + if self.platform.get_pscall(self.id): + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + else: + Log.warning(f"Wrapper {self.name} failed, cancelling it") + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + except: + Log.debug(f'Job with {self.id} was finished before canceling it') + self._check_running_jobs() + for job in self.inner_jobs_running: + job.status = Status.FAILED for job in self.job_list: - #if job.status == Status.RUNNING: - #job.inc_fail_count() - # job.packed = False - # job.status = Status.FAILED + job.packed = False if job.status not in [Status.COMPLETED, Status.FAILED]: - job.packed = False job.status = Status.WAITING + else: + if job.wrapper_type == "vertical": # job is being retrieved internally by the wrapper + job.fail_count = job.retrials def _update_completed_jobs(self): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0e5d32eb410a9ac62fc9dcd1f2544e3a2c9a0050..17bffecc51f61875b8318530c28a99fdf5aca532 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -27,6 +27,7 @@ from datetime import timedelta from autosubmit.job.job_common import Status from log.log import Log, AutosubmitCritical +Log.get_logger("Autosubmit") from autosubmit.job.job import Job from bscearth.utils.date import sum_str_hours from threading import Thread, Lock @@ -36,9 +37,6 @@ import tarfile import datetime import re import locale - -Log.get_logger("Autosubmit") - lock = Lock() def threaded(fn): def wrapper(*args, **kwargs): @@ -56,7 +54,7 @@ def jobs_in_wrapper_str(as_conf, current_wrapper): else: jobs_in_wrapper = jobs_in_wrapper.split(" ") jobs_in_wrapper = [job.strip(" ,") for job in jobs_in_wrapper] - return "_".join(jobs_in_wrapper) + return "&".join(jobs_in_wrapper) class JobPackageBase(object): """ Class to manage the package of jobs to be submitted by autosubmit @@ -251,6 +249,7 @@ class JobPackageSimple(JobPackageBase): if len(job_scripts) == 0: job_scripts = self._job_scripts for job in self.jobs: + job.update_local_logs() #CLEANS PREVIOUS RUN ON LOCAL log_completed = os.path.join(self._tmp_path, job.name + '_COMPLETED') log_stat = os.path.join(self._tmp_path, job.name + '_STAT') @@ -258,14 +257,15 @@ class JobPackageSimple(JobPackageBase): os.remove(log_completed) if os.path.exists(log_stat): os.remove(log_stat) - self.platform.remove_stat_file(job.name) + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) job.id = self.platform.submit_job(job, job_scripts[job.name], hold=hold, export = self.export) if job.id is None or not job.id: continue Log.info("{0} submitted", job.name) - job.status = Status.SUBMITTED - job.write_submit_time(hold=self.hold) + job.status = Status.SUBMITTED + job.wrapper_name = job.name + class JobPackageSimpleWrapped(JobPackageSimple): @@ -345,7 +345,8 @@ class JobPackageArray(JobPackageBase): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export) @@ -356,9 +357,8 @@ class JobPackageArray(JobPackageBase): for i in range(0, len(self.jobs)): # platforms without a submit.cmd Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) - self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].wrapper_name = self.name class JobPackageThread(JobPackageBase): @@ -575,13 +575,15 @@ class JobPackageThread(JobPackageBase): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: + job.update_local_logs() filenames += " " + self.platform.remote_log_dir + "/" + job.name + "_STAT " + \ self.platform.remote_log_dir + "/" + job.name + "_COMPLETED" self.platform.remove_multiple_files(filenames) else: for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: job.hold = hold @@ -591,13 +593,12 @@ class JobPackageThread(JobPackageBase): if package_id is None or not package_id: return - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + def _common_script_content(self): pass @@ -661,7 +662,8 @@ class JobPackageThreadWrapped(JobPackageThread): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: job.hold = hold @@ -671,13 +673,13 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None or not package_id: raise Exception('Submission failed') - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + + class JobPackageVertical(JobPackageThread): """ @@ -755,7 +757,7 @@ class JobPackageVertical(JobPackageThread): self._wallclock = "{0}:{1}".format(hh_str,mm_str) Log.info("Submitting {2} with wallclock {0}:{1}".format(hh_str,mm_str,self._name)) else: - wallclock_by_level = None + wallclock_by_level = 0 # command: "timeout 0 sleep 2" == command: "sleep 2" return self._wrapper_factory.get_wrapper(self._wrapper_factory.vertical_wrapper, name=self._name, queue=self._queue, project=self._project, wallclock=self._wallclock, diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 3d9296705d5731faf34b7d053791dc880018321e..61f96832ae217153036c4f196bf538eb2c8d55c1 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -18,12 +18,15 @@ # along with Autosubmit. If not, see . import locale import os +from pathlib import Path from xml.dom.minidom import parseString import subprocess +from matplotlib.patches import PathPatch from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader +from autosubmit.platforms.wrappers.wrapper_factory import LocalWrapperFactory from autosubmitconfigparser.config.basicconfig import BasicConfig from time import sleep @@ -64,6 +67,9 @@ class LocalPlatform(ParamikoPlatform): self.job_status['RUNNING'] = ['0'] self.job_status['QUEUING'] = [] self.job_status['FAILED'] = [] + self._allow_wrappers = True + self._wrapper = LocalWrapperFactory(self) + self.update_cmds() def update_cmds(self): @@ -100,28 +106,25 @@ class LocalPlatform(ParamikoPlatform): return [int(element.firstChild.nodeValue) for element in jobs_xml] def get_submit_cmd(self, job_script, job, hold=False, export=""): - wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + if job: + wallclock = self.parse_time(job.wallclock) + seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + else: + seconds = 24 * 3600 if export == "none" or export == "None" or export is None or export == "": export = "" else: export += " ; " - return self.get_call(job_script, job, export=export,timeout=seconds) - + command = self.get_call(job_script, job, export=export,timeout=seconds) + return f"cd {self.remote_log_dir} ; {command}" def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) - def connect(self, as_conf, reconnect=False): + def connect(self, as_conf={}, reconnect=False): self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() - + self.spawn_log_retrieval_process(as_conf) def test_connection(self,as_conf): - self.main_process_id = os.getpid() if not self.connected: self.connect(as_conf) @@ -151,10 +154,7 @@ class LocalPlatform(ParamikoPlatform): return True def send_file(self, filename, check=True): - self.check_remote_log_dir() - self.delete_file(filename,del_cmd=True) - command = '{0} {1} {2}'.format(self.put_cmd, os.path.join(self.tmp_path, filename), - os.path.join(self.tmp_path, 'LOG_' + self.expid, filename)) + command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}' try: subprocess.check_call(command, shell=True) except subprocess.CalledProcessError: @@ -164,6 +164,17 @@ class LocalPlatform(ParamikoPlatform): raise return True + def remove_multiple_files(self, filenames): + log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) + multiple_delete_previous_run = os.path.join( + log_dir, "multiple_delete_previous_run.sh") + if os.path.exists(log_dir): + lang = locale.getlocale()[1] + if lang is None: + lang = 'UTF-8' + open(multiple_delete_previous_run, 'wb+').write(("rm -f" + filenames).encode(lang)) + os.chmod(multiple_delete_previous_run, 0o770) + return "" def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): local_path = os.path.join(self.tmp_path, relative_path) @@ -187,40 +198,28 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=1, max_retries=1): """ - Moves a file on the platform + Checks if a file exists in the platform :param src: source name :type src: str - :param: wrapper_failed: if True, the wrapper failed. + :param wrapper_failed: checks inner jobs files :type wrapper_failed: bool - + :param sleeptime: time to sleep + :type sleeptime: int + :param max_retries: maximum number of retries + :type max_retries: int + :return: True if the file exists, False otherwise + :rtype: bool """ - file_exist = False - remote_path = os.path.join(self.get_files_path(), src) - retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 - while not file_exist and retries < max_retries: - try: - file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) - if not file_exist: # File doesn't exist, retry in sleep-time - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, remote_path) - if not wrapper_failed: - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 - else: - retries = 9999 - except BaseException as e: # Unrecoverable error - Log.printlog("File does not exist, logs {0} {1}".format(self.get_files_path(),src),6001) - file_exist = False # won't exist - retries = 999 # no more retries - return file_exist + sleeptime = 1 + for i in range(max_retries): + if os.path.isfile(os.path.join(self.get_files_path(), src)): + return True + sleep(sleeptime) + Log.warning("File {0} does not exist".format(src)) + return False + def delete_file(self, filename,del_cmd = False): if del_cmd: @@ -281,3 +280,18 @@ class LocalPlatform(ParamikoPlatform): :type remote_logs: (str, str) """ return + + def check_completed_files(self, sections=None): + command = "find %s " % self.remote_log_dir + if sections: + for i, section in enumerate(sections.split()): + command += " -name *%s_COMPLETED" % section + if i < len(sections.split()) - 1: + command += " -o " + else: + command += " -name *_COMPLETED" + + if self.send_command(command, True): + return self._ssh_output + else: + return None \ No newline at end of file diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index b6b4cdeaafccdd87ad4f9d28e8ca4b9d2d5b2ce3..8b19730a8e94a8b5d84a7c9017e8728022f10816 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,8 +1,6 @@ -import copy -import threading - import locale from contextlib import suppress +from pathlib import Path from time import sleep import sys import socket @@ -13,9 +11,6 @@ import select import re from datetime import timedelta import random - -from pathlib import Path - from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform @@ -26,7 +21,7 @@ from threading import Thread import threading import getpass from paramiko.agent import Agent -from autosubmit.helpers.utils import terminate_child_process +import time def threaded(fn): def wrapper(*args, **kwargs): @@ -118,13 +113,10 @@ class ParamikoPlatform(Platform): if display is None: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - self.log_retrieval_process_active = False - terminate_child_process(self.expid, self.name) def test_connection(self,as_conf): """ Test if the connection is still alive, reconnect if not. """ - self.main_process_id = os.getpid() try: if not self.connected: @@ -138,7 +130,7 @@ class ParamikoPlatform(Platform): try: transport = self._ssh.get_transport() transport.send_ignore() - except Exception: + except: message = "Timeout connection" return message @@ -154,6 +146,7 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical(str(e),7051) #raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message) + def restore_connection(self, as_conf): try: self.connected = False @@ -309,10 +302,9 @@ class ParamikoPlatform(Platform): self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) ) self._ftpChannel.get_channel().settimeout(120) self.connected = True - if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + + except SSHException: raise except IOError as e: @@ -371,33 +363,31 @@ class ParamikoPlatform(Platform): return "" return "" - def send_file(self, filename: str, check: bool = True) -> bool: + def send_file(self, filename, check=True): """ - Sends a local file to the platform. - - :param filename: Name of the file to send. + Sends a local file to the platform + :param check: + :param filename: name of the file to send :type filename: str - :param check: Whether to check and delete the remote file before sending. Defaults to True. - :type check: bool - :return: True if the file was sent successfully, False otherwise. - :rtype: bool - :raises AutosubmitError: If the file cannot be sent or the connection is not active. """ - local_path = Path(self.tmp_path) / filename - remote_path = Path(self.get_files_path()) / local_path.name + if check: - Log.debug(f"Cleaning remote file: {remote_path}") self.check_remote_log_dir() - self.delete_file(local_path.name) + self.delete_file(filename) try: - self._ftpChannel.put(str(local_path), str(remote_path)) - self._ftpChannel.chmod(str(remote_path), local_path.stat().st_mode) - Log.debug(f"File {local_path} sent to {remote_path}") + local_path = os.path.join(os.path.join(self.tmp_path, filename)) + remote_path = os.path.join( + self.get_files_path(), os.path.basename(filename)) + self._ftpChannel.put(local_path, remote_path) + self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode) return True except IOError as e: - raise AutosubmitError(f'Can not send file {local_path} to {remote_path}', 6004, str(e)) + raise AutosubmitError('Can not send file {0} to {1}'.format(os.path.join( + self.tmp_path, filename), os.path.join(self.get_files_path(), filename)), 6004, str(e)) except BaseException as e: - raise AutosubmitError('Send file failed. Connection seems to no be active', 6004, str(e)) + raise AutosubmitError( + 'Send file failed. Connection seems to no be active', 6004) + def get_list_of_files(self): return self._ftpChannel.get(self.get_files_path) @@ -461,7 +451,6 @@ class ParamikoPlatform(Platform): """ try: - Log.debug(f"Deleting file {filename}, full_path: {os.path.join(self.get_files_path(), filename)}") self._ftpChannel.remove(os.path.join( self.get_files_path(), filename)) return True @@ -539,7 +528,8 @@ class ParamikoPlatform(Platform): if cmd is None: return None if self.send_command(cmd,x11=x11): - job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=job.x11) + x11 = False if job is None else job.x11 + job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=x11) Log.debug("Job ID: {0}", job_id) return int(job_id) else: @@ -631,7 +621,19 @@ class ParamikoPlatform(Platform): self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: - job_status = Status.COMPLETED + # The Local platform has only 0 or 1, so it neccesary to look for the completed file. + # Not sure why it is called over_wallclock but is the only way to return a value + if self.type == "local": # wrapper has a different check completion + if not job.is_wrapper: + job_status = job.check_completion(over_wallclock=True) + else: + if Path(f"{self.remote_log_dir}/WRAPPER_FAILED").exists(): + job_status = Status.FAILED + else: + job_status = Status.COMPLETED + else: + job_status = Status.COMPLETED + elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING if not is_wrapper: @@ -663,8 +665,14 @@ class ParamikoPlatform(Platform): Log.error( 'check_job() The job id ({0}) status is {1}.', job_id, job_status) - if job_status in [Status.FAILED, Status.COMPLETED]: + if job_status in [Status.FAILED, Status.COMPLETED, Status.UNKNOWN]: job.updated_log = False + # backup for end time in case that the stat file is not found + job.end_time_placeholder = int(time.time()) + if job_status in [Status.RUNNING, Status.COMPLETED] and job.new_status in [Status.QUEUING, Status.SUBMITTED]: + # backup for start time in case that the stat file is not found + job.start_time_timestamp = int(time.time()) + if submit_hold_check: return job_status else: @@ -776,9 +784,9 @@ class ParamikoPlatform(Platform): try: if self.cancel_cmd is not None: job.platform.send_command(self.cancel_cmd + " " + str(job.id)) - except Exception: + except: pass - except Exception: + except: job_status = Status.FAILED if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED @@ -1229,18 +1237,23 @@ class ParamikoPlatform(Platform): :return: command to execute script :rtype: str """ - executable = '' - if job.type == Type.BASH: - executable = 'bash' - elif job.type == Type.PYTHON: - executable = 'python3' - elif job.type == Type.PYTHON2: - executable = 'python2' - elif job.type == Type.R: - executable = 'Rscript' - if job.executable != '': - executable = '' # Alternative: use job.executable with substituted placeholders - remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + if job: # If job is None, it is a wrapper + executable = '' + if job.type == Type.BASH: + executable = 'bash' + elif job.type == Type.PYTHON: + executable = 'python3' + elif job.type == Type.PYTHON2: + executable = 'python2' + elif job.type == Type.R: + executable = 'Rscript' + if job.executable != '': + executable = '' # Alternative: use job.executable with substituted placeholders + remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + else: + executable = 'python3' # wrappers are always python3 + remote_logs = (f"{job_script}.out", f"{job_script}.err") + if timeout < 1: command = export + ' nohup ' + executable + ' {0} > {1} 2> {2} & echo $!'.format( os.path.join(self.remote_log_dir, job_script), @@ -1438,7 +1451,7 @@ class ParamikoPlatform(Platform): return True else: return False - except Exception: + except: return False class ParamikoPlatformException(Exception): """ diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 4f198d81902068c6a3d6db83fc983bb7268be0d3..7291377b72a6337135698c096d4c15a6c621d4a4 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -151,7 +151,8 @@ class PJMPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - job.write_submit_time(hold=hold) + job.wrapper_name = package.name + i += 1 save = True except AutosubmitError as e: @@ -483,13 +484,10 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 + while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist @@ -497,9 +495,6 @@ class PJMPlatform(ParamikoPlatform): self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 67e36792730621a6ea78480ce3a254d1389ac7ba..9af2ca4845d306c41239343eaaffe6bed7575ae5 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,53 +1,55 @@ import atexit - -import queue +import multiprocessing +import queue # only for the exception +import psutil import setproctitle import locale import os - import traceback from autosubmit.job.job_common import Status -from typing import List, Union - +from typing import List, Union, Callable from autosubmit.helpers.parameters import autosubmit_parameter from log.log import AutosubmitCritical, AutosubmitError, Log -from multiprocessing import Process, Queue, Event - +from multiprocessing import Process, Event +from multiprocessing.queues import Queue import time -# stop the background task gracefully before exit -def stop_background(stop_event, process): - # request the background thread stop - stop_event.set() - # wait for the background thread to stop - process.join() - -def processed(fn): - def wrapper(*args, **kwargs): - stop_event = Event() - args = (args[0], stop_event) - process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform") - process.daemon = True # Set the process as a daemon process - process.start() - atexit.register(stop_background, stop_event, process) - return process - - return wrapper + +class UniqueQueue( + Queue): # The reason of this class is to avoid duplicates in the queue during the same run. That can happen if the log retrieval process didn't process it yet. + + def __init__(self, maxsize=-1, block=True, timeout=None): + self.block = block + self.timeout = timeout + self.all_items = set() # Won't be popped, so even if it is being processed by the log retrieval process, it won't be added again. + super().__init__(maxsize, ctx=multiprocessing.get_context()) + + def put(self, job, block=True, timeout=None): + if job.wrapper_type == "vertical": + unique_name = job.name + else: + unique_name = job.name+str(job.fail_count) + if unique_name not in self.all_items: + self.all_items.add(unique_name) + super().put(job, block, timeout) + class Platform(object): """ Class to manage the connections to the different platforms. """ + worker_events = list() + lock = multiprocessing.Lock() - def __init__(self, expid, name, config, auth_password = None): + def __init__(self, expid, name, config, auth_password=None): """ :param config: :param expid: :param name: """ self.connected = False - self.expid = expid # type: str - self._name = name # type: str + self.expid = expid # type: str + self._name = name # type: str self.config = config self.tmp_path = os.path.join( self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR")) @@ -92,23 +94,32 @@ class Platform(object): self.cancel_cmd = None self.otp_timeout = None self.two_factor_auth = None - self.otp_timeout = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA_TIMEOUT", 60*5) - self.two_factor_auth = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA", False) - self.two_factor_method = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA_METHOD", "token") + self.otp_timeout = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA_TIMEOUT", 60 * 5) + self.two_factor_auth = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA", False) + self.two_factor_method = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA_METHOD", "token") if not self.two_factor_auth: self.pw = None elif auth_password is not None and self.two_factor_auth: - if type(auth_password) is list: + if type(auth_password) == list: self.pw = auth_password[0] else: self.pw = auth_password else: self.pw = None - self.recovery_queue = Queue() + self.recovery_queue = UniqueQueue() self.log_retrieval_process_active = False self.main_process_id = None + self.work_event = Event() + self.cleanup_event = Event() + self.log_recovery_process = None + self.keep_alive_timeout = 60 * 5 # Useful in case of kill -9 + self.processed_wrapper_logs = set() self.max_waiting_jobs = 20 + @classmethod + def update_workers(cls, event_worker): + cls.worker_events.append(event_worker) + @property @autosubmit_parameter(name='current_arch') def name(self): @@ -239,9 +250,10 @@ class Platform(object): """ # only implemented for slurm return "" - def get_multiple_jobids(self,job_list,valid_packages_to_submit,failed_packages,error_message="",hold=False): - return False,valid_packages_to_submit - #raise NotImplementedError + + def get_multiple_jobids(self, job_list, valid_packages_to_submit, failed_packages, error_message="", hold=False): + return False, valid_packages_to_submit + # raise NotImplementedError def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages, error_message="", hold=False): return True, valid_packages_to_submit @@ -346,7 +358,7 @@ class Platform(object): raise except AutosubmitCritical as e: - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except AutosubmitError as e: raise except Exception as e: @@ -471,7 +483,6 @@ class Platform(object): parameters['{0}EC_QUEUE'.format(prefix)] = self.ec_queue parameters['{0}PARTITION'.format(prefix)] = self.partition - parameters['{0}USER'.format(prefix)] = self.user parameters['{0}PROJ'.format(prefix)] = self.project parameters['{0}BUDG'.format(prefix)] = self.budget @@ -574,10 +585,12 @@ class Platform(object): if job.current_checkpoint_step < job.max_checkpoint_step: remote_checkpoint_path = f'{self.get_files_path()}/CHECKPOINT_' self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) - while self.check_file_exists(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') and job.current_checkpoint_step < job.max_checkpoint_step: + while self.check_file_exists( + f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') and job.current_checkpoint_step < job.max_checkpoint_step: self.remove_checkpoint_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') job.current_checkpoint_step += 1 self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) + def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -606,18 +619,14 @@ class Platform(object): else: return False - def remove_stat_file(self, job_name): + def remove_stat_file(self, job): """ Removes *STAT* files from remote - - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool + param job: job to check + type job: Job """ - filename = job_name + '_STAT' - if self.delete_file(filename): - Log.debug('{0}_STAT have been removed', job_name) + if self.delete_file(job.stat_file): + Log.debug(f"{job.stat_file} have been removed") return True return False @@ -649,6 +658,7 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def remove_checkpoint_file(self, filename): """ Removes *CHECKPOINT* files from remote @@ -659,36 +669,26 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): return True - def get_stat_file(self, job_name, retries=0, count = -1): - """ - Copies *STAT* files from remote to local + def get_stat_file(self, job, count=-1): - :param retries: number of intents to get the completed files - :type retries: int - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool - """ - if count == -1: # No internal retrials - filename = job_name + '_STAT' + if count == -1: # No internal retrials + filename = job.stat_file else: - filename = job_name + '_STAT_{0}'.format(str(count)) + filename = job.name + '_STAT_{0}'.format(str(count)) stat_local_path = os.path.join( self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): if self.get_file(filename, True): - Log.debug('{0}_STAT file have been transferred', job_name) + Log.debug('{0}_STAT file have been transferred', job.name) return True - Log.debug('{0}_STAT file not found', job_name) + Log.warning('{0}_STAT file not found', job.name) return False - @autosubmit_parameter(name='current_logdir') def get_files_path(self): """ @@ -720,9 +720,11 @@ class Platform(object): :rtype: int """ raise NotImplementedError + def check_Alljobs(self, job_list, as_conf, retries=5): - for job,job_prev_status in job_list: + for job, job_prev_status in job_list: self.check_job(job) + def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status @@ -811,7 +813,7 @@ class Platform(object): # type: () -> None """ Opens Submit script file """ raise NotImplementedError - + def submit_Script(self, hold=False): # type: (bool) -> Union[List[str], str] """ @@ -820,54 +822,115 @@ class Platform(object): raise NotImplementedError def add_job_to_log_recover(self, job): - self.recovery_queue.put((job,job.children)) + self.recovery_queue.put(job) def connect(self, as_conf, reconnect=False): raise NotImplementedError - def restore_connection(self,as_conf): + def restore_connection(self, as_conf): raise NotImplementedError - @processed - def recover_job_logs(self, event): + def spawn_log_retrieval_process(self, as_conf): + """ + This function, spawn a process to recover the logs of the jobs that have been submitted in this platform. + """ + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', + "false")).lower() == "false"): + if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": + self.log_retrieval_process_active = True + Platform.update_workers(self.work_event) + self.log_recovery_process = Process(target=self.recover_platform_job_logs, args=(), + name=f"{self.name}_log_recovery") + self.log_recovery_process.daemon = True + self.log_recovery_process.start() + os.waitpid(self.log_recovery_process.pid, os.WNOHANG) + Log.result(f"Process {self.log_recovery_process.name} started with pid {self.log_recovery_process.pid}") + atexit.register(self.send_cleanup_signal) + + def send_cleanup_signal(self): + if self.log_recovery_process and self.log_recovery_process.is_alive(): + self.work_event.clear() + self.cleanup_event.set() + self.log_recovery_process.join() + + def wait_for_work(self, sleep_time=60): + """ + This function waits for the work_event to be set or the cleanup_event to be set. + """ + process_log = False + for remaining in range(sleep_time, 0, -1): # Min time to wait unless clean-up signal is set + time.sleep(1) + if self.work_event.is_set() or not self.recovery_queue.empty(): + process_log = True + if self.cleanup_event.is_set(): + process_log = True + break + if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process. + timeout = self.keep_alive_timeout - sleep_time + while timeout > 0 or not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + time.sleep(1) + timeout -= 1 + if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + process_log = True + self.work_event.clear() + return process_log + + def recover_job_log(self, identifier, jobs_pending_to_process): + job = None + try: + while not self.recovery_queue.empty(): + try: + job = self.recovery_queue.get( + timeout=1) # Should be non-empty, but added a timeout for other possible errors. + job.children = set() # Children can't be serialized, so we set it to an empty set for this process. + job.platform = self # change the original platform to this process platform. + job._log_recovery_retries = 0 # reset the log recovery retries. + Log.debug(f"{identifier} Recovering log files for job {job.name} and retrial:{job.fail_count}") + job.retrieve_logfiles(self, raise_error=True) + if job.status == Status.FAILED: + Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") + except queue.Empty: + pass + # This second while is to keep retring the failed jobs. + while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval + job = jobs_pending_to_process.pop() + job._log_recovery_retries += 1 + Log.debug( + f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") + job.retrieve_logfiles(self, raise_error=True) + Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") + except Exception as e: + Log.info(f"{identifier} Error while recovering logs: {str(e)}") + try: + if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. + jobs_pending_to_process.add(job) + self.connected = False + Log.info(f"{identifier} Attempting to restore connection") + self.restore_connection(None) # Always restore the connection on a failure. + Log.result(f"{identifier} Sucessfully reconnected.") + except: + pass + return jobs_pending_to_process + + def recover_platform_job_logs(self): + """ + This function, recovers the logs of the jobs that have been submitted. + The exit of this process is controlled by the work_event and cleanup_events of the main process. + """ setproctitle.setproctitle(f"autosubmit log {self.expid} recovery {self.name.lower()}") - job_names_processed = set() + identifier = f"{self.name.lower()}(log_recovery):" + Log.info(f"{identifier} Starting...") + jobs_pending_to_process = set() self.connected = False self.restore_connection(None) - # check if id of self.main_process exists with ps ax | grep self.main_process_id - max_logs_to_process = 60 - while not event.is_set() and os.system(f"ps ax | grep {str(self.main_process_id)} | grep -v grep > /dev/null 2>&1") == 0: - time.sleep(60) - logs_processed = 0 # avoid deadlocks just in case - try: - while not self.recovery_queue.empty() and logs_processed < max_logs_to_process: - logs_processed += 1 - job,children = self.recovery_queue.get(block=False) - if job.wrapper_type != "vertical": - if f'{job.name}_{job.fail_count}' in job_names_processed: - continue - else: - if f'{job.name}' in job_names_processed: - continue - job.children = children - job.platform = self - if job.x11: - Log.debug("Job {0} is an X11 job, skipping log retrieval as they're written in the ASLOGS".format(job.name)) - continue - try: - job.retrieve_logfiles(self, raise_error=True) - if job.wrapper_type != "vertical": - job_names_processed.add(f'{job.name}_{job.fail_count}') - else: - job_names_processed.add(f'{job.name}') - except Exception: - pass - except queue.Empty: - pass - except (IOError, OSError): - pass - except Exception as e: - try: - self.restore_connection(None) - except Exception: - pass + Log.get_logger("Autosubmit") # Log needs to be initialised in the new process + Log.result(f"{identifier} Sucessfully connected.") + log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60) + self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5) + while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)): + jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process) + if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end. + self.recover_job_log(identifier, jobs_pending_to_process) + break + Log.info(f"{identifier} Exiting.") \ No newline at end of file diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 3a84e028317f3c64955c7e2fc0e3536e6d7e26ca..1884a67390f5b04d3794541e1c62fe6c7849589d 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -99,7 +99,7 @@ class SlurmPlatform(ParamikoPlatform): """ try: - valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 is not True] + valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 != True] if len(valid_packages_to_submit) > 0: duplicated_jobs_already_checked = False platform = valid_packages_to_submit[0].jobs[0].platform @@ -120,7 +120,7 @@ class SlurmPlatform(ParamikoPlatform): #cancel bad submitted job if jobid is encountered for id_ in jobid: self.send_command(self.cancel_job(id_)) - except Exception: + except: pass jobs_id = None self.connected = False @@ -189,6 +189,8 @@ class SlurmPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED + job.wrapper_name = package.name + # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -651,37 +653,30 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 while not file_exist and retries < max_retries: try: - # This return IOError if path doesn't exist + # This return IOError if a path doesn't exist self._ftpChannel.stat(os.path.join( self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) - sleeptime = sleeptime + 5 retries = retries + 1 else: - retries = 9999 + sleep(2) + retries = retries + 1 except BaseException as e: # Unrecoverable error if str(e).lower().find("garbage") != -1: - if not wrapper_failed: - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 else: - Log.printlog("remote logs {0} couldn't be recovered".format(filename), 6001) file_exist = False # won't exist retries = 999 # no more retries + if not file_exist: + Log.warning("File {0} couldn't be found".format(filename)) return file_exist diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 7ff5552db84966a83e7ce97260abe89d49076748..0135e2976c92fce6199bb34ed5813dfdfc964227 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -60,8 +60,10 @@ class WrapperBuilder(object): self.exit_thread = '' if "wallclock_by_level" in list(kwargs.keys()): self.wallclock_by_level = kwargs['wallclock_by_level'] + def build_header(self): return textwrap.dedent(self.header_directive) + self.build_imports() + def build_imports(self): pass def build_job_thread(self): @@ -153,7 +155,6 @@ class PythonWrapperBuilder(WrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') - #os.system("echo $(date +%s) > "+jobname+"_STAT") out = str(self.template) + ".out." + str(0) err = str(self.template) + ".err." + str(0) print(out+"\\n") @@ -446,8 +447,11 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): sequential_threads_launcher = textwrap.dedent(""" failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} - total_steps = 0 - print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + total_steps = 0 + try: + print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + except: + print("JOB.ID") for i in range(len({0})): job_retrials = retrials completed = False @@ -455,8 +459,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while fail_count <= job_retrials and not completed: current = {1} current.start() - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + start = int(time.time()) current.join({3}) total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -468,26 +471,42 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + finish = int(time.time()) + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + print(f"Completed_file:{{completed_path}}") + print(f"Writting:{{stat_path_tmp}}") + print(f"[Start:{{start}}, Finish:{{finish}}, Fail_count:{{fail_count}}]") + with open(f"{{stat_path_tmp}}", "w") as file: + file.write(f"{{start}}\\n") + file.write(f"{{finish}}\\n") if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") - os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) else: print(datetime.now(), "The job ", current.template," has FAILED") - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) #{1} fail_count = fail_count + 1 """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" + from pathlib import Path + fail_count = 0 + while fail_count <= job_retrials: + try: + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + Path(stat_path_tmp).replace(stat_path_tmp.replace(".tmp","")) + except: + print(f"Couldn't write the stat file:{{stat_path_tmp}}") + fail_count = fail_count + 1 if not os.path.exists(completed_path): open(failed_wrapper,'wb').close() open(failed_path, 'wb').close() if os.path.exists(failed_wrapper): os.remove(os.path.join(os.getcwd(),wrapper_id)) + print("WRAPPER_FAILED") wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") open(wrapper_failed, 'wb').close() os._exit(1) @@ -505,15 +524,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): self.fail_count = fail_count def run(self): + print("\\n") jobname = self.template.replace('.cmd', '') out = str(self.template) + ".out." + str(self.fail_count) err = str(self.template) + ".err." + str(self.fail_count) - print((out+"\\n")) - command = "./" + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() - print((command+"\\n")) - (self.status) = getstatusoutput("timeout {0} " + command + " > " + out + " 2> " + err) - for i in self.status: - print((str(i)+"\\n")) + out_path = os.path.join(os.getcwd(), out) + err_path = os.path.join(os.getcwd(), err) + template_path = os.path.join(os.getcwd(), self.template) + command = f"timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}" + print(command) + getstatusoutput(command) + """).format(str(self.wallclock_by_level),'\n'.ljust(13)) diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 613dcdaa1ce0ba82fcf26b5d689385f85bc8d192..1bd97c831c11119bab6bcb63bd5dc0dcd494803a 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -72,16 +72,16 @@ class WrapperFactory(object): return wrapper_cmd def vertical_wrapper(self, **kwargs): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def horizontal_wrapper(self, **kwargs): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def hybrid_wrapper_horizontal_vertical(self, **kwargs): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def hybrid_wrapper_vertical_horizontal(self, **kwargs): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def header_directives(self, **kwargs): pass @@ -124,21 +124,73 @@ class WrapperFactory(object): def reservation_directive(self, reservation): return '#' def dependency_directive(self, dependency): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def queue_directive(self, queue): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def processors_directive(self, processors): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def nodes_directive(self, nodes): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def tasks_directive(self, tasks): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def partition_directive(self, partition): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def exclusive_directive(self, exclusive): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) def threads_directive(self, threads): - raise NotImplementedError(self.exception) + raise NotImplemented(self.exception) + + +class LocalWrapperFactory(WrapperFactory): + + def vertical_wrapper(self, **kwargs): + return PythonVerticalWrapperBuilder(**kwargs) + + def horizontal_wrapper(self, **kwargs): + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) + + def hybrid_wrapper_horizontal_vertical(self, **kwargs): + return PythonHorizontalVerticalWrapperBuilder(**kwargs) + + def hybrid_wrapper_vertical_horizontal(self, **kwargs): + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) + + def reservation_directive(self, reservation): + return '#' + + def dependency_directive(self, dependency): + return '#' + + def queue_directive(self, queue): + return '#' + + def processors_directive(self, processors): + return '#' + + def nodes_directive(self, nodes): + return '#' + + def tasks_directive(self, tasks): + return '#' + + def partition_directive(self, partition): + return '#' + + def exclusive_directive(self, exclusive): + return '#' + + def threads_directive(self, threads): + return '#' + + def header_directives(self, **kwargs): + return "" class SlurmWrapperFactory(WrapperFactory): diff --git a/test/unit/test_database_regression.py b/test/unit/test_database_regression.py new file mode 100644 index 0000000000000000000000000000000000000000..44079537c5be3e1a520227af51fc0d7f7afd82ad --- /dev/null +++ b/test/unit/test_database_regression.py @@ -0,0 +1,264 @@ +import shutil + +import pytest +from pathlib import Path +from autosubmit.autosubmit import Autosubmit +from log.log import Log +import os +import pwd +from autosubmit.platforms.locplatform import LocalPlatform + +from test.unit.utils.common import create_database, init_expid +import sqlite3 + + +def _get_script_files_path() -> Path: + return Path(__file__).resolve().parent / 'files' + + +# Maybe this should be a regression test + +@pytest.fixture +def db_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp(f'db_tests') + os.mkdir(folder.join('scratch')) + os.mkdir(folder.join('db_tmp_dir')) + file_stat = os.stat(f"{folder.strpath}") + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + folder.owner = file_owner + + # Write an autosubmitrc file in the temporary directory + autosubmitrc = folder.join('autosubmitrc') + autosubmitrc.write(f''' +[database] +path = {folder} +filename = tests.db + +[local] +path = {folder} + +[globallogs] +path = {folder} + +[structures] +path = {folder} + +[historicdb] +path = {folder} + +[historiclog] +path = {folder} + +[defaultstats] +path = {folder} + +''') + os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc')) + create_database(str(folder.join('autosubmitrc'))) + assert "tests.db" in [Path(f).name for f in folder.listdir()] + init_expid(str(folder.join('autosubmitrc')), platform='local', create=False, test_type='test') + assert "t000" in [Path(f).name for f in folder.listdir()] + return folder + + +@pytest.fixture +def prepare_db(db_tmpdir): + # touch as_misc + # remove files under t000/conf + conf_folder = Path(f"{db_tmpdir.strpath}/t000/conf") + shutil.rmtree(conf_folder) + os.makedirs(conf_folder) + platforms_path = Path(f"{db_tmpdir.strpath}/t000/conf/platforms.yml") + main_path = Path(f"{db_tmpdir.strpath}/t000/conf/main.yml") + # Add each platform to test + with platforms_path.open('w') as f: + f.write(f""" +PLATFORMS: + dummy: + type: dummy + """) + + with main_path.open('w') as f: + f.write(f""" +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + # Number of chunks of the experiment. + NUMCHUNKS: '3' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + +CONFIG: + # Current version of Autosubmit. + AUTOSUBMIT_VERSION: "" + # Total number of jobs in the workflow. + TOTALJOBS: 20 + # Maximum number of jobs permitted in the waiting status. + MAXWAITINGJOBS: 20 + SAFETYSLEEPTIME: 1 +DEFAULT: + # Job experiment ID. + EXPID: "t000" + # Default HPC platform name. + HPCARCH: "local" + #hint: use %PROJDIR% to point to the project folder (where the project is cloned) + # Custom configuration location. +project: + # Type of the project. + PROJECT_TYPE: None + # Folder to hold the project sources. + PROJECT_DESTINATION: local_project +""") + expid_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000") + dummy_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/dummy_dir") + real_data = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/real_data") + # We write some dummy data inside the scratch_dir + os.makedirs(expid_dir, exist_ok=True) + os.makedirs(dummy_dir, exist_ok=True) + os.makedirs(real_data, exist_ok=True) + + with open(dummy_dir.joinpath('dummy_file'), 'w') as f: + f.write('dummy data') + # create some dummy absolute symlinks in expid_dir to test migrate function + (real_data / 'dummy_symlink').symlink_to(dummy_dir / 'dummy_file') + return db_tmpdir + + +@pytest.mark.parametrize("jobs_data, expected_count, final_status", [ + # Success + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + """, 1, "COMPLETED"), + # Success wrapper + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 1, "COMPLETED"), + # Failure + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + exit 1 + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + """, 3, "FAILED"), + # Failure wrappers + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + exit 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:10 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 3, "FAILED"), +], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) +def test_db(db_tmpdir, prepare_db, jobs_data, expected_count, final_status, mocker): + # write jobs_data + jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") + with jobs_path.open('w') as f: + f.write(jobs_data) + + # Create + init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True, test_type='test') + + # This is set in _init_log which is not called + as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") + with as_misc.open('w') as f: + f.write(f""" + AS_MISC: True + ASMISC: + COMMAND: run + AS_COMMAND: run + """) + + # Run the experiment + with mocker.patch('autosubmit.platforms.platform.max', return_value=20): + Autosubmit.run_experiment(expid='t000') + + # Test database exists. + job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") + autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") + assert job_data.exists() + assert autosubmit_db.exists() + + # Check job_data info + conn = sqlite3.connect(job_data) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute("SELECT * FROM job_data") + rows = c.fetchall() + # Convert rows to a list of dictionaries + rows_as_dicts = [dict(row) for row in rows] + # Tune the print so it is more readable, so it is easier to debug in case of failure + column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] + column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col + in column_names] + print(f"Experiment folder: {db_tmpdir.strpath}") + header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) + print(f"\n{header}") + print("-" * len(header)) + # Print the rows + for row_dict in rows_as_dicts: # always print, for debug proposes + print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) + for row_dict in rows_as_dicts: + # Check that all fields contain data, except extra_data, children, and platform_output + # Check that submit, start and finish are > 0 + assert int(row_dict["submit"]) > 0 and int(row_dict["finish"]) != 1970010101 + assert int(row_dict["start"]) > 0 and int(row_dict["finish"]) != 1970010101 + assert int(row_dict["finish"]) > 0 and int(row_dict["finish"]) != 1970010101 + assert row_dict["status"] == final_status + for key in [key for key in row_dict.keys() if + key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: + assert str(row_dict[key]) != str("") + # Check that the job_data table has the expected number of entries + c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") + count_rows = c.fetchall() + for row in count_rows: + assert int(row["count"]) == expected_count + # Close the cursor and connection + c.close() + conn.close() diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 3808dba35bf988240d0511fadafb7da96b0ff546..6aa3723541e56d24c75b8452e4a4ac82fa2aec54 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -226,4 +226,4 @@ def test_jobs_in_wrapper_str(mock_as_conf): # Arrange current_wrapper = "current_wrapper" result = jobs_in_wrapper_str(mock_as_conf, current_wrapper) - assert result == "job1_job2_job3" \ No newline at end of file + assert result == "job1&job2&job3" \ No newline at end of file