From 58cef5fa5bc21ab81930695c753b881ef9477373 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Wed, 4 Sep 2024 12:19:55 +0200 Subject: [PATCH] fix inconsistency due to different chunk sizes --- .../components/representations/graph/graph.py | 31 +++++++++++++-- .../components/representations/tree/tree.py | 38 +++++++++++++++---- .../history/data_classes/job_data.py | 4 +- autosubmit_api/history/experiment_history.py | 17 ++++++++- .../performance/performance_metrics.py | 12 ++++-- 5 files changed, 84 insertions(+), 18 deletions(-) diff --git a/autosubmit_api/components/representations/graph/graph.py b/autosubmit_api/components/representations/graph/graph.py index c05b7ce..8c46004 100644 --- a/autosubmit_api/components/representations/graph/graph.py +++ b/autosubmit_api/components/representations/graph/graph.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import networkx as nx +from autosubmit_api.builders.experiment_history_builder import ExperimentHistoryBuilder, ExperimentHistoryDirector from autosubmit_api.performance import utils as PUtils # import common.utils as utils from autosubmit_api.common.utils import Status, get_average_total_time @@ -226,10 +227,32 @@ class GraphRepresentation(object): self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): + # Get last run data for each job + try: + experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(self.expid)).build_reader_experiment_history() + last_jobs_run = experiment_history.get_all_jobs_last_run_dict() + except Exception: + last_jobs_run = {} + + # Generate node data for job_name in self.job_dictionary: job = self.job_dictionary[job_name] self._calculate_max_children_parent(len(job.children_names), len(job.parents_names)) - ini_date, end_date = job.get_date_ini_end(self.joblist_loader.chunk_size, self.joblist_loader.chunk_unit) + + # Get chunk size and unit + chunk_size = self.joblist_loader.chunk_size + chunk_unit = self.joblist_loader.chunk_unit + last_run = last_jobs_run.get(job_name) + if last_run and last_run.chunk_unit and last_run.chunk_size: + chunk_unit, chunk_size = last_run.chunk_unit, last_run.chunk_size + + # Calculate dates + ini_date, end_date = job.get_date_ini_end(chunk_size, chunk_unit) + + # Calculate performance metrics + SYPD = PUtils.calculate_SYPD_perjob(chunk_unit, chunk_size, job.chunk, job.run_time, job.status) + ASYPD = PUtils.calculate_ASYPD_perjob(chunk_unit, chunk_size, job.chunk, job.total_time, self.average_post_time, job.status) + self.nodes.append({ "id": job.name, "internal_id": job.name, @@ -239,13 +262,15 @@ class GraphRepresentation(object): "status_color": job.status_color, "platform_name": job.platform, "chunk": job.chunk, + "chunk_size": chunk_size, + "chunk_unit": chunk_unit, "package": job.package, "wrapper": job.package, "member": job.member, "date": ini_date, "date_plus": end_date, - "SYPD": PUtils.calculate_SYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.run_time, job.status), - "ASYPD": PUtils.calculate_ASYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.total_time, self.average_post_time, job.status), + "SYPD": SYPD, + "ASYPD": ASYPD, "minutes_queue": job.queue_time, "minutes": job.run_time, "submit": job.submit_datetime, diff --git a/autosubmit_api/components/representations/tree/tree.py b/autosubmit_api/components/representations/tree/tree.py index 1e7d4c7..cc3a65d 100644 --- a/autosubmit_api/components/representations/tree/tree.py +++ b/autosubmit_api/components/representations/tree/tree.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +from autosubmit_api.builders.experiment_history_builder import ExperimentHistoryBuilder, ExperimentHistoryDirector from autosubmit_api.components.jobs import utils as JUtils from autosubmit_api.performance import utils as PUtils from autosubmit_api.components.jobs.joblist_loader import JobListLoader @@ -53,10 +54,10 @@ class TreeRepresentation(object): self._distributed_dates[job.date] = None self._distributed_members[job.member] = None elif job.date is not None and job.member is None: - parents_members = {self.joblist_loader.job_dictionary[parent_name].member for parent_name in job.parents_names} - children_members = {self.joblist_loader.job_dictionary[children_name].member for children_name in job.children_names} - intersection_member_parent = self.joblist_loader.members & parents_members - intersection_member_children = self.joblist_loader.members & children_members + # parents_members = {self.joblist_loader.job_dictionary[parent_name].member for parent_name in job.parents_names} + # children_members = {self.joblist_loader.job_dictionary[children_name].member for children_name in job.children_names} + # intersection_member_parent = self.joblist_loader.members & parents_members + # intersection_member_children = self.joblist_loader.members & children_members self._date_member_distribution.setdefault((job.date, DEFAULT_MEMBER), []).append(job) self._distributed_dates[job.date] = None self._distributed_members[DEFAULT_MEMBER] = None @@ -259,8 +260,28 @@ class TreeRepresentation(object): self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): + # Get last run data for each job + try: + experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(self.expid)).build_reader_experiment_history() + last_jobs_run = experiment_history.get_all_jobs_last_run_dict() + except Exception: + last_jobs_run = {} + for job in self.joblist_loader.jobs: - ini_date, end_date = job.get_date_ini_end(self.joblist_loader.chunk_size, self.joblist_loader.chunk_unit) + # Get chunk size and unit + chunk_size = self.joblist_loader.chunk_size + chunk_unit = self.joblist_loader.chunk_unit + last_run = last_jobs_run.get(job.name) + if last_run and last_run.chunk_unit and last_run.chunk_size: + chunk_unit, chunk_size = last_run.chunk_unit, last_run.chunk_size + + # Calculate dates + ini_date, end_date = job.get_date_ini_end(chunk_size, chunk_unit) + + # Calculate performance metrics + SYPD = PUtils.calculate_SYPD_perjob(chunk_unit, chunk_size, job.chunk, job.run_time, job.status) + ASYPD = PUtils.calculate_ASYPD_perjob(chunk_unit, chunk_size, job.chunk, job.total_time, self.average_post_time, job.status) + self.nodes.append({ "id": job.name, "internal_id": job.name, @@ -269,13 +290,14 @@ class TreeRepresentation(object): "status_code": job.status, "platform_name": job.platform, "chunk": job.chunk, + "chunk_size": chunk_size, + "chunk_unit": chunk_unit, "member": job.member, "title" : job.tree_title, "date": ini_date, "date_plus": end_date, - "SYPD": PUtils.calculate_SYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.run_time, job.status), - "ASYPD": PUtils.calculate_ASYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.total_time, self.average_post_time, job.status), - "minutes_queue": job.queue_time, + "SYPD": SYPD, + "ASYPD": ASYPD, "minutes": job.run_time, "submit": job.submit_datetime, "start": job.start_datetime, diff --git a/autosubmit_api/history/data_classes/job_data.py b/autosubmit_api/history/data_classes/job_data.py index 897db9a..ad98169 100644 --- a/autosubmit_api/history/data_classes/job_data.py +++ b/autosubmit_api/history/data_classes/job_data.py @@ -62,11 +62,11 @@ class JobData(object): self.job_id = job_id if job_id else 0 try: self.extra_data_parsed = loads(extra_data) - except Exception as exp: + except Exception: self.extra_data_parsed = {} # Fail fast self.extra_data = extra_data self.nnodes = nnodes - self.run_id = run_id + self.run_id: int = run_id self.require_update = False # DB VERSION 15 attributes self.MaxRSS = MaxRSS diff --git a/autosubmit_api/history/experiment_history.py b/autosubmit_api/history/experiment_history.py index b2643c6..b3dcbce 100644 --- a/autosubmit_api/history/experiment_history.py +++ b/autosubmit_api/history/experiment_history.py @@ -23,7 +23,7 @@ from autosubmit_api.history.data_classes.job_data import JobData from autosubmit_api.history.data_classes.experiment_run import ExperimentRun from autosubmit_api.history.internal_logging import Logging from autosubmit_api.config.basicConfig import APIBasicConfig -from typing import List, Dict, Tuple, Any +from typing import List, Dict, Optional, Tuple, Any SECONDS_WAIT_PLATFORM = 60 @@ -101,3 +101,18 @@ class ExperimentHistory(): "err": job_data_dc.err }) return result + + def get_all_jobs_last_run_dict(self) -> Dict[str, Optional[ExperimentRun]]: + """ + Gets the last run of all jobs in the experiment + """ + # Map all experiment runs by run_id + runs = self.manager.get_experiment_runs_dcs() + runs_dict = {run.run_id: run for run in runs} + + # Map last jobs data by job name + last_jobs_data = self.manager.get_all_last_job_data_dcs() + return { + job_data.job_name: runs_dict.get(job_data.run_id) + for job_data in last_jobs_data + } \ No newline at end of file diff --git a/autosubmit_api/performance/performance_metrics.py b/autosubmit_api/performance/performance_metrics.py index f7a55f8..b21dbc8 100644 --- a/autosubmit_api/performance/performance_metrics.py +++ b/autosubmit_api/performance/performance_metrics.py @@ -17,6 +17,7 @@ class PerformanceMetrics(object): self.error_message = "" self.total_sim_run_time: int = 0 self.total_sim_queue_time: int = 0 + self.valid_sim_yps_sum: float = 0.0 self.SYPD: float = 0 self.ASYPD: float = 0 self.CHSY: float = 0 @@ -61,9 +62,11 @@ class PerformanceMetrics(object): self.joblist_helper.update_with_timedata(self.pkl_organizer.post_jobs) self.joblist_helper.update_with_timedata(self.pkl_organizer.clean_jobs) self.joblist_helper.update_with_timedata(self.pkl_organizer.transfer_jobs) + # Update yps with the latest historical data self.joblist_helper.update_with_yps_per_run(self.pkl_organizer.sim_jobs) def _calculate_global_metrics(self): + self.valid_sim_yps_sum = sum(job.years_per_sim for job in self.sim_jobs_valid) self.SYPD = self._calculate_SYPD() self.ASYPD = self._calculate_ASYPD() self.RSYPD = self._calculate_RSYPD() @@ -116,21 +119,22 @@ class PerformanceMetrics(object): def _calculate_SYPD(self): if self.total_sim_run_time > 0: - SYPD = ((self.configuration_facade.current_years_per_sim * len(self._considered) * utils.SECONDS_IN_A_DAY) / - (self.total_sim_run_time)) + SYPD = ((self.valid_sim_yps_sum * utils.SECONDS_IN_A_DAY) / + (self.total_sim_run_time)) return round(SYPD, 4) return 0 def _calculate_ASYPD(self): if len(self.sim_jobs_valid) > 0 and (self.total_sim_run_time + self.total_sim_queue_time + self.post_jobs_total_time_average)>0: - ASYPD = (self.configuration_facade.current_years_per_sim * len(self.sim_jobs_valid) * utils.SECONDS_IN_A_DAY) / (self.total_sim_run_time + self.total_sim_queue_time + self.post_jobs_total_time_average) + ASYPD = ((self.valid_sim_yps_sum * utils.SECONDS_IN_A_DAY) / + (self.total_sim_run_time + self.total_sim_queue_time + self.post_jobs_total_time_average)) return round(ASYPD, 4) return 0 def _calculate_RSYPD(self): divisor = self._get_RSYPD_divisor() if len(self.sim_jobs_valid) > 0 and divisor > 0: - RSYPD = (self.configuration_facade.current_years_per_sim * len(self.sim_jobs_valid) * utils.SECONDS_IN_A_DAY) / divisor + RSYPD = (self.valid_sim_yps_sum * utils.SECONDS_IN_A_DAY) / divisor return round(RSYPD, 4) return 0 -- GitLab