From d3a2ff274e79da71beaf36879f84b4a9447cd7f2 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Thu, 8 Oct 2020 14:45:50 +0200 Subject: [PATCH] Improved job historical database. Added post-processing operation for end of wrapper operation --- autosubmit/database/db_jobdata.py | 463 ++++++++++++++++++++++++-- autosubmit/job/job.py | 110 +++--- autosubmit/job/job_common.py | 37 +- autosubmit/platforms/slurmplatform.py | 116 ++++--- 4 files changed, 605 insertions(+), 121 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 2b9b1f290..35775760b 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -28,15 +28,15 @@ import sqlite3 import copy import collections from datetime import datetime -from json import dumps +from json import dumps, loads #from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig -from autosubmit.job.job_common import Status +from autosubmit.job.job_common import Status, parse_output_number from autosubmit.job.job_package_persistence import JobPackagePersistence from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, subs_dates from log.log import Log, AutosubmitCritical, AutosubmitError -CURRENT_DB_VERSION = 12 # Used to be 10 +CURRENT_DB_VERSION = 14 # Used to be 10 # Defining RowType standard @@ -50,7 +50,7 @@ JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'creat 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id']) ExperimentRunItem = collections.namedtuple('ExperimentRunItem', [ - 'run_id', 'created', 'start', 'finish', 'chunk_unit', 'chunk_size', 'completed', 'total', 'failed', 'queuing', 'running', 'submitted']) + 'run_id', 'created', 'start', 'finish', 'chunk_unit', 'chunk_size', 'completed', 'total', 'failed', 'queuing', 'running', 'submitted', 'suspended', 'metadata']) ExperimentRow = collections.namedtuple( 'ExperimentRow', ['exp_id', 'expid', 'status', 'seconds']) @@ -58,7 +58,7 @@ ExperimentRow = collections.namedtuple( class ExperimentRun(): - def __init__(self, run_id, created=None, start=0, finish=0, chunk_unit="NA", chunk_size=0, completed=0, total=0, failed=0, queuing=0, running=0, submitted=0): + def __init__(self, run_id, created=None, start=0, finish=0, chunk_unit="NA", chunk_size=0, completed=0, total=0, failed=0, queuing=0, running=0, submitted=0, suspended=0, metadata=""): self.run_id = run_id self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') self.start = start @@ -71,6 +71,8 @@ class ExperimentRun(): self.completed = completed self.failed = failed self.total = total + self.suspended = suspended + self.metadata = metadata def _increase_counter(self, status): if status == Status.FAILED: @@ -106,7 +108,28 @@ class ExperimentRun(): self._decrease_counter(prev_status) -class JobData(): +class JobStepExtraData(): + def __init__(self, key, dict_data): + self.key = key + self.ncpus = dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys( + ) else 0 + self.nnodes = dict_data["nnodes"] if dict_data and "nnodes" in dict_data.keys( + ) else 0 + self.submit = int(time.mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in dict_data.keys( + ) else 0 + self.start = int(time.mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in dict_data.keys( + ) else 0 + self.finish = int(time.mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in dict_data.keys( + ) and dict_data["finish"] != "Unknown" else 0 + self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in dict_data.keys( + ) else 0 + self.maxRSS = dict_data["MaxRSS"] if dict_data and "MaxRSS" in dict_data.keys( + ) else 0 + self.aveRSS = dict_data["AveRSS"] if dict_data and "AveRSS" in dict_data.keys( + ) else 0 + + +class JobData(object): """Job Data object """ @@ -158,9 +181,14 @@ class JobData(): self._platform = platform if platform and len( platform) > 0 else "NA" self.job_id = job_id if job_id else 0 - self.extra_data = dumps(extra_data) + try: + self.extra_data = loads(extra_data) + except Exception as exp: + self.extra_data = "" + pass self.nnodes = nnodes self.run_id = run_id + self.require_update = False @property def submit(self): @@ -200,7 +228,41 @@ class JobData(): @energy.setter def energy(self, energy): - self._energy = energy if energy else 0 + # print("Energy {0}".format(energy)) + if energy > 0: + if (energy != self._energy): + # print("Updating energy to {0} from {1}.".format( + # energy, self._energy)) + self.require_update = True + self._energy = energy if energy else 0 + + def running_time(self): + """Calculates the running time of the job. + + Returns: + int: running time + """ + if self.status in ["RUNNING", "COMPLETED", "FAILED"]: + # print("Finish: {0}".format(self.finish)) + run = int((self.finish if self.finish > + 0 else time.time()) - self.start) + # print("RUN {0}".format(run)) + if run > 0: + return run + return 0 + + def queuing_time(self): + """Calculates the queuing time of the job. + + Returns: + int: queueing time + """ + if self.status in ["SUBMITTED", "QUEUING", "RUNNING", "COMPLETED", "HELD", "PREPARED", "FAILED"]: + queue = int((self.start if self.start > + 0 else time.time()) - self.submit) + if queue > 0: + return queue + return 0 class JobDataList(): @@ -526,6 +588,10 @@ class JobDataStructure(MainDataBase): "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0") self.version_schema_changes.append( "ALTER TABLE job_data ADD COLUMN run_id INTEGER") + self.version_schema_changes.append( + "ALTER TABLE experiment_run ADD COLUMN suspended INTEGER NOT NULL DEFAULT 0") + self.version_schema_changes.append( + "ALTER TABLE experiment_run ADD COLUMN metadata TEXT") # We use rowtype to identify a packed job self.create_table_query = textwrap.dedent( '''CREATE TABLE @@ -637,6 +703,7 @@ class JobDataStructure(MainDataBase): current_run = self.get_max_id_experiment_run() if current_run: if tracking_dictionary is not None and bool(tracking_dictionary) == True: + # print("Changes {0}".format(tracking_dictionary)) if job_list and check_run == True: current_date_member_completed_count = sum( 1 for job in job_list if job.date is not None and job.member is not None and job.status == Status.COMPLETED) @@ -648,7 +715,7 @@ class JobDataStructure(MainDataBase): self.validate_current_run( job_list, chunk_unit, chunk_size, True) return None - if job_list and check_run == False: + if job_list: if len(tracking_dictionary.items()) > 0: # Changes exist completed_count = sum( @@ -661,12 +728,16 @@ class JobDataStructure(MainDataBase): 1 for job in job_list if job.status == Status.SUBMITTED) running_count = sum( 1 for job in job_list if job.status == Status.RUNNING) + suspended_count = sum( + 1 for job in job_list if job.status == Status.SUSPENDED) current_run.completed = completed_count current_run.failed = failed_count current_run.queuing = queue_count current_run.submitted = submit_count current_run.running = running_count + current_run.suspended = suspended_count self._update_experiment_run(current_run) + return None # for name, (prev_status, status) in tracking_dictionary.items(): # current_run.update_counters(prev_status, status) @@ -699,22 +770,30 @@ class JobDataStructure(MainDataBase): 1 for job in job_list if job.status == Status.SUBMITTED) running_count = sum( 1 for job in job_list if job.status == Status.RUNNING) + suspended_count = sum( + 1 for job in job_list if job.status == Status.SUSPENDED) if not current_run or must_create == True: new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, - current_total, failed_count, queue_count, running_count, submit_count) + current_total, failed_count, queue_count, running_count, submit_count, suspended_count, None) self.current_run_id = self._insert_experiment_run(new_run) else: + # print("Current run {0}".format(current_run.total)) + # print("Current total {0}".format(len(job_list))) if current_run.total != current_total and only_update == False: + # print("Creating new run") new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, - current_total, failed_count, queue_count, running_count, submit_count) + current_total, failed_count, queue_count, running_count, submit_count, suspended_count, None) self.current_run_id = self._insert_experiment_run(new_run) else: + # print("Updating current run") current_run.completed = completed_count current_run.failed = failed_count current_run.queuing = queue_count current_run.submitted = submit_count + # print("New suspended count {0}".format(suspended_count)) current_run.running = running_count + current_run.suspended = suspended_count current_run.total = current_total if only_update == True else current_run.total current_run.finish = 0 self._update_experiment_run(current_run) @@ -877,7 +956,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[]): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True): """Writes the finish time into the database Args: @@ -899,29 +978,42 @@ class JobDataStructure(MainDataBase): Boolean/None: True if success, None if exception. """ try: + # Current thread: + BasicConfig.read() + # self.expid = expid + # self.basic_conf = BasicConfig + self.folder_path = BasicConfig.JOBDATA_DIR + self.database_path = os.path.join( + self.folder_path, "job_data_" + str(self.expid) + ".db") + self.conn = self.create_connection(self.database_path) + # print("Writing finish time \t" + str(job_name) + "\t" + str(finish)) job_data_last = self.get_job_data_last(job_name) # energy = 0 is_packed = False + is_end_of_wrapper = False submit_time = start_time = finish_time = number_nodes = number_cpus = energy = 0 extra_data = dict() # Updating existing row - if job_data_last: + if job_data_last and len(job_data_last) > 0: job_data_last = job_data_last[0] is_packed = True if job_data_last.rowtype > 1000 else False + # Call Slurm here, update times. - if platform_object: + if platform_object and no_slurm == False: # print("There is platform object") try: if type(platform_object) is not str: - if platform_object.type == "slurm": - # print("Checking Slurm for " + str(job_name)) - submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data = platform_object.check_job_energy( + if platform_object.type == "slurm" and job_id > 0: + # Waiting 30 seconds for slurm data completion + time.sleep(60) + submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data, is_end_of_wrapper = platform_object.check_job_energy( job_id, is_packed) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) #energy = 0 + try: extra_data["parents"] = [int(item) for item in parent_id_list] @@ -930,23 +1022,29 @@ class JobDataStructure(MainDataBase): "Parent Id List couldn't be parsed to array of int. Using default values.") extra_data["parents"] = parent_id_list pass - - job_data_last.finish = finish_time if finish_time > 0 else int( - finish) + current_timestamp = int(time.time()) + job_data_last.finish = current_timestamp if no_slurm == True else ( + job_data_last.finish if job_data_last.finish > 0 else current_timestamp) + # job_data_last.finish = finish_time if finish_time > 0 and finish_time >= job_data_last.start else ( + # current_timestamp if no_slurm == True else job_data_last.finish) + #print("Job data finish time {0}".format(job_data_last.finish)) job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy job_data_last.ncpus = number_cpus if number_cpus > 0 else job_data_last.ncpus job_data_last.nnodes = number_nodes if number_nodes > 0 else job_data_last.nnodes job_data_last.extra_data = dumps( - extra_data) if extra_data else "NA" + extra_data) if extra_data else None job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - if submit_time > 0 and start_time > 0: + if is_packed == False and submit_time > 0 and start_time > 0: job_data_last.submit = int(submit_time) job_data_last.start = int(start_time) rowid = self._update_finish_job_data_plus(job_data_last) else: - rowid = self._update_finish_job_data(job_data_last) + job_data_last.start = job_data_last.start if job_data_last.start > 0 else start_time + rowid = self._update_finish_job_data_plus(job_data_last) + if no_slurm == False and is_end_of_wrapper == True: + self.process_current_run_collection() return True # It is necessary to create a new row submit_inserted = self.write_submit_time( @@ -958,7 +1056,7 @@ class JobDataStructure(MainDataBase): if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, number_nodes) + job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list) else: return None except Exception as exp: @@ -1008,6 +1106,218 @@ class JobDataStructure(MainDataBase): Log.warning(str(exp)) return None + def process_current_run_collection(self): + """Post-process for job_data. + + Returns: + ([job_data], [warning_messaages]): job data processes, messages + """ + # start_time = time.time() + current_job_data = None + # warning_messages = [] + experiment_run = self.get_max_id_experiment_run() + # List of jobs from pkl -> Dictionary + # allJobsDict = { + # job.name: Status.VALUE_TO_KEY[job.status] for job in allJobs} + # None if there is no experiment header + if experiment_run: + # List of last runs of jobs + current_job_data = self.get_current_job_data(experiment_run.run_id) + if not current_job_data: + Log.warning( + "Autosubmit did not find historical database information.") + return None + # warning_messages.append( + # "Critical | This version of Autosubmit does not support the database that provides the energy information.") + # Include only those that exist in the pkl and have the same status as in the pkl + # current_job_data = [job for job in current_job_data_last if job.job_name in allJobsDict.keys( + # ) and allJobsDict[job.job_name] == job.status] if current_job_data_last else None + # Start processing + if current_job_data: + # Dropping parents key + for job in current_job_data: + job.extra_data.pop('parents', None) + # Internal map from name to object + name_to_current_job = { + job.job_name: job for job in current_job_data} + # Unique packages where rowtype > 2 + packages = set( + job.rowtype for job in current_job_data if job.rowtype > 2) + # Start by processing packages + for package in packages: + # All jobs in package + jobs_in_package = [ + job for job in current_job_data if job.rowtype == package] + # Order package by submit order + jobs_in_package.sort(key=lambda x: x._id, reverse=True) + # Internal list of single-purpose objects + wrapper_jobs = [] + sum_total_energy = 0 + not_1_to_1 = True + keys_found = False + no_process = False + for job_data in jobs_in_package: + # If it is a wrapper job step + if "energy" in job_data.extra_data.keys() and job_data.extra_data["energy"] != "NA": + name_to_current_job[job_data.job_name].energy = parse_output_number( + job_data.extra_data["energy"]) + sum_total_energy += name_to_current_job[job_data.job_name].energy + else: + # Identify best source + description_job = max( + jobs_in_package, key=lambda x: len(x.extra_data)) + # Identify job steps + keys_step = [ + y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] + if len(keys_step) > 0: + # Steps found + keys_step.sort( + key=lambda x: int(x[x.index('.') + 1:])) + keys_found = True + # Find all job steps + for key in keys_step: + if "submit" not in description_job.extra_data[key].keys(): + keys_found = False + break + + for key in keys_step: + wrapper_jobs.append(JobStepExtraData( + key, description_job.extra_data[key])) + + sum_total_energy = sum( + jobp.energy for jobp in wrapper_jobs) * 1.0 + + if len(jobs_in_package) == len(wrapper_jobs) and len(wrapper_jobs) > 0: + # Approximation + not_1_to_1 = False + else: + # Identify main step + main_step = [ + y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] + if len(main_step) > 0: + # Check only first one + main_step = [main_step[0]] + # If main step contains submit, its valid. Else, break, not valid, + for key in main_step: + if "submit" not in description_job.extra_data[key].keys(): + keys_found = False + break + # Build wrapper jobs + for key in main_step: + wrapper_jobs.append(JobStepExtraData( + key, description_job.extra_data[key])) + # Total energy for main job + sum_total_energy = sum( + jobp.energy for jobp in wrapper_jobs) * 1.0 + + else: + no_process = True + # warning_messages.append( + # "Wrapper | Wrapper {0} does not have information to perform any energy approximation.".format(package)) + break + # Keys do not have enough information + # if keys_found == False: + # warning_messages.append( + # "Wrapper | Wrapper {0} does not have complete sacct data available.".format(package)) + # If it is not a 1 to 1 relationship between jobs in package and job steps + if sum_total_energy > 0: + if not_1_to_1 == True and no_process == False: + # It is not 1 to 1, so we perform approximation + # warning_messages.append( + # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy)) + # Completing job information if necessary + for i in range(0, len(jobs_in_package)): + if jobs_in_package[i].running_time() <= 0: + # Needs to be completed + # Dropping job from package list + dropped_job = jobs_in_package.pop(i) + # After completion is finished, calculate total resources to be approximated + resources_total = sum( + z.ncpus * z.running_time() for z in jobs_in_package) * 1.0 + if resources_total > 0: + for job_data in jobs_in_package: + job_data_factor = ( + job_data.ncpus * job_data.running_time()) + # if job_data_factor <= 0: + # warning_messages.append("Approximation | Job {0} requires {1} ncpus and has {2} running time, resulting in a 0 energy approximation. This job will be ignored.".format( + # job_data.job_name, job_data.ncpus, job_data.running_time())) + name_to_current_job[job_data.job_name].energy = round(job_data_factor / + resources_total * sum_total_energy, 2) + # else: + # warning_messages.append( + # "Approximation | Aproximation for wrapper {0} failed.".format(package)) + else: + # Check if it is 1 to 1 + if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: + # It is 1 to 1 + for i in range(0, len(jobs_in_package)): + name_to_current_job[jobs_in_package[i] + .job_name].energy = wrapper_jobs[i].energy + name_to_current_job[jobs_in_package[i] + .job_name].submit = wrapper_jobs[i].submit + name_to_current_job[jobs_in_package[i] + .job_name].start = wrapper_jobs[i].start + name_to_current_job[jobs_in_package[i] + .job_name].finish = wrapper_jobs[i].finish + # else: + # warning_messages.append( + # "Approximation | Wrapper {0} did not have enough or precise information to calculate an exact mapping.".format(package)) + # else: + # warning_messages.append( + # "Approximation | Wrapper {0} does not have energy information, it will be ignored.".format(package)) + + for job_data in current_job_data: + if job_data.rowtype == 2 and len(job_data.extra_data.keys()) > 0: + keys = [x for x in job_data.extra_data.keys() + if x != "parents" and '.' not in x] + if len(keys) > 0: + found_energy = job_data.extra_data[keys[0]]["energy"] + # Resort to batch if main is NA + found_energy = found_energy if found_energy != "NA" else ( + job_data.extra_data[keys[0] + ".batch"]["energy"] if keys[0] + ".batch" in job_data.extra_data.keys() else found_energy) + job_data.energy = parse_output_number(found_energy) + else: + continue + # warning_messages.append( + # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys)) + self.update_energy_values( + [job for job in current_job_data if job.require_update == True]) + # for job in current_job_data: + # if job.energy == 0: + # print("Job {:30} | energy {:15} | package {:5} | status {:15}".format( + # job.job_name, job.energy, job.rowtype, job.status)) + + # for message in warning_messages: + # print(message) + + # print("Extra data query finished in {0} seconds.".format( + # time.time() - start_time)) + + # if not current_job_data: + # warning_messages.append( + # "Energy | There is not enough information to compute a reliable result.") + + # return current_job_data, warning_messages + + def update_energy_values(self, update_job_data): + """Updating energy values + + :param update_job_data: list JobData object + :type update_job_data: List of JobData + """ + try: + #print("Updating {0}".format(len(update_job_data))) + for jobdata in update_job_data: + # print("Job {0} requires update. Energy {1}.".format( + # jobdata.job_name, jobdata.energy)) + self._update_job_data(jobdata) + self.conn.commit() + except Exception as exp: + Log.info(traceback.format_exc()) + Log.warning( + "Autosubmit couldn't retrieve experiment run header. update_energy_values. Exception {0}".format(str(exp))) + pass + def get_all_job_data(self): """[summary] @@ -1063,6 +1373,31 @@ class JobDataStructure(MainDataBase): Log.warning("Autosubmit couldn't retrieve job data. get_job_data") return None + def get_current_job_data(self, run_id): + """[summary] + + Args: + run_id ([type]): [description] + """ + try: + current_collection = [] + # if self.db_version < DB_VERSION_SCHEMA_CHANGES: + # raise Exception("This function requieres a newer DB version.") + if os.path.exists(self.folder_path): + current_job_data = self._get_current_job_data(run_id) + if current_job_data: + for job_data in current_job_data: + jobitem = JobItem(*job_data) + current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, + jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id)) + return current_collection + return None + except Exception as exp: + print(traceback.format_exc()) + print( + "Error on returning current job data. run_id {0}".format(run_id)) + return None + def get_pending_data(self): """[summary] """ @@ -1088,13 +1423,21 @@ class JobDataStructure(MainDataBase): return None def get_max_id_experiment_run(self): + """Get Max experiment run object (last experiment run) + + Raises: + Exception: [description] + + Returns: + [type]: [description] + """ try: #expe = list() if os.path.exists(self.folder_path): current_experiment_run = self._get_max_id_experiment_run() if current_experiment_run: exprun_item = ExperimentRunItem(*current_experiment_run) - return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted) + return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted, exprun_item.suspended, exprun_item.metadata) else: return None else: @@ -1245,6 +1588,30 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Update : " + str(type(e).__name__)) return None + def _update_job_data(self, job_data): + """Updating processed job_data + + :param job_data: JobData object with changes + :type job_data: JobData object + :return: True if succesful, None otherwise + :rtype: Boolean - None + """ + try: + if self.conn: + sql = ''' UPDATE job_data SET energy=?, modified=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (job_data.energy, datetime.today().strftime( + '%Y-%m-%d-%H:%M:%S'), job_data._id)) + # self.conn.commit() + return True + return None + except sqlite3.Error as e: + if _debug == True: + print(traceback.format_exc()) + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : {}".format(str(type(e).__name__))) + return None + def _update_experiment_run(self, experiment_run): """Updates experiment run row by run_id (finish, chunk_unit, chunk_size, completed, total, failed, queuing, running, submitted) @@ -1255,10 +1622,10 @@ class JobDataStructure(MainDataBase): """ try: if self.conn: - sql = ''' UPDATE experiment_run SET finish=?, chunk_unit=?, chunk_size=?, completed=?, total=?, failed=?, queuing=?, running=?, submitted=? WHERE run_id=? ''' + sql = ''' UPDATE experiment_run SET finish=?, chunk_unit=?, chunk_size=?, completed=?, total=?, failed=?, queuing=?, running=?, submitted=?, suspended=? WHERE run_id=? ''' cur = self.conn.cursor() cur.execute(sql, (experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size, - experiment_run.completed, experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.run_id)) + experiment_run.completed, experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.suspended, experiment_run.run_id)) self.conn.commit() return cur.lastrowid return None @@ -1285,7 +1652,7 @@ class JobDataStructure(MainDataBase): #print("pre insert") cur.execute(sql, tuplerow) self.conn.commit() - #print("Inserted " + str(jobdata.job_name)) + # print("Inserted " + str(jobdata.job_name)) return cur.lastrowid else: #print("Not a valid connection.") @@ -1306,9 +1673,9 @@ class JobDataStructure(MainDataBase): try: if self.conn: #print("preparing to insert") - sql = ''' INSERT INTO experiment_run(created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted) VALUES(?,?,?,?,?,?,?,?,?,?,?) ''' + sql = ''' INSERT INTO experiment_run(created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended,metadata) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?) ''' tuplerow = (experiment_run.created, experiment_run.start, experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size, experiment_run.completed, - experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted) + experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.suspended, experiment_run.metadata) cur = self.conn.cursor() cur.execute(sql, tuplerow) self.conn.commit() @@ -1318,7 +1685,7 @@ class JobDataStructure(MainDataBase): except sqlite3.Error as e: if _debug == True: Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) + print(traceback.format_exc()) Log.warning("Error on insert on experiment_run: {0}".format( str(type(e).__name__))) return None @@ -1347,6 +1714,29 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Select : " + str(type(e).__name__)) return list() + def _get_current_job_data(self, run_id): + """[summary] + + Args: + run_id ([type]): [description] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id", (run_id,)) + rows = cur.fetchall() + if len(rows) > 0: + return rows + else: + return None + except sqlite3.Error as e: + if _debug == True: + print(traceback.format_exc()) + print("Error on select job data: {0}".format( + str(type(e).__name__))) + return None + def _get_job_data(self, job_name): """[summary] @@ -1513,7 +1903,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted from experiment_run ORDER BY run_id DESC LIMIT 0, 1") + "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted, suspended, metadata from experiment_run ORDER BY run_id DESC LIMIT 0, 1") rows = cur.fetchall() if len(rows) > 0: return rows[0] @@ -1527,3 +1917,12 @@ class JobDataStructure(MainDataBase): Log.warning("Error on select max run_id : " + str(type(e).__name__)) return None + + def _retry_database_operation(self): + completed = False + tries = 0 + while completed == False and tries <= 3: + try: + pass + except sqlite3.Error as e: + pass diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b768766a4..ea8fb1721 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -42,7 +42,7 @@ from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_da from time import sleep from threading import Thread from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter -from log.log import Log,AutosubmitCritical,AutosubmitError +from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") @@ -53,6 +53,7 @@ def threaded(fn): return thread return wrapper + class Job(object): """ Class to handle all the tasks with Jobs at HPC. @@ -127,6 +128,7 @@ class Job(object): self.packed = False self.hold = False self.distance_weight = 0 + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -522,7 +524,8 @@ class Job(object): try: self.platform.restore_connection() except Exception as e: - Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message,self.name), 6001) + Log.printlog( + "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) out_exist = False err_exist = False retries = 3 @@ -533,11 +536,13 @@ class Job(object): while (not out_exist and not err_exist) and i < retries: try: try: - out_exist = self.platform.check_file_exists(remote_logs[0]) # will do 5 retries + out_exist = self.platform.check_file_exists( + remote_logs[0]) # will do 5 retries except IOError as e: out_exist = False try: - err_exist = self.platform.check_file_exists(remote_logs[1]) # will do 5 retries + err_exist = self.platform.check_file_exists( + remote_logs[1]) # will do 5 retries except IOError as e: err_exists = False except Exception as e: @@ -550,31 +555,36 @@ class Job(object): sleep(sleeptime) if i >= retries: if not out_exist or not err_exist: - Log.printlog("Retries = {0}, Failed to retrieve log files {1} and {2}".format(retries,remote_logs[0],remote_logs[1]), 6001) - + Log.printlog("Retries = {0}, Failed to retrieve log files {1} and {2}".format( + retries, remote_logs[0], remote_logs[1]), 6001) if copy_remote_logs: if local_logs != remote_logs: # unifying names for log files - self.synchronize_logs(self.platform, remote_logs, local_logs) + self.synchronize_logs( + self.platform, remote_logs, local_logs) remote_logs = local_logs self.platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand try: for local_log in local_logs: - self.platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + self.platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1}".format(e.message,self.name), 6001) + Log.printlog("Trace {0} \n Failed to write the {1}".format( + e.message, self.name), 6001) except AutosubmitError as e: - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( + e.message, self.name), 6001) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( + e.message, self.name), 6001) try: self.platform.closeConnection() except: pass - sleep(5) # safe wait before end a thread + sleep(5) # safe wait before end a thread return def update_status(self, copy_remote_logs=False): @@ -589,10 +599,12 @@ class Job(object): self.prev_status = previous_status new_status = self.new_status if new_status == Status.COMPLETED: - Log.debug("{0} job seems to have completed: checking...".format(self.name)) + Log.debug( + "{0} job seems to have completed: checking...".format(self.name)) if not self.platform.get_completed_files(self.name): - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + log_name = os.path.join( + self._tmp_path, self.name + '_COMPLETED') self.check_completion() else: @@ -607,25 +619,29 @@ class Job(object): elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.FAILED: - Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format(self.name),3000) + Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format( + self.name), 3000) self.platform.get_completed_files(self.name) self.check_completion() if self.status == Status.COMPLETED: - Log.printlog(" there is a COMPLETED file.",3000) + Log.printlog(" there is a COMPLETED file.", 3000) Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() elif self.status == Status.UNKNOWN: - Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format(self.name),3000) + Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( + self.name), 3000) self.platform.get_completed_files(self.name) self.check_completion(Status.UNKNOWN) if self.status == Status.UNKNOWN: - Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format(self.name),6009) + Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( + self.name), 6009) elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.SUBMITTED: # after checking the jobs , no job should have the status "submitted" - Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format(self.name),6008) + Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( + self.name), 6008) if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN, Status.RUNNING]: @@ -655,7 +671,8 @@ class Job(object): if communications_library == 'paramiko': return ParamikoSubmitter() # communications library not known - raise AutosubmitCritical( 'You have defined a not valid communications library on the configuration file', 7014) + raise AutosubmitCritical( + 'You have defined a not valid communications library on the configuration file', 7014) def update_children_status(self): children = list(self.children) @@ -671,12 +688,13 @@ class Job(object): :param default_status: status to set if job is not completed. By default is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path,self.name + '_COMPLETED') + log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') if os.path.exists(log_name): self.status = Status.COMPLETED else: - Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format(self.name),6009) + Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( + self.name), 6009) self.status = default_status def update_parameters(self, as_conf, parameters, @@ -871,7 +889,8 @@ class Job(object): if communications_library == 'paramiko': return self._get_paramiko_template(snippet, template) else: - raise AutosubmitCritical("Job {0} does not have an correct template// template not found".format(self.name),7014) + raise AutosubmitCritical( + "Job {0} does not have an correct template// template not found".format(self.name), 7014) def _get_paramiko_template(self, snippet, template): current_platform = self.platform @@ -969,13 +988,14 @@ class Job(object): if not out: self.undefined_variables = set(variables) - set(parameters) if show_logs: - Log.printlog("The following set of variables to be substituted in template script is not part of parameters set, and will be replaced by a blank value: {0}".format(self.undefined_variables),3000) - + Log.printlog("The following set of variables to be substituted in template script is not part of parameters set, and will be replaced by a blank value: {0}".format( + self.undefined_variables), 3000) # Check which variables in the proj.conf are not being used in the templates if show_logs: if not set(variables).issuperset(set(parameters)): - Log.printlog("The following set of variables are not being used in the templates: {0}".format(str(set(parameters)-set(variables))),3000) + Log.printlog("The following set of variables are not being used in the templates: {0}".format( + str(set(parameters) - set(variables))), 3000) return out def write_submit_time(self): @@ -1002,7 +1022,8 @@ class Job(object): if self.platform.get_stat_file(self.name, retries=5): start_time = self.check_start_time() else: - Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format(self.name),3000) + Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( + self.name), 3000) start_time = time.time() path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') @@ -1043,8 +1064,16 @@ class Job(object): else: final_status = "FAILED" f.write('FAILED') - JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) + + # Launch first as simple + JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, + self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) + # Launch second as thread + thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False)) + thread_write_finish.start() + # JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, + # self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) def check_started_after(self, date_limit): """ @@ -1223,7 +1252,8 @@ class WrapperJob(Job): reason = self.platform.parse_queue_reason( self.platform._ssh_output, self.id) if self._queuing_reason_cancel(reason): - Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format(self.name,reason),6009) + Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format( + self.name, reason), 6009) self.cancel_failed_wrapper_job() self.update_failed_jobs() return @@ -1260,7 +1290,8 @@ class WrapperJob(Job): start_time = self.running_jobs_start[job] if self._is_over_wallclock(start_time, job.wallclock): # if self.as_config.get_wrapper_type() in ['vertical', 'horizontal']: - Log.printlog("Job {0} inside wrapper {1} is running for longer than it's wallclock! Cancelling...".format(job.name,self.name),6009) + Log.printlog("Job {0} inside wrapper {1} is running for longer than it's wallclock! Cancelling...".format( + job.name, self.name), 6009) job.new_status = Status.FAILED job.update_status(self.as_config.get_copy_remote_logs() == 'true') return True @@ -1318,7 +1349,8 @@ done if len(out) > 1: if job not in self.running_jobs_start: start_time = self._check_time(out, 1) - Log.debug("Job {0} started at {1}".format(jobname, str(parse_date(start_time)))) + Log.debug("Job {0} started at {1}".format( + jobname, str(parse_date(start_time)))) self.running_jobs_start[job] = start_time job.new_status = Status.RUNNING @@ -1331,7 +1363,7 @@ done job) if over_wallclock: Log.printlog( - "Job {0} is FAILED".format(jobname),6009) + "Job {0} is FAILED".format(jobname), 6009) elif len(out) == 3: end_time = self._check_time(out, 2) @@ -1371,7 +1403,7 @@ done self._check_finished_job(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id),6009) + Log.printlog("Cancelling job with id {0}".format(self.id), 6009) self.platform.send_command( self.platform.cancel_cmd + " " + str(self.id)) @@ -1391,13 +1423,14 @@ done if wallclock.hour > 0: total = wallclock.hour if wallclock.minute > 0: - total += wallclock.minute/60.0 + total += wallclock.minute / 60.0 if wallclock.second > 0: - total += wallclock.second/60.0/60.0 + total += wallclock.second / 60.0 / 60.0 total = total * 1.15 hour = int(total) minute = int((total - int(total)) * 60.0) - second = int(((total - int(total)) * 60 - int((total - int(total)) * 60.0)) * 60.0) + second = int(((total - int(total)) * 60 - + int((total - int(total)) * 60.0)) * 60.0) wallclock_delta = datetime.timedelta(hours=hour, minutes=minute, seconds=second) if elapsed > wallclock_delta: @@ -1413,6 +1446,3 @@ done time = int(output[index]) time = self._parse_timestamp(time) return time - - - diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 2cd92e05d..1455f37b5 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -42,6 +42,7 @@ class Status: def retval(self, value): return getattr(self, value) + class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' @@ -64,8 +65,8 @@ class bcolors: HELD = '\033[34;1m' FAILED = '\033[31m' SUSPENDED = '\033[31;1m' - CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD, 7: PREPARED} - + CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, + 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD, 7: PREPARED} class Type: @@ -238,3 +239,35 @@ class StatisticsSnippetEmpty: @staticmethod def as_tailer(): return '' + + +def parse_output_number(string_number): + """ + Parses number in format 1.0K 1.0M 1.0G + + :param string_number: String representation of number + :type string_number: str + :return: number in float format + :rtype: float + """ + number = 0.0 + if (string_number): + last_letter = string_number.strip()[-1] + multiplier = 1 + if last_letter == "G": + multiplier = 1000000000 + number = string_number[:-1] + elif last_letter == "M": + multiplier = 1000000 + number = string_number[:-1] + elif last_letter == "K": + multiplier = 1000 + number = string_number[:-1] + else: + number = string_number + try: + number = float(number) * multiplier + except Exception as exp: + number = 0.0 + pass + return number diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 051820af1..bcd6fc204 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -20,12 +20,13 @@ import os from time import sleep from time import mktime +from time import time from datetime import datetime import traceback from xml.dom.minidom import parseString -from autosubmit.job.job_common import Status +from autosubmit.job.job_common import Status, parse_output_number from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory @@ -118,12 +119,12 @@ class SlurmPlatform(ParamikoPlatform): self.host, self.remote_log_dir) self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) - #jobid =$(sbatch WOA_run_mn4.sh 2 > & 1 | grep -o "[0-9]*"); scontrol hold $jobid; + # jobid =$(sbatch WOA_run_mn4.sh 2 > & 1 | grep -o "[0-9]*"); scontrol hold $jobid; self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir - def hold_job(self,job): + def hold_job(self, job): try: cmd = "scontrol release {0} ; scontrol hold {0} ".format(job.id) self.send_command(cmd) @@ -131,7 +132,7 @@ class SlurmPlatform(ParamikoPlatform): if job_status == Status.RUNNING: self.send_command("scancel {0}".format(job.id)) return False - cmd=self.get_queue_status_cmd(job.id) + cmd = self.get_queue_status_cmd(job.id) self.send_command(cmd) queue_status = self._ssh_output @@ -145,9 +146,11 @@ class SlurmPlatform(ParamikoPlatform): except BaseException as e: try: self.send_command("scancel {0}".format(job.id)) - raise AutosubmitError("Can't hold jobid:{0}, canceling job".format(job.id), 6000, e.message) + raise AutosubmitError( + "Can't hold jobid:{0}, canceling job".format(job.id), 6000, e.message) except BaseException as e: - raise AutosubmitError("Can't cancel the jobid: {0}".format(job.id),6000,e.message) + raise AutosubmitError( + "Can't cancel the jobid: {0}".format(job.id), 6000, e.message) except AutosubmitError as e: raise @@ -181,6 +184,7 @@ class SlurmPlatform(ParamikoPlatform): try: # Setting up: Storing detail for posterity detailed_data = dict() + steps = [] # No blank spaces after or before output = output.strip() if output else None lines = output.split("\n") if output else [] @@ -214,7 +218,8 @@ class SlurmPlatform(ParamikoPlatform): "AveRSS": str(line[9] if len(line) > 9 else "NA")} # Detailed data will contain the important information from output detailed_data[name] = extra_data - submit = start = finish = joules = nnodes = ncpus = 0 + steps.append(name) + submit = start = finish = energy = nnodes = ncpus = 0 status = "UNKNOWN" # Take first line as source line = lines[0].strip().split() @@ -225,12 +230,14 @@ class SlurmPlatform(ParamikoPlatform): # If it is not wrapper job, take first line as source if status not in ["COMPLETED", "FAILED", "UNKNOWN"]: # It not completed, then its error and send default data plus output - return (0, 0, 0, 0, ncpus, nnodes, detailed_data) + return (0, 0, 0, 0, ncpus, nnodes, detailed_data, False) else: + # If it is a wrapped job # Check if the wrapper has finished if status in ["COMPLETED", "FAILED", "UNKNOWN"]: # Wrapper has finished is_end_of_wrapper = True + # Continue with first line as source if line: try: # Parse submit and start only for normal jobs (not packed) @@ -240,56 +247,71 @@ class SlurmPlatform(ParamikoPlatform): line[5], "%Y-%m-%dT%H:%M:%S").timetuple())) if not packed else 0 # Assuming the job has been COMPLETED # If normal job or end of wrapper => Try to get the finish time from the first line of the output, else default to now. - finish = (int(mktime(datetime.strptime( - line[6], "%Y-%m-%dT%H:%M:%S").timetuple())) if len(line) > 6 and line[6] != "Unknown" else datetime.now().timestamp()) if not packed or is_end_of_wrapper == True else 0 - # If normal job or end of wrapper => Try to get energy from first line - joules = (self.parse_output_number( - line[7]) if len(line) > 7 and len(line[7]) > 0 else 0) if not packed or is_end_of_wrapper == True else 0 + finish = 0 + + if not packed: + # If normal job, take finish time from first line + finish = (int(mktime(datetime.strptime(line[6], "%Y-%m-%dT%H:%M:%S").timetuple( + ))) if len(line) > 6 and line[6] != "Unknown" else int(time())) + energy = parse_output_number(line[7]) if len( + line) > 7 and len(line[7]) > 0 else 0 + else: + # If it is a wrapper job + # If end of wrapper, take data from first line + if is_end_of_wrapper == True: + finish = (int(mktime(datetime.strptime(line[6], "%Y-%m-%dT%H:%M:%S").timetuple( + ))) if len(line) > 6 and line[6] != "Unknown" else int(time())) + energy = parse_output_number(line[7]) if len( + line) > 7 and len(line[7]) > 0 else 0 + else: + # If packed but not end of wrapper, try to get info from current data. + if "finish" in extra_data.keys() and extra_data["finish"] != "Unknown": + # finish data exists + finish = int(mktime(datetime.strptime( + extra_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) + else: + # if finish date does not exist, query previous step. + if len(steps) >= 2 and detailed_data.__contains__(steps[-2]): + new_extra_data = detailed_data[steps[-2]] + if "finish" in new_extra_data.keys() and new_extra_data["finish"] != "Unknown": + # This might result in an job finish < start, need to handle that in the caller function + finish = int(mktime(datetime.strptime( + new_extra_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) + else: + finish = int(time()) + else: + finish = int(time()) + if "energy" in extra_data.keys() and extra_data["energy"] != "NA": + # energy exists + energy = parse_output_number( + extra_data["energy"]) + else: + # if energy does not exist, query previous step + if len(steps) >= 2 and detailed_data.__contains__(steps[-2]): + new_extra_data = detailed_data[steps[-2]] + if "energy" in new_extra_data.keys() and new_extra_data["energy"] != "NA": + energy = parse_output_number( + new_extra_data["energy"]) + else: + energy = 0 + else: + energy = 0 except Exception as exp: + # print(line) + # Log.info(traceback.format_exc()) Log.info( "Parsing mishandling.") # joules = -1 pass detailed_data = detailed_data if not packed or is_end_of_wrapper == True else extra_data - return (submit, start, finish, joules, ncpus, nnodes, detailed_data) + return (submit, start, finish, energy, ncpus, nnodes, detailed_data, is_end_of_wrapper) - return (0, 0, 0, 0, 0, 0, dict()) + return (0, 0, 0, 0, 0, 0, dict(), False) except Exception as exp: Log.warning( "Autosubmit couldn't parse SLURM energy output. From parse_job_finish_data: {0}".format(str(exp))) - return (0, 0, 0, 0, 0, 0, dict()) - - def parse_output_number(self, string_number): - """ - Parses number in format 1.0K 1.0M 1.0G - - :param string_number: String representation of number - :type string_number: str - :return: number in float format - :rtype: float - """ - number = 0.0 - if (string_number): - last_letter = string_number.strip()[-1] - multiplier = 1 - if last_letter == "G": - multiplier = 1000000000 - number = string_number[:-1] - elif last_letter == "M": - multiplier = 1000000 - number = string_number[:-1] - elif last_letter == "K": - multiplier = 1000 - number = string_number[:-1] - else: - number = string_number - try: - number = float(number) * multiplier - except Exception as exp: - number = 0.0 - pass - return number + return (0, 0, 0, 0, 0, 0, dict(), False) def parse_Alljobs_output(self, output, job_id): try: -- GitLab