diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 917504528807e19869151e63b9395feec8a76af5..5d9716b131636d4b5634a796e3db1e71db6a3683 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1201,10 +1201,10 @@ class JobDataStructure(MainDataBase): return None def process_current_run_collection(self): - """Post-process for job_data. + """Post-process source output for job_data. - Returns: - ([job_data], [warning_messaages]): job data processes, messages + :return: job data processes, messages + :rtype: ([job_data], [warning_messaages]) """ try: @@ -1252,7 +1252,7 @@ class JobDataStructure(MainDataBase): no_process = False for job_data in jobs_in_package: # If it is a wrapper job step - if "energy" in job_data.extra_data.keys() and job_data.extra_data["energy"] != "NA": + if job_data.extra_data is not None and isinstance(job_data.extra_data, dict) and job_data.extra_data.get("energy", None) and job_data.extra_data["energy"] != "NA": name_to_current_job[job_data.job_name].energy = parse_output_number( job_data.extra_data["energy"]) sum_total_energy += name_to_current_job[job_data.job_name].energy @@ -1262,7 +1262,7 @@ class JobDataStructure(MainDataBase): jobs_in_package, key=lambda x: len(str(x.extra_data))) # Identify job steps keys_step = [ - y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] + y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else [] if len(keys_step) > 0: # Steps found keys_step.sort( @@ -1273,7 +1273,7 @@ class JobDataStructure(MainDataBase): if "submit" not in description_job.extra_data[key].keys(): keys_found = False break - + # Build wrapper jobs as job steps for key in keys_step: wrapper_jobs.append(JobStepExtraData( key, description_job.extra_data[key])) @@ -1285,9 +1285,10 @@ class JobDataStructure(MainDataBase): # Approximation not_1_to_1 = False else: - # Identify main step + # No jobs steps, identify main step main_step = [ - y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] + y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else [] + # For some reason, a packaged jobs can arrive as a single job slurm output if len(main_step) > 0 and main_step[0] not in ['AveRSS', 'finish', 'ncpus', 'submit', 'MaxRSS', 'start', 'nnodes', 'energy']: # Check only first one main_step = [main_step[0]] @@ -1296,7 +1297,7 @@ class JobDataStructure(MainDataBase): if key not in description_job.extra_data.keys() or "submit" not in description_job.extra_data[key].keys(): keys_found = False break - # Build wrapper jobs + # Build wrapper jobs as job steps for key in main_step: wrapper_jobs.append(JobStepExtraData( key, description_job.extra_data[key])) @@ -1320,15 +1321,10 @@ class JobDataStructure(MainDataBase): # warning_messages.append( # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy)) # Completing job information if necessary - dropped_jobs = [] - for i in range(0, len(jobs_in_package)): - if jobs_in_package[i].running_time() <= 0: - # Needs to be completed - # Dropping job from package list - dropped_jobs.append(i) - #dropped_job = jobs_in_package.pop(i) - for j in dropped_jobs: - jobs_in_package.pop(j) + # dropped_jobs = [job_data.job_name for job_data in jobs_in_package if job_data.running_time() <= 0] + jobs_in_package = [ + job_data for job_data in jobs_in_package if job_data.running_time() > 0] + # After completion is finished, calculate total resources to be approximated resources_total = sum( z.ncpus * z.running_time() for z in jobs_in_package) * 1.0 @@ -1336,9 +1332,6 @@ class JobDataStructure(MainDataBase): for job_data in jobs_in_package: job_data_factor = ( job_data.ncpus * job_data.running_time()) - # if job_data_factor <= 0: - # warning_messages.append("Approximation | Job {0} requires {1} ncpus and has {2} running time, resulting in a 0 energy approximation. This job will be ignored.".format( - # job_data.job_name, job_data.ncpus, job_data.running_time())) name_to_current_job[job_data.job_name].energy = round(job_data_factor / resources_total * sum_total_energy, 2) # else: @@ -1346,6 +1339,8 @@ class JobDataStructure(MainDataBase): # "Approximation | Aproximation for wrapper {0} failed.".format(package)) else: # Check if it is 1 to 1 + # If it is 1 to 1, then jobs in package is equal to wrapper jobs in size, so we can assign energy based on order of jobs. + # Needs more guarantees but so far it works. if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: # It is 1 to 1 for i in range(0, len(jobs_in_package)): @@ -1357,15 +1352,10 @@ class JobDataStructure(MainDataBase): .job_name].start = wrapper_jobs[i].start name_to_current_job[jobs_in_package[i] .job_name].finish = wrapper_jobs[i].finish - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} did not have enough or precise information to calculate an exact mapping.".format(package)) - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} does not have energy information, it will be ignored.".format(package)) for job_data in current_job_data: - if job_data.rowtype == 2 and len(job_data.extra_data.keys()) > 0: + # Making VERY sure + if job_data.rowtype == 2 and job_data.extra_data and isinstance(job_data.extra_data, dict) and len(job_data.extra_data) > 0: keys = [x for x in job_data.extra_data.keys() if x != "parents" and '.' not in x] if len(keys) > 0: @@ -1379,13 +1369,16 @@ class JobDataStructure(MainDataBase): continue # warning_messages.append( # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys)) + # Updating detected energy values self.update_energy_values( [job for job in current_job_data if job.require_update == True]) except Exception as exp: + # stack = traceback.extract_stack() + # (filename, line, procname, text) = stack[-1] Log.info(traceback.format_exc()) Log.warning( - "Autosubmit couldn't process the SLURM. Exception {0}".format(str(exp))) + "Autosubmit couldn't process the SLURM. ".format(str(exp))) pass def update_energy_values(self, update_job_data): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 34f25a57956ec171819b773901fac4bbd78ee129..277fb8f42df856d16724bb0b6a33e3a5f3110824 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1363,7 +1363,7 @@ class JobList(object): if m_time_job_conf: if m_time_job_conf > m_time_db: Log.info( - "File jobs_{0}.conf has been modified since the last time the structured was cached.".format(self.expid)) + "File jobs_{0}.conf has been modified since the last time the structure persistence was saved.".format(self.expid)) structure_valid = False else: Log.info(