From 623f79abd22ce440e687565ec253da5242ea7af2 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Mon, 17 Aug 2020 12:12:04 +0200 Subject: [PATCH 1/9] Momentary fix for #573, also related to #571. --- autosubmit/database/db_jobdata.py | 41 ++++++-- autosubmit/job/job.py | 6 +- autosubmit/platforms/paramiko_platform.py | 4 +- autosubmit/platforms/slurmplatform.py | 111 ++++++++++++++++++---- 4 files changed, 134 insertions(+), 28 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index da4f58811..b163a4d27 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -194,6 +194,25 @@ class MainDataBase(): Log.warning("Error on create table . create_table") return None + def create_index(self): + """ Creates index + """ + try: + if self.conn: + c = self.conn.cursor() + c.execute(self.create_index_query) + else: + raise IOError("Not a valid connection") + except IOError as exp: + Log.warning(exp) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(str(type(e).__name__)) + Log.warning("Error on create index . create_index") + return None + class ExperimentStatus(MainDataBase): def __init__(self, expid): @@ -426,14 +445,17 @@ class JobDataStructure(MainDataBase): platform TEXT NOT NULL, job_id INTEGER NOT NULL, extra_data TEXT NOT NULL, - UNIQUE(counter,job_name), - INDEX + UNIQUE(counter,job_name) ); - CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name);''') + ''') + self.create_index_query = textwrap.dedent(''' + CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); + ''') if not os.path.exists(self.database_path): open(self.database_path, "w") self.conn = self.create_connection(self.database_path) self.create_table() + self.create_index() if self._set_pragma_version(CURRENT_DB_VERSION): Log.info("Database version set.") else: @@ -546,7 +568,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False): """Writes the finish time into the database Args: @@ -583,9 +605,16 @@ class JobDataStructure(MainDataBase): try: if type(platform_object) is not str: if platform_object.type == "slurm": - #print("Checking Slurm for " + str(job_name)) + # print("Checking Slurm for " + str(job_name)) submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( - job_id) + job_id, packed) + # print(job_id) + # print(packed) + # print(submit_time) + # print(start_time) + # print(finish_time) + # print(energy) + # print(extra_data) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 928171da1..d1c217861 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -950,7 +950,7 @@ class Job(object): if show_logs: if not set(variables).issuperset(set(parameters)): Log.warning("The following set of variables are not being used in the templates: {0}", - str(set(parameters)-set(variables))) + str(set(parameters) - set(variables))) return out @@ -1021,7 +1021,7 @@ class Job(object): final_status = "FAILED" f.write('FAILED') JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed) def check_started_after(self, date_limit): """ @@ -1329,7 +1329,7 @@ done output = self.platform.check_completed_files(job.name) if output is None or output == '': sleep(wait) - retries = retries-1 + retries = retries - 1 if output is not None and output != '' and 'COMPLETED' in output: job.new_status = Status.COMPLETED job.update_status(self.as_config.get_copy_remote_logs() == 'true') diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 875910c3a..c9dec8016 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -302,7 +302,7 @@ class ParamikoPlatform(Platform): else: return None - def check_job_energy(self, job_id): + def check_job_energy(self, job_id, packed=False): """ Checks job energy and return values. Defined in child classes. @@ -315,7 +315,7 @@ class ParamikoPlatform(Platform): check_energy_cmd = self.get_job_energy_cmd(job_id) self.send_command(check_energy_cmd) return self.parse_job_finish_data( - self.get_ssh_output(), job_id) + self.get_ssh_output(), job_id, packed) def submit_Script(self, hold=False): """ diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 251d64f21..8d48b058c 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -21,6 +21,7 @@ import os from time import sleep from time import mktime from datetime import datetime +import traceback from xml.dom.minidom import parseString @@ -56,7 +57,7 @@ class SlurmPlatform(ParamikoPlatform): exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") self._submit_script_path = os.path.join( - tmp_path, config.LOCAL_ASLOG_DIR, "submit_"+self.name+".sh") + tmp_path, config.LOCAL_ASLOG_DIR, "submit_" + self.name + ".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -116,35 +117,111 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_output(self, output): return output.strip().split(' ')[0].strip() - - def parse_job_finish_data(self, output, job_id): + + def parse_job_finish_data(self, output, job_id, packed): + """Parses the context of the sacct query to SLURM for a single job. + + :param output: The sacct output + :type output: str + :param job_id: Id in SLURM for the job + :type job_id: int + :param packed: true if job belongs to package + :type packed: bool + :return: submit, start, finish, joules, detailed_data + :rtype: int, int, int, int, json object (str) + """ try: + # Storing detail for posterity detailed_data = dict() + # No blank spaces after or before output = output.strip() lines = output.split("\n") + # If there is output, list exists if len(lines) > 0: + # Collecting information from all output for line in lines: line = line.strip().split() if len(line) > 0: + # Collecting detailed data name = str(line[0]) - extra_data = { "energy" : str(line[5] if len(line) > 5 else "NA"), "MaxRSS" : str(line[6] if len(line) > 6 else "NA"), "AveRSS" : str(line[7] if len(line) > 6 else "NA")} + extra_data = {"energy": str(line[5] if len(line) > 5 else "NA"), "MaxRSS": str( + line[6] if len(line) > 6 else "NA"), "AveRSS": str(line[7] if len(line) > 6 else "NA")} detailed_data[name] = extra_data - - line = lines[0].strip().split() - submit = int(mktime(datetime.strptime(line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) - start = int(mktime(datetime.strptime(line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) - finish = int(mktime(datetime.strptime(line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) - joules = int(float(str(line[5])[:-1]) * 1000 if len(line[5]) > 0 else 0) + submit = start = finish = joules = 0 + status = "UNKNOWN" + line = None + if packed == False: + line = lines[0].strip().split() + #print("Unpacked {0}".format(line)) + status = line[1] + if status != "COMPLETED": + packed == True + if packed == True: + i = -1 + while (status != "COMPLETED"): + if len(lines) >= i * -1: + line = lines[i].strip().split() + #print("Packed output {0}".format(output)) + #print("Packed lines {0}".format(lines)) + status = line[1] + ave_rss = line[6] if len(line) > 6 and len( + line[6].strip()) > 0 else "NA" + if (ave_rss == "NA"): + status = "UNKNOWN" + i -= 1 + else: + break + # print(line) + try: + submit = int(mktime(datetime.strptime( + line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) + start = int(mktime(datetime.strptime( + line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) + finish = int(mktime(datetime.strptime( + line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 4 and line[4] != "Unknown" else datetime.now().timestamp() + joules = self.parse_output_number( + line[5]) if len(line) > 5 and len(line[5]) > 0 else -1 + except Exception as exp: + # Log.info(str(exp)) + Log.info("Parsing error on SLURM output.") + # print(lines) + pass + # print(detailed_data) return (submit, start, finish, joules, detailed_data) - return (0,0,0,0, dict()) + return (0, 0, 0, 0, dict()) except Exception as exp: # On error return 4*0 - Log.warning(str(exp)) - return (0,0,0,0, dict()) - - #return str(output) + # print(exp) + print("From _update_exp_status: {0}".format( + traceback.format_exc())) + return (0, 0, 0, 0, dict()) + + def parse_output_number(self, string_number): + """[summary] + + Args: + string_number ([type]): [description] + """ + number = 0.0 + if (string_number): + last_letter = string_number.strip()[-1] + multiplier = 1 + if last_letter == "G": + multiplier = 1000000 + number = string_number[:-1] + elif last_letter == "K" or last_letter == "M": + multiplier = 1000 + number = string_number[:-1] + else: + number = string_number + try: + number = float(number) * multiplier + except Exception as exp: + number = 0.0 + pass + return number def parse_Alljobs_output(self, output, job_id): status = [x.split()[1] for x in output.splitlines() @@ -184,7 +261,7 @@ class SlurmPlatform(ParamikoPlatform): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): - return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS,AveRSS'.format(job_id) + return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS%20,AveRSS%20'.format(job_id) def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() @@ -220,7 +297,7 @@ class SlurmPlatform(ParamikoPlatform): else: language = "#!/usr/bin/env python" return \ - language+""" + language + """ ############################################################################### # {0} ############################################################################### -- GitLab From d25b3ef6304be9f3d657ee0a9990abc489a4420d Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 18 Aug 2020 11:52:46 +0200 Subject: [PATCH 2/9] Slurm output handled, but still need to react when the output is not the expected. --- autosubmit/platforms/slurmplatform.py | 30 +++++++++++---------------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 8d48b058c..f419aeaf5 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -153,25 +153,19 @@ class SlurmPlatform(ParamikoPlatform): if packed == False: line = lines[0].strip().split() #print("Unpacked {0}".format(line)) - status = line[1] + status = str(line[1]) if status != "COMPLETED": packed == True if packed == True: i = -1 + # It can happen that after this loop, there is no COMPLETED job information while (status != "COMPLETED"): if len(lines) >= i * -1: line = lines[i].strip().split() - #print("Packed output {0}".format(output)) - #print("Packed lines {0}".format(lines)) - status = line[1] - ave_rss = line[6] if len(line) > 6 and len( - line[6].strip()) > 0 else "NA" - if (ave_rss == "NA"): - status = "UNKNOWN" - i -= 1 + status = str(line[1]) else: break - # print(line) + try: submit = int(mktime(datetime.strptime( line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) @@ -182,9 +176,7 @@ class SlurmPlatform(ParamikoPlatform): joules = self.parse_output_number( line[5]) if len(line) > 5 and len(line[5]) > 0 else -1 except Exception as exp: - # Log.info(str(exp)) - Log.info("Parsing error on SLURM output.") - # print(lines) + Log.info("Parsing mishandling. Further attempt is necessary.") pass # print(detailed_data) @@ -194,15 +186,17 @@ class SlurmPlatform(ParamikoPlatform): except Exception as exp: # On error return 4*0 # print(exp) - print("From _update_exp_status: {0}".format( - traceback.format_exc())) + Log.warning("From _update_exp_status: {0}".format(str(exp))) return (0, 0, 0, 0, dict()) def parse_output_number(self, string_number): - """[summary] + """ + Parses number in format 1.0K 1.0M 1.0G - Args: - string_number ([type]): [description] + :param string_number: String representation of number + :type string_number: str + :return: number in float format + :rtype: float """ number = 0.0 if (string_number): -- GitLab From 436039f37b9aec992001c17f5fac863014bc9c05 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 18 Aug 2020 11:54:27 +0200 Subject: [PATCH 3/9] Status comments --- autosubmit/database/db_jobdata.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index b163a4d27..49664f170 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -195,7 +195,7 @@ class MainDataBase(): return None def create_index(self): - """ Creates index + """ Creates index from statement defined in child class """ try: if self.conn: @@ -448,9 +448,11 @@ class JobDataStructure(MainDataBase): UNIQUE(counter,job_name) ); ''') + # Index creation should be in a different statement self.create_index_query = textwrap.dedent(''' CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); ''') + if not os.path.exists(self.database_path): open(self.database_path, "w") self.conn = self.create_connection(self.database_path) -- GitLab From a49dc06a3bff5606000eca2a370d0ee23bb9dc8e Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 21 Aug 2020 11:07:06 +0200 Subject: [PATCH 4/9] Fixed #573. Changed the way the job historical database handles the SLURM sacct command output for wrappers. --- autosubmit/autosubmit.py | 2 +- autosubmit/database/db_jobdata.py | 201 +++++++++++++++++++--- autosubmit/job/job.py | 4 +- autosubmit/job/job_list.py | 10 +- autosubmit/platforms/paramiko_platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 92 ++++++---- 6 files changed, 243 insertions(+), 68 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index f0837691f..88e087ec7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,7 +73,7 @@ from bscearth.utils.date import date2str from monitor.monitor import Monitor from database.db_common import get_autosubmit_version from database.db_common import delete_experiment -from database.db_jobdata import ExperimentStatus +from database.db_jobdata import ExperimentStatus, JobDataStructure from experiment.experiment_common import copy_experiment from experiment.experiment_common import new_experiment from database.db_common import create_db diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 49664f170..860679958 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -31,11 +31,20 @@ from datetime import datetime from json import dumps #from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig +from autosubmit.job.job_package_persistence import JobPackagePersistence from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates CURRENT_DB_VERSION = 10 -_debug = True +# Defining RowType standard + + +class RowType: + NORMAL = 2 + #PACKED = 2 + + +_debug = False JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data']) @@ -47,7 +56,7 @@ class JobData(): """Job Data object """ - def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()): + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()): """[summary] Args: @@ -276,9 +285,10 @@ class ExperimentStatus(MainDataBase): result = list() # print(current_table) # print(type(current_table)) - for item in current_table: - #exp_id, expid, status, seconds = item - result.append(ExperimentRow(*item)) + if current_table: + for item in current_table: + #exp_id, expid, status, seconds = item + result.append(ExperimentRow(*item)) return result def _get_id_db(self): @@ -415,11 +425,14 @@ class JobDataStructure(MainDataBase): MainDataBase.__init__(self, expid) BasicConfig.read() #self.expid = expid + self.basic_conf = BasicConfig + self.expid = expid self.folder_path = BasicConfig.JOBDATA_DIR self.database_path = os.path.join( self.folder_path, "job_data_" + str(expid) + ".db") #self.conn = None self.jobdata_list = JobDataList(self.expid) + # We use rowtype to identify a packed job self.create_table_query = textwrap.dedent( '''CREATE TABLE IF NOT EXISTS job_data ( @@ -463,7 +476,63 @@ class JobDataStructure(MainDataBase): else: self.conn = self.create_connection(self.database_path) - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + def determine_rowtype(self, code): + """ + Determines rowtype based on job information. + + :param packed: True if job belongs to wrapper, False otherwise + :type packed: boolean + :return: rowtype, 2 packed, 1 normal + :rtype: int + """ + if code: + return code + else: + return RowType.NORMAL + + def get_job_package_code(self, current_job_name): + """ + Finds the package code and retrieves it. None if no package. + + :param BasicConfig: Basic configuration + :type BasicConfig: Configuration Object + :param expid: Experiment Id + :type expid: String + :param current_job_name: Name of job + :type current_jobs: string + :return: package code, None if not found + :rtype: int or None + """ + packages = None + try: + packages = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"), + "job_packages_" + self.expid).load(wrapper=False) + except Exception as ex: + Log.debug( + "Wrapper table not found, trying packages. JobDataStructure.retrieve_packages") + packages = None + try: + packages = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"), + "job_packages_" + self.expid).load(wrapper=True) + except Exception as exp2: + packages = None + + if (packages): + try: + for exp, package_name, job_name in packages: + #print("Looking for {0}".format(current_job_name)) + if current_job_name == job_name: + # print(package_name) + code = int(package_name.split("_")[2]) + return code + except Exception as ex: + Log.warning( + "Package parse error. JobDataStructure.retrieve_packages") + Log.debug(traceback.format_exc()) + return None + return None + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): """Writes submit time of job. Args: @@ -483,16 +552,11 @@ class JobDataStructure(MainDataBase): Returns: [type]: [description] """ - #print("Saving write submit " + job_name) try: job_data = self.get_job_data(job_name) current_counter = 1 max_counter = self._get_maxcounter_jobdata() - #submit = parse_date(submit) if submit > 0 else 0 - #print("submit job data " + str(job_data)) if job_data and len(job_data) > 0: - # print("job data has 1 element") - # max_counter = self._get_maxcounter_jobdata() job_max_counter = max(job.counter for job in job_data) current_last = [ job for job in job_data if job.counter == job_max_counter] @@ -507,7 +571,7 @@ class JobDataStructure(MainDataBase): current_counter = max_counter # Insert new last rowid = self._insert_job_data(JobData( - 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) + 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(self.get_job_package_code(job_name)), ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) # print(rowid) if rowid: return True @@ -522,7 +586,7 @@ class JobDataStructure(MainDataBase): # if rowid > 0: # print("Successfully inserted") - def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): """Writes start time into the database Args: @@ -600,7 +664,6 @@ class JobDataStructure(MainDataBase): # Updating existing row if job_data_last: job_data_last = job_data_last[0] - # if job_data_last.finish == 0: # Call Slurm here, update times. if platform_object: # print("There is platform object") @@ -610,23 +673,17 @@ class JobDataStructure(MainDataBase): # print("Checking Slurm for " + str(job_name)) submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( job_id, packed) - # print(job_id) - # print(packed) - # print(submit_time) - # print(start_time) - # print(finish_time) - # print(energy) - # print(extra_data) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) - energy = 0 - job_data_last.finish = int( - finish_time) if finish_time > 0 else int(finish) + #energy = 0 + job_data_last.finish = finish_time if finish_time > 0 else int( + finish) job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy - job_data_last.extra_data = dumps(extra_data) + job_data_last.extra_data = dumps( + extra_data) if extra_data else "NA" job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') if submit_time > 0 and start_time > 0: job_data_last.submit = int(submit_time) @@ -637,15 +694,15 @@ class JobDataStructure(MainDataBase): return True # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed) write_inserted = self.write_start_time(job_name, finish, status, ncpus, - wallclock, qos, date, member, section, chunk, platform, job_id) + wallclock, qos, date, member, section, chunk, platform, job_id, packed) # print(submit_inserted) # print(write_inserted) if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, packed) else: return None except Exception as exp: @@ -653,6 +710,48 @@ class JobDataStructure(MainDataBase): Log.warning("Autosubmit couldn't write finish time.") return None + def retry_incompleted_data(self, list_jobs): + """ + Retries retrieval of data that might be incompleted. + + :param list_jobs: list of jobs in experiment + :type list_jobs: list() + + :return: None (Modifies database) + """ + try: + pending_jobs = self.get_pending_data() + if pending_jobs: + for item in pending_jobs: + job_object = section = next( + (job for job in list_jobs if job.name == item), None) + if (job_object): + platform_object = job_object.platform + if type(platform_object) is not str: + if platform_object.type == "slurm": + # print("Checking Slurm for " + str(job_name)) + Log.info("Attempting to complete information for {0}".format( + job_object.name)) + submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( + job_object.id, job_object.packed) + if submit_time > 0 and start_time > 0: + job_data_last = self.get_job_data_last( + job_object.name)[0] + job_data_last.submit = int(submit_time) + job_data_last.start = int(start_time) + job_data_last.energy = energy + job_data_last.extra_data = dumps( + extra_data) + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + rowid = self._update_finish_job_data_plus( + job_data_last) + Log.info("Historic data successfully retrieved and updated for: {0} {1}".format( + job_object.name, rowid)) + except Exception as exp: + print(traceback.format_exc()) + Log.warning(str(exp)) + return None + def get_all_job_data(self): """[summary] @@ -711,6 +810,30 @@ class JobDataStructure(MainDataBase): Log.warning("Autosubmit couldn't retrieve job data. get_job_data") return None + def get_pending_data(self): + """[summary] + """ + try: + job_names_list = list() + if os.path.exists(self.folder_path): + current_pending = self._get_job_data_pending() + if current_pending: + for item in current_pending: + job_id, job_name, job_rowtype = item + job_names_list.append(job_name) + # job_name_to_detail[job_name] = (job_id, job_rowtype) + # jobid_list.append(job_id) + return job_names_list + else: + return None + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning( + "Autosubmit couldn't retrieve job data. get_job_data_last") + return None + def get_job_data_last(self, job_name): """ Returns latest jobdata row for a job_name. The current version. @@ -963,6 +1086,28 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Select : " + str(type(e).__name__)) return None + def _get_job_data_pending(self): + """ + Gets the list of job_id, job_name of those jobs that have pending information. + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT job_id, job_name, rowtype FROM job_data WHERE last=1 and platform='marenostrum4' and energy <= 0 and (status = 'COMPLETED' or status = 'FAILED')") + rows = cur.fetchall() + if rows and len(rows) > 0: + return rows + else: + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Error on historic database retrieval.") + return None + def _set_pragma_version(self, version=2): """Sets current version of the schema diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d1c217861..c907cb387 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -967,7 +967,7 @@ class Job(object): f.write(date2str(datetime.datetime.now(), 'S')) # Writing database JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed) def write_start_time(self): """ @@ -989,7 +989,7 @@ class Job(object): f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) # Writing database JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed) return True def write_end_time(self, completed): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 64b41c32a..8e9080c8a 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -232,16 +232,16 @@ class JobList: location = location.strip('[').strip(']') if ':' in location: if len(location) == 3: - for chunk_number in range(int(location[0]), int(location[2])+1): + for chunk_number in range(int(location[0]), int(location[2]) + 1): auxiliar_chunk_list.append( chunk_number) elif len(location) == 2: if ':' == location[0]: - for chunk_number in range(0, int(location[1])+1): + for chunk_number in range(0, int(location[1]) + 1): auxiliar_chunk_list.append( chunk_number) elif ':' == location[1]: - for chunk_number in range(int(location[0])+1, len(dic_jobs._chunk_list)-1): + for chunk_number in range(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): auxiliar_chunk_list.append( chunk_number) elif ',' in location: @@ -275,7 +275,7 @@ class JobList: numbers = str_split.split(":") # change this to be checked in job_common.py max_splits = min(int(numbers[1]), max_splits) - for count in range(int(numbers[0]), max_splits+1): + for count in range(int(numbers[0]), max_splits + 1): splits.append(int(str(count).zfill(len(numbers[0])))) else: if int(str_split) <= max_splits: @@ -1237,7 +1237,7 @@ class JobList: if not notransitive: # Transitive reduction required current_structure = None - if os.path.exists(os.path.join(self._config.STRUCTURES_DIR, "structure_"+self.expid+".db")): + if os.path.exists(os.path.join(self._config.STRUCTURES_DIR, "structure_" + self.expid + ".db")): try: current_structure = DbStructure.get_structure( self.expid, self._config.STRUCTURES_DIR) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index c9dec8016..d0508f8c2 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -315,7 +315,7 @@ class ParamikoPlatform(Platform): check_energy_cmd = self.get_job_energy_cmd(job_id) self.send_command(check_energy_cmd) return self.parse_job_finish_data( - self.get_ssh_output(), job_id, packed) + self.get_ssh_output(), packed) def submit_Script(self, hold=False): """ diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index f419aeaf5..be41e30b9 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -118,8 +118,10 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_output(self, output): return output.strip().split(' ')[0].strip() - def parse_job_finish_data(self, output, job_id, packed): - """Parses the context of the sacct query to SLURM for a single job. + def parse_job_finish_data(self, output, packed): + """Parses the context of the sacct query to SLURM for a single job. + Only normal jobs return submit, start, and finish times. + When a wrapper has finished, capture finish time. :param output: The sacct output :type output: str @@ -136,6 +138,8 @@ class SlurmPlatform(ParamikoPlatform): # No blank spaces after or before output = output.strip() lines = output.split("\n") + is_end_of_wrapper = False + extra_data = None # If there is output, list exists if len(lines) > 0: # Collecting information from all output @@ -144,41 +148,66 @@ class SlurmPlatform(ParamikoPlatform): if len(line) > 0: # Collecting detailed data name = str(line[0]) - extra_data = {"energy": str(line[5] if len(line) > 5 else "NA"), "MaxRSS": str( - line[6] if len(line) > 6 else "NA"), "AveRSS": str(line[7] if len(line) > 6 else "NA")} + if packed: + # If it belongs to a wrapper + extra_data = {"ncpus": str(line[2] if len(line) > 2 else "NA"), + "submit": str(line[3] if len(line) > 3 else "NA"), + "start": str(line[4] if len(line) > 4 else "NA"), + "finish": str(line[5] if len(line) > 5 else "NA"), + "energy": str(line[6] if len(line) > 6 else "NA"), + "MaxRSS": str(line[7] if len(line) > 7 else "NA"), + "AveRSS": str(line[8] if len(line) > 8 else "NA")} + else: + # Normal job + extra_data = {"energy": str(line[6] if len(line) > 6 else "NA"), + "MaxRSS": str(line[7] if len(line) > 7 else "NA"), + "AveRSS": str(line[8] if len(line) > 8 else "NA")} + # Detailed data will contain the important information from output detailed_data[name] = extra_data submit = start = finish = joules = 0 status = "UNKNOWN" line = None if packed == False: + # If it is not wrapper job, take first line as source line = lines[0].strip().split() - #print("Unpacked {0}".format(line)) status = str(line[1]) - if status != "COMPLETED": - packed == True - if packed == True: - i = -1 - # It can happen that after this loop, there is no COMPLETED job information - while (status != "COMPLETED"): - if len(lines) >= i * -1: - line = lines[i].strip().split() - status = str(line[1]) - else: - break - - try: - submit = int(mktime(datetime.strptime( - line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) - start = int(mktime(datetime.strptime( - line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) - finish = int(mktime(datetime.strptime( - line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 4 and line[4] != "Unknown" else datetime.now().timestamp() - joules = self.parse_output_number( - line[5]) if len(line) > 5 and len(line[5]) > 0 else -1 - except Exception as exp: - Log.info("Parsing mishandling. Further attempt is necessary.") - pass + if status not in ["COMPLETED", "FAILED", "UNKNOWN"]: + # It not completed, then its error and send default data plus output + return (0, 0, 0, detailed_data) + else: + line = lines[0].strip().split() + # Check if the wrapper has finished + if str(line[1]) in ["COMPLETED", "FAILED", "UNKNOWN"]: + # Wrapper has finished + is_end_of_wrapper = True + if line: + try: + # Parse submit and start only for normal jobs (not packed) + submit = int(mktime(datetime.strptime( + line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 + start = int(mktime(datetime.strptime( + line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 + # Assuming the job has been COMPLETED + # If normal job or end of wrapper => Try to get the finish time from the first line of the output, else default to now. + finish = (int(mktime(datetime.strptime( + line[5], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 5 and line[5] != "Unknown" else datetime.now().timestamp()) if not packed or is_end_of_wrapper == True else 0 + # If normal job or end of wrapper => Try to get energy from first line + joules = (self.parse_output_number( + line[6]) if len(line) > 6 and len(line[6]) > 0 else 0) if not packed or is_end_of_wrapper == True else 0 + except Exception as exp: + Log.info( + "Parsing mishandling.") + # joules = -1 + pass + # print(detailed_data) + detailed_data = detailed_data if not packed or is_end_of_wrapper == True else extra_data + # print("Is packed {0}".format(packed)) + # print("Is end of wrapper {0}".format(is_end_of_wrapper)) + # print("Submit {0}".format(submit)) + # print(start) + # print(finish) + # print(joules) # print(detailed_data) return (submit, start, finish, joules, detailed_data) @@ -186,7 +215,8 @@ class SlurmPlatform(ParamikoPlatform): except Exception as exp: # On error return 4*0 # print(exp) - Log.warning("From _update_exp_status: {0}".format(str(exp))) + Log.warning( + "Autosubmit couldn't parse SLURM energy output. From parse_job_finish_data: {0}".format(str(exp))) return (0, 0, 0, 0, dict()) def parse_output_number(self, string_number): @@ -255,7 +285,7 @@ class SlurmPlatform(ParamikoPlatform): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): - return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS%20,AveRSS%20'.format(job_id) + return 'sacct -n -j {0} -o JobId%25,State,NCPUS,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() -- GitLab From 3c3200b42b0cd7cdc17618357c41b1d2e80c1836 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 21 Aug 2020 11:42:01 +0200 Subject: [PATCH 5/9] Added parent list to extra_data column --- autosubmit/database/db_jobdata.py | 9 ++++++++- autosubmit/job/job.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 860679958..be14bc8c6 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -634,7 +634,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[]): """Writes the finish time into the database Args: @@ -677,6 +677,13 @@ class JobDataStructure(MainDataBase): Log.info(traceback.format_exc()) Log.warning(str(exp)) #energy = 0 + try: + extra_data["parents"] =[int(item) for item in parent_id_list] + except Exception as inner_exp: + Log.debug("Parent Id List couldn't be parsed to array of int. Using default values.") + extra_data["parents"] = parent_id_list + pass + job_data_last.finish = finish_time if finish_time > 0 else int( finish) job_data_last.status = status diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c907cb387..f1aa08e21 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1021,7 +1021,7 @@ class Job(object): final_status = "FAILED" f.write('FAILED') JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) def check_started_after(self, date_limit): """ -- GitLab From 3d4709658b9219348970930f77a972c6dbd4f35d Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 28 Aug 2020 16:56:32 +0200 Subject: [PATCH 6/9] Added experiment_run table as header for the job_data table. Experiment_run controls the progress of an experiment. Added the proper validations and calls in the necessary functions of Autosubmit. --- autosubmit/autosubmit.py | 74 ++++- autosubmit/config/config_common.py | 243 +++++++++----- autosubmit/database/db_jobdata.py | 458 ++++++++++++++++++++++---- autosubmit/platforms/slurmplatform.py | 91 +++-- simple_test.py | 51 +++ 5 files changed, 717 insertions(+), 200 deletions(-) create mode 100644 simple_test.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 88e087ec7..8e6c949a3 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -647,6 +647,10 @@ class Autosubmit: Log.info("Removing experiment directory...") shutil.rmtree(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid_delete)) + os.remove(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + BasicConfig.STRUCTURES_DIR, "structure_{0}.db".format(expid_delete))) + os.remove(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid_delete))) except OSError as e: Log.warning('Can not delete experiment folder: {0}', e) return ret @@ -1221,7 +1225,6 @@ class Autosubmit: Log.critical("Current experiment uses ({0}) which is not the running Autosubmit version \nPlease, update the experiment version if you wish to continue using AutoSubmit {1}\nYou can achieve this using the command autosubmit updateversion {2} \n" "Or with the -v parameter: autosubmit run {2} -v ", as_conf.get_version(), Autosubmit.autosubmit_version, expid) return 1 - # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): @@ -1290,10 +1293,14 @@ class Autosubmit: job_list.save() Log.info( "Autosubmit is running with v{0}", Autosubmit.autosubmit_version) + # Before starting main loop, setup historical database tables and main information + job_data_structure = JobDataStructure(expid) + job_data_structure.validate_current_run( + job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) ######################### # AUTOSUBMIT - MAIN LOOP ######################### - # Update RUNNING database + # Update experiment RUNNING database ExperimentStatus(expid).update_running_status() # Main loop. Finishing when all jobs have been submitted while job_list.get_active(): @@ -1317,6 +1324,7 @@ class Autosubmit: check_wrapper_jobs_sleeptime)) save = False slurm = [] + job_changes_tracker = {} # to easily keep track of changes per iteration for platform in platforms_to_test: list_jobid = "" completed_joblist = [] @@ -1329,9 +1337,10 @@ class Autosubmit: Log.debug( 'Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] - if as_conf.get_notifications() == 'true': - for inner_job in wrapper_job.job_list: - inner_job.prev_status = inner_job.status + # if as_conf.get_notifications() == 'true': + # Setting prev_status as an easy way to check status change for inner jobs + for inner_job in wrapper_job.job_list: + inner_job.prev_status = inner_job.status check_wrapper = True if wrapper_job.status == Status.RUNNING: check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now( @@ -1371,6 +1380,12 @@ class Autosubmit: Status.VALUE_TO_KEY[inner_job.prev_status], Status.VALUE_TO_KEY[inner_job.status], as_conf.get_mails_to()) + # Detect and store changes + job_changes_tracker = {job.name: ( + job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} + job_data_structure.process_status_changes( + job_changes_tracker) + job_changes_tracker = {} else: # Prepare jobs, if slurm check all active jobs at once. job = job[0] prev_status = job.status @@ -1387,6 +1402,9 @@ class Autosubmit: else: # If they're not from slurm platform check one-by-one platform.check_job(job) if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): + # Keeping track of changes + job_changes_tracker[job.name] = ( + prev_status, job.status) if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, @@ -1409,6 +1427,9 @@ class Autosubmit: prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): + # Keeping track of changes + job_changes_tracker[job.name] = ( + prev_status, job.status) if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, @@ -1429,6 +1450,11 @@ class Autosubmit: if save: job_list.save() + # Safe spot to store changes + job_data_structure.process_status_changes( + job_changes_tracker) + job_changes_tracker = {} + if Autosubmit.exit: job_list.save() return 2 @@ -3098,9 +3124,12 @@ class Autosubmit: job_list.save() JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).reset_table() - groups_dict = dict() + # Setting up job historical database header. Must create a new run. + JobDataStructure(expid).validate_current_run(job_list.get_job_list( + ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True) + if not noplot: if group_by: status = list() @@ -3328,7 +3357,7 @@ class Autosubmit: BasicConfig.read() if not Autosubmit._check_Ownership(expid): Log.critical( - 'Can not change the status of experiment {0} due you are not the owner', expid) + 'Can not change the status of experiment {0} because you are not the owner', expid) return False exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) @@ -3354,6 +3383,7 @@ class Autosubmit: Log.debug('Status of jobs to change: {0}', filter_status) Log.debug('Sections to change: {0}', filter_section) wrongExpid = 0 + job_tracked_changes = {} as_conf = AutosubmitConfig( expid, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): @@ -3558,6 +3588,9 @@ class Autosubmit: ft = filter_chunks.split(",")[1:] if ft == 'Any': for job in job_list.get_job_list(): + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) else: @@ -3567,6 +3600,9 @@ class Autosubmit: if filter_chunks: jobs_filtered.append(job) else: + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) @@ -3719,6 +3755,9 @@ class Autosubmit: status = Status() for job in final_list: if job.status != final_status: + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) # Only real changes performed_changes[job.name] = str( Status.VALUE_TO_KEY[job.status]) + " -> " + str(final) @@ -3747,6 +3786,9 @@ class Autosubmit: if fc == 'Any': for job in jobs_filtered: + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) else: @@ -3765,10 +3807,16 @@ class Autosubmit: for chunk_json in member_json['cs']: chunk = int(chunk_json) for job in filter(lambda j: j.chunk == chunk and j.synchronize is not None, jobs_date): + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) for job in filter(lambda j: j.chunk == chunk, jobs_member): + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) @@ -3778,12 +3826,18 @@ class Autosubmit: Log.debug("Filtering jobs with status {0}", filter_status) if status_list == 'Any': for job in job_list.get_job_list(): + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) else: for status in status_list: fs = Autosubmit._get_status(status) for job in filter(lambda j: j.status == fs, job_list.get_job_list()): + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) @@ -3806,6 +3860,9 @@ class Autosubmit: else: for job in job_list.get_job_list(): if job.name in jobs: + # Tracking changes + job_tracked_changes[job.name] = ( + job.status, final_status) Autosubmit.change_status( final, final_status, job, save) @@ -3813,6 +3870,9 @@ class Autosubmit: if save and wrongExpid == 0: job_list.save() + job_data_structure = JobDataStructure(expid) + job_data_structure.process_status_changes( + job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) else: Log.warning( "Changes NOT saved to the JobList!!!!: use -s option to save") diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d1dae9b1c..93cffcebe 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -111,11 +111,12 @@ class AutosubmitConfig(object): if os.path.exists(self._proj_parser_file): with open(self._proj_parser_file, 'r+') as f: first_line = f.readline() - #if not re.match('\[[a-zA-Z0-9_]*\]', first_line): + # if not re.match('\[[a-zA-Z0-9_]*\]', first_line): if not re.match('^\[[^\[\]\# \t\n]*\][ \t]*$|^[ \t]+\[[^\[\]# \t\n]*\]', first_line): content = f.read() f.seek(0, 0) - f.write('[DEFAULT]'.rstrip('\r\n') + '\n' + first_line + content) + f.write('[DEFAULT]'.rstrip('\r\n') + + '\n' + first_line + content) @property def jobs_file(self): @@ -155,6 +156,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') + def get_processors(self, section): """ Gets processors needed for the given job type @@ -223,6 +225,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'USER_TO', '').lower() + def get_current_user(self, section): """ Returns the user to be changed from platform config file. @@ -240,6 +243,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'HOST', '').lower() + def get_current_project(self, section): """ Returns the project to be changed from platform config file. @@ -248,6 +252,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'PROJECT', '').lower() + def set_new_user(self, section, new_user): """ Sets new user for given platform @@ -257,12 +262,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -270,10 +275,13 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_user = self.get_current_user(section) - contentToMod = contentToMod.replace(re.search(r'[^#]\bUSER\b =.*', contentToMod).group(0)[1:], "USER = " + new_user) - contentToMod = contentToMod.replace(re.search(r'[^#]\bUSER_TO\b =.*', contentToMod).group(0)[1:], "USER_TO = " + old_user) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bUSER\b =.*', contentToMod).group(0)[1:], "USER = " + new_user) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bUSER_TO\b =.*', contentToMod).group(0)[1:], "USER_TO = " + old_user) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) + def set_new_host(self, section, new_host): """ Sets new host for given platform @@ -283,12 +291,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -296,10 +304,13 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_host = self.get_current_host(section) - contentToMod = contentToMod.replace(re.search(r'[^#]\bHOST\b =.*', contentToMod).group(0)[1:], "HOST = " + new_host) - contentToMod = contentToMod.replace(re.search(r'[^#]\bHOST_TO\b =.*', contentToMod).group(0)[1:], "HOST_TO = " + old_host) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bHOST\b =.*', contentToMod).group(0)[1:], "HOST = " + new_host) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bHOST_TO\b =.*', contentToMod).group(0)[1:], "HOST_TO = " + old_host) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) + def get_migrate_project_to(self, section): """ Returns the project to change to from platform config file. @@ -308,6 +319,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'PROJECT_TO', '').lower() + def get_migrate_host_to(self, section): """ Returns the host to change to from platform config file. @@ -316,6 +328,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'HOST_TO', "none").lower() + def set_new_project(self, section, new_project): """ Sets new project for given platform @@ -325,12 +338,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -338,12 +351,13 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_project = self.get_current_project(section) - contentToMod = contentToMod.replace(re.search(r"[^#]\bPROJECT\b =.*", contentToMod).group(0)[1:], "PROJECT = " + new_project) - contentToMod = contentToMod.replace(re.search(r"[^#]\bPROJECT_TO\b =.*", contentToMod).group(0)[1:], "PROJECT_TO = " + old_project) + contentToMod = contentToMod.replace(re.search( + r"[^#]\bPROJECT\b =.*", contentToMod).group(0)[1:], "PROJECT = " + new_project) + contentToMod = contentToMod.replace(re.search( + r"[^#]\bPROJECT_TO\b =.*", contentToMod).group(0)[1:], "PROJECT_TO = " + old_project) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) - def get_custom_directives(self, section): """ Gets custom directives needed for the given job type @@ -385,12 +399,18 @@ class AutosubmitConfig(object): result = True self._conf_parser.read(self._conf_parser_file) - result = result and self._conf_parser.check_exists('config', 'AUTOSUBMIT_VERSION') - result = result and self._conf_parser.check_is_int('config', 'MAXWAITINGJOBS', True) - result = result and self._conf_parser.check_is_int('config', 'TOTALJOBS', True) - result = result and self._conf_parser.check_is_int('config', 'SAFETYSLEEPTIME', True) - result = result and self._conf_parser.check_is_int('config', 'RETRIALS', True) - result = result and self._conf_parser.check_is_boolean('mail', 'NOTIFICATIONS', False) + result = result and self._conf_parser.check_exists( + 'config', 'AUTOSUBMIT_VERSION') + result = result and self._conf_parser.check_is_int( + 'config', 'MAXWAITINGJOBS', True) + result = result and self._conf_parser.check_is_int( + 'config', 'TOTALJOBS', True) + result = result and self._conf_parser.check_is_int( + 'config', 'SAFETYSLEEPTIME', True) + result = result and self._conf_parser.check_is_int( + 'config', 'RETRIALS', True) + result = result and self._conf_parser.check_is_boolean( + 'mail', 'NOTIFICATIONS', False) result = result and self.is_valid_communications_library() result = result and self.is_valid_storage_type() if self.get_wrapper_type() != 'None': @@ -399,11 +419,13 @@ class AutosubmitConfig(object): if self.get_notifications() == 'true': for mail in self.get_mails_to(): if not self.is_valid_mail_address(mail): - Log.warning('One or more of the email addresses configured for the mail notifications are wrong') + Log.warning( + 'One or more of the email addresses configured for the mail notifications are wrong') break if not result: - Log.critical("{0} is not a valid config file".format(os.path.basename(self._conf_parser_file))) + Log.critical("{0} is not a valid config file".format( + os.path.basename(self._conf_parser_file))) else: Log.info('{0} OK'.format(os.path.basename(self._conf_parser_file))) return result @@ -423,25 +445,35 @@ class AutosubmitConfig(object): Log.error('There are repeated platforms names') for section in self._platforms_parser.sections(): - result = result and self._platforms_parser.check_exists(section, 'TYPE') - platform_type = self._platforms_parser.get_option(section, 'TYPE', '').lower() + result = result and self._platforms_parser.check_exists( + section, 'TYPE') + platform_type = self._platforms_parser.get_option( + section, 'TYPE', '').lower() if platform_type != 'ps': - result = result and self._platforms_parser.check_exists(section, 'PROJECT') - result = result and self._platforms_parser.check_exists(section, 'USER') - - result = result and self._platforms_parser.check_exists(section, 'HOST') - result = result and self._platforms_parser.check_exists(section, 'SCRATCH_DIR') + result = result and self._platforms_parser.check_exists( + section, 'PROJECT') + result = result and self._platforms_parser.check_exists( + section, 'USER') + + result = result and self._platforms_parser.check_exists( + section, 'HOST') + result = result and self._platforms_parser.check_exists( + section, 'SCRATCH_DIR') result = result and self._platforms_parser.check_is_boolean(section, 'ADD_PROJECT_TO_HOST', False) - result = result and self._platforms_parser.check_is_boolean(section, 'TEST_SUITE', False) + result = result and self._platforms_parser.check_is_boolean( + section, 'TEST_SUITE', False) result = result and self._platforms_parser.check_is_int(section, 'MAX_WAITING_JOBS', False) - result = result and self._platforms_parser.check_is_int(section, 'TOTAL_JOBS', False) + result = result and self._platforms_parser.check_is_int( + section, 'TOTAL_JOBS', False) if not result: - Log.critical("{0} is not a valid config file".format(os.path.basename(self._platforms_parser_file))) + Log.critical("{0} is not a valid config file".format( + os.path.basename(self._platforms_parser_file))) else: - Log.info('{0} OK'.format(os.path.basename(self._platforms_parser_file))) + Log.info('{0} OK'.format( + os.path.basename(self._platforms_parser_file))) return result def check_jobs_conf(self): @@ -464,10 +496,12 @@ class AutosubmitConfig(object): for section in sections: result = result and parser.check_exists(section, 'FILE') - result = result and parser.check_is_boolean(section, 'RERUN_ONLY', False) + result = result and parser.check_is_boolean( + section, 'RERUN_ONLY', False) if parser.has_option(section, 'PLATFORM'): - result = result and parser.check_is_choice(section, 'PLATFORM', False, platforms) + result = result and parser.check_is_choice( + section, 'PLATFORM', False, platforms) if parser.has_option(section, 'DEPENDENCIES'): for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).split(' '): @@ -496,10 +530,11 @@ class AutosubmitConfig(object): 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, dependency)) result = result and parser.check_is_choice(section, 'RUNNING', False, - ['once', 'date', 'member', 'chunk']) + ['once', 'date', 'member', 'chunk']) if not result: - Log.critical("{0} is not a valid config file".format(os.path.basename(self._jobs_parser_file))) + Log.critical("{0} is not a valid config file".format( + os.path.basename(self._jobs_parser_file))) else: Log.info('{0} OK'.format(os.path.basename(self._jobs_parser_file))) @@ -522,8 +557,10 @@ class AutosubmitConfig(object): result = result and parser.check_exists('experiment', 'MEMBERS') result = result and parser.check_is_choice('experiment', 'CHUNKSIZEUNIT', True, ['year', 'month', 'day', 'hour']) - result = result and parser.check_is_int('experiment', 'CHUNKSIZE', True) - result = result and parser.check_is_int('experiment', 'NUMCHUNKS', True) + result = result and parser.check_is_int( + 'experiment', 'CHUNKSIZE', True) + result = result and parser.check_is_int( + 'experiment', 'NUMCHUNKS', True) result = result and parser.check_is_choice('experiment', 'CALENDAR', True, ['standard', 'noleap']) @@ -534,22 +571,28 @@ class AutosubmitConfig(object): project_type = parser.get_option('project', 'PROJECT_TYPE', '') if project_type == 'git': - result = result and parser.check_exists('git', 'PROJECT_ORIGIN') - result = result and parser.check_exists('git', 'PROJECT_BRANCH') + result = result and parser.check_exists( + 'git', 'PROJECT_ORIGIN') + result = result and parser.check_exists( + 'git', 'PROJECT_BRANCH') elif project_type == 'svn': result = result and parser.check_exists('svn', 'PROJECT_URL') - result = result and parser.check_exists('svn', 'PROJECT_REVISION') + result = result and parser.check_exists( + 'svn', 'PROJECT_REVISION') elif project_type == 'local': - result = result and parser.check_exists('local', 'PROJECT_PATH') + result = result and parser.check_exists( + 'local', 'PROJECT_PATH') if project_type != 'none': - result = result and parser.check_exists('project_files', 'FILE_PROJECT_CONF') + result = result and parser.check_exists( + 'project_files', 'FILE_PROJECT_CONF') else: result = False if not result: - Log.critical("{0} is not a valid config file".format(os.path.basename(self._exp_parser_file))) + Log.critical("{0} is not a valid config file".format( + os.path.basename(self._exp_parser_file))) else: Log.info('{0} OK'.format(os.path.basename(self._exp_parser_file))) return result @@ -565,7 +608,8 @@ class AutosubmitConfig(object): if self._proj_parser_file == '': self._proj_parser = None else: - self._proj_parser = AutosubmitConfig.get_parser(self.parser_factory, self._proj_parser_file) + self._proj_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._proj_parser_file) return True except Exception as e: Log.error('Project conf file error: {0}', e) @@ -575,27 +619,36 @@ class AutosubmitConfig(object): result = True result = result and self.is_valid_jobs_in_wrapper() if not result: - Log.error("There are sections in JOBS_IN_WRAPPER that are not defined in your jobs.conf file") + Log.error( + "There are sections in JOBS_IN_WRAPPER that are not defined in your jobs.conf file") if 'horizontal' in self.get_wrapper_type(): - result = result and self._platforms_parser.check_exists(self.get_platform(), 'PROCESSORS_PER_NODE') - result = result and self._platforms_parser.check_exists(self.get_platform(), 'MAX_PROCESSORS') + result = result and self._platforms_parser.check_exists( + self.get_platform(), 'PROCESSORS_PER_NODE') + result = result and self._platforms_parser.check_exists( + self.get_platform(), 'MAX_PROCESSORS') if 'vertical' in self.get_wrapper_type(): - result = result and self._platforms_parser.check_exists(self.get_platform(), 'MAX_WALLCLOCK') + result = result and self._platforms_parser.check_exists( + self.get_platform(), 'MAX_WALLCLOCK') return result def reload(self): """ Creates parser objects for configuration files """ - self._conf_parser = AutosubmitConfig.get_parser(self.parser_factory, self._conf_parser_file) - self._platforms_parser = AutosubmitConfig.get_parser(self.parser_factory, self._platforms_parser_file) - self._jobs_parser = AutosubmitConfig.get_parser(self.parser_factory, self._jobs_parser_file) - self._exp_parser = AutosubmitConfig.get_parser(self.parser_factory, self._exp_parser_file) + self._conf_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._conf_parser_file) + self._platforms_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._platforms_parser_file) + self._jobs_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._jobs_parser_file) + self._exp_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._exp_parser_file) if self._proj_parser_file == '': self._proj_parser = None else: - self._proj_parser = AutosubmitConfig.get_parser(self.parser_factory, self._proj_parser_file) + self._proj_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._proj_parser_file) def load_parameters(self): """ @@ -650,12 +703,14 @@ class AutosubmitConfig(object): # Experiment conf content = open(self._exp_parser_file).read() if re.search('EXPID =.*', content): - content = content.replace(re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) + content = content.replace( + re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) open(self._exp_parser_file, 'w').write(content) content = open(self._conf_parser_file).read() if re.search('EXPID =.*', content): - content = content.replace(re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) + content = content.replace( + re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) open(self._conf_parser_file, 'w').write(content) def get_project_type(self): @@ -702,6 +757,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._exp_parser.get_option('git', 'PROJECT_BRANCH', 'master') + def get_git_project_commit(self): """ Returns git commit from experiment's config file @@ -710,6 +766,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._exp_parser.get_option('git', 'PROJECT_COMMIT', None) + def get_git_remote_project_root(self): """ Returns remote machine ROOT PATH @@ -718,6 +775,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._exp_parser.get_option('git', 'REMOTE_CLONE_ROOT', '') + def get_submodules_list(self): """ Returns submodules list from experiment's config file @@ -725,7 +783,7 @@ class AutosubmitConfig(object): :return: submodules to load :rtype: list """ - return ' '.join(self._exp_parser.get_option('git', 'PROJECT_SUBMODULES','').split()).split() + return ' '.join(self._exp_parser.get_option('git', 'PROJECT_SUBMODULES', '').split()).split() def get_fetch_single_branch(self): """ @@ -735,6 +793,7 @@ class AutosubmitConfig(object): :rtype: boolean """ return self._exp_parser.get_option('git', 'FETCH_SINGLE_BRANCH', 'False').lower() + def get_project_destination(self): """ Returns git commit from experiment's config file @@ -749,7 +808,8 @@ class AutosubmitConfig(object): elif self.get_project_type().lower() == "svn": value = self.get_svn_project_url().split('/')[-1] elif self.get_project_type().lower() == "git": - value = self.get_git_project_origin().split('/')[-1].split('.')[-2] + value = self.get_git_project_origin().split( + '/')[-1].split('.')[-2] return value def set_git_project_commit(self, as_conf): @@ -758,7 +818,7 @@ class AutosubmitConfig(object): :param as_conf: Configuration class for exteriment :type as_conf: AutosubmitConfig """ - full_project_path=as_conf.get_project_dir() + full_project_path = as_conf.get_project_dir() try: output = subprocess.check_output("cd {0}; git rev-parse --abbrev-ref HEAD".format(full_project_path), shell=True) @@ -769,7 +829,8 @@ class AutosubmitConfig(object): project_branch = output Log.debug("Project branch is: " + project_branch) try: - output = subprocess.check_output("cd {0}; git rev-parse HEAD".format(full_project_path), shell=True) + output = subprocess.check_output( + "cd {0}; git rev-parse HEAD".format(full_project_path), shell=True) except subprocess.CalledProcessError: Log.critical("Failed to retrieve project commit SHA...") return False @@ -785,7 +846,8 @@ class AutosubmitConfig(object): content = content.replace(re.search('PROJECT_COMMIT =.*', content).group(0), "PROJECT_COMMIT = " + project_sha) open(self._exp_parser_file, 'w').write(content) - Log.debug("Project commit SHA succesfully registered to the configuration file.") + Log.debug( + "Project commit SHA succesfully registered to the configuration file.") return True def get_svn_project_url(self): @@ -834,7 +896,8 @@ class AutosubmitConfig(object): if split_in.find("-") != -1: numbers = split_in.split("-") for count in range(int(numbers[0]), int(numbers[1]) + 1): - date_list.append(parse_date(string_date + str(count).zfill(len(numbers[0])))) + date_list.append(parse_date( + string_date + str(count).zfill(len(numbers[0])))) else: date_list.append(parse_date(string_date + split_in)) string_date = None @@ -863,7 +926,8 @@ class AutosubmitConfig(object): :return: initial chunk :rtype: int """ - chunk_ini = self._exp_parser.get_option('experiment', 'CHUNKINI', default) + chunk_ini = self._exp_parser.get_option( + 'experiment', 'CHUNKINI', default) if chunk_ini == '': return default return int(chunk_ini) @@ -877,6 +941,18 @@ class AutosubmitConfig(object): """ return self._exp_parser.get('experiment', 'CHUNKSIZEUNIT').lower() + def get_chunk_size(self, default=1): + """ + Chunk Size as defined in the expdef file. + + :return: Chunksize, 1 as default. + :rtype: int + """ + chunk_size = self._exp_parser.get('experiment', 'CHUNKSIZE', default) + if chunk_size == '': + return default + return int(chunk_size) + def get_member_list(self): """ Returns members list from experiment's config file @@ -896,7 +972,8 @@ class AutosubmitConfig(object): if split_in.find("-") != -1: numbers = split_in.split("-") for count in range(int(numbers[0]), int(numbers[1]) + 1): - member_list.append(string_member + str(count).zfill(len(numbers[0]))) + member_list.append( + string_member + str(count).zfill(len(numbers[0]))) else: member_list.append(string_member + split_in) string_member = None @@ -945,7 +1022,8 @@ class AutosubmitConfig(object): """ content = open(self._exp_parser_file).read() if re.search('HPCARCH =.*', content): - content = content.replace(re.search('HPCARCH =.*', content).group(0), "HPCARCH = " + hpc) + content = content.replace( + re.search('HPCARCH =.*', content).group(0), "HPCARCH = " + hpc) open(self._exp_parser_file, 'w').write(content) def set_version(self, autosubmit_version): @@ -968,7 +1046,8 @@ class AutosubmitConfig(object): :return: version :rtype: str """ - return self._conf_parser.get('config', 'AUTOSUBMIT_VERSION' , 'None') + return self._conf_parser.get('config', 'AUTOSUBMIT_VERSION', 'None') + def get_total_jobs(self): """ Returns max number of running jobs from autosubmit's config file @@ -977,7 +1056,7 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get('config', 'TOTALJOBS')) - + def get_output_type(self): """ Returns default output type, pdf if none @@ -985,7 +1064,7 @@ class AutosubmitConfig(object): :return: output type :rtype: string """ - return self._conf_parser.get_option('config', 'OUTPUT','pdf') + return self._conf_parser.get_option('config', 'OUTPUT', 'pdf') def get_max_wallclock(self): """ @@ -1001,7 +1080,8 @@ class AutosubmitConfig(object): :rtype: str """ - config_value = self._conf_parser.get_option('config', 'MAX_PROCESSORS', None) + config_value = self._conf_parser.get_option( + 'config', 'MAX_PROCESSORS', None) return int(config_value) if config_value is not None else config_value def get_max_waiting_jobs(self): @@ -1068,11 +1148,13 @@ class AutosubmitConfig(object): :return: if remote dependencies :rtype: bool """ - config_value = self._conf_parser.get_option('config', 'PRESUBMISSION', 'false').lower() + config_value = self._conf_parser.get_option( + 'config', 'PRESUBMISSION', 'false').lower() if config_value == "true": return True else: return False + def get_wrapper_type(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config @@ -1081,6 +1163,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + def get_wrapper_policy(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config @@ -1098,6 +1181,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + def get_wrapper_queue(self): """ Returns the wrapper queue if not defined, will be the one of the first job wrapped @@ -1106,6 +1190,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'QUEUE', 'None') + def get_min_wrapped_jobs(self): """ Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file @@ -1123,6 +1208,7 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) + def get_wrapper_method(self): """ Returns the method of make the wrapper @@ -1131,6 +1217,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'METHOD', 'ASThread') + def get_wrapper_check_time(self): """ Returns time to check the status of jobs in the wrapper diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index be14bc8c6..841f6e4a8 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -31,11 +31,12 @@ from datetime import datetime from json import dumps #from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig +from autosubmit.job.job_common import Status from autosubmit.job.job_package_persistence import JobPackagePersistence from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates -CURRENT_DB_VERSION = 10 +CURRENT_DB_VERSION = 12 # Used to be 10 # Defining RowType standard @@ -44,19 +45,72 @@ class RowType: #PACKED = 2 -_debug = False +_debug = True JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', - 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data']) + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id']) + +ExperimentRunItem = collections.namedtuple('ExperimentRunItem', [ + 'run_id', 'created', 'start', 'finish', 'chunk_unit', 'chunk_size', 'completed', 'total', 'failed', 'queuing', 'running', 'submitted']) ExperimentRow = collections.namedtuple( 'ExperimentRow', ['exp_id', 'expid', 'status', 'seconds']) +class ExperimentRun(): + + def __init__(self, run_id, created=None, start=0, finish=0, chunk_unit="NA", chunk_size=0, completed=0, total=0, failed=0, queuing=0, running=0, submitted=0): + self.run_id = run_id + self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + self.start = start + self.finish = finish + self.chunk_unit = chunk_unit + self.chunk_size = chunk_size + self.submitted = submitted + self.queuing = queuing + self.running = running + self.completed = completed + self.failed = failed + self.total = total + + def _increase_counter(self, status): + if status == Status.FAILED: + self.failed += 1 + elif status == Status.SUBMITTED: + self.submitted += 1 + elif status == Status.QUEUING: + self.queuing += 1 + elif status == Status.RUNNING: + self.running += 1 + elif status == Status.COMPLETED: + self.completed += 1 if self.completed < self.total else 0 + else: + pass + + def _decrease_counter(self, status): + if status == Status.FAILED: + self.failed -= 1 if self.failed > 0 else 0 + elif status == Status.SUBMITTED: + self.submitted -= 1 if self.submitted > 0 else 0 + elif status == Status.QUEUING: + self.queuing -= 1 if self.queuing > 0 else 0 + elif status == Status.RUNNING: + self.running -= 1 if self.running > 0 else 0 + elif status == Status.COMPLETED: + self.completed -= 1 if self.completed > 0 else 0 + else: + pass + + def update_counters(self, prev_status, status): + if prev_status != status: + self._increase_counter(status) + self._decrease_counter(prev_status) + + class JobData(): """Job Data object """ - def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()): + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict(), nnodes=0, run_id=None): """[summary] Args: @@ -105,6 +159,8 @@ class JobData(): platform) > 0 else "NA" self.job_id = job_id if job_id else 0 self.extra_data = dumps(extra_data) + self.nnodes = nnodes + self.run_id = run_id @property def submit(self): @@ -168,6 +224,8 @@ class MainDataBase(): self.conn = None self.conn_ec = None self.create_table_query = None + self.create_table_header_query = None + self.version_schema_changes = [] def create_connection(self, db_file): """ @@ -181,7 +239,7 @@ class MainDataBase(): except: return None - def create_table(self): + def create_table(self, statement): """ create a table from the create_table_sql statement :param conn: Connection object :param create_table_sql: a CREATE TABLE statement @@ -190,7 +248,8 @@ class MainDataBase(): try: if self.conn: c = self.conn.cursor() - c.execute(self.create_table_query) + c.execute(statement) + self.conn.commit() else: raise IOError("Not a valid connection") except IOError as exp: @@ -210,6 +269,7 @@ class MainDataBase(): if self.conn: c = self.conn.cursor() c.execute(self.create_index_query) + self.conn.commit() else: raise IOError("Not a valid connection") except IOError as exp: @@ -222,6 +282,36 @@ class MainDataBase(): Log.warning("Error on create index . create_index") return None + def update_table_schema(self): + """[summary] + """ + try: + if self.conn: + c = self.conn.cursor() + for item in self.version_schema_changes: + try: + c.execute(item) + except sqlite3.Error as e: + if _debug == True: + Log.info(str(type(e).__name__)) + Log.debug(str(type(e).__name__)) + Log.warning( + "Error on updating table schema statement. It is safe to ignore this message.") + pass + self.conn.commit() + else: + raise IOError("Not a valid connection") + except IOError as exp: + Log.warning(exp) + return None + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(str(exp)) + Log.warning( + "Error on updating table schema . update_table_schema.") + return None + class ExperimentStatus(MainDataBase): def __init__(self, expid): @@ -432,6 +522,10 @@ class JobDataStructure(MainDataBase): self.folder_path, "job_data_" + str(expid) + ".db") #self.conn = None self.jobdata_list = JobDataList(self.expid) + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0") + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN run_id INTEGER") # We use rowtype to identify a packed job self.create_table_query = textwrap.dedent( '''CREATE TABLE @@ -458,23 +552,57 @@ class JobDataStructure(MainDataBase): platform TEXT NOT NULL, job_id INTEGER NOT NULL, extra_data TEXT NOT NULL, + nnodes INTEGER NOT NULL DEFAULT 0, + run_id INTEGER, UNIQUE(counter,job_name) ); ''') - # Index creation should be in a different statement + + # Creating the header table + self.create_table_header_query = textwrap.dedent( + '''CREATE TABLE + IF NOT EXISTS experiment_run ( + run_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + created TEXT NOT NULL, + start INTEGER NOT NULL, + finish INTEGER, + chunk_unit TEXT NOT NULL, + chunk_size INTEGER NOT NULL, + completed INTEGER NOT NULL, + total INTEGER NOT NULL, + failed INTEGER NOT NULL, + queuing INTEGER NOT NULL, + running INTEGER NOT NULL, + submitted INTEGER NOT NULL + ); + ''') + + # Index creation is in a different statement self.create_index_query = textwrap.dedent(''' CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); ''') - + # print(self.database_path) if not os.path.exists(self.database_path): open(self.database_path, "w") self.conn = self.create_connection(self.database_path) - self.create_table() + self.create_table(self.create_table_header_query) + self.create_table(self.create_table_query) self.create_index() if self._set_pragma_version(CURRENT_DB_VERSION): Log.info("Database version set.") else: self.conn = self.create_connection(self.database_path) + db_version = self._select_pragma_version() + if db_version != CURRENT_DB_VERSION: + # Update to current version + Log.info("Database schema needs update.") + self.update_table_schema() + self.create_index() + self.create_table(self.create_table_header_query) + if self._set_pragma_version(CURRENT_DB_VERSION): + Log.info("Database version set to {0}.".format( + CURRENT_DB_VERSION)) + self.current_run_id = self.get_current_run_id() def determine_rowtype(self, code): """ @@ -490,6 +618,87 @@ class JobDataStructure(MainDataBase): else: return RowType.NORMAL + def get_current_run_id(self): + current_run = self.get_max_id_experiment_run() + if current_run: + return current_run.run_id + else: + new_run = ExperimentRun(0) + return self._insert_experiment_run(new_run) + + def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0): + current_run = self.get_max_id_experiment_run() + if current_run: + if tracking_dictionary is not None and bool(tracking_dictionary) == True: + if job_list: + current_date_member_completed_count = sum( + 1 for job in job_list if job.date is not None and job.member is not None and job.status == Status.COMPLETED) + if len(tracking_dictionary.keys()) >= int(current_date_member_completed_count * 0.9): + # If setstatus changes more than 90% of date-member completed jobs, it's a new run + # Must create a new experiment run + Log.result( + "Since a significant amount of jobs have changes status. Autosubmit will consider a new run of the same experiment.") + self.validate_current_run( + job_list, chunk_unit, chunk_size, True) + return None + for name, (prev_status, status) in tracking_dictionary.items(): + current_run.update_counters(prev_status, status) + self._update_experiment_run(current_run) + else: + raise Exception("Empty header database") + + def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False): + """[summary] + + :param job_list ([type]): [description] + :param chunk_unit (str, optional): [description]. Defaults to "NA". + :param chunk_size (int, optional): [description]. Defaults to 0. + :param must_create (bool, optional): [description]. Defaults to False. + + :return: [description] + """ + try: + if not job_list: + raise Exception( + "Autosubmit couldn't find the job_list. validate_current_run.") + current_run = self.get_max_id_experiment_run() + current_total = len(job_list) + completed_count = sum( + 1 for job in job_list if job.status == Status.COMPLETED) + failed_count = sum( + 1 for job in job_list if job.status == Status.FAILED) + queue_count = sum( + 1 for job in job_list if job.status == Status.QUEUING) + submit_count = sum( + 1 for job in job_list if job.status == Status.SUBMITTED) + running_count = sum( + 1 for job in job_list if job.status == Status.RUNNING) + + if not current_run or must_create == True: + new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, + current_total, failed_count, queue_count, running_count, submit_count) + self.current_run_id = self._insert_experiment_run(new_run) + else: + if current_run.total != current_total: + new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, + current_total, failed_count, queue_count, running_count, submit_count) + self.current_run_id = self._insert_experiment_run(new_run) + else: + current_run.completed = completed_count + current_run.failed = failed_count + current_run.queuing = queue_count + current_run.submitted = submit_count + current_run.running = running_count + self._update_experiment_run(current_run) + self.current_run_id = current_run.run_id + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning( + "Autosubmit couldn't insert a new experiment run register. validate_current_run {0}".format(str(exp))) + return None + def get_job_package_code(self, current_job_name): """ Finds the package code and retrieves it. None if no package. @@ -520,9 +729,7 @@ class JobDataStructure(MainDataBase): if (packages): try: for exp, package_name, job_name in packages: - #print("Looking for {0}".format(current_job_name)) if current_job_name == job_name: - # print(package_name) code = int(package_name.split("_")[2]) return code except Exception as ex: @@ -571,8 +778,7 @@ class JobDataStructure(MainDataBase): current_counter = max_counter # Insert new last rowid = self._insert_job_data(JobData( - 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(self.get_job_package_code(job_name)), ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) - # print(rowid) + 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(self.get_job_package_code(job_name)), ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id)) if rowid: return True else: @@ -590,18 +796,20 @@ class JobDataStructure(MainDataBase): """Writes start time into the database Args: - job_name (str): Name of Job - start (int, optional): Start time. Defaults to 0. - status (str, optional): Status of job. Defaults to "UNKWNONW". - ncpus (int, optional): Number of cpis. Defaults to 0. - wallclock (str, optional): Wallclock value. Defaults to "00:00". - qos (str, optional): Name of QoS. Defaults to "debug". - date (str, optional): Date from config. Defaults to "". - member (str, optional): Member from config. Defaults to "". + job_name ([type]): [description] + start (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKWNONW". + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + date (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". section (str, optional): [description]. Defaults to "". chunk (int, optional): [description]. Defaults to 0. platform (str, optional): [description]. Defaults to "NA". job_id (int, optional): [description]. Defaults to 0. + packed (bool, optional): [description]. Defaults to False. + nnodes (int, optional): [description]. Defaults to 0. Returns: [type]: [description] @@ -620,11 +828,11 @@ class JobDataStructure(MainDataBase): return _updated # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed) if submit_inserted: # print("retro start") self.write_start_time(job_name, start, status, - ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed) return True else: return None @@ -658,8 +866,8 @@ class JobDataStructure(MainDataBase): try: # print("Writing finish time \t" + str(job_name) + "\t" + str(finish)) job_data_last = self.get_job_data_last(job_name) - energy = 0 - submit_time = start_time = finish_time = 0 + # energy = 0 + submit_time = start_time = finish_time = number_nodes = number_cpus = energy = 0 extra_data = dict() # Updating existing row if job_data_last: @@ -671,24 +879,28 @@ class JobDataStructure(MainDataBase): if type(platform_object) is not str: if platform_object.type == "slurm": # print("Checking Slurm for " + str(job_name)) - submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( + submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data = platform_object.check_job_energy( job_id, packed) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) #energy = 0 try: - extra_data["parents"] =[int(item) for item in parent_id_list] + extra_data["parents"] = [int(item) + for item in parent_id_list] except Exception as inner_exp: - Log.debug("Parent Id List couldn't be parsed to array of int. Using default values.") + Log.debug( + "Parent Id List couldn't be parsed to array of int. Using default values.") extra_data["parents"] = parent_id_list pass - + job_data_last.finish = finish_time if finish_time > 0 else int( finish) job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy + job_data_last.ncpus = number_cpus if number_cpus > 0 else job_data_last.ncpus + job_data_last.nnodes = number_nodes if number_nodes > 0 else job_data_last.nnodes job_data_last.extra_data = dumps( extra_data) if extra_data else "NA" job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') @@ -709,7 +921,7 @@ class JobDataStructure(MainDataBase): if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, packed) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, packed, number_nodes) else: return None except Exception as exp: @@ -774,7 +986,7 @@ class JobDataStructure(MainDataBase): # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item job_item = JobItem(*item) self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) else: raise Exception("Job data folder not found :" + @@ -802,12 +1014,9 @@ class JobDataStructure(MainDataBase): if os.path.exists(self.folder_path): current_job = self._get_job_data(job_name) for item in current_job: - # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item job_item = JobItem(*item) job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) - # job_data.append(JobData(_id, _counter, _job_name, _created, _modified, - # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) return job_data else: raise Exception("Job data folder not found :" + @@ -841,6 +1050,27 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't retrieve job data. get_job_data_last") return None + def get_max_id_experiment_run(self): + try: + #expe = list() + if os.path.exists(self.folder_path): + current_experiment_run = self._get_max_id_experiment_run() + if current_experiment_run: + exprun_item = ExperimentRunItem(*current_experiment_run) + return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted) + else: + return None + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning( + "Autosubmit couldn't retrieve experiment run header. get_max_id_experiment_run") + return None + def get_job_data_last(self, job_name): """ Returns latest jobdata row for a job_name. The current version. @@ -860,11 +1090,8 @@ class JobDataStructure(MainDataBase): if current_job_last: for current in current_job_last: job_item = JobItem(*current) - # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = current_job_last - # return JobData(_id, _counter, _job_name, _created, _modified, - # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform) jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) return jobdata else: return None @@ -905,7 +1132,7 @@ class JobDataStructure(MainDataBase): return None def _update_start_job_data(self, jobdata): - """Update start time of job data row + """Update job_data by id. Updates start, modified, job_id, status. Args: jobdata ([type]): [description] @@ -931,7 +1158,7 @@ class JobDataStructure(MainDataBase): return None def _update_finish_job_data_plus(self, jobdata): - """Updates the finish job data, also updates submit, start times. + """Updates job_data by id. Updates submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus Args: jobdata (JobData): JobData object @@ -941,10 +1168,10 @@ class JobDataStructure(MainDataBase): """ try: if self.conn: - sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' + sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -956,7 +1183,7 @@ class JobDataStructure(MainDataBase): return None def _update_finish_job_data(self, jobdata): - """Update register with id. Updates finish, modified, status. + """Update register by id. Updates finish, modified, job_id, status, energy, extra_data, nnodes, ncpus Args: jobdata ([type]): [description] @@ -967,10 +1194,10 @@ class JobDataStructure(MainDataBase): try: if self.conn: # print("Updating finish time") - sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' + sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -981,21 +1208,42 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Update : " + str(type(e).__name__)) return None - def _insert_job_data(self, jobdata): - """[summary] + def _update_experiment_run(self, experiment_run): + """Updates experiment run row by run_id (finish, chunk_unit, chunk_size, completed, total, failed, queuing, running, submitted) - Args: - jobdata ([type]): JobData object + :param experiment_run: Object representation of experiment run row + :type experiment_run: ExperimentRun object - Returns: - [type]: None if error, lastrowid if correct + :return: None + """ + try: + if self.conn: + sql = ''' UPDATE experiment_run SET finish=?, chunk_unit=?, chunk_size=?, completed=?, total=?, failed=?, queuing=?, running=?, submitted=? WHERE run_id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size, + experiment_run.completed, experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.run_id)) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Error on update experiment_run : " + + str(type(e).__name__)) + return None + + def _insert_job_data(self, jobdata): + """[summary] + Inserts a new job_data register. + :param jobdata: JobData object """ try: if self.conn: #print("preparing to insert") - sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, - jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data) + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data, jobdata.nnodes, jobdata.run_id) cur = self.conn.cursor() #print("pre insert") cur.execute(sql, tuplerow) @@ -1013,11 +1261,36 @@ class JobDataStructure(MainDataBase): "\t " + str(jobdata.job_name) + "\t" + str(jobdata.counter)) return None + def _insert_experiment_run(self, experiment_run): + """[summary] + Inserts a new experiment_run register. + :param experiment_run: ExperimentRun object + """ + try: + if self.conn: + #print("preparing to insert") + sql = ''' INSERT INTO experiment_run(created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted) VALUES(?,?,?,?,?,?,?,?,?,?,?) ''' + tuplerow = (experiment_run.created, experiment_run.start, experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size, experiment_run.completed, + experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted) + cur = self.conn.cursor() + cur.execute(sql, tuplerow) + self.conn.commit() + return cur.lastrowid + else: + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Error on insert on experiment_run: {0}".format( + str(type(e).__name__))) + return None + def _get__all_job_data(self): """ Get all registers from job_data.\n - :return: row content: exp_id, name, status, seconds_diff - :rtype: 4-tuple (int, str, str, int) + :return: row content: + :rtype: 23-tuple """ try: #conn = create_connection(path) @@ -1025,7 +1298,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data") + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data") rows = cur.fetchall() return rows else: @@ -1051,7 +1324,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() # print(rows) return rows @@ -1078,7 +1351,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() if rows and len(rows) > 0: return rows @@ -1095,7 +1368,8 @@ class JobDataStructure(MainDataBase): def _get_job_data_pending(self): """ - Gets the list of job_id, job_name of those jobs that have pending information. + Gets the list of job_id, job_name of those jobs that have pending information. + This function is no longer used. """ try: if self.conn: @@ -1118,17 +1392,17 @@ class JobDataStructure(MainDataBase): def _set_pragma_version(self, version=2): """Sets current version of the schema - Args: - version (int, optional): Current Version. Defaults to 1. - - Returns: - Boolean/None: True if success, None if error + :param version: Current Version. Defaults to 1. + :type version: (int, optional) + :return: current version, None + :rtype: (int, None) """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() - cur.execute("pragma user_version={v:d}".format(v=version)) + # print("Setting version") + cur.execute("pragma user_version={v:d};".format(v=version)) self.conn.commit() return True except sqlite3.Error as e: @@ -1138,6 +1412,32 @@ class JobDataStructure(MainDataBase): Log.warning("Error on version : " + str(type(e).__name__)) return None + def _select_pragma_version(self): + """[summary] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("pragma user_version;") + rows = cur.fetchall() + if len(rows) > 0: + # print(rows) + #print("Row " + str(rows[0])) + result, = rows[0] + # print(result) + return int(result) if result >= 0 else None + else: + # Starting value + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Error while retrieving version: " + + str(type(e).__name__)) + return None + def _get_maxcounter_jobdata(self): """Return the maxcounter of the experiment @@ -1164,3 +1464,29 @@ class JobDataStructure(MainDataBase): Log.debug(traceback.format_exc()) Log.warning("Error on Select Max : " + str(type(e).__name__)) return None + + def _get_max_id_experiment_run(self): + """Return the max id from experiment_run + + :return: max run_id, None + :rtype: int, None + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted from experiment_run ORDER BY run_id DESC LIMIT 0, 1") + rows = cur.fetchall() + if len(rows) > 0: + return rows[0] + else: + return None + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.debug(traceback.format_exc()) + Log.warning("Error on select max run_id : " + + str(type(e).__name__)) + return None diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index be41e30b9..2854ff9f6 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -120,24 +120,25 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_finish_data(self, output, packed): """Parses the context of the sacct query to SLURM for a single job. - Only normal jobs return submit, start, and finish times. - When a wrapper has finished, capture finish time. - - :param output: The sacct output - :type output: str - :param job_id: Id in SLURM for the job - :type job_id: int - :param packed: true if job belongs to package - :type packed: bool - :return: submit, start, finish, joules, detailed_data - :rtype: int, int, int, int, json object (str) + Only normal jobs return submit, start, finish, joules, ncpus, nnodes. + + When a wrapper has finished, capture finish time. + + :param output: The sacct output + :type output: str + :param job_id: Id in SLURM for the job + :type job_id: int + :param packed: true if job belongs to package + :type packed: bool + :return: submit, start, finish, joules, ncpus, nnodes, detailed_data + :rtype: int, int, int, int, int, int, json object (str) """ try: - # Storing detail for posterity + # Setting up: Storing detail for posterity detailed_data = dict() # No blank spaces after or before - output = output.strip() - lines = output.split("\n") + output = output.strip() if output else None + lines = output.split("\n") if output else [] is_end_of_wrapper = False extra_data = None # If there is output, list exists @@ -151,73 +152,65 @@ class SlurmPlatform(ParamikoPlatform): if packed: # If it belongs to a wrapper extra_data = {"ncpus": str(line[2] if len(line) > 2 else "NA"), - "submit": str(line[3] if len(line) > 3 else "NA"), - "start": str(line[4] if len(line) > 4 else "NA"), - "finish": str(line[5] if len(line) > 5 else "NA"), - "energy": str(line[6] if len(line) > 6 else "NA"), - "MaxRSS": str(line[7] if len(line) > 7 else "NA"), - "AveRSS": str(line[8] if len(line) > 8 else "NA")} + "nnodes": str(line[3] if len(line) > 3 else "NA"), + "submit": str(line[4] if len(line) > 4 else "NA"), + "start": str(line[5] if len(line) > 5 else "NA"), + "finish": str(line[6] if len(line) > 6 else "NA"), + "energy": str(line[7] if len(line) > 7 else "NA"), + "MaxRSS": str(line[8] if len(line) > 8 else "NA"), + "AveRSS": str(line[9] if len(line) > 9 else "NA")} else: # Normal job - extra_data = {"energy": str(line[6] if len(line) > 6 else "NA"), - "MaxRSS": str(line[7] if len(line) > 7 else "NA"), - "AveRSS": str(line[8] if len(line) > 8 else "NA")} + extra_data = {"energy": str(line[7] if len(line) > 7 else "NA"), + "MaxRSS": str(line[8] if len(line) > 8 else "NA"), + "AveRSS": str(line[9] if len(line) > 9 else "NA")} # Detailed data will contain the important information from output detailed_data[name] = extra_data - submit = start = finish = joules = 0 + submit = start = finish = joules = nnodes = ncpus = 0 status = "UNKNOWN" - line = None + # Take first line as source + line = lines[0].strip().split() + ncpus = int(line[2] if len(line) > 2 else 0) + nnodes = int(line[3] if len(line) > 3 else 0) + status = str(line[1]) if packed == False: # If it is not wrapper job, take first line as source - line = lines[0].strip().split() - status = str(line[1]) if status not in ["COMPLETED", "FAILED", "UNKNOWN"]: # It not completed, then its error and send default data plus output - return (0, 0, 0, detailed_data) + return (0, 0, 0, 0, ncpus, nnodes, detailed_data) else: - line = lines[0].strip().split() # Check if the wrapper has finished - if str(line[1]) in ["COMPLETED", "FAILED", "UNKNOWN"]: + if status in ["COMPLETED", "FAILED", "UNKNOWN"]: # Wrapper has finished is_end_of_wrapper = True if line: try: # Parse submit and start only for normal jobs (not packed) submit = int(mktime(datetime.strptime( - line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 - start = int(mktime(datetime.strptime( line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 + start = int(mktime(datetime.strptime( + line[5], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 # Assuming the job has been COMPLETED # If normal job or end of wrapper => Try to get the finish time from the first line of the output, else default to now. finish = (int(mktime(datetime.strptime( - line[5], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 5 and line[5] != "Unknown" else datetime.now().timestamp()) if not packed or is_end_of_wrapper == True else 0 + line[6], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 6 and line[6] != "Unknown" else datetime.now().timestamp()) if not packed or is_end_of_wrapper == True else 0 # If normal job or end of wrapper => Try to get energy from first line joules = (self.parse_output_number( - line[6]) if len(line) > 6 and len(line[6]) > 0 else 0) if not packed or is_end_of_wrapper == True else 0 + line[7]) if len(line) > 7 and len(line[7]) > 0 else 0) if not packed or is_end_of_wrapper == True else 0 except Exception as exp: Log.info( "Parsing mishandling.") # joules = -1 pass - # print(detailed_data) detailed_data = detailed_data if not packed or is_end_of_wrapper == True else extra_data - # print("Is packed {0}".format(packed)) - # print("Is end of wrapper {0}".format(is_end_of_wrapper)) - # print("Submit {0}".format(submit)) - # print(start) - # print(finish) - # print(joules) - # print(detailed_data) - return (submit, start, finish, joules, detailed_data) - - return (0, 0, 0, 0, dict()) + return (submit, start, finish, joules, ncpus, nnodes, detailed_data) + + return (0, 0, 0, 0, 0, 0, dict()) except Exception as exp: - # On error return 4*0 - # print(exp) Log.warning( "Autosubmit couldn't parse SLURM energy output. From parse_job_finish_data: {0}".format(str(exp))) - return (0, 0, 0, 0, dict()) + return (0, 0, 0, 0, 0, 0, dict()) def parse_output_number(self, string_number): """ @@ -285,7 +278,7 @@ class SlurmPlatform(ParamikoPlatform): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): - return 'sacct -n -j {0} -o JobId%25,State,NCPUS,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) + return 'sacct -n -j {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() diff --git a/simple_test.py b/simple_test.py new file mode 100644 index 000000000..48ba9dab2 --- /dev/null +++ b/simple_test.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +import datetime +import os +import time +import sys +import inspect +sys.path.insert(0, os.path.abspath('.')) +from autosubmit.autosubmit import Autosubmit +from autosubmit.config.config_common import AutosubmitConfig, BasicConfig +from bscearth.utils.config_parser import ConfigParserFactory +from autosubmit.job.job_list import JobList +from autosubmit.database.db_jobdata import JobDataStructure +from bscearth.utils.log import Log + + +def test_retrieve_energy(): + BasicConfig.read() + expid = "a2ze" + job_name = "a2ze_REMOTE_COMPILE" + as_conf = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not run with invalid configuration') + return False + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=False) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = "marenostrum4" + # noinspection PyTypeChecker + job.platform = submitter.platforms[job.platform_name.lower( + )] + + list_jobs = job_list.get_job_list() + job_honk = [job for job in list_jobs if job.name == job_name][0] + job_honk.write_end_time(True) + + +def main(): + job_structure = JobDataStructure('a29z') + # print(job_structure._select_pragma_version()) + return None + + +if __name__ == "__main__": + main() -- GitLab From 250cbb97486720857660799bdf58a650d976bb48 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 28 Aug 2020 17:00:38 +0200 Subject: [PATCH 7/9] Commenting testing file. This file can be deleted. --- simple_test.py | 102 ++++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/simple_test.py b/simple_test.py index 48ba9dab2..5fbbc0d55 100644 --- a/simple_test.py +++ b/simple_test.py @@ -1,51 +1,51 @@ -#!/usr/bin/env python -import datetime -import os -import time -import sys -import inspect -sys.path.insert(0, os.path.abspath('.')) -from autosubmit.autosubmit import Autosubmit -from autosubmit.config.config_common import AutosubmitConfig, BasicConfig -from bscearth.utils.config_parser import ConfigParserFactory -from autosubmit.job.job_list import JobList -from autosubmit.database.db_jobdata import JobDataStructure -from bscearth.utils.log import Log - - -def test_retrieve_energy(): - BasicConfig.read() - expid = "a2ze" - job_name = "a2ze_REMOTE_COMPILE" - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): - Log.critical('Can not run with invalid configuration') - return False - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=False) - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - - for job in job_list.get_job_list(): - if job.platform_name is None: - job.platform_name = "marenostrum4" - # noinspection PyTypeChecker - job.platform = submitter.platforms[job.platform_name.lower( - )] - - list_jobs = job_list.get_job_list() - job_honk = [job for job in list_jobs if job.name == job_name][0] - job_honk.write_end_time(True) - - -def main(): - job_structure = JobDataStructure('a29z') - # print(job_structure._select_pragma_version()) - return None - - -if __name__ == "__main__": - main() +# #!/usr/bin/env python +# import datetime +# import os +# import time +# import sys +# import inspect +# sys.path.insert(0, os.path.abspath('.')) +# from autosubmit.autosubmit import Autosubmit +# from autosubmit.config.config_common import AutosubmitConfig, BasicConfig +# from bscearth.utils.config_parser import ConfigParserFactory +# from autosubmit.job.job_list import JobList +# from autosubmit.database.db_jobdata import JobDataStructure +# from bscearth.utils.log import Log + + +# def test_retrieve_energy(): +# BasicConfig.read() +# expid = "a2ze" +# job_name = "a2ze_REMOTE_COMPILE" +# as_conf = AutosubmitConfig( +# expid, BasicConfig, ConfigParserFactory()) +# if not as_conf.check_conf_files(): +# Log.critical('Can not run with invalid configuration') +# return False +# submitter = Autosubmit._get_submitter(as_conf) +# submitter.load_platforms(as_conf) +# job_list = Autosubmit.load_job_list( +# expid, as_conf, notransitive=False) +# Autosubmit._load_parameters( +# as_conf, job_list, submitter.platforms) + +# for job in job_list.get_job_list(): +# if job.platform_name is None: +# job.platform_name = "marenostrum4" +# # noinspection PyTypeChecker +# job.platform = submitter.platforms[job.platform_name.lower( +# )] + +# list_jobs = job_list.get_job_list() +# job_honk = [job for job in list_jobs if job.name == job_name][0] +# job_honk.write_end_time(True) + + +# def main(): +# job_structure = JobDataStructure('a29z') +# # print(job_structure._select_pragma_version()) +# return None + + +# if __name__ == "__main__": +# main() -- GitLab From 2d8d96417e0143546b7a8a31b9765be60dca5aab Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 28 Aug 2020 17:54:24 +0200 Subject: [PATCH 8/9] Added update of finish time on experiment completion. Also update of status counters on Autosubmit stop. --- autosubmit/autosubmit.py | 5 +++++ autosubmit/database/db_jobdata.py | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8e6c949a3..67d4e607f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1460,6 +1460,9 @@ class Autosubmit: return 2 time.sleep(safetysleeptime) Log.info("No more jobs to run.") + # Updating job data header with finish time + job_data_structure.validate_current_run( + job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) timeout = 0 for platform in platforms_to_test: @@ -1481,6 +1484,8 @@ class Autosubmit: return False else: Log.result("Run successful") + # Updating finish time for job data header + job_data_structure.update_finish_time() return True except portalocker.AlreadyLocked: diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 841f6e4a8..f17a56166 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -689,6 +689,7 @@ class JobDataStructure(MainDataBase): current_run.queuing = queue_count current_run.submitted = submit_count current_run.running = running_count + current_run.finish = 0 self._update_experiment_run(current_run) self.current_run_id = current_run.run_id except Exception as exp: @@ -699,6 +700,13 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't insert a new experiment run register. validate_current_run {0}".format(str(exp))) return None + def update_finish_time(self): + current_run = self.get_max_id_experiment_run() + if current_run: + current_run.finish = int(time.time()) + self._update_experiment_run(current_run) + self.current_run_id = current_run.run_id + def get_job_package_code(self, current_job_name): """ Finds the package code and retrieves it. None if no package. -- GitLab From 0a6c232b733757872981953e76ff1570da9d2f64 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Mon, 31 Aug 2020 10:37:12 +0200 Subject: [PATCH 9/9] Deactivate debug messages from jobdata module --- autosubmit/database/db_jobdata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index f17a56166..c0d8bbd4b 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -45,7 +45,7 @@ class RowType: #PACKED = 2 -_debug = True +_debug = False JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id']) -- GitLab