diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6bc39f8627db2dba04db63e5d975ad57b756e3e4..e27d4dfd4dde927b00853c6467a48b619bc50149 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -86,7 +86,7 @@ from typing import List import autosubmit.history.utils as HUtils import autosubmit.helpers.autosubmit_helper as AutosubmitHelper import autosubmit.statistics.utils as StatisticsUtils -from autosubmit.helpers.utils import proccess_id, terminate_child_process, check_jobs_file_exists +from autosubmit.helpers.utils import proccess_id, check_jobs_file_exists from contextlib import suppress @@ -191,7 +191,7 @@ class Autosubmit: default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('NO_LOG', 'INFO', 'WARNING', 'DEBUG'), - default='INFO', type=str, + default='DEBUG', type=str, help="sets console's log level") subparsers = parser.add_subparsers(dest='command') @@ -1686,25 +1686,6 @@ class Autosubmit: for job in job_list.get_job_list(): job.status = Status.WAITING - - @staticmethod - def terminate_child_process(expid, platform = None): - # get pid of the main process - pid = os.getpid() - # In case some one used 4.1.6 or 4.1.5 - process_ids = proccess_id(expid,"run", single_instance = False, platform = platform) - if process_ids: - for process_id in [ process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - process_ids = proccess_id(expid,"log", single_instance = False, platform = platform) - # 4.1.7 + - if process_ids: - for process_id in [ process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - - @staticmethod def terminate(all_threads): # Closing threads on Ctrl+C @@ -1926,7 +1907,7 @@ class Autosubmit: return exp_history @staticmethod def prepare_run(expid, notransitive=False, start_time=None, start_after=None, - run_only_members=None, recover = False, check_scripts= False): + run_only_members=None, recover = False, check_scripts= False, submitter=None): """ Prepare the run of the experiment. :param expid: a string with the experiment id. @@ -1935,6 +1916,7 @@ class Autosubmit: :param start_after: a string with the experiment id to start after. :param run_only_members: a string with the members to run. :param recover: a boolean to indicate if the experiment is recovering from a failure. + :param submitter: the actual loaded platforms if any :return: a tuple """ host = platform.node() @@ -1969,8 +1951,9 @@ class Autosubmit: # Loads the communication lib, always paramiko. # Paramiko is the only way to communicate with the remote machines. Previously we had also Saga. - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) + if not submitter: + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) # Tries to load the job_list from disk, discarding any changes in running time ( if recovery ). # Could also load a backup from previous iteration. # The submit ready functions will cancel all job submitted if one submitted in that iteration had issues, so it should be safe to recover from a backup without losing job ids @@ -2083,7 +2066,7 @@ class Autosubmit: Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False else: - return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True + return job_list, submitter, None, None, as_conf, platforms_to_test, packages_persistence, True @staticmethod def get_iteration_info(as_conf,job_list): """ @@ -2166,12 +2149,13 @@ class Autosubmit: max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 while job_list.get_active(): + for platform in platforms_to_test: # Main loop, there will be logs to recover + platform.work_event.set() for job in [job for job in job_list.get_job_list() if job.status == Status.READY]: job.update_parameters(as_conf, {}) did_run = True try: if Autosubmit.exit: - terminate_child_process(expid) Autosubmit.terminate(threading.enumerate()) if job_list.get_failed(): return 1 @@ -2277,7 +2261,7 @@ class Autosubmit: start_time, start_after, run_only_members, - recover=True) + recover=True, submitter = submitter) except AutosubmitError as e: recovery = False Log.result("Recover of job_list has fail {0}".format(e.message)) @@ -2331,33 +2315,33 @@ class Autosubmit: except BaseException as e: raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") - if not did_run and len(job_list.get_completed_without_logs()) > 0: - #connect to platforms + # search hint - finished run + for job in job_list.get_completed_failed_without_logs(): + job_list.update_log_status(job, as_conf) + job_list.save() + if not did_run and len(job_list.get_completed_failed_without_logs()) > 0: Log.info(f"Connecting to the platforms, to recover missing logs") submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) if submitter.platforms is None: raise AutosubmitCritical("No platforms configured!!!", 7014) - platforms = [value for value in submitter.platforms.values()] - Autosubmit.restore_platforms(platforms, as_conf=as_conf, expid=expid) - # Wait for all remaining threads of I/O, close remaining connections - # search hint - finished run + platforms_to_test = [value for value in submitter.platforms.values()] + Autosubmit.restore_platforms(platforms_to_test, as_conf=as_conf, expid=expid) Log.info("Waiting for all logs to be updated") - # get all threads - threads = threading.enumerate() - # print name - timeout = as_conf.experiment_data.get("CONFIG",{}).get("LAST_LOGS_TIMEOUT", 180) - for remaining in range(timeout, 0, -1): - if len(job_list.get_completed_without_logs()) == 0: - break - for job in job_list.get_completed_without_logs(): - job.platform = submitter.platforms[job.platform_name.upper()] - job_list.update_log_status(job, as_conf) - sleep(1) - if remaining % 10 == 0: - Log.info(f"Timeout: {remaining}") - - # Updating job data header with current information when experiment ends + if len(job_list.get_completed_failed_without_logs()) > 0: + for p in platforms_to_test: + if p.log_recovery_process: + p.cleanup_event.set() + for p in platforms_to_test: + if p.log_recovery_process: + p.log_recovery_process.join() + for job in job_list.get_completed_failed_without_logs(): + job_list.update_log_status(job, as_conf) + job_list.save() + if len(job_list.get_completed_failed_without_logs()) == 0: + Log.result(f"Autosubmit recovered all job logs.") + else: + Log.warning(f"Autosubmit couldn't recover the following job logs: {[job.name for job in job_list.get_completed_failed_without_logs()]}") try: exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) @@ -2368,7 +2352,6 @@ class Autosubmit: Autosubmit.database_fix(expid) except Exception as e: pass - terminate_child_process(expid) for platform in platforms_to_test: platform.closeConnection() if len(job_list.get_failed()) > 0: @@ -2383,13 +2366,10 @@ class Autosubmit: Log.warning("Database is locked") except (portalocker.AlreadyLocked, portalocker.LockException) as e: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - terminate_child_process(expid) raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: - terminate_child_process(expid) raise except BaseException as e: - terminate_child_process(expid) raise finally: if profile: @@ -2527,12 +2507,6 @@ class Autosubmit: if error_message != "": raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message), 7014) - if not inspect: - for package in valid_packages_to_submit: - wrapper_time = None - for job in package.jobs: # if jobs > 1 == wrapped == same submission time - job.write_submit_time(wrapper_submit_time=wrapper_time) - wrapper_time = job.submit_time_timestamp if wrapper_errors and not any_job_submitted and len(job_list.get_in_queue()) == 0: # Deadlock situation @@ -6094,7 +6068,6 @@ class Autosubmit: if status in Status.VALUE_TO_KEY.values(): job.status = Status.KEY_TO_VALUE[status] job_list.save() - terminate_child_process(expid) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index 7ccca9cf89a6e60664192a305baa158ce13aa0f5..4f05c7676049e9e5ffafe5498426ace74b3d862f 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -34,21 +34,6 @@ def check_jobs_file_exists(as_conf, current_section_name=None): if missing_files: raise AutosubmitCritical(f"Templates not found:\n{missing_files}", 7011) -def terminate_child_process(expid, platform=None): - # get pid of the main process - pid = os.getpid() - # In case someone used 4.1.6 or 4.1.5 - process_ids = proccess_id(expid, "run", single_instance=False, platform=platform) - if process_ids: - for process_id in [process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) - process_ids = proccess_id(expid, "log", single_instance=False, platform=platform) - # 4.1.7 + - if process_ids: - for process_id in [process_id for process_id in process_ids if process_id != pid]: - # force kill - os.kill(process_id, signal.SIGKILL) def proccess_id(expid=None, command="run", single_instance=True, platform=None): # Retrieve the process id of the autosubmit process diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 8df415c94682435e781588f939a850301bf784f3..1e8127bef01a839c7fcaec13157bf5d244a8a3e6 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -215,6 +215,11 @@ class ExperimentHistoryDbManager(DatabaseManager): """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ self._update_job_data_by_id(job_data_dc) return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) + + def update_job_data_dc_by_job_id_name(self, job_data_dc): + """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ + self._update_job_data_by_id(job_data_dc) + return self.get_job_data_by_job_id_name(job_data_dc.job_id, job_data_dc.job_name) def update_list_job_data_dc_by_each_id(self, job_data_dcs): """ Return length of updated list. """ @@ -319,11 +324,11 @@ class ExperimentHistoryDbManager(DatabaseManager): statement = ''' UPDATE job_data SET last=?, submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=?, - children=?, platform_output=? WHERE id=? ''' - arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), - job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, - job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, - job_data_dc.children, job_data_dc.platform_output, job_data_dc._id) + children=?, platform_output=?, id=? WHERE id=?''' + arguments = (job_data_dc.last, job_data_dc.submit, job_data_dc.start, job_data_dc.finish, HUtils.get_current_datetime(), + job_data_dc.job_id, job_data_dc.status, job_data_dc.energy, job_data_dc.extra_data, + job_data_dc.nnodes, job_data_dc.ncpus, job_data_dc.rowstatus, job_data_dc.out, job_data_dc.err, + job_data_dc.children, job_data_dc.platform_output, job_data_dc._id, job_data_dc._id) self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) def _update_experiment_run(self, experiment_run_dc): @@ -346,13 +351,14 @@ class ExperimentHistoryDbManager(DatabaseManager): job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows] - def get_job_data_by_name(self, job_name): - """ Get List of Models.JobDataRow for job_name """ - statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC") - arguments = (job_name,) + def get_job_data_by_job_id_name(self, job_id, job_name): + """ Get List of Models.JobDataRow for job_id """ + statement = self.get_built_select_statement("job_data", "job_id=? AND job_name=? ORDER BY counter") + arguments = (int(job_id), str(job_name),) job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [Models.JobDataRow(*row) for row in job_data_rows] - + models = [Models.JobDataRow(*row) for row in job_data_rows][-1] + return JobData.from_model(models) + def get_job_data_max_counter(self): """ The max counter is the maximum count value for the count column in job_data. """ statement = "SELECT MAX(counter) as maxcounter FROM job_data" diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 5fd081600ba82f151b9c7e5d42a0b711ce26cf82..8dcf5d068bb84255c9a5c86866ae9d5c8c02f6a2 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -97,23 +97,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code) - self._log.log("write_start_time {0} start not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.start = start @@ -122,7 +106,7 @@ class ExperimentHistory: job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code) job_data_dc_last.job_id = job_id job_data_dc_last.children = children - return self.manager.update_job_data_dc_by_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') @@ -131,24 +115,7 @@ class ExperimentHistory: member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, wrapper_queue=None, wrapper_code=None, children=""): try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code, - children=children) - self._log.log("write_finish_time {0} submit not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + job_data_dc_last = self.manager.get_job_data_by_job_id_name(job_id, job_name) if not job_data_dc_last: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.finish = finish if finish > 0 else int(time()) @@ -157,7 +124,8 @@ class ExperimentHistory: job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS job_data_dc_last.out = out_file if out_file else "" job_data_dc_last.err = err_file if err_file else "" - return self.manager.update_job_data_dc_by_id(job_data_dc_last) + return self.manager.update_job_data_dc_by_job_id_name(job_data_dc_last) + except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 0f5085f1f084658ada59d93a739f2f95622a990e..f9ce861d7c797b92629bcc50c5054c3ff5f1582e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -22,6 +22,7 @@ Main module for Autosubmit. Only contains an interface class to all functionalit """ from collections import OrderedDict +from pathlib import Path from autosubmit.job import job_utils from contextlib import suppress @@ -207,6 +208,7 @@ class Job(object): self._local_logs = ('', '') self._remote_logs = ('', '') self.script_name = self.name + ".cmd" + self.stat_file = self.script_name[:-4] + "_STAT_0" self.status = status self.prev_status = status self.old_status = self.status @@ -248,12 +250,16 @@ class Job(object): # hetjobs self.het = None self.updated_log = False - self.ready_start_date = None self.log_retrieved = False - self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time + self.start_time_timestamp = None self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial self._script = None # Inline code to be executed + self._log_recovery_retries = None + self.ready_date = None + self.wrapper_name = None + self.is_wrapper = False + def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -267,8 +273,10 @@ class Job(object): self._memory = '' self._memory_per_task = '' self.log_retrieved = False - self.start_time_placeholder = "" + self.start_time_timestamp = time.time() + self.end_time_placeholder = time.time() self.processors_per_node = "" + self.stat_file = self.script_name[:-4] + "_STAT_0" @property @@ -871,6 +879,13 @@ class Job(object): """Number of processors per node that the job can use.""" self._processors_per_node = value + def set_ready_date(self): + """ + Sets the ready start date for the job + """ + self.updated_log = False + self.ready_date = int(time.strftime("%Y%m%d%H%M%S")) + def inc_fail_count(self): """ Increments fail count @@ -974,7 +989,7 @@ class Job(object): :rtype: int """ if fail_count == -1: - logname = os.path.join(self._tmp_path, self.name + '_STAT') + logname = os.path.join(self._tmp_path, self.stat_file) else: fail_count = str(fail_count) logname = os.path.join(self._tmp_path, self.name + '_STAT_' + fail_count) @@ -1115,64 +1130,77 @@ class Job(object): def retrieve_internal_retrials_logfiles(self, platform): log_retrieved = False - original = copy.deepcopy(self.local_logs) - for i in range(0, int(self.retrials + 1)): - if i > 0: - self.local_logs = (original[0][:-4] + "_{0}".format(i) + ".out", original[1][:-4] + "_{0}".format(i) + ".err") - self.remote_logs = self.get_new_remotelog_name(i) - if not self.remote_logs: - self.log_retrieved = False - else: + last_retrial = 0 + try: + for i in range(0, int(self.retrials + 1)): + self.update_local_logs(count=i, update_submit_time=False) + backup_log = copy.copy(self.remote_logs) + self.remote_logs = self.get_new_remotelog_name(i) if self.check_remote_log_exists(platform): - try: - self.synchronize_logs(platform, self.remote_logs, self.local_logs) - remote_logs = copy.deepcopy(self.local_logs) - platform.get_logs_files(self.expid, remote_logs) - log_retrieved = True - except BaseException: - log_retrieved = False - self.log_retrieved = log_retrieved + self.synchronize_logs(platform, self.remote_logs, self.local_logs) + remote_logs = copy.deepcopy(self.local_logs) + platform.get_logs_files(self.expid, remote_logs) + log_retrieved = True + last_retrial = i + else: + self.remote_logs = backup_log + break + except: + pass + self.log_retrieved = log_retrieved + if self.log_retrieved: + self.platform.processed_wrapper_logs.add(self.wrapper_name) + + return last_retrial + + def write_stats(self, last_retrial): + # write stats + if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones + for i in range(0, int(last_retrial + 1)): + self.platform.get_stat_file(self, count=i) + self.write_vertical_time(i) + self.inc_fail_count() + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + else: + self.update_local_logs(update_submit_time=False) + self.platform.get_stat_file(self) + self.write_submit_time() + self.write_start_time() + self.write_end_time(self.status == Status.COMPLETED) + Log.result(f"{self.fail_count} retrials of job:{self.name} and {self.id} has been inserted in the db") + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + def retrieve_logfiles(self, platform, raise_error=False): """ - Retrieves log files from remote host meant to be used inside a process. - :param platform: platform that is calling the function, already connected. - :param raise_error: boolean to raise an error if the logs are not retrieved - :return: + Retrieves log files from remote host + :param platform: HPCPlatform object + :param raise_error: boolean, if True, raises an error if the log files are not retrieved + :return: dict with finish timestamps per job """ backup_logname = copy.copy(self.local_logs) - if self.wrapper_type == "vertical": - stat_file = self.script_name[:-4] + "_STAT_" - self.retrieve_internal_retrials_logfiles(platform) + last_retrial = self.retrieve_internal_retrials_logfiles(platform) else: - stat_file = self.script_name[:-4] + "_STAT" self.retrieve_external_retrials_logfiles(platform) - + last_retrial = 0 if not self.log_retrieved: self.local_logs = backup_logname - if raise_error: + if raise_error and self.wrapper_name not in self.platform.processed_wrapper_logs: raise AutosubmitCritical("Failed to retrieve logs for job {0}".format(self.name), 6000) - else: - Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) - else: - # Update the logs with Autosubmit Job ID Brand - try: - for local_log in self.local_logs: - platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) - # write stats - if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones - for i in range(0,int(self.retrials+1)): - if self.platform.get_stat_file(self.name, stat_file, count=i): - self.write_vertical_time(i) - self.inc_fail_count() - else: - self.platform.get_stat_file(self.name, stat_file) - self.write_start_time(from_stat_file=True) - self.write_end_time(self.status == Status.COMPLETED) + self.write_stats(last_retrial) def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') @@ -1252,11 +1280,6 @@ class Job(object): Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.FAILED: if not failed_file: - Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format( - self.name), 3000) - self._platform.get_completed_files( - self.name, wrapper_failed=self.packed) - self.check_completion() if self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) else: @@ -1276,12 +1299,6 @@ class Job(object): # after checking the jobs , no job should have the status "submitted" Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( self.name), 6008) - if self.status in [Status.COMPLETED, Status.FAILED]: - self.updated_log = False - - # # Write start_time() if not already written and job is running, completed or failed - # if self.status in [Status.RUNNING, Status.COMPLETED, Status.FAILED] and not self.start_time_written: - # self.write_start_time() # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: @@ -1289,10 +1306,6 @@ class Job(object): self.retrieve_logfiles(self.platform) else: self.platform.add_job_to_log_recover(self) - - - - return self.status @staticmethod @@ -1325,16 +1338,16 @@ class Job(object): :param default_status: status to set if job is not completed. By default, is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file = os.path.join(self._tmp_path, self.name + '_COMPLETED') + completed_file_location = os.path.join(self._tmp_path, f"LOG_{self.expid}", self.name + '_COMPLETED') - if os.path.exists(log_name): + if os.path.exists(completed_file) or os.path.exists(completed_file_location): if not over_wallclock: self.status = Status.COMPLETED else: return Status.COMPLETED else: - Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( - self.name), 6009) + Log.warning(f"Couldn't find {self.name} COMPLETED file") if not over_wallclock: self.status = default_status else: @@ -1875,6 +1888,11 @@ class Job(object): self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") self.x11 = False if str(as_conf.jobs_data[self.section].get("X11", False)).lower() == "false" else True + if self.wrapper_type != "vertical" and self.packed: + self.stat_file = f"{self.script_name[:-4]}_STAT_{self.fail_count}" + else: + self.stat_file = f"{self.script_name[:-4]}_STAT_0" + if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1954,6 +1972,8 @@ class Job(object): """ as_conf.reload() self._init_runtime_parameters() + if hasattr(self, "start_time"): + self.start_time = time.time() # Parameters that affect to all the rest of parameters self.update_dict_parameters(as_conf) parameters = parameters.copy() @@ -1977,6 +1997,9 @@ class Job(object): # For some reason, there is return but the assignee is also necessary self.parameters = parameters # This return is only being used by the mock , to change the mock + for event in self.platform.worker_events: # keep alive log retrieval workers. + if not event.is_set(): + event.set() return parameters def update_content_extra(self,as_conf,files): @@ -2220,29 +2243,20 @@ class Job(object): str(set(parameters) - set(variables))), 5013) return out - def write_submit_time(self, hold=False, enable_vertical_write=False, wrapper_submit_time=None): + def update_local_logs(self, count=-1, update_submit_time=True): + if update_submit_time: + self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') + if count > 0: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out_retrial_{count}", + f"{self.name}.{self.submit_time_timestamp}.err_retrial_{count}") + else: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", + f"{self.name}.{self.submit_time_timestamp}.err") + + def write_submit_time(self): """ Writes submit date and time to TOTAL_STATS file. It doesn't write if hold is True. """ - - self.start_time_written = False - if not enable_vertical_write: - if wrapper_submit_time: - self.submit_time_timestamp = wrapper_submit_time - else: - self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') - if self.wrapper_type != "vertical": - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - else: - self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", - f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials - return - if self.wrapper_type == "vertical" and self.fail_count > 0: - self.submit_time_timestamp = self.finish_time_timestamp - print(("Call from {} with status {}".format(self.name, self.status_str))) - if hold is True: - return # Do not write for HELD jobs. - data_time = ["",int(datetime.datetime.strptime(self.submit_time_timestamp, "%Y%m%d%H%M%S").timestamp())] path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') if os.path.exists(path): @@ -2259,57 +2273,56 @@ class Job(object): platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) - def write_start_time(self, enable_vertical_write=False, from_stat_file=False, count=-1): + def update_start_time(self, count=-1): + start_time_ = self.check_start_time(count) # last known start time from the .cmd file + if start_time_: + self.start_time_timestamp = start_time_ + else: + Log.warning(f"Start time for job {self.name} not found in the .cmd file, using last known time.") + self.start_time_timestamp = self.start_time_timestamp if self.start_time_timestamp else time.time() + if count > 0 or self.wrapper_name in self.platform.processed_wrapper_logs: + self.submit_time_timestamp = date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp),'S') + + def write_start_time(self, count=-1, vertical_wrapper=False): """ Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool """ - - if not enable_vertical_write and self.wrapper_type == "vertical": - return - - self.start_time_written = True - if not from_stat_file: # last known start time from AS - self.start_time_placeholder = time.time() - elif from_stat_file: - start_time_ = self.check_start_time(count) # last known start time from the .cmd file - if start_time_: - start_time = start_time_ - else: - start_time = self.start_time_placeholder if self.start_time_placeholder else time.time() - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) - # Writing database - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=start_time, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) + if not vertical_wrapper: + self.update_start_time(count) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(self.start_time_timestamp), 'S')) + # Writing database + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.write_start_time(self.name, start=self.start_time_timestamp, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) return True def write_vertical_time(self, count=-1): - self.write_submit_time(enable_vertical_write=True) - self.write_start_time(enable_vertical_write=True, from_stat_file=True, count=count) - self.write_end_time(self.status == Status.COMPLETED, enable_vertical_write=True, count=count) + self.update_start_time(count=count) + self.update_local_logs(update_submit_time=False) + self.write_submit_time() + self.write_start_time(count=count, vertical_wrapper=True) + self.write_end_time(self.status == Status.COMPLETED, count=count) - def write_end_time(self, completed, enable_vertical_write=False, count = -1): + def write_end_time(self, completed, count=-1): """ - Writes ends date and time to TOTAL_STATS file - :param completed: True if job was completed successfully, False otherwise + Writes end timestamp to TOTAL_STATS file and jobs_data.db + :param completed: True if the job has been completed, False otherwise :type completed: bool + :param count: number of retrials + :type count: int """ - if not enable_vertical_write and self.wrapper_type == "vertical": - return end_time = self.check_end_time(count) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') - finish_time = None - final_status = None if end_time > 0: # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S')) @@ -2319,7 +2332,7 @@ class Job(object): else: f.write(date2str(datetime.datetime.now(), 'S')) self.finish_time_timestamp = date2str(datetime.datetime.now(), 'S') - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + finish_time = time.time() f.write(' ') if completed: final_status = "COMPLETED" @@ -2328,7 +2341,6 @@ class Job(object): final_status = "FAILED" f.write('FAILED') out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, @@ -2342,6 +2354,7 @@ class Job(object): thread_write_finish.name = "JOB_data_{}".format(self.name) thread_write_finish.start() + def write_total_stat_by_retries(self, total_stats, first_retrial = False): """ Writes all data to TOTAL_STATS file @@ -2490,6 +2503,8 @@ class WrapperJob(Job): self.checked_time = datetime.datetime.now() self.hold = hold self.inner_jobs_running = list() + self.is_wrapper = True + def _queuing_reason_cancel(self, reason): try: @@ -2525,6 +2540,7 @@ class WrapperJob(Job): self._check_running_jobs() # Check and update inner_jobs status that are eligible # Completed wrapper will always come from check function. elif self.status == Status.COMPLETED: + self._check_running_jobs() # Check and update inner_jobs status that are eligible self.check_inner_jobs_completed(self.job_list) # Fail can come from check function or running/completed checkers. @@ -2570,8 +2586,16 @@ class WrapperJob(Job): job.new_status = Status.COMPLETED job.updated_log = False job.update_status(self.as_config) + # for job in self.job_list: + # if job not in completed_jobs and job in self.inner_jobs_running: + # job.new_status = Status.FAILED + # job.packed = False + # else: + # job.new_status = Status.WAITING + # job.packed = False for job in completed_jobs: self.running_jobs_start.pop(job, None) + not_completed_jobs = list( set(not_completed_jobs) - set(completed_jobs)) @@ -2647,6 +2671,7 @@ class WrapperJob(Job): not_finished_jobs_names = ' '.join(list(not_finished_jobs_dict.keys())) remote_log_dir = self._platform.get_remote_log_dir() # PREPARE SCRIPT TO SEND + # When a inner_job is running? When the job has an _STAT file command = textwrap.dedent(""" cd {1} for job in {0} @@ -2669,9 +2694,12 @@ class WrapperJob(Job): os.chmod(log_dir, 0o770) open(multiple_checker_inner_jobs, 'w+').write(command) os.chmod(multiple_checker_inner_jobs, 0o770) - self._platform.send_file(multiple_checker_inner_jobs, False) - command = os.path.join( - self._platform.get_files_path(), "inner_jobs_checker.sh") + if self.platform.name != "local": # already "sent"... + self._platform.send_file(multiple_checker_inner_jobs, False) + command = f"cd {self._platform.get_files_path()}; {os.path.join(self._platform.get_files_path(), 'inner_jobs_checker.sh')}" + else: + command = os.path.join( + self._platform.get_files_path(), "inner_jobs_checker.sh") # wait = 2 retries = 5 @@ -2712,9 +2740,9 @@ class WrapperJob(Job): if content == '': sleep(wait) retries = retries - 1 - temp_list = self.inner_jobs_running - self.inner_jobs_running = [ - job for job in temp_list if job.status == Status.RUNNING] + # temp_list = self.inner_jobs_running + # self.inner_jobs_running = [ + # job for job in temp_list if job.status == Status.RUNNING] if retries == 0 or over_wallclock: self.status = Status.FAILED @@ -2743,7 +2771,7 @@ class WrapperJob(Job): running_jobs += [job for job in self.job_list if job.status == Status.READY or job.status == Status.SUBMITTED or job.status == Status.QUEUING] self.inner_jobs_running = list() for job in running_jobs: - if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): + if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True, max_retries=2): if job.platform.get_file('{0}_FAILED'.format(job.name), False, wrapper_failed=True): self._check_finished_job(job, True) else: @@ -2751,21 +2779,28 @@ class WrapperJob(Job): self.inner_jobs_running.append(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id), 6009) try: - self._platform.send_command( - self._platform.cancel_cmd + " " + str(self.id)) + if self.platform_name == "local": + # Check if the job is still running to avoid a misleading message in the logs + if self.platform.get_pscall(self.id): + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) + else: + Log.warning(f"Wrapper {self.name} failed, cancelling it") + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) except: - Log.info(f'Job with {self.id} was finished before canceling it') - + Log.debug(f'Job with {self.id} was finished before canceling it') + self._check_running_jobs() + for job in self.inner_jobs_running: + job.status = Status.FAILED for job in self.job_list: - #if job.status == Status.RUNNING: - #job.inc_fail_count() - # job.packed = False - # job.status = Status.FAILED + job.packed = False if job.status not in [Status.COMPLETED, Status.FAILED]: - job.packed = False job.status = Status.WAITING + else: + if job.wrapper_type == "vertical": # job is being retrieved internally by the wrapper + job.fail_count = job.retrials def _update_completed_jobs(self): diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 92dc1b1bd7e8dc05b0ca4cd986ae244dd96a2a5f..5897ec0fabcf69c184cd84e50b9482f51dad0abb 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -127,7 +127,7 @@ class StatisticsSnippetBash: set -xuve job_name_ptrn='%CURRENT_LOGDIR%/%JOBNAME%' - echo $(date +%s) > ${job_name_ptrn}_STAT + echo $(date +%s) > ${job_name_ptrn}_STAT_%FAIL_COUNT% ################### # AS CHECKPOINT FUNCTION @@ -154,7 +154,7 @@ class StatisticsSnippetBash: # Autosubmit tailer ################### set -xuve - echo $(date +%s) >> ${job_name_ptrn}_STAT + echo $(date +%s) >> ${job_name_ptrn}_STAT_%FAIL_COUNT% touch ${job_name_ptrn}_COMPLETED exit 0 diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 22245c23aa12b858e1537e613cb7f00aecfb7c01..98744ed09fe0245bff243a5d69dadf229b9cf60b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -37,7 +37,7 @@ import math import networkx as nx from bscearth.utils.date import date2str, parse_date from networkx import DiGraph -from time import localtime, strftime, mktime, time +from time import localtime, mktime, time import autosubmit.database.db_structure as DbStructure from autosubmit.helpers.data_transfer import JobRow @@ -1725,35 +1725,20 @@ class JobList(object): else: return completed_jobs - def get_completed_without_logs(self, platform=None): + def get_completed_failed_without_logs(self, platform=None): """ Returns a list of completed jobs without updated logs :param platform: job platform :type platform: HPCPlatform - :return: completed jobs - :rtype: list - """ - - completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - job.status == Status.COMPLETED and job.updated_log is False ] - - return completed_jobs - - def get_completed_without_logs(self, platform=None): - """ - Returns a list of completed jobs without updated logs - - :param platform: job platform - :type platform: HPCPlatform - :return: completed jobs + :return: completed and failed jobs without logs :rtype: list """ - completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and - job.status == Status.COMPLETED and job.updated_log is False ] + completed_failed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and + (job.status == Status.COMPLETED or job.status == Status.FAILED) and job.updated_log is False ] - return completed_jobs + return completed_failed_jobs def get_uncompleted(self, platform=None, wrapper=False): """ @@ -2587,29 +2572,37 @@ class JobList(object): """ Updates the log err and log out. """ - if not hasattr(job,"updated_log") or not job.updated_log: # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) - # order path_to_logs by name and get the two last element - log_file = False - if hasattr(job, "x11") and job.x11: - job.updated_log = True - return - if job.wrapper_type == "vertical" and job.fail_count > 0: - for log_recovered in self.path_to_logs.glob(f"{job.name}.*._{job.fail_count}.out"): - if job.local_logs[0][-4] in log_recovered.name: - log_file = True - break - else: - for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): - if job.local_logs[0] == log_recovered.name: - log_file = True - break + if not hasattr(job, "updated_log"): # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) + job.updated_log = False + elif job.updated_log: + return + if hasattr(job, "x11") and job.x11: # X11 has it log writted in the run.out file. No need to check for log files as there are none + job.updated_log = True + return + + log_recovered = self.check_if_log_is_recovered(job) + if log_recovered: + job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") # we only want the last one + job.updated_log = True + elif not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": + job.platform.add_job_to_log_recover(job) + return log_recovered + + def check_if_log_is_recovered(self, job): + """ + Check if the log is recovered. + Conditions: + - File must exist + - File timestamp should be greater than the job ready_date, otherwise is from a previous run. + + """ - if log_file: - if not hasattr(job, "ready_start_date") or not job.ready_start_date or job.local_logs[0] >= job.ready_start_date: # hasattr for backward compatibility - job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") - job.updated_log = True - if not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": - job.platform.add_job_to_log_recover(job) + if not hasattr(job, "updated_log") or not job.updated_log: + for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): + file_timestamp = int(datetime.datetime.fromtimestamp(log_recovered.stat().st_mtime).strftime("%Y%m%d%H%M%S")) + if job.ready_date and file_timestamp >= int(job.ready_date): + return log_recovered + return None def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False): # type: (AutosubmitConfig, bool, bool, object, bool) -> bool @@ -2631,6 +2624,8 @@ class JobList(object): save = store_change Log.debug('Updating FAILED jobs') write_log_status = False + for job in self.get_completed_failed_without_logs(): + save = self.update_log_status(job, as_conf) if not save else save if not first_time: for job in self.get_failed(): job.packed = False @@ -2695,7 +2690,6 @@ class JobList(object): for job in self.check_special_status(): job.status = Status.READY # Run start time in format (YYYYMMDDHH:MM:SS) from current time - job.ready_start_date = strftime("%Y%m%d%H%M%S") job.id = None job.packed = False job.wrapper_type = None @@ -2707,8 +2701,6 @@ class JobList(object): # Log name has this format: # a02o_20000101_fc0_2_SIM.20240212115021.err # $jobname.$(YYYYMMDDHHMMSS).err or .out - if not first_time: - self.update_log_status(job, as_conf) if job.synchronize is not None and len(str(job.synchronize)) > 0: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): @@ -2797,7 +2789,6 @@ class JobList(object): job.status = Status.READY job.packed = False # Run start time in format (YYYYMMDDHH:MM:SS) from current time - job.ready_start_date = strftime("%Y%m%d%H%M%S") job.packed = False job.hold = False save = True diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 16e78bb87fde4afab430663df3784186b3555bf8..2604f656fe7171a9e8b9efa0122b246d6eeec058 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -491,14 +491,12 @@ class JobPackager(object): current_info.append(param[self.current_wrapper_section]) current_info.append(self._as_config) - if self.wrapper_type[self.current_wrapper_section] == 'vertical': - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) - elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + if self.wrapper_type[self.current_wrapper_section] == 'horizontal': + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section, wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section, wrapper_info=current_info)) else: - built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) + built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits, wrapper_info=current_info) Log.result(f"Built {len(built_packages_tmp)} wrappers for {wrapper_name}") packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) @@ -526,6 +524,8 @@ class JobPackager(object): for package in packages_to_submit: self.max_jobs = self.max_jobs - 1 package.hold = self.hold + for job in package.jobs: + job.set_ready_date() return packages_to_submit @@ -726,7 +726,7 @@ class JobPackagerVertical(object): stack = [(job, 1)] while stack: job, level = stack.pop() - if level % 10 == 0 and level > 0: + if level % 50 == 0 and level > 0: Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= \ self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits[ @@ -736,7 +736,7 @@ class JobPackagerVertical(object): if child is not None and len(str(child)) > 0: child.update_parameters(wrapper_info[-1], {}) self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) - if self.total_wallclock <= self.max_wallclock: + if self.total_wallclock <= self.max_wallclock or not self.max_wallclock: child.packed = True child.level = level self.jobs_list.append(child) @@ -848,22 +848,14 @@ class JobPackagerVerticalMixed(JobPackagerVertical): :rtype: Job Object """ sorted_jobs = self.sorted_jobs - + child = None for index in range(self.index, len(sorted_jobs)): - child = sorted_jobs[index] - if self._is_wrappable(child): + child_ = sorted_jobs[index] + if child_.name != job.name and self._is_wrappable(child_): + child = child_ self.index = index + 1 - return child - continue - return None - # Not passing tests but better wrappers result to check - # for child in job.children: - # if child.name != job.name: - # if self._is_wrappable(child): - # self.index = self.index + 1 - # return child - # continue - # return None + break + return child def _is_wrappable(self, job): """ @@ -919,7 +911,7 @@ class JobPackagerHorizontal(object): for section in jobs_by_section: current_package_by_section[section] = 0 for job in jobs_by_section[section]: - if jobs_processed % 10 == 0 and jobs_processed > 0: + if jobs_processed % 50 == 0 and jobs_processed > 0: Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") job.update_parameters(wrapper_info[-1], {}) if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 0 and int(job.processors) <= 1: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 313f9cfa3a19c0fcec7cff8debd13237687868c3..9532e658322e71da6001d5d38951f8bed3415cd8 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -249,6 +249,7 @@ class JobPackageSimple(JobPackageBase): if len(job_scripts) == 0: job_scripts = self._job_scripts for job in self.jobs: + job.update_local_logs() #CLEANS PREVIOUS RUN ON LOCAL log_completed = os.path.join(self._tmp_path, job.name + '_COMPLETED') log_stat = os.path.join(self._tmp_path, job.name + '_STAT') @@ -256,14 +257,15 @@ class JobPackageSimple(JobPackageBase): os.remove(log_completed) if os.path.exists(log_stat): os.remove(log_stat) - self.platform.remove_stat_file(job.name) + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) job.id = self.platform.submit_job(job, job_scripts[job.name], hold=hold, export = self.export) if job.id is None or not job.id: continue Log.info("{0} submitted", job.name) - job.status = Status.SUBMITTED - job.write_submit_time(hold=self.hold) + job.status = Status.SUBMITTED + job.wrapper_name = job.name + class JobPackageSimpleWrapped(JobPackageSimple): @@ -343,7 +345,8 @@ class JobPackageArray(JobPackageBase): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export) @@ -354,9 +357,8 @@ class JobPackageArray(JobPackageBase): for i in range(0, len(self.jobs)): # platforms without a submit.cmd Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) - self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].wrapper_name = self.name class JobPackageThread(JobPackageBase): @@ -573,13 +575,15 @@ class JobPackageThread(JobPackageBase): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: + job.update_local_logs() filenames += " " + self.platform.remote_log_dir + "/" + job.name + "_STAT " + \ self.platform.remote_log_dir + "/" + job.name + "_COMPLETED" self.platform.remove_multiple_files(filenames) else: for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: job.hold = hold @@ -589,13 +593,12 @@ class JobPackageThread(JobPackageBase): if package_id is None or not package_id: return - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + def _common_script_content(self): pass @@ -659,7 +662,8 @@ class JobPackageThreadWrapped(JobPackageThread): def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: - self.platform.remove_stat_file(job.name) + job.update_local_logs() + self.platform.remove_stat_file(job) self.platform.remove_completed_file(job.name) if hold: job.hold = hold @@ -669,13 +673,13 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None or not package_id: raise Exception('Submission failed') - wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) - wrapper_time = self.jobs[i].write_submit_time + self.jobs[i].wrapper_name = self.name + + class JobPackageVertical(JobPackageThread): """ @@ -753,7 +757,7 @@ class JobPackageVertical(JobPackageThread): self._wallclock = "{0}:{1}".format(hh_str,mm_str) Log.info("Submitting {2} with wallclock {0}:{1}".format(hh_str,mm_str,self._name)) else: - wallclock_by_level = None + wallclock_by_level = 0 # command: "timeout 0 sleep 2" == command: "sleep 2" return self._wrapper_factory.get_wrapper(self._wrapper_factory.vertical_wrapper, name=self._name, queue=self._queue, project=self._project, wallclock=self._wallclock, diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 713cb17eb9df3c7192177461067d98b0f8606081..36f694c3cf62f07a07f3729df81792026086d31b 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # Copyright 2017-2020 Earth Sciences Department, BSC-CNS # This file is part of Autosubmit. @@ -169,12 +168,8 @@ class EcPlatform(ParamikoPlatform): self.connected = False except: self.connected = False - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + def restore_connection(self,as_conf): """ @@ -201,19 +196,14 @@ class EcPlatform(ParamikoPlatform): :return: True :rtype: bool """ - self.main_process_id = os.getpid() output = subprocess.check_output(self._checkvalidcert_cmd, shell=True).decode(locale.getlocale()[1]) if not output: output = "" try: if output.lower().find("yes") != -1: self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + return "OK" else: self.connected = False diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 3d9296705d5731faf34b7d053791dc880018321e..61f96832ae217153036c4f196bf538eb2c8d55c1 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -18,12 +18,15 @@ # along with Autosubmit. If not, see . import locale import os +from pathlib import Path from xml.dom.minidom import parseString import subprocess +from matplotlib.patches import PathPatch from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader +from autosubmit.platforms.wrappers.wrapper_factory import LocalWrapperFactory from autosubmitconfigparser.config.basicconfig import BasicConfig from time import sleep @@ -64,6 +67,9 @@ class LocalPlatform(ParamikoPlatform): self.job_status['RUNNING'] = ['0'] self.job_status['QUEUING'] = [] self.job_status['FAILED'] = [] + self._allow_wrappers = True + self._wrapper = LocalWrapperFactory(self) + self.update_cmds() def update_cmds(self): @@ -100,28 +106,25 @@ class LocalPlatform(ParamikoPlatform): return [int(element.firstChild.nodeValue) for element in jobs_xml] def get_submit_cmd(self, job_script, job, hold=False, export=""): - wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + if job: + wallclock = self.parse_time(job.wallclock) + seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + else: + seconds = 24 * 3600 if export == "none" or export == "None" or export is None or export == "": export = "" else: export += " ; " - return self.get_call(job_script, job, export=export,timeout=seconds) - + command = self.get_call(job_script, job, export=export,timeout=seconds) + return f"cd {self.remote_log_dir} ; {command}" def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) - def connect(self, as_conf, reconnect=False): + def connect(self, as_conf={}, reconnect=False): self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() - + self.spawn_log_retrieval_process(as_conf) def test_connection(self,as_conf): - self.main_process_id = os.getpid() if not self.connected: self.connect(as_conf) @@ -151,10 +154,7 @@ class LocalPlatform(ParamikoPlatform): return True def send_file(self, filename, check=True): - self.check_remote_log_dir() - self.delete_file(filename,del_cmd=True) - command = '{0} {1} {2}'.format(self.put_cmd, os.path.join(self.tmp_path, filename), - os.path.join(self.tmp_path, 'LOG_' + self.expid, filename)) + command = f'{self.put_cmd} {os.path.join(self.tmp_path, Path(filename).name)} {os.path.join(self.tmp_path, "LOG_" + self.expid, Path(filename).name)}' try: subprocess.check_call(command, shell=True) except subprocess.CalledProcessError: @@ -164,6 +164,17 @@ class LocalPlatform(ParamikoPlatform): raise return True + def remove_multiple_files(self, filenames): + log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) + multiple_delete_previous_run = os.path.join( + log_dir, "multiple_delete_previous_run.sh") + if os.path.exists(log_dir): + lang = locale.getlocale()[1] + if lang is None: + lang = 'UTF-8' + open(multiple_delete_previous_run, 'wb+').write(("rm -f" + filenames).encode(lang)) + os.chmod(multiple_delete_previous_run, 0o770) + return "" def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): local_path = os.path.join(self.tmp_path, relative_path) @@ -187,40 +198,28 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=1, max_retries=1): """ - Moves a file on the platform + Checks if a file exists in the platform :param src: source name :type src: str - :param: wrapper_failed: if True, the wrapper failed. + :param wrapper_failed: checks inner jobs files :type wrapper_failed: bool - + :param sleeptime: time to sleep + :type sleeptime: int + :param max_retries: maximum number of retries + :type max_retries: int + :return: True if the file exists, False otherwise + :rtype: bool """ - file_exist = False - remote_path = os.path.join(self.get_files_path(), src) - retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 - while not file_exist and retries < max_retries: - try: - file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) - if not file_exist: # File doesn't exist, retry in sleep-time - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, remote_path) - if not wrapper_failed: - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 - else: - retries = 9999 - except BaseException as e: # Unrecoverable error - Log.printlog("File does not exist, logs {0} {1}".format(self.get_files_path(),src),6001) - file_exist = False # won't exist - retries = 999 # no more retries - return file_exist + sleeptime = 1 + for i in range(max_retries): + if os.path.isfile(os.path.join(self.get_files_path(), src)): + return True + sleep(sleeptime) + Log.warning("File {0} does not exist".format(src)) + return False + def delete_file(self, filename,del_cmd = False): if del_cmd: @@ -281,3 +280,18 @@ class LocalPlatform(ParamikoPlatform): :type remote_logs: (str, str) """ return + + def check_completed_files(self, sections=None): + command = "find %s " % self.remote_log_dir + if sections: + for i, section in enumerate(sections.split()): + command += " -name *%s_COMPLETED" % section + if i < len(sections.split()) - 1: + command += " -o " + else: + command += " -name *_COMPLETED" + + if self.send_command(command, True): + return self._ssh_output + else: + return None \ No newline at end of file diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 288a4f00bc5f99d1d613afe360d9c9000997bdd2..8b19730a8e94a8b5d84a7c9017e8728022f10816 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,8 +1,6 @@ -import copy -import threading - import locale from contextlib import suppress +from pathlib import Path from time import sleep import sys import socket @@ -23,7 +21,7 @@ from threading import Thread import threading import getpass from paramiko.agent import Agent -from autosubmit.helpers.utils import terminate_child_process +import time def threaded(fn): def wrapper(*args, **kwargs): @@ -115,13 +113,10 @@ class ParamikoPlatform(Platform): if display is None: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - self.log_retrieval_process_active = False - terminate_child_process(self.expid, self.name) def test_connection(self,as_conf): """ Test if the connection is still alive, reconnect if not. """ - self.main_process_id = os.getpid() try: if not self.connected: @@ -151,6 +146,7 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical(str(e),7051) #raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message) + def restore_connection(self, as_conf): try: self.connected = False @@ -306,10 +302,9 @@ class ParamikoPlatform(Platform): self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) ) self._ftpChannel.get_channel().settimeout(120) self.connected = True - if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + + except SSHException: raise except IOError as e: @@ -533,7 +528,8 @@ class ParamikoPlatform(Platform): if cmd is None: return None if self.send_command(cmd,x11=x11): - job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=job.x11) + x11 = False if job is None else job.x11 + job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=x11) Log.debug("Job ID: {0}", job_id) return int(job_id) else: @@ -625,7 +621,19 @@ class ParamikoPlatform(Platform): self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: - job_status = Status.COMPLETED + # The Local platform has only 0 or 1, so it neccesary to look for the completed file. + # Not sure why it is called over_wallclock but is the only way to return a value + if self.type == "local": # wrapper has a different check completion + if not job.is_wrapper: + job_status = job.check_completion(over_wallclock=True) + else: + if Path(f"{self.remote_log_dir}/WRAPPER_FAILED").exists(): + job_status = Status.FAILED + else: + job_status = Status.COMPLETED + else: + job_status = Status.COMPLETED + elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING if not is_wrapper: @@ -657,8 +665,14 @@ class ParamikoPlatform(Platform): Log.error( 'check_job() The job id ({0}) status is {1}.', job_id, job_status) - if job_status in [Status.FAILED, Status.COMPLETED]: + if job_status in [Status.FAILED, Status.COMPLETED, Status.UNKNOWN]: job.updated_log = False + # backup for end time in case that the stat file is not found + job.end_time_placeholder = int(time.time()) + if job_status in [Status.RUNNING, Status.COMPLETED] and job.new_status in [Status.QUEUING, Status.SUBMITTED]: + # backup for start time in case that the stat file is not found + job.start_time_timestamp = int(time.time()) + if submit_hold_check: return job_status else: @@ -1223,18 +1237,23 @@ class ParamikoPlatform(Platform): :return: command to execute script :rtype: str """ - executable = '' - if job.type == Type.BASH: - executable = 'bash' - elif job.type == Type.PYTHON: - executable = 'python3' - elif job.type == Type.PYTHON2: - executable = 'python2' - elif job.type == Type.R: - executable = 'Rscript' - if job.executable != '': - executable = '' # Alternative: use job.executable with substituted placeholders - remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + if job: # If job is None, it is a wrapper + executable = '' + if job.type == Type.BASH: + executable = 'bash' + elif job.type == Type.PYTHON: + executable = 'python3' + elif job.type == Type.PYTHON2: + executable = 'python2' + elif job.type == Type.R: + executable = 'Rscript' + if job.executable != '': + executable = '' # Alternative: use job.executable with substituted placeholders + remote_logs = (job.script_name + ".out."+str(job.fail_count), job.script_name + ".err."+str(job.fail_count)) + else: + executable = 'python3' # wrappers are always python3 + remote_logs = (f"{job_script}.out", f"{job_script}.err") + if timeout < 1: command = export + ' nohup ' + executable + ' {0} > {1} 2> {2} & echo $!'.format( os.path.join(self.remote_log_dir, job_script), diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 4f198d81902068c6a3d6db83fc983bb7268be0d3..7291377b72a6337135698c096d4c15a6c621d4a4 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -151,7 +151,8 @@ class PJMPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - job.write_submit_time(hold=hold) + job.wrapper_name = package.name + i += 1 save = True except AutosubmitError as e: @@ -483,13 +484,10 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 + while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist @@ -497,9 +495,6 @@ class PJMPlatform(ParamikoPlatform): self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 80a2d497191baf71021f441e8c3eeadd4e7b5fe5..9ec8828cc6fadaef28199c0cdd0cf7556d7b2cf6 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,53 +1,55 @@ import atexit - -import queue +import multiprocessing +import queue # only for the exception +import psutil import setproctitle import locale import os - import traceback from autosubmit.job.job_common import Status -from typing import List, Union - +from typing import List, Union, Callable from autosubmit.helpers.parameters import autosubmit_parameter from log.log import AutosubmitCritical, AutosubmitError, Log -from multiprocessing import Process, Queue, Event - +from multiprocessing import Process, Event +from multiprocessing.queues import Queue import time -# stop the background task gracefully before exit -def stop_background(stop_event, process): - # request the background thread stop - stop_event.set() - # wait for the background thread to stop - process.join() - -def processed(fn): - def wrapper(*args, **kwargs): - stop_event = Event() - args = (args[0], stop_event) - process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform") - process.daemon = True # Set the process as a daemon process - process.start() - atexit.register(stop_background, stop_event, process) - return process - - return wrapper + +class UniqueQueue( + Queue): # The reason of this class is to avoid duplicates in the queue during the same run. That can happen if the log retrieval process didn't process it yet. + + def __init__(self, maxsize=-1, block=True, timeout=None): + self.block = block + self.timeout = timeout + self.all_items = set() # Won't be popped, so even if it is being processed by the log retrieval process, it won't be added again. + super().__init__(maxsize, ctx=multiprocessing.get_context()) + + def put(self, job, block=True, timeout=None): + if job.wrapper_type == "vertical": + unique_name = job.name + else: + unique_name = job.name+str(job.fail_count) + if unique_name not in self.all_items: + self.all_items.add(unique_name) + super().put(job, block, timeout) + class Platform(object): """ Class to manage the connections to the different platforms. """ + worker_events = list() + lock = multiprocessing.Lock() - def __init__(self, expid, name, config, auth_password = None): + def __init__(self, expid, name, config, auth_password=None): """ :param config: :param expid: :param name: """ self.connected = False - self.expid = expid # type: str - self._name = name # type: str + self.expid = expid # type: str + self._name = name # type: str self.config = config self.tmp_path = os.path.join( self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR")) @@ -92,9 +94,9 @@ class Platform(object): self.cancel_cmd = None self.otp_timeout = None self.two_factor_auth = None - self.otp_timeout = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA_TIMEOUT", 60*5) - self.two_factor_auth = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA", False) - self.two_factor_method = self.config.get("PLATFORMS", {}).get(self.name.upper(),{}).get("2FA_METHOD", "token") + self.otp_timeout = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA_TIMEOUT", 60 * 5) + self.two_factor_auth = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA", False) + self.two_factor_method = self.config.get("PLATFORMS", {}).get(self.name.upper(), {}).get("2FA_METHOD", "token") if not self.two_factor_auth: self.pw = None elif auth_password is not None and self.two_factor_auth: @@ -104,9 +106,17 @@ class Platform(object): self.pw = auth_password else: self.pw = None - self.recovery_queue = Queue() + self.recovery_queue = UniqueQueue() self.log_retrieval_process_active = False self.main_process_id = None + self.work_event = Event() + self.cleanup_event = Event() + self.log_recovery_process = None + self.keep_alive_timeout = 60 * 5 # Useful in case of kill -9 + self.processed_wrapper_logs = set() + @classmethod + def update_workers(cls, event_worker): + cls.worker_events.append(event_worker) @property @autosubmit_parameter(name='current_arch') @@ -238,9 +248,10 @@ class Platform(object): """ # only implemented for slurm return "" - def get_multiple_jobids(self,job_list,valid_packages_to_submit,failed_packages,error_message="",hold=False): - return False,valid_packages_to_submit - #raise NotImplementedError + + def get_multiple_jobids(self, job_list, valid_packages_to_submit, failed_packages, error_message="", hold=False): + return False, valid_packages_to_submit + # raise NotImplementedError def process_batch_ready_jobs(self, valid_packages_to_submit, failed_packages, error_message="", hold=False): return True, valid_packages_to_submit @@ -345,7 +356,7 @@ class Platform(object): raise except AutosubmitCritical as e: - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except AutosubmitError as e: raise except Exception as e: @@ -470,7 +481,6 @@ class Platform(object): parameters['{0}EC_QUEUE'.format(prefix)] = self.ec_queue parameters['{0}PARTITION'.format(prefix)] = self.partition - parameters['{0}USER'.format(prefix)] = self.user parameters['{0}PROJ'.format(prefix)] = self.project parameters['{0}BUDG'.format(prefix)] = self.budget @@ -573,10 +583,12 @@ class Platform(object): if job.current_checkpoint_step < job.max_checkpoint_step: remote_checkpoint_path = f'{self.get_files_path()}/CHECKPOINT_' self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) - while self.check_file_exists(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') and job.current_checkpoint_step < job.max_checkpoint_step: + while self.check_file_exists( + f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') and job.current_checkpoint_step < job.max_checkpoint_step: self.remove_checkpoint_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}') job.current_checkpoint_step += 1 self.get_file(f'{remote_checkpoint_path}{str(job.current_checkpoint_step)}', False, ignore_log=True) + def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -605,18 +617,14 @@ class Platform(object): else: return False - def remove_stat_file(self, job_name): + def remove_stat_file(self, job): """ Removes *STAT* files from remote - - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool + param job: job to check + type job: Job """ - filename = job_name + '_STAT' - if self.delete_file(filename): - Log.debug('{0}_STAT have been removed', job_name) + if self.delete_file(job.stat_file): + Log.debug(f"{job.stat_file} have been removed") return True return False @@ -648,6 +656,7 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def remove_checkpoint_file(self, filename): """ Removes *CHECKPOINT* files from remote @@ -658,36 +667,26 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): return True - def get_stat_file(self, job_name, retries=0, count = -1): - """ - Copies *STAT* files from remote to local + def get_stat_file(self, job, count=-1): - :param retries: number of intents to get the completed files - :type retries: int - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool - """ - if count == -1: # No internal retrials - filename = job_name + '_STAT' + if count == -1: # No internal retrials + filename = job.stat_file else: - filename = job_name + '_STAT_{0}'.format(str(count)) + filename = job.name + '_STAT_{0}'.format(str(count)) stat_local_path = os.path.join( self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): if self.get_file(filename, True): - Log.debug('{0}_STAT file have been transferred', job_name) + Log.debug('{0}_STAT file have been transferred', job.name) return True - Log.debug('{0}_STAT file not found', job_name) + Log.warning('{0}_STAT file not found', job.name) return False - @autosubmit_parameter(name='current_logdir') def get_files_path(self): """ @@ -719,9 +718,11 @@ class Platform(object): :rtype: int """ raise NotImplementedError + def check_Alljobs(self, job_list, as_conf, retries=5): - for job,job_prev_status in job_list: + for job, job_prev_status in job_list: self.check_job(job) + def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status @@ -810,7 +811,7 @@ class Platform(object): # type: () -> None """ Opens Submit script file """ raise NotImplementedError - + def submit_Script(self, hold=False): # type: (bool) -> Union[List[str], str] """ @@ -819,54 +820,117 @@ class Platform(object): raise NotImplementedError def add_job_to_log_recover(self, job): - self.recovery_queue.put((job,job.children)) + self.recovery_queue.put(job) def connect(self, as_conf, reconnect=False): raise NotImplementedError - def restore_connection(self,as_conf): + def restore_connection(self, as_conf): raise NotImplementedError - @processed - def recover_job_logs(self, event): + def spawn_log_retrieval_process(self, as_conf): + """ + This function, spawn a process to recover the logs of the jobs that have been submitted in this platform. + """ + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', + "false")).lower() == "false"): + if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run": + self.log_retrieval_process_active = True + Platform.update_workers(self.work_event) + self.log_recovery_process = Process(target=self.recover_platform_job_logs, args=(), + name=f"{self.name}_log_recovery") + self.log_recovery_process.daemon = True + self.log_recovery_process.start() + os.waitpid(self.log_recovery_process.pid, os.WNOHANG) + Log.result(f"Process {self.log_recovery_process.name} started with pid {self.log_recovery_process.pid}") + atexit.register(self.send_cleanup_signal) + + def send_cleanup_signal(self): + if self.log_recovery_process and self.log_recovery_process.is_alive(): + self.work_event.clear() + self.cleanup_event.set() + self.log_recovery_process.join() + + def wait_for_work(self, sleep_time=60): + """ + This function waits for the work_event to be set or the cleanup_event to be set. + """ + process_log = False + for remaining in range(sleep_time, 0, -1): # Min time to wait unless clean-up signal is set + time.sleep(1) + if self.work_event.is_set() or not self.recovery_queue.empty(): + process_log = True + if self.cleanup_event.is_set(): + process_log = True + break + if not process_log: # If no work, wait until the keep_alive_timeout is reached or any signal is set to end the process. + timeout = self.keep_alive_timeout - sleep_time + while timeout > 0: + if self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + break + time.sleep(1) + timeout -= 1 + if not self.recovery_queue.empty() or self.cleanup_event.is_set() or self.work_event.is_set(): + process_log = True + self.work_event.clear() + return process_log + + def recover_job_log(self, identifier, jobs_pending_to_process): + job = None + try: + while not self.recovery_queue.empty(): + try: + job = self.recovery_queue.get( + timeout=1) # Should be non-empty, but added a timeout for other possible errors. + job.children = set() # Children can't be serialized, so we set it to an empty set for this process. + job.platform = self # change the original platform to this process platform. + job._log_recovery_retries = 0 # reset the log recovery retries. + Log.debug(f"{identifier} Recovering log files for job {job.name} and retrial:{job.fail_count}") + job.retrieve_logfiles(self, raise_error=True) + if job.status == Status.FAILED: + Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") + except queue.Empty: + pass + # This second while is to keep retring the failed jobs. + while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval + job = jobs_pending_to_process.pop() + job._log_recovery_retries += 1 + Log.debug( + f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") + job.retrieve_logfiles(self, raise_error=True) + Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") + except Exception as e: + Log.info(f"{identifier} Error while recovering logs: {str(e)}") + try: + if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. + jobs_pending_to_process.add(job) + self.connected = False + Log.info(f"{identifier} Attempting to restore connection") + self.restore_connection(None) # Always restore the connection on a failure. + Log.result(f"{identifier} Sucessfully reconnected.") + except: + pass + return jobs_pending_to_process + + def recover_platform_job_logs(self): + """ + This function, recovers the logs of the jobs that have been submitted. + The exit of this process is controlled by the work_event and cleanup_events of the main process. + """ setproctitle.setproctitle(f"autosubmit log {self.expid} recovery {self.name.lower()}") - job_names_processed = set() + identifier = f"{self.name.lower()}(log_recovery):" + Log.info(f"{identifier} Starting...") + jobs_pending_to_process = set() self.connected = False self.restore_connection(None) - # check if id of self.main_process exists with ps ax | grep self.main_process_id - max_logs_to_process = 60 - while not event.is_set() and os.system(f"ps ax | grep {str(self.main_process_id)} | grep -v grep > /dev/null 2>&1") == 0: - time.sleep(60) - logs_processed = 0 # avoid deadlocks just in case - try: - while not self.recovery_queue.empty() and logs_processed < max_logs_to_process: - logs_processed += 1 - job,children = self.recovery_queue.get(block=False) - if job.wrapper_type != "vertical": - if f'{job.name}_{job.fail_count}' in job_names_processed: - continue - else: - if f'{job.name}' in job_names_processed: - continue - job.children = children - job.platform = self - if job.x11: - Log.debug("Job {0} is an X11 job, skipping log retrieval as they're written in the ASLOGS".format(job.name)) - continue - try: - job.retrieve_logfiles(self, raise_error=True) - if job.wrapper_type != "vertical": - job_names_processed.add(f'{job.name}_{job.fail_count}') - else: - job_names_processed.add(f'{job.name}') - except: - pass - except queue.Empty: - pass - except (IOError, OSError): - pass - except Exception as e: - try: - self.restore_connection(None) - except: - pass + Log.get_logger("Autosubmit") # Log needs to be initialised in the new process + Log.result(f"{identifier} Sucessfully connected.") + log_recovery_timeout = self.config.get("LOG_RECOVERY_TIMEOUT", 60) + self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5) + while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)): + jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process) + if self.cleanup_event.is_set(): # Check if main process is waiting for this child to end. + self.recover_job_log(identifier, jobs_pending_to_process) + break + Log.info(f"{identifier} Exiting.") \ No newline at end of file diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index f44fb66294006e247974014f03c100326dd5b712..1b956ff9971fc8c4f05afafd0f99cf76dd486a7c 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -122,12 +122,9 @@ class SgePlatform(ParamikoPlatform): :rtype: bool """ self.connected = True - if not self.log_retrieval_process_active and ( - as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', - "false")).lower() == "false"): - self.log_retrieval_process_active = True - if as_conf and as_conf.misc_data.get("AS_COMMAND","").lower() == "run": - self.recover_job_logs() + self.spawn_log_retrieval_process(as_conf) + + def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command @@ -144,7 +141,6 @@ class SgePlatform(ParamikoPlatform): :return: True :rtype: bool """ - self.main_process_id = os.getpid() self.connected = True self.connected(as_conf,True) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c48b98e9cfb0478c0cd89efe9950322181fd7ddd..1884a67390f5b04d3794541e1c62fe6c7849589d 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -189,6 +189,8 @@ class SlurmPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED + job.wrapper_name = package.name + # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -651,37 +653,30 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 - # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on - if not first: - max_retries = 1 - sleeptime = 0 while not file_exist and retries < max_retries: try: - # This return IOError if path doesn't exist + # This return IOError if a path doesn't exist self._ftpChannel.stat(os.path.join( self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - if first: - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) - sleeptime = sleeptime + 5 retries = retries + 1 else: - retries = 9999 + sleep(2) + retries = retries + 1 except BaseException as e: # Unrecoverable error if str(e).lower().find("garbage") != -1: - if not wrapper_failed: - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 else: - Log.printlog("remote logs {0} couldn't be recovered".format(filename), 6001) file_exist = False # won't exist retries = 999 # no more retries + if not file_exist: + Log.warning("File {0} couldn't be found".format(filename)) return file_exist diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 7ff5552db84966a83e7ce97260abe89d49076748..0135e2976c92fce6199bb34ed5813dfdfc964227 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -60,8 +60,10 @@ class WrapperBuilder(object): self.exit_thread = '' if "wallclock_by_level" in list(kwargs.keys()): self.wallclock_by_level = kwargs['wallclock_by_level'] + def build_header(self): return textwrap.dedent(self.header_directive) + self.build_imports() + def build_imports(self): pass def build_job_thread(self): @@ -153,7 +155,6 @@ class PythonWrapperBuilder(WrapperBuilder): def run(self): jobname = self.template.replace('.cmd', '') - #os.system("echo $(date +%s) > "+jobname+"_STAT") out = str(self.template) + ".out." + str(0) err = str(self.template) + ".err." + str(0) print(out+"\\n") @@ -446,8 +447,11 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): sequential_threads_launcher = textwrap.dedent(""" failed_wrapper = os.path.join(os.getcwd(),wrapper_id) retrials = {2} - total_steps = 0 - print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + total_steps = 0 + try: + print("JOB.ID:"+ os.getenv('SLURM_JOBID')) + except: + print("JOB.ID") for i in range(len({0})): job_retrials = retrials completed = False @@ -455,8 +459,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while fail_count <= job_retrials and not completed: current = {1} current.start() - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + start = int(time.time()) current.join({3}) total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -468,26 +471,42 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - timer = int(time.time()) - os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + finish = int(time.time()) + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + print(f"Completed_file:{{completed_path}}") + print(f"Writting:{{stat_path_tmp}}") + print(f"[Start:{{start}}, Finish:{{finish}}, Fail_count:{{fail_count}}]") + with open(f"{{stat_path_tmp}}", "w") as file: + file.write(f"{{start}}\\n") + file.write(f"{{finish}}\\n") if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") - os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) else: print(datetime.now(), "The job ", current.template," has FAILED") - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) #{1} fail_count = fail_count + 1 """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" + from pathlib import Path + fail_count = 0 + while fail_count <= job_retrials: + try: + stat_filename = {0}[i].replace(".cmd", f"_STAT_{{fail_count}}") + stat_path_tmp = os.path.join(os.getcwd(),f"{{stat_filename}}.tmp") + Path(stat_path_tmp).replace(stat_path_tmp.replace(".tmp","")) + except: + print(f"Couldn't write the stat file:{{stat_path_tmp}}") + fail_count = fail_count + 1 if not os.path.exists(completed_path): open(failed_wrapper,'wb').close() open(failed_path, 'wb').close() if os.path.exists(failed_wrapper): os.remove(os.path.join(os.getcwd(),wrapper_id)) + print("WRAPPER_FAILED") wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") open(wrapper_failed, 'wb').close() os._exit(1) @@ -505,15 +524,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): self.fail_count = fail_count def run(self): + print("\\n") jobname = self.template.replace('.cmd', '') out = str(self.template) + ".out." + str(self.fail_count) err = str(self.template) + ".err." + str(self.fail_count) - print((out+"\\n")) - command = "./" + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() - print((command+"\\n")) - (self.status) = getstatusoutput("timeout {0} " + command + " > " + out + " 2> " + err) - for i in self.status: - print((str(i)+"\\n")) + out_path = os.path.join(os.getcwd(), out) + err_path = os.path.join(os.getcwd(), err) + template_path = os.path.join(os.getcwd(), self.template) + command = f"timeout {0} {{template_path}} > {{out_path}} 2> {{err_path}}" + print(command) + getstatusoutput(command) + """).format(str(self.wallclock_by_level),'\n'.ljust(13)) diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 1f47996a95f4622b17cf88dc6a990572f2d45476..1bd97c831c11119bab6bcb63bd5dc0dcd494803a 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -140,6 +140,58 @@ class WrapperFactory(object): def threads_directive(self, threads): raise NotImplemented(self.exception) + +class LocalWrapperFactory(WrapperFactory): + + def vertical_wrapper(self, **kwargs): + return PythonVerticalWrapperBuilder(**kwargs) + + def horizontal_wrapper(self, **kwargs): + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) + + def hybrid_wrapper_horizontal_vertical(self, **kwargs): + return PythonHorizontalVerticalWrapperBuilder(**kwargs) + + def hybrid_wrapper_vertical_horizontal(self, **kwargs): + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) + + def reservation_directive(self, reservation): + return '#' + + def dependency_directive(self, dependency): + return '#' + + def queue_directive(self, queue): + return '#' + + def processors_directive(self, processors): + return '#' + + def nodes_directive(self, nodes): + return '#' + + def tasks_directive(self, tasks): + return '#' + + def partition_directive(self, partition): + return '#' + + def exclusive_directive(self, exclusive): + return '#' + + def threads_directive(self, threads): + return '#' + + def header_directives(self, **kwargs): + return "" + class SlurmWrapperFactory(WrapperFactory): def vertical_wrapper(self, **kwargs): diff --git a/bin/autosubmit b/bin/autosubmit index d08f47593a4bffef1fe577076a6ffe24c394b856..b60ce0a7408a3ca2d5b331e0106a9a894398298a 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -51,7 +51,7 @@ def exit_from_error(e: BaseException): args = [str(e)] Log.critical(msg.format(*args)) Log.info("More info at https://autosubmit.readthedocs.io/en/master/troubleshooting/error-codes.html") - os._exit(1) + return 1 # noinspection PyProtectedMember def main(): @@ -60,8 +60,8 @@ def main(): if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): os.remove(os.path.join(Log.file_path, "autosubmit.lock")) if type(return_value) is int: - os._exit(return_value) - os._exit(0) + return return_value + return 0 except AutosubmitError as e: exit_from_error(e) except AutosubmitCritical as e: @@ -70,4 +70,5 @@ def main(): exit_from_error(e) if __name__ == "__main__": - main() + exit_code = main() + sys.exit(exit_code) # Sys.exit ensures a proper cleanup of the program, while os._exit() does not. \ No newline at end of file diff --git a/setup.py b/setup.py index 8d1f5b1001c304d12a82508756b162fc54ba7284..013dcfc2181ef790d62c2e4fa4f953d220d1b601 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ install_requires = [ 'numpy<2', 'ruamel.yaml==0.17.21', 'rocrate==0.*', - 'autosubmitconfigparser==1.0.67', + 'autosubmitconfigparser==1.0.70', 'configparser', 'setproctitle', # 'sqlalchemy[mypy]' # TODO: pending Postgres MR diff --git a/test/unit/test_database_regression.py b/test/unit/test_database_regression.py new file mode 100644 index 0000000000000000000000000000000000000000..58116b581e164b35aff5818be0fe37a6f7e3ac2c --- /dev/null +++ b/test/unit/test_database_regression.py @@ -0,0 +1,258 @@ +import shutil +import pytest +from pathlib import Path +from autosubmit.autosubmit import Autosubmit +import os +import pwd +from test.unit.utils.common import create_database, init_expid +import sqlite3 + + +def _get_script_files_path() -> Path: + return Path(__file__).resolve().parent / 'files' + + +@pytest.fixture +def db_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp(f'db_tests') + os.mkdir(folder.join('scratch')) + os.mkdir(folder.join('db_tmp_dir')) + file_stat = os.stat(f"{folder.strpath}") + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + folder.owner = file_owner + + # Write an autosubmitrc file in the temporary directory + autosubmitrc = folder.join('autosubmitrc') + autosubmitrc.write(f''' +[database] +path = {folder} +filename = tests.db + +[local] +path = {folder} + +[globallogs] +path = {folder} + +[structures] +path = {folder} + +[historicdb] +path = {folder} + +[historiclog] +path = {folder} + +[defaultstats] +path = {folder} + +''') + os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc')) + create_database(str(folder.join('autosubmitrc'))) + assert "tests.db" in [Path(f).name for f in folder.listdir()] + init_expid(str(folder.join('autosubmitrc')), platform='local', create=False) + assert "t000" in [Path(f).name for f in folder.listdir()] + return folder + + +@pytest.fixture +def prepare_db(db_tmpdir): + # touch as_misc + # remove files under t000/conf + conf_folder = Path(f"{db_tmpdir.strpath}/t000/conf") + shutil.rmtree(conf_folder) + os.makedirs(conf_folder) + platforms_path = Path(f"{db_tmpdir.strpath}/t000/conf/platforms.yml") + main_path = Path(f"{db_tmpdir.strpath}/t000/conf/main.yml") + # Add each platform to test + with platforms_path.open('w') as f: + f.write(f""" +PLATFORMS: + dummy: + type: dummy + """) + + with main_path.open('w') as f: + f.write(f""" +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + # Number of chunks of the experiment. + NUMCHUNKS: '3' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + +CONFIG: + # Current version of Autosubmit. + AUTOSUBMIT_VERSION: "" + # Total number of jobs in the workflow. + TOTALJOBS: 20 + # Maximum number of jobs permitted in the waiting status. + MAXWAITINGJOBS: 20 + SAFETYSLEEPTIME: 1 +DEFAULT: + # Job experiment ID. + EXPID: "t000" + # Default HPC platform name. + HPCARCH: "local" + #hint: use %PROJDIR% to point to the project folder (where the project is cloned) + # Custom configuration location. +project: + # Type of the project. + PROJECT_TYPE: None + # Folder to hold the project sources. + PROJECT_DESTINATION: local_project +""") + expid_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000") + dummy_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/dummy_dir") + real_data = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/real_data") + # We write some dummy data inside the scratch_dir + os.makedirs(expid_dir, exist_ok=True) + os.makedirs(dummy_dir, exist_ok=True) + os.makedirs(real_data, exist_ok=True) + + with open(dummy_dir.joinpath('dummy_file'), 'w') as f: + f.write('dummy data') + # create some dummy absolute symlinks in expid_dir to test migrate function + (real_data / 'dummy_symlink').symlink_to(dummy_dir / 'dummy_file') + return db_tmpdir + + +@pytest.mark.parametrize("jobs_data, expected_count, final_status", [ + # Success + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + """, 1, "COMPLETED"), + # Success wrapper + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 1, "COMPLETED"), + # Failure + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + exit 1 + DEPENDENCIES: job-1 + PLATFORM: local + RUNNING: chunk + wallclock: 00:01 + retrials: 2 + """, 3, "FAILED"), + # Failure wrappers + (""" + JOBS: + job: + SCRIPT: | + echo "Hello World" + sleep 1 + exit 1 + PLATFORM: local + DEPENDENCIES: job-1 + RUNNING: chunk + wallclock: 00:10 + retrials: 2 + wrappers: + wrapper: + JOBS_IN_WRAPPER: job + TYPE: vertical + """, 3, "FAILED"), +], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) +def test_db(db_tmpdir, prepare_db, jobs_data, expected_count, final_status, mocker): + # write jobs_data + jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") + with jobs_path.open('w') as f: + f.write(jobs_data) + + # Create + init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True) + + # This is set in _init_log which is not called + as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") + with as_misc.open('w') as f: + f.write(f""" + AS_MISC: True + ASMISC: + COMMAND: run + AS_COMMAND: run + """) + + # Run the experiment + mocker.patch('autosubmit.platforms.platform.max', return_value=60) + Autosubmit.run_experiment(expid='t000') + + # Test database exists. + job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") + autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") + assert job_data.exists() + assert autosubmit_db.exists() + + # Check job_data info + conn = sqlite3.connect(job_data) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute("SELECT * FROM job_data") + rows = c.fetchall() + # Convert rows to a list of dictionaries + rows_as_dicts = [dict(row) for row in rows] + # Tune the print so it is more readable, so it is easier to debug in case of failure + column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] + column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col + in column_names] + print(f"Experiment folder: {db_tmpdir.strpath}") + header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) + print(f"\n{header}") + print("-" * len(header)) + # Print the rows + for row_dict in rows_as_dicts: # always print, for debug proposes + print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) + for row_dict in rows_as_dicts: + # Check that all fields contain data, except extra_data, children, and platform_output + # Check that submit, start and finish are > 0 + assert row_dict["submit"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["start"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["status"] == final_status + for key in [key for key in row_dict.keys() if + key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: + assert str(row_dict[key]) != str("") + # Check that the job_data table has the expected number of entries + c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") + count_rows = c.fetchall() + for row in count_rows: + assert row["count"] == expected_count + # Close the cursor and connection + c.close() + conn.close() diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 3808dba35bf988240d0511fadafb7da96b0ff546..a11323df1839b49edbe9c95bea313a12723d2191 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -222,8 +222,9 @@ def mock_as_conf(): } return MockAsConf() + def test_jobs_in_wrapper_str(mock_as_conf): # Arrange current_wrapper = "current_wrapper" result = jobs_in_wrapper_str(mock_as_conf, current_wrapper) - assert result == "job1_job2_job3" \ No newline at end of file + assert result == "job1_job2_job3" diff --git a/test/unit/test_log_recovery.py b/test/unit/test_log_recovery.py new file mode 100644 index 0000000000000000000000000000000000000000..d45f1579148d97f8a0adff763eb99c2c287160ef --- /dev/null +++ b/test/unit/test_log_recovery.py @@ -0,0 +1,160 @@ +import time +import pytest +from pathlib import Path +import os +import pwd +from autosubmit.job.job_common import Status +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmit.job.job import Job + + +def _get_script_files_path() -> Path: + return Path(__file__).resolve().parent / 'files' + + +@pytest.fixture +def current_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp(f'tests') + os.mkdir(folder.join('scratch')) + file_stat = os.stat(f"{folder.strpath}") + file_owner_id = file_stat.st_uid + file_owner = pwd.getpwuid(file_owner_id).pw_name + folder.owner = file_owner + return folder + + +@pytest.fixture +def prepare_test(current_tmpdir): + # touch as_misc + platforms_path = Path(f"{current_tmpdir.strpath}/platforms_t000.yml") + jobs_path = Path(f"{current_tmpdir.strpath}/jobs_t000.yml") + project = "whatever" + scratch_dir = f"{current_tmpdir.strpath}/scratch" + Path(f"{scratch_dir}/{project}/{current_tmpdir.owner}").mkdir(parents=True, exist_ok=True) + Path(f"{scratch_dir}/LOG_t000").mkdir(parents=True, exist_ok=True) + Path(f"{scratch_dir}/LOG_t000/t000.cmd.out.0").touch() + Path(f"{scratch_dir}/LOG_t000/t000.cmd.err.0").touch() + + # Add each platform to test + with platforms_path.open('w') as f: + f.write(f""" +PLATFORMS: + pytest-ps: + type: ps + host: 127.0.0.1 + user: {current_tmpdir.owner} + project: {project} + scratch_dir: {scratch_dir} + """) + # add a job of each platform type + with jobs_path.open('w') as f: + f.write(f""" +JOBS: + base: + SCRIPT: | + echo "Hello World" + echo sleep 5 + QUEUE: hpc + PLATFORM: pytest-ps + RUNNING: once + wallclock: 00:01 +EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: month + # Size of each chunk. + CHUNKSIZE: '4' + # Number of chunks of the experiment. + NUMCHUNKS: '2' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + """) + return current_tmpdir + + +@pytest.fixture +def local(prepare_test): + # Init Local platform + from autosubmit.platforms.locplatform import LocalPlatform + config = { + 'LOCAL_ROOT_DIR': f"{prepare_test}/scratch", + 'LOCAL_TMP_DIR': f"{prepare_test}/scratch", + } + local = LocalPlatform(expid='t000', name='local', config=config) + return local + + +@pytest.fixture +def as_conf(prepare_test, mocker): + mocker.patch('pathlib.Path.exists', return_value=True) + as_conf = AutosubmitConfig("test") + as_conf.experiment_data = as_conf.load_config_file(as_conf.experiment_data, + Path(prepare_test.join('platforms_t000.yml'))) + as_conf.misc_data = {"AS_COMMAND": "run"} + return as_conf + + +def test_log_recovery_no_keep_alive(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + time.sleep(2) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_keep_alive(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + time.sleep(1) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_keep_alive_cleanup(prepare_test, local, mocker, as_conf): + mocker.patch('autosubmit.platforms.platform.max', return_value=1) + local.keep_alive_timeout = 0 + local.spawn_log_retrieval_process(as_conf) + assert local.log_recovery_process.is_alive() + local.work_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() + local.work_event.set() + local.cleanup_event.set() + time.sleep(1) + assert local.log_recovery_process.is_alive() is False + local.cleanup_event.set() + + +def test_log_recovery_recover_log(prepare_test, local, mocker, as_conf): + print(prepare_test.strpath) + mocker.patch('autosubmit.platforms.platform.max', return_value=0) + local.keep_alive_timeout = 20 + mocker.patch('autosubmit.job.job.Job.write_stats') # Tested in test_database_regression.py + local.spawn_log_retrieval_process(as_conf) + local.work_event.set() + job = Job('t000', '0000', Status.COMPLETED, 0) + job.name = 'test_job' + job.platform = local + job.platform_name = 'local' + job.local_logs = ("t000.cmd.out.moved", "t000.cmd.err.moved") + job._init_runtime_parameters() + local.work_event.set() + local.add_job_to_log_recover(job) + local.cleanup_event.set() + local.log_recovery_process.join(30) # should exit earlier. + assert local.log_recovery_process.is_alive() is False + assert Path(f"{prepare_test.strpath}/scratch/LOG_t000/t000.cmd.out.moved").exists() + assert Path(f"{prepare_test.strpath}/scratch/LOG_t000/t000.cmd.err.moved").exists()