From 844d80a0341f5983149fadaceebb16be97589557 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Wed, 10 Jul 2024 15:43:30 +0200 Subject: [PATCH 1/3] include not considered in performance response --- autosubmit_api/common/utils.py | 58 +++++++++++-------- .../performance/performance_metrics.py | 36 +++++++----- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/autosubmit_api/common/utils.py b/autosubmit_api/common/utils.py index e0ce68f..2162f09 100644 --- a/autosubmit_api/common/utils.py +++ b/autosubmit_api/common/utils.py @@ -7,7 +7,7 @@ import math from collections import namedtuple from bscearth.utils.date import date2str from dateutil.relativedelta import * -from typing import List +from typing import List, Tuple class Section: CONFIG = "CONFIG" @@ -71,29 +71,39 @@ def parse_number_processors(processors_str): except: return 1 -def get_jobs_with_no_outliers(jobs: List): - """ Detects outliers and removes them from the returned list """ - new_list = [] - data_run_times = [job.run_time for job in jobs] - # print(data_run_times) - if len(data_run_times) <= 1: - return jobs - - mean = statistics.mean(data_run_times) - std = statistics.stdev(data_run_times) - - # print("mean {0} std {1}".format(mean, std)) - if std == 0: - return jobs - - for job in jobs: - z_score = (job.run_time - mean) / std - # print("{0} {1} {2}".format(job.name, np.abs(z_score), job.run_time)) - if math.fabs(z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: - new_list.append(job) - # else: - # print(" OUTLIED {0} {1} {2}".format(job.name, np.abs(z_score), job.run_time)) - return new_list + +def separate_job_outliers(jobs: List) -> Tuple[List, List]: + """ + Detect job outliers and separate them from the job list + """ + new_list = [] + outliers = [] + data_run_times = [job.run_time for job in jobs] + + if len(data_run_times) <= 1: + return (jobs, outliers) + + mean = statistics.mean(data_run_times) + std = statistics.stdev(data_run_times) + + if std == 0: + return (jobs, outliers) + + for job in jobs: + z_score = (job.run_time - mean) / std + if math.fabs(z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: + new_list.append(job) + else: + outliers.append(job) + return (new_list, outliers) + + +def get_jobs_with_no_outliers(jobs: List) -> List: + """ + Returns a list of jobs without outliers + """ + return separate_job_outliers(jobs)[0] + def date_plus(date, chunk_unit, chunk, chunk_size=1): if not date: diff --git a/autosubmit_api/performance/performance_metrics.py b/autosubmit_api/performance/performance_metrics.py index e1be227..a60caea 100644 --- a/autosubmit_api/performance/performance_metrics.py +++ b/autosubmit_api/performance/performance_metrics.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import math import traceback from autosubmit_api.logger import logger from autosubmit_api.common import utils as utils @@ -10,8 +9,7 @@ from typing import List, Dict class PerformanceMetrics(object): """ Manages Performance Metrics """ - def __init__(self, expid, joblist_helper): - # type: (str, JobListHelper) -> None + def __init__(self, expid: str, joblist_helper: JobListHelper): self.expid = expid self.error = False self.error_message = "" @@ -23,10 +21,13 @@ class PerformanceMetrics(object): self.JPSY = 0 # type: float self.RSYPD = 0 # type: float self.processing_elements = 1 - self._considered = [] # type : List + self._considered: List[Dict] = [] + self._not_considered: List[Dict] = [] self._sim_processors = 1 # type : int self.warnings = [] # type : List self.post_jobs_total_time_average = 0 # type : int + self.sim_jobs_valid: List[SimJob] = [] + self.sim_jobs_invalid: List[SimJob] = [] try: self.joblist_helper = joblist_helper # type: JobListHelper self.configuration_facade = self.joblist_helper.configuration_facade # type : AutosubmitConfigurationFacade @@ -39,14 +40,14 @@ class PerformanceMetrics(object): self.error_message = "Error while preparing data sources: {0}".format(str(exp)) logger.error((traceback.format_exc())) logger.error((str(exp))) - if self.error == False: + if self.error is False: self.configuration_facade.update_sim_jobs(self.pkl_organizer.sim_jobs) # This will assign self.configuration_facade.sim_processors to all the SIM jobs self._update_jobs_with_time_data() self._calculate_post_jobs_total_time_average() - self.sim_jobs_valid: List[SimJob] = utils.get_jobs_with_no_outliers(self.pkl_organizer.get_completed_section_jobs(utils.JobSection.SIM)) + self.sim_jobs_valid, self.sim_jobs_invalid = utils.separate_job_outliers(self.pkl_organizer.get_completed_section_jobs(utils.JobSection.SIM)) self._identify_outlied_jobs() self._update_valid_sim_jobs_with_post_data() - self._add_valid_sim_jobs_to_considered() + self._populate_considered_jobs() self._calculate_total_sim_queue_time() self._calculate_total_sim_run_time() self._calculate_global_metrics() @@ -94,9 +95,15 @@ class PerformanceMetrics(object): simjob.set_post_jobs_total_average(self.post_jobs_total_time_average) # self._add_to_considered(simjob) - def _add_valid_sim_jobs_to_considered(self): + def _populate_considered_jobs(self): + """ + Format valid and invalid jobs to be added to the final JSON + """ for simjob in self.sim_jobs_valid: - self._add_to_considered(simjob) + self._considered.append(self._sim_job_to_dict(simjob)) + + for simjob in self.sim_jobs_invalid: + self._not_considered.append(self._sim_job_to_dict(simjob)) def _calculate_total_sim_run_time(self): self.total_sim_run_time = sum(job.run_time for job in self.sim_jobs_valid) @@ -139,8 +146,7 @@ class PerformanceMetrics(object): return round(CHSY, 4) return 0 - def _get_RSYPD_support_list(self): - # type: () -> List[Job] + def _get_RSYPD_support_list(self) -> List[Job]: """ The support list for the divisor can have a different source """ completed_transfer_jobs = self.pkl_organizer.get_completed_section_jobs(utils.JobSection.TRANSFER) completed_clean_jobs = self.pkl_organizer.get_completed_section_jobs(utils.JobSection.CLEAN) @@ -159,9 +165,8 @@ class PerformanceMetrics(object): divisor = max(support_list[-1].finish_ts - self.sim_jobs_valid[0].start_ts, 0.0) return divisor - - def _add_to_considered(self, simjob: SimJob): - self._considered.append({ + def _sim_job_to_dict(self, simjob: SimJob): + return { "name": simjob.name, "queue": simjob.queue_time, "running": simjob.run_time, @@ -173,7 +178,7 @@ class PerformanceMetrics(object): "yps": simjob.years_per_sim, "ncpus": simjob.ncpus, "chunk": simjob.chunk - }) + } def to_json(self): # type: () -> Dict @@ -184,6 +189,7 @@ class PerformanceMetrics(object): "JPSY": self.JPSY, "Parallelization": self.processing_elements, "considered": self._considered, + "not_considered": self._not_considered, "error": self.error, "error_message": self.error_message, "warnings_job_data": self.warnings, -- GitLab From bdef706f18af39fb5d4ea78340c636e96151d08f Mon Sep 17 00:00:00 2001 From: ltenorio Date: Mon, 17 Jun 2024 11:06:49 +0200 Subject: [PATCH 2/3] modified z-score implementation --- autosubmit_api/common/utils.py | 59 +++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/autosubmit_api/common/utils.py b/autosubmit_api/common/utils.py index 2162f09..8321097 100644 --- a/autosubmit_api/common/utils.py +++ b/autosubmit_api/common/utils.py @@ -73,29 +73,42 @@ def parse_number_processors(processors_str): def separate_job_outliers(jobs: List) -> Tuple[List, List]: - """ - Detect job outliers and separate them from the job list - """ - new_list = [] - outliers = [] - data_run_times = [job.run_time for job in jobs] - - if len(data_run_times) <= 1: - return (jobs, outliers) - - mean = statistics.mean(data_run_times) - std = statistics.stdev(data_run_times) - - if std == 0: - return (jobs, outliers) - - for job in jobs: - z_score = (job.run_time - mean) / std - if math.fabs(z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: - new_list.append(job) - else: - outliers.append(job) - return (new_list, outliers) + """ + Detect job outliers and separate them from the job list + + Method: https://www.ibm.com/docs/en/cognos-analytics/11.1.0?topic=terms-modified-z-score + """ + MAD_K = 1.4826 # = 1/(CDF-1(3/4)) https://en.wikipedia.org/wiki/Median_absolute_deviation#Derivation + MEANAD_K = 1.2533 # Ratio STD / MeanAD - Geary (1935) = 1/sqrt(2/pi) + + new_list = [] + outliers = [] + data_run_times = [job.run_time for job in jobs] + + if len(data_run_times) <= 1: + return (jobs, outliers) + + mean = statistics.mean(data_run_times) + mean_ad = statistics.mean([abs(x - mean) for x in data_run_times]) + + median = statistics.median(data_run_times) + mad = statistics.median([abs(x - median) for x in data_run_times]) + + if mad == 0 and mean_ad == 0: + return (jobs, outliers) + + for job in jobs: + if mad == 0: + modified_z_score = (job.run_time - median) / (MEANAD_K*mean_ad) + else: + modified_z_score = (job.run_time - median) / (MAD_K*mad) + + if math.fabs(modified_z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: + new_list.append(job) + else: + outliers.append(job) + + return (new_list, outliers) def get_jobs_with_no_outliers(jobs: List) -> List: -- GitLab From 744f0ffd620cc2a2f8866214aac8d108029150c7 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Mon, 15 Jul 2024 11:17:04 +0200 Subject: [PATCH 3/3] add outlier test --- autosubmit_api/common/utils.py | 19 +++++++++++----- tests/test_common.py | 41 ++++++++++++++++++++++++++++++++++ tests/test_endpoints_v3.py | 15 ++++++++++--- 3 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 tests/test_common.py diff --git a/autosubmit_api/common/utils.py b/autosubmit_api/common/utils.py index 8321097..bcd209b 100644 --- a/autosubmit_api/common/utils.py +++ b/autosubmit_api/common/utils.py @@ -74,19 +74,21 @@ def parse_number_processors(processors_str): def separate_job_outliers(jobs: List) -> Tuple[List, List]: """ - Detect job outliers and separate them from the job list + Detect job outliers and separate them from the job list. + Zero (and negative) run times are considered outliers. Method: https://www.ibm.com/docs/en/cognos-analytics/11.1.0?topic=terms-modified-z-score """ MAD_K = 1.4826 # = 1/(CDF-1(3/4)) https://en.wikipedia.org/wiki/Median_absolute_deviation#Derivation MEANAD_K = 1.2533 # Ratio STD / MeanAD - Geary (1935) = 1/sqrt(2/pi) - new_list = [] - outliers = [] - data_run_times = [job.run_time for job in jobs] + data_run_times = [job.run_time for job in jobs if job.run_time > 0] if len(data_run_times) <= 1: - return (jobs, outliers) + return ( + [job for job in jobs if job.run_time > 0], + [job for job in jobs if job.run_time <= 0] + ) mean = statistics.mean(data_run_times) mean_ad = statistics.mean([abs(x - mean) for x in data_run_times]) @@ -95,8 +97,13 @@ def separate_job_outliers(jobs: List) -> Tuple[List, List]: mad = statistics.median([abs(x - median) for x in data_run_times]) if mad == 0 and mean_ad == 0: - return (jobs, outliers) + return ( + [job for job in jobs if job.run_time > 0], + [job for job in jobs if job.run_time <= 0] + ) + new_list = [] + outliers = [] for job in jobs: if mad == 0: modified_z_score = (job.run_time - median) / (MEANAD_K*mean_ad) diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 0000000..9d365e8 --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,41 @@ +from typing import List +import pytest +from autosubmit_api.common import utils +from autosubmit_api.components.jobs.job_factory import SimJob + + +@pytest.mark.parametrize( + "valid_run_times, outlier_run_times, zeros", + [ + ([200, 300, 250], [4], 2), + ([240, 300, 250, 320, 280], [4, 1500], 0), + ([240, 300, 250, 320, 280], [4, 1500], 200), + ([], [], 0), + ([1], [], 0), + ([1], [], 20), + ], +) +def test_outlier_detection( + valid_run_times: List[int], outlier_run_times: List[int], zeros: int +): + """ + Test outlier detection method with different run times. + + :param valid_run_times: List of valid run times. + :param outlier_run_times: List of outlier run times. + :param zeros: Number of jobs with run time equal to 0. + """ + + zeros_run_times = [0] * zeros + + # Mock jobs with run times + jobs = [] + for run_time in valid_run_times + outlier_run_times + zeros_run_times: + aux_job = SimJob() + aux_job._run_time = run_time + jobs.append(aux_job) + + valid_jobs, outliers = utils.separate_job_outliers(jobs) + + assert len(valid_jobs) == len(valid_run_times) + assert len(outliers) == (len(outlier_run_times) + zeros) diff --git a/tests/test_endpoints_v3.py b/tests/test_endpoints_v3.py index e38c75b..b5d0113 100644 --- a/tests/test_endpoints_v3.py +++ b/tests/test_endpoints_v3.py @@ -116,15 +116,21 @@ class TestPerformance: response = fixture_client.get(self.endpoint.format(expid=expid)) resp_obj: dict = response.get_json() assert resp_obj["error_message"] == "" - assert resp_obj["error"] == False + assert resp_obj["error"] is False assert resp_obj["Parallelization"] == 8 + assert isinstance(resp_obj["considered"], list) and isinstance( + resp_obj["not_considered"], list + ) expid = "a3tb" response = fixture_client.get(self.endpoint.format(expid=expid)) resp_obj: dict = response.get_json() assert resp_obj["error_message"] == "" - assert resp_obj["error"] == False + assert resp_obj["error"] is False assert resp_obj["Parallelization"] == 768 + assert isinstance(resp_obj["considered"], list) and isinstance( + resp_obj["not_considered"], list + ) def test_parallelization_platforms(self, fixture_client: FlaskClient): """ @@ -134,8 +140,11 @@ class TestPerformance: response = fixture_client.get(self.endpoint.format(expid=expid)) resp_obj: dict = response.get_json() assert resp_obj["error_message"] == "" - assert resp_obj["error"] == False + assert resp_obj["error"] is False assert resp_obj["Parallelization"] == 16 + assert isinstance(resp_obj["considered"], list) and isinstance( + resp_obj["not_considered"], list + ) class TestTree: -- GitLab