From 5537a43b8bc76743f7a286ee82d5c9eaf7cd3db1 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 14 Oct 2020 12:10:43 +0200 Subject: [PATCH 1/2] Implementing #484. Added job footer information --- autosubmit/autosubmit.py | 15 +++++++ autosubmit/database/db_jobdata.py | 72 +++++++++++++++++++++++++++++-- autosubmit/job/job.py | 11 +++-- autosubmit/platforms/platform.py | 39 +++++++++++++++-- 4 files changed, 124 insertions(+), 13 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ab1cd5bfc..038761972 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1379,6 +1379,21 @@ class Autosubmit: while job_list.get_active(): try: if Autosubmit.exit: + # Closing threads on Ctrl+C + exit_timeout = 0 + exit_active_threads = True + Log.info( + "Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 60 seconds.") + while exit_active_threads and exit_timeout <= 60: + exit_active_threads = False + for thread in threading.enumerate(): + if "Thread-" in thread.name: + if thread.is_alive(): + Log.info( + "{0} is still working.".format(thread.name)) + exit_active_threads = True + sleep(20) + exit_timeout += 20 return 0 # reload parameters changes Log.debug("Reloading parameters...") diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index aa1103b7b..f537e1326 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -27,7 +27,7 @@ import traceback import sqlite3 import copy import collections -from datetime import datetime +from datetime import datetime, timedelta from json import dumps, loads #from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig @@ -237,6 +237,48 @@ class JobData(object): self.require_update = True self._energy = energy if energy else 0 + def delta_queue_time(self): + return str(timedelta(seconds=self.queuing_time())) + + def delta_running_time(self): + return str(timedelta(seconds=self.running_time())) + + def submit_datetime(self): + if self.submit > 0: + return datetime.fromtimestamp(self.submit) + return None + + def start_datetime(self): + if self.start > 0: + return datetime.fromtimestamp(self.start) + return None + + def finish_datetime(self): + if self.finish > 0: + return datetime.fromtimestamp(self.finish) + return None + + def submit_datetime_str(self): + o_datetime = self.submit_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + + def start_datetime_str(self): + o_datetime = self.start_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + + def finish_datetime_str(self): + o_datetime = self.finish_datetime() + if o_datetime: + return o_datetime.strftime('%Y-%m-%d-%H:%M:%S') + else: + return None + def running_time(self): """Calculates the running time of the job. @@ -265,6 +307,24 @@ class JobData(object): return queue return 0 + def get_hdata(self): + hdata = collections.OrderedDict() + hdata["name"] = self.job_name + hdata["date"] = self.date + hdata["section"] = self.section + hdata["member"] = self.member + hdata["chunk"] = self.chunk + hdata["submit"] = self.submit_datetime_str() + hdata["start"] = self.start_datetime_str() + hdata["finish"] = self.finish_datetime_str() + hdata["queue_time"] = self.delta_queue_time() + hdata["run_time"] = self.delta_running_time() + hdata["wallclock"] = self.wallclock + hdata["ncpus"] = self.ncpus + hdata["nnodes"] = self.nnodes + hdata["energy"] = self.energy + return dumps(hdata) + class JobDataList(): """Object that stores the list of jobs to be handled. @@ -978,7 +1038,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None): """Writes the finish time into the database Args: @@ -1027,10 +1087,16 @@ class JobDataStructure(MainDataBase): try: if type(platform_object) is not str: if platform_object.type == "slurm" and job_id > 0: - # Waiting 30 seconds for slurm data completion + # Waiting 60 seconds for slurm data completion time.sleep(60) submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data, is_end_of_wrapper = platform_object.check_job_energy( job_id, is_packed) + # Writing EXTRADATA + if job_id > 0 and out_file_path is not None: + if job_data_last.job_id == job_id: + # print("Writing extra info") + platform_object.write_job_extrainfo( + job_data_last.get_hdata(), out_file_path) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ea8fb1721..d4c964910 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1064,16 +1064,15 @@ class Job(object): else: final_status = "FAILED" f.write('FAILED') - - # Launch first as simple + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) - # Launch second as thread + # Launch second as threaded function thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False)) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out)) thread_write_finish.start() - # JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - # self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) def check_started_after(self, date_limit): """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index e42884a15..71e69a051 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,7 @@ import os from log.log import Log +import traceback from autosubmit.job.job_common import Status @@ -19,7 +20,8 @@ class Platform(object): self.expid = expid self.name = name self.config = config - self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) + self.tmp_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) self._serial_platform = None self._serial_queue = None self._default_queue = None @@ -204,7 +206,8 @@ class Platform(object): :type remote_logs: (str, str) """ (job_out_filename, job_err_filename) = remote_logs - self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + self.get_files([job_out_filename, job_err_filename], + False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=0, recovery=False): """ @@ -276,7 +279,8 @@ class Platform(object): :rtype: bool """ filename = job_name + '_STAT' - stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -294,7 +298,8 @@ class Platform(object): :rtype: str """ if self.type == "local": - path = os.path.join(self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) + path = os.path.join( + self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) else: path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path @@ -363,3 +368,29 @@ class Platform(object): except Exception as ex: Log.error("Writing Job Id Failed : " + str(ex)) + def write_job_extrainfo(self, job_hdata, complete_path): + """[summary] + + :param job_hdata: job extra data + :type job_hdata: str + :param complete_path: complete path to the file, includes filename + :type complete_path: str + :return: Modifies file and returns True, False if file could not be modified + :rtype: Boolean + """ + try: + # footer = "extra_data = {0}".format() + # print("Complete path {0}".format(complete_path)) + if os.path.exists(complete_path): + file_type = complete_path[-3:] + # print("Detected file type {0}".format(file_type)) + if file_type == "out" or file_type == "err": + with open(complete_path, "a") as f: + job_footer_info = "[INFO] HDATA={0}".format(job_hdata) + f.write(job_footer_info) + f.close() + except Exception as ex: + Log.debug(traceback.format_exc()) + Log.warning( + "Autosubmit has not written extra information into the .out log.") + pass -- GitLab From 1be4a868089c94b07c8377682c5fb28195ffd09e Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 14 Oct 2020 12:37:56 +0200 Subject: [PATCH 2/2] Added platform to the format --- autosubmit/database/db_jobdata.py | 1 + 1 file changed, 1 insertion(+) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index f537e1326..b25fc8dfd 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -323,6 +323,7 @@ class JobData(object): hdata["ncpus"] = self.ncpus hdata["nnodes"] = self.nnodes hdata["energy"] = self.energy + hdata["platform"] = self.platform return dumps(hdata) -- GitLab