diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index aedd7a84cc68707debd79846df1186f31da4355a..ce009355991a2b45b88ad521123f1ac4216f333c 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1206,196 +1206,187 @@ class JobDataStructure(MainDataBase): Returns: ([job_data], [warning_messaages]): job data processes, messages """ - # start_time = time.time() - current_job_data = None - # warning_messages = [] - experiment_run = self.get_max_id_experiment_run() - # List of jobs from pkl -> Dictionary - # allJobsDict = { - # job.name: Status.VALUE_TO_KEY[job.status] for job in allJobs} - # None if there is no experiment header - if experiment_run: - # List of last runs of jobs - current_job_data = self.get_current_job_data(experiment_run.run_id) - if not current_job_data: - Log.warning( - "Autosubmit did not find historical database information.") - return None - # warning_messages.append( - # "Critical | This version of Autosubmit does not support the database that provides the energy information.") - # Include only those that exist in the pkl and have the same status as in the pkl - # current_job_data = [job for job in current_job_data_last if job.job_name in allJobsDict.keys( - # ) and allJobsDict[job.job_name] == job.status] if current_job_data_last else None - # Start processing - if current_job_data: - # Dropping parents key - for job in current_job_data: - job.extra_data.pop('parents', None) - # Internal map from name to object - name_to_current_job = { - job.job_name: job for job in current_job_data} - # Unique packages where rowtype > 2 - packages = set( - job.rowtype for job in current_job_data if job.rowtype > 2) - # Start by processing packages - for package in packages: - # All jobs in package - jobs_in_package = [ - job for job in current_job_data if job.rowtype == package] - # Order package by submit order - jobs_in_package.sort(key=lambda x: x._id, reverse=True) - # Internal list of single-purpose objects - wrapper_jobs = [] - sum_total_energy = 0 - not_1_to_1 = True - keys_found = False - no_process = False - for job_data in jobs_in_package: - # If it is a wrapper job step - if "energy" in job_data.extra_data.keys() and job_data.extra_data["energy"] != "NA": - name_to_current_job[job_data.job_name].energy = parse_output_number( - job_data.extra_data["energy"]) - sum_total_energy += name_to_current_job[job_data.job_name].energy - else: - # Identify best source - description_job = max( - jobs_in_package, key=lambda x: len(str(x.extra_data))) - # Identify job steps - keys_step = [ - y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] - if len(keys_step) > 0: - # Steps found - keys_step.sort( - key=lambda x: int(x[x.index('.') + 1:])) - keys_found = True - # Find all job steps - for key in keys_step: - if "submit" not in description_job.extra_data[key].keys(): - keys_found = False - break - - for key in keys_step: - wrapper_jobs.append(JobStepExtraData( - key, description_job.extra_data[key])) - - sum_total_energy = sum( - jobp.energy for jobp in wrapper_jobs) * 1.0 - - if len(jobs_in_package) == len(wrapper_jobs) and len(wrapper_jobs) > 0: - # Approximation - not_1_to_1 = False + + try: + # start_time = time.time() + current_job_data = None + # warning_messages = [] + experiment_run = self.get_max_id_experiment_run() + # List of jobs from pkl -> Dictionary + if experiment_run: + # List of last runs of jobs + current_job_data = self.get_current_job_data( + experiment_run.run_id) + if not current_job_data: + Log.warning( + "Autosubmit did not find historical database information.") + return None + # warning_messages.append( + # "Critical | This version of Autosubmit does not support the database that provides the energy information.") + # Include only those that exist in the pkl and have the same status as in the pkl + # current_job_data = [job for job in current_job_data_last if job.job_name in allJobsDict.keys( + # ) and allJobsDict[job.job_name] == job.status] if current_job_data_last else None + # Start processing + if current_job_data: + # Dropping parents key + for job in current_job_data: + job.extra_data.pop('parents', None) + # Internal map from name to object + name_to_current_job = { + job.job_name: job for job in current_job_data} + # Unique packages where rowtype > 2 + packages = set( + job.rowtype for job in current_job_data if job.rowtype > 2) + # Start by processing packages + for package in packages: + # All jobs in package + jobs_in_package = [ + job for job in current_job_data if job.rowtype == package] + # Order package by submit order + jobs_in_package.sort(key=lambda x: x._id, reverse=True) + # Internal list of single-purpose objects + wrapper_jobs = [] + sum_total_energy = 0 + not_1_to_1 = True + keys_found = False + no_process = False + for job_data in jobs_in_package: + # If it is a wrapper job step + if "energy" in job_data.extra_data.keys() and job_data.extra_data["energy"] != "NA": + name_to_current_job[job_data.job_name].energy = parse_output_number( + job_data.extra_data["energy"]) + sum_total_energy += name_to_current_job[job_data.job_name].energy else: - # Identify main step - main_step = [ - y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] - if len(main_step) > 0 and main_step[0] not in ['AveRSS', 'finish', 'ncpus', 'submit', 'MaxRSS', 'start', 'nnodes', 'energy']: - # Check only first one - main_step = [main_step[0]] - # If main step contains submit, its valid. Else, break, not valid, - for key in main_step: - if key not in description_job.extra_data.keys() or "submit" not in description_job.extra_data[key].keys(): + # Identify best source + description_job = max( + jobs_in_package, key=lambda x: len(str(x.extra_data))) + # Identify job steps + keys_step = [ + y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] + if len(keys_step) > 0: + # Steps found + keys_step.sort( + key=lambda x: int(x[x.index('.') + 1:])) + keys_found = True + # Find all job steps + for key in keys_step: + if "submit" not in description_job.extra_data[key].keys(): keys_found = False break - # Build wrapper jobs - for key in main_step: + + for key in keys_step: wrapper_jobs.append(JobStepExtraData( key, description_job.extra_data[key])) - # Total energy for main job + sum_total_energy = sum( jobp.energy for jobp in wrapper_jobs) * 1.0 + if len(jobs_in_package) == len(wrapper_jobs) and len(wrapper_jobs) > 0: + # Approximation + not_1_to_1 = False else: - no_process = True - # warning_messages.append( - # "Wrapper | Wrapper {0} does not have information to perform any energy approximation.".format(package)) - break - # Keys do not have enough information - # if keys_found == False: - # warning_messages.append( - # "Wrapper | Wrapper {0} does not have complete sacct data available.".format(package)) - # If it is not a 1 to 1 relationship between jobs in package and job steps - if sum_total_energy > 0: - if not_1_to_1 == True and no_process == False: - # It is not 1 to 1, so we perform approximation - # warning_messages.append( - # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy)) - # Completing job information if necessary - dropped_jobs = [] - for i in range(0, len(jobs_in_package)): - if jobs_in_package[i].running_time() <= 0: - # Needs to be completed - # Dropping job from package list - dropped_jobs.append(i) - #dropped_job = jobs_in_package.pop(i) - for j in dropped_jobs: - jobs_in_package.pop(j) - # After completion is finished, calculate total resources to be approximated - resources_total = sum( - z.ncpus * z.running_time() for z in jobs_in_package) * 1.0 - if resources_total > 0: - for job_data in jobs_in_package: - job_data_factor = ( - job_data.ncpus * job_data.running_time()) - # if job_data_factor <= 0: - # warning_messages.append("Approximation | Job {0} requires {1} ncpus and has {2} running time, resulting in a 0 energy approximation. This job will be ignored.".format( - # job_data.job_name, job_data.ncpus, job_data.running_time())) - name_to_current_job[job_data.job_name].energy = round(job_data_factor / - resources_total * sum_total_energy, 2) - # else: - # warning_messages.append( - # "Approximation | Aproximation for wrapper {0} failed.".format(package)) - else: - # Check if it is 1 to 1 - if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: - # It is 1 to 1 + # Identify main step + main_step = [ + y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] + if len(main_step) > 0 and main_step[0] not in ['AveRSS', 'finish', 'ncpus', 'submit', 'MaxRSS', 'start', 'nnodes', 'energy']: + # Check only first one + main_step = [main_step[0]] + # If main step contains submit, its valid. Else, break, not valid, + for key in main_step: + if key not in description_job.extra_data.keys() or "submit" not in description_job.extra_data[key].keys(): + keys_found = False + break + # Build wrapper jobs + for key in main_step: + wrapper_jobs.append(JobStepExtraData( + key, description_job.extra_data[key])) + # Total energy for main job + sum_total_energy = sum( + jobp.energy for jobp in wrapper_jobs) * 1.0 + + else: + no_process = True + # warning_messages.append( + # "Wrapper | Wrapper {0} does not have information to perform any energy approximation.".format(package)) + break + # Keys do not have enough information + # if keys_found == False: + # warning_messages.append( + # "Wrapper | Wrapper {0} does not have complete sacct data available.".format(package)) + # If it is not a 1 to 1 relationship between jobs in package and job steps + if sum_total_energy > 0: + if not_1_to_1 == True and no_process == False: + # It is not 1 to 1, so we perform approximation + # warning_messages.append( + # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy)) + # Completing job information if necessary + dropped_jobs = [] for i in range(0, len(jobs_in_package)): - name_to_current_job[jobs_in_package[i] - .job_name].energy = wrapper_jobs[i].energy - name_to_current_job[jobs_in_package[i] - .job_name].submit = wrapper_jobs[i].submit - name_to_current_job[jobs_in_package[i] - .job_name].start = wrapper_jobs[i].start - name_to_current_job[jobs_in_package[i] - .job_name].finish = wrapper_jobs[i].finish - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} did not have enough or precise information to calculate an exact mapping.".format(package)) - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} does not have energy information, it will be ignored.".format(package)) - - for job_data in current_job_data: - if job_data.rowtype == 2 and len(job_data.extra_data.keys()) > 0: - keys = [x for x in job_data.extra_data.keys() - if x != "parents" and '.' not in x] - if len(keys) > 0: - found_energy = job_data.extra_data[keys[0]]["energy"] - # Resort to batch if main is NA - found_energy = found_energy if found_energy != "NA" else ( - job_data.extra_data[keys[0] + ".batch"]["energy"] if keys[0] + ".batch" in job_data.extra_data.keys() else found_energy) - job_data.energy = parse_output_number(found_energy) - else: - continue - # warning_messages.append( - # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys)) - self.update_energy_values( - [job for job in current_job_data if job.require_update == True]) - # for job in current_job_data: - # if job.energy == 0: - # print("Job {:30} | energy {:15} | package {:5} | status {:15}".format( - # job.job_name, job.energy, job.rowtype, job.status)) - - # for message in warning_messages: - # print(message) - - # print("Extra data query finished in {0} seconds.".format( - # time.time() - start_time)) - - # if not current_job_data: - # warning_messages.append( - # "Energy | There is not enough information to compute a reliable result.") - - # return current_job_data, warning_messages + if jobs_in_package[i].running_time() <= 0: + # Needs to be completed + # Dropping job from package list + dropped_jobs.append(i) + #dropped_job = jobs_in_package.pop(i) + for j in dropped_jobs: + jobs_in_package.pop(j) + # After completion is finished, calculate total resources to be approximated + resources_total = sum( + z.ncpus * z.running_time() for z in jobs_in_package) * 1.0 + if resources_total > 0: + for job_data in jobs_in_package: + job_data_factor = ( + job_data.ncpus * job_data.running_time()) + # if job_data_factor <= 0: + # warning_messages.append("Approximation | Job {0} requires {1} ncpus and has {2} running time, resulting in a 0 energy approximation. This job will be ignored.".format( + # job_data.job_name, job_data.ncpus, job_data.running_time())) + name_to_current_job[job_data.job_name].energy = round(job_data_factor / + resources_total * sum_total_energy, 2) + # else: + # warning_messages.append( + # "Approximation | Aproximation for wrapper {0} failed.".format(package)) + else: + # Check if it is 1 to 1 + if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: + # It is 1 to 1 + for i in range(0, len(jobs_in_package)): + name_to_current_job[jobs_in_package[i] + .job_name].energy = wrapper_jobs[i].energy + name_to_current_job[jobs_in_package[i] + .job_name].submit = wrapper_jobs[i].submit + name_to_current_job[jobs_in_package[i] + .job_name].start = wrapper_jobs[i].start + name_to_current_job[jobs_in_package[i] + .job_name].finish = wrapper_jobs[i].finish + # else: + # warning_messages.append( + # "Approximation | Wrapper {0} did not have enough or precise information to calculate an exact mapping.".format(package)) + # else: + # warning_messages.append( + # "Approximation | Wrapper {0} does not have energy information, it will be ignored.".format(package)) + + for job_data in current_job_data: + if job_data.rowtype == 2 and len(job_data.extra_data.keys()) > 0: + keys = [x for x in job_data.extra_data.keys() + if x != "parents" and '.' not in x] + if len(keys) > 0: + found_energy = job_data.extra_data[keys[0]]["energy"] + # Resort to batch if main is NA + found_energy = found_energy if found_energy != "NA" else ( + job_data.extra_data[keys[0] + ".batch"]["energy"] if keys[0] + ".batch" in job_data.extra_data.keys() else found_energy) + job_data.energy = parse_output_number( + found_energy) + else: + continue + # warning_messages.append( + # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys)) + self.update_energy_values( + [job for job in current_job_data if job.require_update == True]) + + except Exception as exp: + Log.info(traceback.format_exc()) + Log.warning( + "Autosubmit couldn't process the SLURM. Exception {0}".format(str(exp))) + pass def update_energy_values(self, update_job_data): """Updating energy values