diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ab1cd5bfce72391894e2946de191c2ad0784bd6f..03876197252c1d856628ec977080cadbb583c890 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1379,6 +1379,21 @@ class Autosubmit: while job_list.get_active(): try: if Autosubmit.exit: + # Closing threads on Ctrl+C + exit_timeout = 0 + exit_active_threads = True + Log.info( + "Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 60 seconds.") + while exit_active_threads and exit_timeout <= 60: + exit_active_threads = False + for thread in threading.enumerate(): + if "Thread-" in thread.name: + if thread.is_alive(): + Log.info( + "{0} is still working.".format(thread.name)) + exit_active_threads = True + sleep(20) + exit_timeout += 20 return 0 # reload parameters changes Log.debug("Reloading parameters...") diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index aa1103b7b665b733a57605a1cd82ab510ed5eb65..b25fc8dfd5f19adffca6a02a35b14149fa9e5be2 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -27,7 +27,7 @@ import traceback import sqlite3 import copy import collections -from datetime import datetime +from datetime import datetime, timedelta from json import dumps, loads #from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig @@ -237,6 +237,48 @@ class JobData(object): self.require_update = True self._energy = energy if energy else 0 + def delta_queue_time(self): + return str(timedelta(seconds=self.queuing_time())) + + def delta_running_time(self): + return str(timedelta(seconds=self.running_time())) + + def submit_datetime(self): + if self.submit > 0: + return datetime.fromtimestamp(self.submit) + return None + + def start_datetime(self): + if self.start > 0: + return datetime.fromtimestamp(self.start) + return None + + def finish_datetime(self): + if self.finish > 0: + return datetime.fromtimestamp(self.finish) + return None + + def submit_datetime_str(self): + o_datetime = self.submit_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + + def start_datetime_str(self): + o_datetime = self.start_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + + def finish_datetime_str(self): + o_datetime = self.finish_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + def running_time(self): """Calculates the running time of the job. @@ -265,6 +307,25 @@ class JobData(object): return queue return 0 + def get_hdata(self): + hdata = collections.OrderedDict() + hdata["name"] = self.job_name + hdata["date"] = self.date + hdata["section"] = self.section + hdata["member"] = self.member + hdata["chunk"] = self.chunk + hdata["submit"] = self.submit_datetime_str() + hdata["start"] = self.start_datetime_str() + hdata["finish"] = self.finish_datetime_str() + hdata["queue_time"] = self.delta_queue_time() + hdata["run_time"] = self.delta_running_time() + hdata["wallclock"] = self.wallclock + hdata["ncpus"] = self.ncpus + hdata["nnodes"] = self.nnodes + hdata["energy"] = self.energy + hdata["platform"] = self.platform + return dumps(hdata) + class JobDataList(): """Object that stores the list of jobs to be handled. @@ -978,7 +1039,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None): """Writes the finish time into the database Args: @@ -1027,10 +1088,16 @@ class JobDataStructure(MainDataBase): try: if type(platform_object) is not str: if platform_object.type == "slurm" and job_id > 0: - # Waiting 30 seconds for slurm data completion + # Waiting 60 seconds for slurm data completion time.sleep(60) submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data, is_end_of_wrapper = platform_object.check_job_energy( job_id, is_packed) + # Writing EXTRADATA + if job_id > 0 and out_file_path is not None: + if job_data_last.job_id == job_id: + # print("Writing extra info") + platform_object.write_job_extrainfo( + job_data_last.get_hdata(), out_file_path) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ea8fb17218ab1ef903e36c1eaa2efc91ce288e5b..d4c96491067e60d5bac6e21737e3ca4fd0ae91d2 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1064,16 +1064,15 @@ class Job(object): else: final_status = "FAILED" f.write('FAILED') - - # Launch first as simple + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) - # Launch second as thread + # Launch second as threaded function thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False)) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out)) thread_write_finish.start() - # JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - # self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) def check_started_after(self, date_limit): """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index e42884a15b965c52dfb858d9431fee219a499596..71e69a0519f608b69b0d019764feb18d8c676951 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,7 @@ import os from log.log import Log +import traceback from autosubmit.job.job_common import Status @@ -19,7 +20,8 @@ class Platform(object): self.expid = expid self.name = name self.config = config - self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) + self.tmp_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) self._serial_platform = None self._serial_queue = None self._default_queue = None @@ -204,7 +206,8 @@ class Platform(object): :type remote_logs: (str, str) """ (job_out_filename, job_err_filename) = remote_logs - self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + self.get_files([job_out_filename, job_err_filename], + False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=0, recovery=False): """ @@ -276,7 +279,8 @@ class Platform(object): :rtype: bool """ filename = job_name + '_STAT' - stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -294,7 +298,8 @@ class Platform(object): :rtype: str """ if self.type == "local": - path = os.path.join(self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) + path = os.path.join( + self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) else: path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path @@ -363,3 +368,29 @@ class Platform(object): except Exception as ex: Log.error("Writing Job Id Failed : " + str(ex)) + def write_job_extrainfo(self, job_hdata, complete_path): + """[summary] + + :param job_hdata: job extra data + :type job_hdata: str + :param complete_path: complete path to the file, includes filename + :type complete_path: str + :return: Modifies file and returns True, False if file could not be modified + :rtype: Boolean + """ + try: + # footer = "extra_data = {0}".format() + # print("Complete path {0}".format(complete_path)) + if os.path.exists(complete_path): + file_type = complete_path[-3:] + # print("Detected file type {0}".format(file_type)) + if file_type == "out" or file_type == "err": + with open(complete_path, "a") as f: + job_footer_info = "[INFO] HDATA={0}".format(job_hdata) + f.write(job_footer_info) + f.close() + except Exception as ex: + Log.debug(traceback.format_exc()) + Log.warning( + "Autosubmit has not written extra information into the .out log.") + pass