diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 65f3921bfda9585deed128f66de5634f73a46bfa..1d51a09bbab2b14205fd577c7ce8f36b66fa462d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -36,6 +36,7 @@ from bscearth.utils.date import date2str from monitor.monitor import Monitor from database.db_common import get_autosubmit_version, check_experiment_exists from database.db_common import delete_experiment, update_experiment_descrip_version +from database.db_structure import get_structure from experiment.experiment_common import copy_experiment from experiment.experiment_common import new_experiment from database.db_common import create_db @@ -45,6 +46,7 @@ from job.job_list_persistence import JobListPersistenceDb from job.job_package_persistence import JobPackagePersistence from job.job_packages import JobPackageThread, JobPackageBase from job.job_list import JobList +from job.job_utils import SubJob, SubJobManager from job.job import Job from git.autosubmit_git import AutosubmitGit from job.job_common import Status @@ -86,6 +88,7 @@ from history.experiment_history import ExperimentHistory from typing import List import history.utils as HUtils import helpers.autosubmit_helper as AutosubmitHelper +import statistics.utils as StatisticsUtils """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit """ @@ -2360,51 +2363,48 @@ class Autosubmit: :param hide: hides plot window :type hide: bool """ - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) Log.info("Loading jobs...") as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - Log.debug("Job list restored from {0} files", pkl_dir) - # Filter by job section - if filter_type: - ft = filter_type - Log.debug(ft) - if ft == 'Any': - job_list = job_list.get_job_list() - else: - job_list = [job for job in job_list.get_job_list() - if job.section == ft] - else: - ft = 'Any' - job_list = job_list.get_job_list() - # Filter by time (hours before) - period_fi = datetime.datetime.now().replace(second=0, microsecond=0) - if filter_period: - period_ini = period_fi - datetime.timedelta(hours=filter_period) - Log.debug(str(period_ini)) - job_list = [job for job in job_list if - job.check_started_after(period_ini) or job.check_running_after(period_ini)] - else: - period_ini = None - - if len(job_list) > 0: + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + Log.debug("Job list restored from {0} files", pkl_dir) + jobs = StatisticsUtils.filter_by_section(job_list.get_job_list(), filter_type) + jobs, period_ini, period_fi = StatisticsUtils.filter_by_time_period(jobs, filter_period) + # print("After time {} {} {}".format(len(jobs), period_ini, period_fi)) + # Package information + job_to_package, package_to_jobs, _, _ = JobList.retrieve_packages(BasicConfig, expid, [job.name for job in job_list.get_job_list()]) + queue_time_fixes = {} + if (job_to_package): + current_table_structure = get_structure(expid, BasicConfig.STRUCTURES_DIR) + subjobs = [] + for job in job_list.get_job_list(): + job_info = JobList.retrieve_times(job.status, job.name, job._tmp_path, make_exception=False, job_times=None, seconds=True, job_data_collection=None) + time_total = (job_info.queue_time + job_info.run_time) if job_info else 0 + subjobs.append( + SubJob(job.name, + job_to_package.get(job.name, None), + job_info.queue_time if job_info else 0, + job_info.run_time if job_info else 0, + time_total, + job_info.status if job_info else Status.UNKNOWN) + ) + queue_time_fixes = SubJobManager(subjobs, job_to_package, package_to_jobs, current_table_structure).get_collection_of_fixes_applied() + + if len(jobs) > 0: try: Log.info("Plotting stats...") monitor_exp = Monitor() # noinspection PyTypeChecker - monitor_exp.generate_output_stats( - expid, job_list, file_format, period_ini, period_fi, not hide) + monitor_exp.generate_output_stats(expid, jobs, file_format, period_ini, period_fi, not hide, queue_time_fixes) Log.result("Stats plot ready") except Exception as e: raise AutosubmitCritical( "Stats couldn't be shown", 7061, str(e)) else: Log.info("There are no {0} jobs in the period from {1} to {2}...".format( - ft, period_ini, period_fi)) + filter_type, period_ini, period_fi)) return True @staticmethod diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index 484f015fbd6ed63a783d37cc9866e4d328ce2413..13c522a86d971bdee0aaa8e9d6c11bd5982a426a 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -27,6 +27,7 @@ import traceback import sqlite3 import copy from datetime import datetime +from typing import Dict, List from log.log import Log, AutosubmitError, AutosubmitCritical # from networkx import DiGraph @@ -34,14 +35,14 @@ from log.log import Log, AutosubmitError, AutosubmitCritical def get_structure(exp_id, structures_path): + # type: (str, str) -> Dict[str, List[str]] """ Creates file of database and table of experiment structure if it does not exist. Returns current structure. :return: Map from experiment name source to name destination :rtype: Dictionary Key: String, Value: List(of String) """ - try: - #pkl_path = os.path.join(exp_path, exp_id, "pkl") + try: if os.path.exists(structures_path): db_structure_path = os.path.join( structures_path, "structure_" + exp_id + ".db") @@ -57,29 +58,20 @@ def get_structure(exp_id, structures_path): UNIQUE(e_from,e_to) );''') create_table(conn, create_table_query) - current_table = _get_exp_structure(db_structure_path) - # print("Current table: ") - # print(current_table) + current_table = _get_exp_structure(db_structure_path) current_table_structure = dict() for item in current_table: _from, _to = item - if _from not in current_table_structure.keys(): - current_table_structure[_from] = list() - if _to not in current_table_structure.keys(): - current_table_structure[_to] = list() - current_table_structure[_from].append(_to) - if (len(current_table_structure.keys()) > 0): - # print("Return structure") - return current_table_structure - else: - return None - else: - # pkl folder not found + current_table_structure.setdefault(_from, []).append(_to) + current_table_structure.setdefault(_to, []) + return current_table_structure + else: raise Exception("Structures folder not found " + str(structures_path)) except Exception as exp: Log.printlog("Get structure error: {0}".format(str(exp)), 6014) Log.debug(traceback.format_exc()) + def create_connection(db_file): diff --git a/autosubmit/helpers/data_transfer.py b/autosubmit/helpers/data_transfer.py new file mode 100644 index 0000000000000000000000000000000000000000..01d7992b9e464743099a7124053c7e1c2e9f0f78 --- /dev/null +++ b/autosubmit/helpers/data_transfer.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import collections + +JobRow = collections.namedtuple( + 'JobRow', ['name', 'queue_time', 'run_time', 'status', 'energy', 'submit', 'start', 'finish', 'ncpus', 'run_id']) \ No newline at end of file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d65b2d491bbba600aa8f97100e3c6601fb841226..47ca12f61c3dff72615a708f9caff6702cbf3939 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -45,6 +45,7 @@ from time import sleep from threading import Thread from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical, AutosubmitError +from typing import List, Union Log.get_logger("Autosubmit") # A wrapper for encapsulate threads , TODO: Python 3+ to be replaced by the < from concurrent.futures > @@ -529,6 +530,7 @@ class Job(object): return self._get_from_total_stats(1) def get_last_retrials(self): + # type: () -> List[Union[datetime.datetime, str]] """ Returns the retrials of a job, including the last COMPLETED run. The selection stops, and does not include, when the previous COMPLETED job is located or the list of registers is exhausted. @@ -1296,7 +1298,7 @@ class Job(object): exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.write_submit_time(self.name, submit=data_time[1], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.name), + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) def write_start_time(self, enabled = False): @@ -1326,7 +1328,7 @@ class Job(object): exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.write_start_time(self.name, start=start_time, status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.name), + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) return True @@ -1366,7 +1368,7 @@ class Job(object): job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, - wrapper_code=get_job_package_code(self.name), children=self.children_names_str) + wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) # Launch second as threaded function only for slurm if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": @@ -1401,19 +1403,19 @@ class Job(object): exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.write_submit_time(self.name, submit=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.name), + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.name), + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) job_data_dc = exp_history.write_finish_time(self.name, finish=total_stats[1], status=total_stats[2], ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, - wrapper_code=get_job_package_code(self.name), children=self.children_names_str) + wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) # Launch second as threaded function only for slurm if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform)) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 394388890d5e8de43246a15a1d98377938dcbc06..1f63648193dfeebd3e19cb70f15300dd81c6b7c4 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -26,9 +26,13 @@ import json import re import os import pickle -from time import localtime, strftime +import traceback +import math + +from time import localtime, strftime, mktime from shutil import move from autosubmit.job.job import Job +from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmit.job.job_dict import DicJobs from autosubmit.job.job_utils import Dependency from autosubmit.job.job_common import Status, bcolors @@ -42,6 +46,7 @@ from threading import Thread, Lock import multiprocessing from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig +from autosubmit.helpers.data_transfer import JobRow from typing import List, Dict import log.fd_show # Log.get_logger("Log.Autosubmit") @@ -2084,3 +2089,279 @@ class JobList(object): "] " if nocolor == True else "") return result + + @staticmethod + def retrieve_packages(BasicConfig, expid, current_jobs=None): + """ + Retrieves dictionaries that map the collection of packages in the experiment + + :param BasicConfig: Basic configuration + :type BasicConfig: Configuration Object + :param expid: Experiment Id + :type expid: String + :param current_jobs: list of names of current jobs + :type current_jobs: list + :return: job to package, package to jobs, package to package_id, package to symbol + :rtype: Dictionary(Job Object, Package), Dictionary(Package, List of Job Objects), Dictionary(String, String), Dictionary(String, String) + """ + # monitor = Monitor() + packages = None + try: + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load(wrapper=False) + except Exception as ex: + print("Wrapper table not found, trying packages.") + packages = None + try: + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load(wrapper=True) + except Exception as exp2: + packages = None + pass + pass + + job_to_package = dict() + package_to_jobs = dict() + package_to_package_id = dict() + package_to_symbol = dict() + if (packages): + try: + for exp, package_name, job_name in packages: + if len(str(package_name).strip()) > 0: + if (current_jobs): + if job_name in current_jobs: + job_to_package[job_name] = package_name + else: + job_to_package[job_name] = package_name + # list_packages.add(package_name) + for name in job_to_package: + package_name = job_to_package[name] + package_to_jobs.setdefault(package_name, []).append(name) + # if package_name not in package_to_jobs.keys(): + # package_to_jobs[package_name] = list() + # package_to_jobs[package_name].append(name) + for key in package_to_jobs: + package_to_package_id[key] = key.split("_")[2] + list_packages = job_to_package.values() + for i in range(len(list_packages)): + if i % 2 == 0: + package_to_symbol[list_packages[i]] = 'square' + else: + package_to_symbol[list_packages[i]] = 'hexagon' + except Exception as ex: + print(traceback.format_exc()) + + return (job_to_package, package_to_jobs, package_to_package_id, package_to_symbol) + + @staticmethod + def retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, job_data_collection=None): + """ + Retrieve job timestamps from database. + :param status_code: Code of the Status of the job + :type status_code: Integer + :param name: Name of the job + :type name: String + :param tmp_path: Path to the tmp folder of the experiment + :type tmp_path: String + :param make_exception: flag for testing purposes + :type make_exception: Boolean + :param job_times: Detail from as_times.job_times for the experiment + :type job_times: Dictionary Key: job name, Value: 5-tuple (submit time, start time, finish time, status, detail id) + :return: minutes the job has been queuing, minutes the job has been running, and the text that represents it + :rtype: int, int, str + """ + status = "NA" + energy = 0 + seconds_queued = 0 + seconds_running = 0 + queue_time = running_time = 0 + submit_time = datetime.timedelta() + start_time = datetime.timedelta() + finish_time = datetime.timedelta() + running_for_min = datetime.timedelta() + queuing_for_min = datetime.timedelta() + + try: + # Getting data from new job database + if job_data_collection is not None: + job_data = next( + (job for job in job_data_collection if job.job_name == name), None) + if job_data: + status = Status.VALUE_TO_KEY[status_code] + if status == job_data.status: + energy = job_data.energy + t_submit = job_data.submit + t_start = job_data.start + t_finish = job_data.finish + # Test if start time does not make sense + if t_start >= t_finish: + if job_times: + _, c_start, c_finish, _, _ = job_times.get( + name, (0, t_start, t_finish, 0, 0)) + t_start = c_start if t_start > c_start else t_start + job_data.start = t_start + + if seconds == False: + queue_time = math.ceil( + job_data.queuing_time() / 60) + running_time = math.ceil( + job_data.running_time() / 60) + else: + queue_time = job_data.queuing_time() + running_time = job_data.running_time() + + if status_code in [Status.SUSPENDED]: + t_submit = t_start = t_finish = 0 + + return JobRow(job_data.job_name, int(queue_time), int(running_time), status, energy, JobList.ts_to_datetime(t_submit), JobList.ts_to_datetime(t_start), JobList.ts_to_datetime(t_finish), job_data.ncpus, job_data.run_id) + + # Using standard procedure + if status_code in [Status.RUNNING, Status.SUBMITTED, Status.QUEUING, Status.FAILED] or make_exception == True: + # COMPLETED adds too much overhead so these values are now stored in a database and retrieved separatedly + submit_time, start_time, finish_time, status = JobList._job_running_check( + status_code, name, tmp_path) + if status_code in [Status.RUNNING, Status.FAILED]: + running_for_min = (finish_time - start_time) + queuing_for_min = (start_time - submit_time) + submit_time = mktime(submit_time.timetuple()) + start_time = mktime(start_time.timetuple()) + finish_time = mktime(finish_time.timetuple()) if status_code in [ + Status.FAILED] else 0 + else: + queuing_for_min = ( + datetime.datetime.now() - submit_time) + running_for_min = datetime.datetime.now() - datetime.datetime.now() + submit_time = mktime(submit_time.timetuple()) + start_time = 0 + finish_time = 0 + + submit_time = int(submit_time) + start_time = int(start_time) + finish_time = int(finish_time) + seconds_queued = queuing_for_min.total_seconds() + seconds_running = running_for_min.total_seconds() + + else: + # For job times completed we no longer use timedeltas, but timestamps + status = Status.VALUE_TO_KEY[status_code] + if (job_times) and status_code not in [Status.READY, Status.WAITING, Status.SUSPENDED]: + if name in job_times.keys(): + submit_time, start_time, finish_time, status, detail_id = job_times[ + name] + seconds_running = finish_time - start_time + seconds_queued = start_time - submit_time + submit_time = int(submit_time) + start_time = int(start_time) + finish_time = int(finish_time) + else: + submit_time = 0 + start_time = 0 + finish_time = 0 + + except Exception as exp: + print(traceback.format_exc()) + return + + seconds_queued = seconds_queued * \ + (-1) if seconds_queued < 0 else seconds_queued + seconds_running = seconds_running * \ + (-1) if seconds_running < 0 else seconds_running + if seconds == False: + queue_time = math.ceil( + seconds_queued / 60) if seconds_queued > 0 else 0 + running_time = math.ceil( + seconds_running / 60) if seconds_running > 0 else 0 + else: + queue_time = seconds_queued + running_time = seconds_running + + return JobRow(name, + int(queue_time), + int(running_time), + status, + energy, + JobList.ts_to_datetime(submit_time), + JobList.ts_to_datetime(start_time), + JobList.ts_to_datetime(finish_time), + 0, + 0) + + @staticmethod + def _job_running_check(status_code, name, tmp_path): + """ + Receives job data and returns the data from its TOTAL_STATS file in an ordered way. + :param status_code: Status of job + :type status_code: Integer + :param name: Name of job + :type name: String + :param tmp_path: Path to the tmp folder of the experiment + :type tmp_path: String + :return: submit time, start time, end time, status + :rtype: 4-tuple in datetime format + """ + # name = "a2d0_20161226_001_124_ARCHIVE" + values = list() + status_from_job = str(Status.VALUE_TO_KEY[status_code]) + now = datetime.datetime.now() + submit_time = now + start_time = now + finish_time = now + current_status = status_from_job + path = os.path.join(tmp_path, name + '_TOTAL_STATS') + # print("Looking in " + path) + if os.path.exists(path): + request = 'tail -1 ' + path + last_line = os.popen(request).readline() + # print(last_line) + + values = last_line.split() + # print(last_line) + try: + if status_code in [Status.RUNNING]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date(values[1]) if len( + values) > 1 else submit_time + finish_time = now + elif status_code in [Status.QUEUING, Status.SUBMITTED, Status.HELD]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date( + values[1]) if len(values) > 1 and values[0] != values[1] else now + elif status_code in [Status.COMPLETED]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date( + values[1]) if len(values) > 1 else submit_time + if len(values) > 3: + finish_time = parse_date(values[len(values) - 2]) + else: + finish_time = submit_time + else: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date(values[1]) if len( + values) > 1 else submit_time + finish_time = parse_date(values[2]) if len( + values) > 2 else start_time + except Exception as exp: + start_time = now + finish_time = now + # NA if reading fails + current_status = "NA" + + current_status = values[3] if (len(values) > 3 and len( + values[3]) != 14) else status_from_job + # TOTAL_STATS last line has more than 3 items, status is different from pkl, and status is not "NA" + if len(values) > 3 and current_status != status_from_job and current_status != "NA": + current_status = "SUSPICIOUS" + return (submit_time, start_time, finish_time, current_status) + + @staticmethod + def ts_to_datetime(timestamp): + if timestamp and timestamp > 0: + # print(datetime.datetime.utcfromtimestamp( + # timestamp).strftime('%Y-%m-%d %H:%M:%S')) + return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') + else: + return None \ No newline at end of file diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 7d7d8a438c651d9e9ac9b5dc4e31dd9b56a2a574..9e9cda943ecc833fc21fd63279bbab1abc78fe77 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -25,6 +25,8 @@ from networkx import DiGraph from networkx import dfs_edges from networkx import NetworkXError from autosubmit.job.job_package_persistence import JobPackagePersistence +from autosubmit.config.basicConfig import BasicConfig +from typing import Dict def transitive_reduction(graph): @@ -43,7 +45,8 @@ def transitive_reduction(graph): reduced_graph.add_edges_from((u, v) for v in u_edges) return reduced_graph -def get_job_package_code(job_name): +def get_job_package_code(expid, job_name): + # type: (str, str) -> int """ Finds the package code and retrieves it. None if no package. @@ -57,8 +60,10 @@ def get_job_package_code(job_name): :rtype: int or None """ try: - packages_wrapper = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"),"job_packages_" + self.expid).load(wrapper=True) - packages_wrapper_plus = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"),"job_packages_" + self.expid).load(wrapper=False) + basic_conf = BasicConfig() + basic_conf.read() + packages_wrapper = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=True) + packages_wrapper_plus = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=False) if (packages_wrapper or packages_wrapper_plus): packages = packages_wrapper if len(packages_wrapper) > len(packages_wrapper_plus) else packages_wrapper_plus for exp, package_name, _job_name in packages: @@ -69,6 +74,7 @@ def get_job_package_code(job_name): pass return 0 + class Dependency(object): """ Class to manage the metadata related with a dependency @@ -98,3 +104,179 @@ class Dependency(object): self.select_members_orig.append(member_relation[1]) else: self.select_members_orig.append([]) + + +class SimpleJob(object): + """ + A simple replacement for jobs + """ + + def __init__(self, name, tmppath, statuscode): + self.name = name + self._tmp_path = tmppath + self.status = statuscode + + +class SubJob(object): + """ + Class to manage package times + """ + + def __init__(self, name, package=None, queue=0, run=0, total=0, status="UNKNOWN"): + self.name = name + self.package = package + self.queue = queue + self.run = run + self.total = total + self.status = status + self.transit = 0 + self.parents = list() + self.children = list() + + +class SubJobManager(object): + """ + Class to manage list of SubJobs + """ + + def __init__(self, subjoblist, job_to_package=None, package_to_jobs=None, current_structure=None): + self.subjobList = subjoblist + # print("Number of jobs in SubManager : {}".format(len(self.subjobList))) + self.job_to_package = job_to_package + self.package_to_jobs = package_to_jobs + self.current_structure = current_structure + self.subjobindex = dict() + self.subjobfixes = dict() + self.process_index() + self.process_times() + + def process_index(self): + """ + Builds a dictionary of jobname -> SubJob object. + """ + for subjob in self.subjobList: + self.subjobindex[subjob.name] = subjob + + def process_times(self): + """ + """ + if (self.job_to_package) and (self.package_to_jobs): + if(self.current_structure) and len(self.current_structure.keys()) > 0: + # Structure exists + new_queues = dict() + fixes_applied = dict() + for package in self.package_to_jobs: + # SubJobs in Package + local_structure = dict() + # SubJob Name -> SubJob Object + local_index = dict() + subjobs_in_package = filter(lambda x: x.package == + package, self.subjobList) + local_jobs_in_package = [job for job in subjobs_in_package] + # Build index + for sub in local_jobs_in_package: + local_index[sub.name] = sub + # Build structure + for sub_job in local_jobs_in_package: + # If job in current_structure, store children names in dictionary + # local_structure: Job Name -> Children (if present in the Job package) + local_structure[sub_job.name] = [v for v in self.current_structure[sub_job.name] + if v in self.package_to_jobs[package]] if sub_job.name in self.current_structure else list() + # Assign children to SubJob in local_jobs_in_package + sub_job.children = local_structure[sub_job.name] + # Assign sub_job Name as a parent of each of its children + for child in local_structure[sub_job.name]: + local_index[child].parents.append(sub_job.name) + + # Identify root as the job with no parents in the package + roots = [sub for sub in local_jobs_in_package if len( + sub.parents) == 0] + + # While roots exists (consider pop) + while(len(roots) > 0): + sub = roots.pop(0) + if len(sub.children) > 0: + for sub_children_name in sub.children: + if sub_children_name not in new_queues: + # Add children to root to continue the sequence of fixes + roots.append( + local_index[sub_children_name]) + fix_size = max(self.subjobindex[sub.name].queue + + self.subjobindex[sub.name].run, 0) + # fixes_applied.setdefault(sub_children_name, []).append(fix_size) # If we care about repetition + # Retain the greater fix size + if fix_size > fixes_applied.get(sub_children_name, 0): + fixes_applied[sub_children_name] = fix_size + fixed_queue_time = max( + self.subjobindex[sub_children_name].queue - fix_size, 0) + new_queues[sub_children_name] = fixed_queue_time + # print(new_queues[sub_name]) + + for key, value in new_queues.items(): + self.subjobindex[key].queue = value + # print("{} : {}".format(key, value)) + for name in fixes_applied: + self.subjobfixes[name] = fixes_applied[name] + + else: + # There is no structure + for package in self.package_to_jobs: + # Filter only jobs in the current package + filtered = filter(lambda x: x.package == + package, self.subjobList) + # Order jobs by total time (queue + run) + filtered = sorted( + filtered, key=lambda x: x.total, reverse=False) + # Sizes of fixes + fixes_applied = dict() + if len(filtered) > 1: + temp_index = 0 + filtered[0].transit = 0 + # Reverse for + for i in range(len(filtered) - 1, 0, -1): + # Assume that the total time of the next job is always smaller than + # the queue time of the current job + # because the queue time of the current also considers the + # total time of the previous (next because of reversed for) job by default + # Confusing? It is. + # Assign to transit the adjusted queue time + filtered[i].transit = max(filtered[i].queue - + filtered[i - 1].total, 0) + + # Positive or zero transit time + positive = len( + [job for job in filtered if job.transit >= 0]) + + if (positive > 1): + for i in range(0, len(filtered)): + if filtered[i].transit >= 0: + temp_index = i + if i > 0: + # Only consider after the first job + filtered[i].queue = max(filtered[i].queue - + filtered[i - 1].total, 0) + fixes_applied[filtered[i].name] = filtered[i - 1].total + else: + filtered[i].queue = max(filtered[i].queue - + filtered[temp_index].total, 0) + fixes_applied[filtered[i].name] = filtered[temp_index].total + # it is starting of level + + for sub in filtered: + self.subjobindex[sub.name].queue = sub.queue + # print("{} : {}".format(sub.name, sub.queue)) + for name in fixes_applied: + self.subjobfixes[name] = fixes_applied[name] + + def get_subjoblist(self): + """ + Returns the list of SubJob objects with their corrected queue times + in the case of jobs that belong to a wrapper. + """ + return self.subjobList + + def get_collection_of_fixes_applied(self): + # type: () -> Dict[str, int] + """ + """ + return self.subjobfixes \ No newline at end of file diff --git a/autosubmit/monitor/diagram.py b/autosubmit/monitor/diagram.py index 27253191c7da45a7842b197edbaf25e1c424b8f9..11e57522343e29701f864aef106b79d21c331e1a 100644 --- a/autosubmit/monitor/diagram.py +++ b/autosubmit/monitor/diagram.py @@ -17,21 +17,32 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +import traceback import numpy as np +import matplotlib as mtp +from numpy.core.fromnumeric import trace +from pkg_resources import normalize_path +mtp.use('Agg') import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import matplotlib.patches as mpatches -from autosubmit.experiment.statistics import ExperimentStats +# from autosubmit.experiment.statistics import ExperimentStats +from autosubmit.statistics.statistics import Statistics +from autosubmit.job.job import Job from log.log import Log, AutosubmitCritical, AutosubmitError +from datetime import datetime +from typing import Dict, List Log.get_logger("Autosubmit") # Autosubmit stats constants RATIO = 4 MAX_JOBS_PER_PLOT = 12.0 -MAX_NUM_PLOTS = 20 +MAX_NUM_PLOTS = 40 -def create_bar_diagram(experiment_id, jobs_list, general_stats, output_file, period_ini=None, period_fi=None): + +def create_bar_diagram(experiment_id, jobs_list, general_stats, output_file, period_ini=None, period_fi=None, queue_time_fixes=None): + # type: (str, List[Job], List[str], str, datetime, datetime, Dict[str, int]) -> None """ Creates a bar diagram of the statistics. @@ -50,59 +61,115 @@ def create_bar_diagram(experiment_id, jobs_list, general_stats, output_file, per """ # Error prevention plt.close('all') + try: + exp_stats = Statistics(jobs_list, period_ini, period_fi, queue_time_fixes) + exp_stats.calculate_statistics() + exp_stats.calculate_summary() + exp_stats.make_old_format() + failed_jobs_dict = exp_stats.build_failed_jobs_only_list() + except Exception as exp: + print(exp) + print(traceback.format_exc()) + # Stats variables definition - num_plots = int(np.ceil(len(jobs_list) / MAX_JOBS_PER_PLOT)) - ind = np.arange(int(MAX_JOBS_PER_PLOT)) + normal_plots_count = int(np.ceil(len(exp_stats.jobs_stat) / MAX_JOBS_PER_PLOT)) + failed_jobs_plots_count = int(np.ceil(len(failed_jobs_dict) / MAX_JOBS_PER_PLOT)) + total_plots_count = normal_plots_count + failed_jobs_plots_count + # num_plots = norma + # ind = np.arange(int(MAX_JOBS_PER_PLOT)) width = 0.16 # Creating stats figure + sanity check - if num_plots > MAX_NUM_PLOTS: + if total_plots_count > MAX_NUM_PLOTS: message = "The results are too large to be shown, try narrowing your query. \n Use a filter like -ft where you supply a list of job types, e.g. INI, SIM; \ -or -fp where you supply an integer that represents the number of hours into the past that should be queried: \ -suppose it is noon, if you supply -fp 5 the query will consider changes starting from 7:00 am. If you really wish to query the whole experiment, refer to Autosubmit GUI." + or -fp where you supply an integer that represents the number of hours into the past that should be queried: \ + suppose it is noon, if you supply -fp 5 the query will consider changes starting from 7:00 am. If you really wish to query the whole experiment, refer to Autosubmit GUI." Log.info(message) raise AutosubmitCritical("Stats query out of bounds", 7061, message) - exp_stats = ExperimentStats(jobs_list, period_ini, period_fi) - fig = plt.figure(figsize=(RATIO * 4, 3 * RATIO * num_plots)) + + + fig = plt.figure(figsize=(RATIO * 4, 3 * RATIO * total_plots_count)) fig.suptitle('STATS - ' + experiment_id, fontsize=24, fontweight='bold') # Variables initialization ax, ax2 = [], [] - rects = [None] * 6 - - grid_spec = gridspec.GridSpec(RATIO * num_plots + 2, 1) - - for plot in xrange(1, num_plots + 1): - # Calculating jobs inside the given plot - l1 = int((plot - 1) * MAX_JOBS_PER_PLOT) - l2 = int(plot * MAX_JOBS_PER_PLOT) - # Building plot axis - ax.append(fig.add_subplot( - grid_spec[RATIO * plot - RATIO + 2:RATIO * plot + 1])) - ax[plot - 1].set_ylabel('hours') - ax[plot - 1].set_xticks(ind + width) - ax[plot - 1].set_xticklabels( - [job.name for job in jobs_list[l1:l2]], rotation='vertical') - ax[plot - 1].set_title(experiment_id, fontsize=20) - ax[plot - 1].set_ylim(0, float(1.10 * exp_stats.max_time)) - # Axis 2 - ax2.append(ax[plot - 1].twinx()) - ax2[plot - 1].set_ylabel('# failed jobs') - ax2[plot - 1].set_yticks(range(0, exp_stats.max_fail + 2)) - ax2[plot - 1].set_ylim(0, exp_stats.max_fail + 1) - # Building rects - rects[0] = ax[plot - - 1].bar(ind, exp_stats.queued[l1:l2], width, color='orchid') - rects[1] = ax[plot - 1].bar(ind + width, - exp_stats.run[l1:l2], width, color='limegreen') - rects[2] = ax2[plot - 1].bar(ind + width * 2, - exp_stats.failed_jobs[l1:l2], width, color='red') - rects[3] = ax[plot - 1].bar(ind + width * 3, - exp_stats.fail_queued[l1:l2], width, color='purple') - rects[4] = ax[plot - 1].bar(ind + width * 4, - exp_stats.fail_run[l1:l2], width, color='tomato') - rects[5] = ax[plot - 1].plot([0., width * 6 * MAX_JOBS_PER_PLOT], [exp_stats.threshold, exp_stats.threshold], - "k--", label='wallclock sim') + rects = [None] * 5 + # print("Normal plots: {}".format(normal_plots_count)) + # print("Failed jobs plots: {}".format(failed_jobs_plots_count)) + # print("Total plots: {}".format(total_plots_count)) + grid_spec = gridspec.GridSpec(RATIO * total_plots_count + 2, 1) + i_plot = 0 + for plot in xrange(1, normal_plots_count + 1): + try: + # Calculating jobs inside the given plot + l1 = int((plot - 1) * MAX_JOBS_PER_PLOT) + l2 = min(int(plot * MAX_JOBS_PER_PLOT), len(exp_stats.jobs_stat)) + if l2 - l1 <= 0: + continue + ind = np.arange(l2 - l1) + # print("TOTAL {}".format(len(exp_stats.jobs_stat))) + # print("{} -> {}".format(l1,l2)) + # print(ind) + # Building plot axis + ax.append(fig.add_subplot(grid_spec[RATIO * plot - RATIO + 2:RATIO * plot + 1])) + ax[plot - 1].set_ylabel('hours') + ax[plot - 1].set_xticks(ind + width) + ax[plot - 1].set_xticklabels( + [job.name for job in jobs_list[l1:l2]], rotation='vertical') + ax[plot - 1].set_title(experiment_id, fontsize=20) + upper_limit = round(1.10 * exp_stats.max_time, 4) + ax[plot - 1].set_yticks(np.arange(0, upper_limit, round(upper_limit/10, 4))) + ax[plot - 1].set_ylim(0, float(1.10 * exp_stats.max_time)) + # Axis 2 + # ax2.append(ax[plot - 1].twinx()) + # ax2[plot - 1].set_ylabel('# failed jobs') + # ax2[plot - 1].set_yticks(range(0, exp_stats.max_fail + 2)) + # ax2[plot - 1].set_ylim(0, exp_stats.max_fail + 1) + # Building rects + rects[0] = ax[plot - 1].bar(ind, exp_stats.queued[l1:l2], width, color='lightpink') + rects[1] = ax[plot - 1].bar(ind + width, exp_stats.run[l1:l2], width, color='green') + # rects[2] = ax2[plot - 1].bar(ind + width * 2, exp_stats.failed_jobs[l1:l2], width, color='red') + rects[2] = ax[plot - 1].bar(ind + width * 3, exp_stats.fail_queued[l1:l2], width, color='lightsalmon') + rects[3] = ax[plot - 1].bar(ind + width * 4, exp_stats.fail_run[l1:l2], width, color='salmon') + rects[4] = ax[plot - 1].plot([0., width * 6 * MAX_JOBS_PER_PLOT], [exp_stats.threshold, exp_stats.threshold], "k--", label='wallclock sim') + i_plot = plot + except Exception as exp: + print(traceback.format_exc()) + print(exp) + + job_names_in_failed = [name for name in exp_stats.failed_jobs_dict] + failed_jobs_rects = [None] + for j_plot in range(1, failed_jobs_plots_count + 1): + try: + l1 = int((j_plot - 1) * MAX_JOBS_PER_PLOT) + l2 = min(int(j_plot * MAX_JOBS_PER_PLOT), len(job_names_in_failed)) + if l2 - l1 <= 0: + continue + ind = np.arange(l2 - l1) + # print("TOTAL {}".format(len(job_names_in_failed))) + # print("{} -> {}".format(l1,l2)) + # print(ind) + # print("i {}, j {}".format(i_plot, j_plot)) + plot = i_plot + j_plot + ax.append(fig.add_subplot(grid_spec[RATIO * plot - RATIO + 2:RATIO * plot + 1])) + # print("len {}, plot {}".format(len(ax), plot)) + ax[plot - 1].set_ylabel('# failed attempts') + ax[plot - 1].set_xticks(ind + width) + ax[plot - 1].set_xticklabels([name for name in job_names_in_failed[l1:l2]], rotation='vertical') + ax[plot - 1].set_title(experiment_id, fontsize=20) + ax[plot - 1].set_ylim(0, float(1.10 * exp_stats.max_fail)) + ax[plot - 1].set_yticks(range(0, exp_stats.max_fail + 2)) + # Axis 2 + # ax2.append(ax[plot - 1].twinx()) + # ax2[plot - 1].set_ylabel('# failed attempts') + # ax2[plot - 1].set_yticks(range(0, exp_stats.max_fail + 2)) + # ax2[plot - 1].set_ylim(0, exp_stats.max_fail + 1) + failed_jobs_rects[0] = ax[plot - 1].bar(ind + width * 2, [exp_stats.failed_jobs_dict[name] for name in job_names_in_failed[l1:l2]], width, color='red') + except Exception as exp: + print(traceback.format_exc()) + print(exp) + + # Building legends subplot legends_plot = fig.add_subplot(grid_spec[0, 0]) @@ -110,14 +177,19 @@ suppose it is noon, if you supply -fp 5 the query will consider changes starting legends_plot.axes.get_xaxis().set_visible(False) legends_plot.axes.get_yaxis().set_visible(False) - # Building legends - build_legends(legends_plot, rects, exp_stats, general_stats) - - # Saving output figure - grid_spec.tight_layout(fig, rect=[0, 0.03, 1, 0.97]) - plt.savefig(output_file) + try: + # Building legends + # print("Legends") + build_legends(legends_plot, rects, exp_stats, general_stats) + + # Saving output figure + grid_spec.tight_layout(fig, rect=[0, 0.03, 1, 0.97]) + plt.savefig(output_file) - create_csv_stats(exp_stats, jobs_list, output_file) + create_csv_stats(exp_stats, jobs_list, output_file) + except Exception as exp: + print(exp) + print(traceback.format_exc()) def create_csv_stats(exp_stats, jobs_list, output_file): @@ -137,11 +209,11 @@ def create_csv_stats(exp_stats, jobs_list, output_file): def build_legends(plot, rects, experiment_stats, general_stats): + # type: (plt.figure, List[plt.bar], Statistics, List[str]) -> None # Main legend with colourful rectangles legend_rects = [[rect[0] for rect in rects]] legend_titles = [ - ['Queued (h)', 'Run (h)', 'Failed jobs (#)', - 'Fail Queued (h)', 'Fail Run (h)', 'Max wallclock (h)'] + ['Queued (h)', 'Run (h)', 'Fail Queued (h)', 'Fail Run (h)', 'Max wallclock (h)'] ] legend_locs = ["upper right"] legend_handlelengths = [None] @@ -149,20 +221,19 @@ def build_legends(plot, rects, experiment_stats, general_stats): # General stats legends, if exists if len(general_stats) > 0: legend_rects.append(get_whites_array(len(general_stats))) - legend_titles.append([str(key) + ': ' + str(value) - for key, value in general_stats]) + legend_titles.append([str(key) + ': ' + str(value) for key, value in general_stats]) legend_locs.append("upper center") legend_handlelengths.append(0) # Total stats legend - legend_rects.append(get_whites_array(len(experiment_stats.totals))) - legend_titles.append(experiment_stats.totals) + stats_summary_as_list = experiment_stats.get_summary_as_list() + legend_rects.append(get_whites_array(len(stats_summary_as_list))) + legend_titles.append(stats_summary_as_list) legend_locs.append("upper left") legend_handlelengths.append(0) # Creating the legends - legends = create_legends( - plot, legend_rects, legend_titles, legend_locs, legend_handlelengths) + legends = create_legends(plot, legend_rects, legend_titles, legend_locs, legend_handlelengths) for legend in legends: plt.gca().add_artist(legend) diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 025d5db42b02c8a3ae0810f394ace77d1d257a2d..2301967e83219001976d86700a030e724c2016a6 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -31,12 +31,14 @@ import copy import subprocess from autosubmit.job.job_common import Status +from autosubmit.job.job import Job from autosubmit.config.basicConfig import BasicConfig from autosubmit.config.config_common import AutosubmitConfig from log.log import Log, AutosubmitError, AutosubmitCritical from bscearth.utils.config_parser import ConfigParserFactory from diagram import create_bar_diagram +from typing import Dict, List class Monitor: @@ -394,7 +396,8 @@ class Monitor: self.write_output_txt_recursive( child, output_file, "_" + level, path) - def generate_output_stats(self, expid, joblist, output_format="pdf", period_ini=None, period_fi=None, show=False): + def generate_output_stats(self, expid, joblist, output_format="pdf", period_ini=None, period_fi=None, show=False, queue_time_fixes=None): + # type: (str, List[Job], str, datetime.datetime, datetime.datetime, bool, Dict[str, int]) -> None """ Plots stats for joblist and stores it in a file @@ -422,8 +425,7 @@ class Monitor: output_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "stats", expid + "_statistics_" + output_date + "." + output_format) - create_bar_diagram(expid, joblist, self.get_general_stats( - expid), output_file, period_ini, period_fi) + create_bar_diagram(expid, joblist, self.get_general_stats(expid), output_file, period_ini, period_fi, queue_time_fixes) Log.result('Stats created at {0}', output_file) if show: try: @@ -475,6 +477,7 @@ class Monitor: @staticmethod def get_general_stats(expid): + # type: (str) -> List[str] """ Returns all the options in the sections of the %expid%_GENERAL_STATS @@ -486,9 +489,10 @@ class Monitor: general_stats = [] general_stats_path = os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "tmp", expid + "_GENERAL_STATS") - parser = AutosubmitConfig.get_parser( - ConfigParserFactory(), general_stats_path) - for section in parser.sections(): - general_stats.append((section, '')) - general_stats += parser.items(section) + if os.path.exists(general_stats_path): + parser = AutosubmitConfig.get_parser( + ConfigParserFactory(), general_stats_path) + for section in parser.sections(): + general_stats.append((section, '')) + general_stats += parser.items(section) return general_stats diff --git a/autosubmit/statistics/__init__.py b/autosubmit/statistics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/autosubmit/statistics/jobs_stat.py b/autosubmit/statistics/jobs_stat.py new file mode 100644 index 0000000000000000000000000000000000000000..5f5a0af6c6aabc81b2844373fdc098a0306e6c86 --- /dev/null +++ b/autosubmit/statistics/jobs_stat.py @@ -0,0 +1,70 @@ +#!/bin/env/python +from datetime import datetime, timedelta +from utils import timedelta2hours + +class JobStat(object): + def __init__(self, name, processors, wallclock): + # type: (str, int, float) -> None + self._name = name + self._processors = processors + self._wallclock = wallclock + self.submit_time = None # type: datetime + self.start_time = None # type: datetime + self.finish_time = None # type: datetime + self.completed_queue_time = timedelta() + self.completed_run_time = timedelta() + self.failed_queue_time = timedelta() + self.failed_run_time = timedelta() + self.retrial_count = 0 + self.completed_retrial_count = 0 + self.failed_retrial_count = 0 + + def inc_retrial_count(self): + self.retrial_count += 1 + + def inc_completed_retrial_count(self): + self.completed_retrial_count += 1 + + def inc_failed_retrial_count(self): + self.failed_retrial_count += 1 + + @property + def cpu_consumption(self): + return timedelta2hours(self._processors * self.completed_run_time) + timedelta2hours(self._processors * self.failed_run_time) + + @property + def failed_cpu_consumption(self): + return timedelta2hours(self._processors * self.failed_run_time) + + @property + def real_consumption(self): + return timedelta2hours(self.failed_run_time + self.completed_run_time) + + @property + def expected_real_consumption(self): + return self._wallclock + + @property + def expected_cpu_consumption(self): + return self._wallclock * self._processors + + def get_as_dict(self): + return { + "name": self._name, + "processors": self._processors, + "wallclock": self._wallclock, + "completedQueueTime": timedelta2hours(self.completed_queue_time), + "completedRunTime": timedelta2hours(self.completed_run_time), + "failedQueueTime": timedelta2hours(self.failed_queue_time), + "failedRunTime": timedelta2hours(self.failed_run_time), + "cpuConsumption": self.cpu_consumption, + "failedCpuConsumption": self.failed_cpu_consumption, + "expectedCpuConsumption": self.expected_cpu_consumption, + "realConsumption": self.real_consumption, + "failedRealConsumption": timedelta2hours(self.failed_run_time), + "expectedConsumption": self.expected_real_consumption, + "retrialCount": self.retrial_count, + "submittedCount": self.retrial_count, + "completedCount": self.completed_retrial_count, + "failedCount": self.failed_retrial_count + } diff --git a/autosubmit/statistics/statistics.py b/autosubmit/statistics/statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..db7f2ceb3420a20786b8339c1dcd1ab44ae05efa --- /dev/null +++ b/autosubmit/statistics/statistics.py @@ -0,0 +1,131 @@ +#!/bin/env/python + +from datetime import datetime, timedelta +from autosubmit.job.job import Job +from jobs_stat import JobStat +from stats_summary import StatsSummary +from utils import timedelta2hours +from typing import List, Union, Dict +# from collections import namedtuple + +_COMPLETED_RETRIAL = 1 +_FAILED_RETRIAL = 0 + +class Statistics(object): + + def __init__(self, jobs, start, end, queue_time_fix): + # type: (List[Job], datetime, datetime, Dict[str, int]) -> None + """ + """ + self._jobs = jobs + self._start = start + self._end = end + self._queue_time_fixes = queue_time_fix + self._name_to_jobstat_dict = dict() # type: Dict[str, JobStat] + self.jobs_stat = [] # type: List[JobStat] + # Old format + self.max_time = 0.0 # type: float + self.max_fail = 0 # type: int + self.start_times = [] # type: List[Union[datetime, None]] + self.end_times = [] # type: List[Union[datetime, None]] + self.queued = [] # type: List[timedelta] + self.run = [] # type: List[timedelta] + self.failed_jobs = [] # type: List[int] + self.fail_queued = [] # type: List[timedelta] + self.fail_run = [] # type: List[timedelta] + self.wallclocks = [] # type: List[float] + self.threshold = 0.0 # type: float + self.failed_jobs_dict = {} # type: Dict[str, int] + self.summary = StatsSummary() + self.totals = [" Description text \n", "Line 1"] + + def calculate_statistics(self): + # type: () -> List[JobStat] + for index, job in enumerate(self._jobs): + retrials = job.get_last_retrials() + for retrial in retrials: + # print(retrial) + job_stat = self._name_to_jobstat_dict.setdefault( + job.name, JobStat(job.name, job.total_processors, job.total_wallclock)) + job_stat.inc_retrial_count() + if Job.is_a_completed_retrial(retrial): + job_stat.inc_completed_retrial_count() + job_stat.submit_time = retrial[0] + job_stat.start_time = retrial[1] + job_stat.finish_time = retrial[2] + adjusted_queue = max(job_stat.start_time - job_stat.submit_time, timedelta()) - timedelta(seconds=self._queue_time_fixes.get(job.name, 0)) + job_stat.completed_queue_time += max(adjusted_queue, timedelta()) + job_stat.completed_run_time += max(job_stat.finish_time - job_stat.start_time, timedelta()) + else: + job_stat.inc_failed_retrial_count() + job_stat.submit_time = retrial[0] if len(retrial) >= 1 and type(retrial[0]) == datetime else None + job_stat.start_time = retrial[1] if len(retrial) >= 2 and type(retrial[1]) == datetime else None + job_stat.finish_time = retrial[2] if len(retrial) >= 3 and type(retrial[2]) == datetime else None + if job_stat.finish_time and job_stat.start_time: + job_stat.failed_run_time += max(job_stat.finish_time - job_stat.start_time, timedelta()) + if job_stat.start_time and job_stat.submit_time: + adjusted_failed_queue = max(job_stat.start_time - job_stat.submit_time, timedelta()) - timedelta(seconds=self._queue_time_fixes.get(job.name, 0)) + job_stat.failed_queue_time += max(adjusted_failed_queue, timedelta()) + self.jobs_stat = list(self._name_to_jobstat_dict.values()) + return self.jobs_stat + + def calculate_summary(self): + # type: () -> StatsSummary + stat_summary = StatsSummary() + for job in self.jobs_stat: + job_stat_dict = job.get_as_dict() + # Counter + stat_summary.submitted_count += job_stat_dict["submittedCount"] + stat_summary.run_count += job_stat_dict["retrialCount"] + stat_summary.completed_count += job_stat_dict["completedCount"] + stat_summary.failed_count += job_stat_dict["failedCount"] + # Consumption + stat_summary.expected_consumption = job_stat_dict["expectedConsumption"] + stat_summary.real_consumption = job_stat_dict["realConsumption"] + stat_summary.failed_real_consumption = job_stat_dict["failedRealConsumption"] + # CPU Consumption + stat_summary.expected_cpu_consumption = job_stat_dict["expectedCpuConsumption"] + stat_summary.cpu_consumption = job_stat_dict["cpuConsumption"] + stat_summary.failed_cpu_consumption = job_stat_dict["failedCpuConsumption"] + stat_summary.total_queue_time = job_stat_dict["completedQueueTime"] + job_stat_dict["failedQueueTime"] + stat_summary.calculate_consumption_percentage() + self.summary = stat_summary + + def get_summary_as_list(self): + return self.summary.get_as_list() + + def get_statistics(self): + job_stat_list = self.calculate_statistics() + return { + "Period": {"From": str(self._start), "To": str(self._end)}, + "JobStatistics": [job.get_as_dict() for job in job_stat_list] + } + + def make_old_format(self): + # type: () -> None + """ Makes old format """ + self.start_times = [job.start_time for job in self.jobs_stat] + self.end_times = [job.finish_time for job in self.jobs_stat] + self.queued = [timedelta2hours(job.completed_queue_time) for job in self.jobs_stat] + self.run = [timedelta2hours(job.completed_run_time) for job in self.jobs_stat] + self.failed_jobs = [job.failed_retrial_count for job in self.jobs_stat] + self.max_fail = max(self.failed_jobs) + self.fail_run = [timedelta2hours(job.failed_run_time) for job in self.jobs_stat] + self.fail_queued = [timedelta2hours(job.failed_queue_time) for job in self.jobs_stat] + self.wallclocks = [job.expected_real_consumption for job in self.jobs_stat] + self.threshold = max(self.wallclocks) + max_queue = max(self.queued) + max_run = max(self.run) + max_fail_queue = max(self.fail_queued) + max_fail_run = max(self.fail_run) + self.max_time = max(max_queue, max_run, max_fail_queue, max_fail_run, self.threshold) + + def build_failed_jobs_only_list(self): + # type: () -> Dict[str, int] + for i, job in enumerate(self.jobs_stat): + if self.failed_jobs[i] > 0: + self.failed_jobs_dict[job._name] = self.failed_jobs[i] + return self.failed_jobs_dict + + + diff --git a/autosubmit/statistics/stats_summary.py b/autosubmit/statistics/stats_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..f96f6109df2ad630f33425d9a42df24997d3b0ce --- /dev/null +++ b/autosubmit/statistics/stats_summary.py @@ -0,0 +1,52 @@ +#!/bin/env/python + +class StatsSummary(object): + + def __init__(self): + # Counters + self.submitted_count = 0 + self.run_count = 0 + self.completed_count = 0 + self.failed_count = 0 + # Consumption + self.expected_consumption = 0.0 + self.real_consumption = 0.0 + self.failed_real_consumption = 0.0 + # CPU Consumption + self.expected_cpu_consumption = 0.0 + self.cpu_consumption = 0.0 + self.failed_cpu_consumption = 0.0 + self.total_queue_time = 0.0 + self.cpu_consumption_percentage = 0.0 + + def calculate_consumption_percentage(self): + if self.expected_cpu_consumption > 0.0: + self.cpu_consumption_percentage = round((self.cpu_consumption / self.expected_cpu_consumption) * 100, 2) + + def get_as_list(self): + return [ + "Summary: ", + "{} : {}".format("CPU Consumption Percentage", str(self.cpu_consumption_percentage) + "%"), + "{} : {} hrs.".format("Total Queue Time", round(self.total_queue_time, 2)), + "{} : {}".format("Submitted Count", self.submitted_count), + "{} : {}".format("Run Count", self.run_count), + "{} : {}".format("Completed Count", self.completed_count), + "{} : {}".format("Failed Count", self.failed_count), + "{} : {} hrs.".format("Expected Consumption", round(self.expected_consumption, 4)), + "{} : {} hrs.".format("Real Consumption", round(self.real_consumption, 4)), + "{} : {} hrs.".format("Failed Real Consumption", round(self.failed_real_consumption, 4)), + "{} : {} hrs.".format("Expected CPU Consumption", round(self.expected_cpu_consumption, 4)), + "{} : {} hrs.".format("CPU Consumption", round(self.cpu_consumption, 4)), + "{} : {} hrs.".format("Failed CPU Consumption", round(self.failed_cpu_consumption, 4)) + ] + + + + + + + + + + + \ No newline at end of file diff --git a/autosubmit/statistics/utils.py b/autosubmit/statistics/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..5c20446b77d48dc5445691856e6e888da8d63ec3 --- /dev/null +++ b/autosubmit/statistics/utils.py @@ -0,0 +1,36 @@ +#!/bin/env/python + +from autosubmit.job.job import Job +from datetime import datetime, timedelta +from autosubmit.job.job_common import Status +from typing import List, Tuple + +from log.log import AutosubmitCritical + +def filter_by_section(jobs, section): + # type: (List[Job], str) -> List[Job] + """ Filter jobs by provided sections """ + if section and section != "Any": + return [job for job in jobs if job.section == section] + return jobs + +def discard_ready_and_waiting(jobs): + if jobs and len(jobs) > 0: + return [job for job in jobs if job.status not in [Status.READY, Status.WAITING]] + return jobs + +def filter_by_time_period(jobs, hours_span): + # type: (List[Job], int) -> Tuple[List[Job], datetime, datetime] + current_time = datetime.now().replace(second=0, microsecond=0) + start_time = None + if hours_span: + if hours_span <= 0: + raise AutosubmitCritical("{} is not a valid input for the statistics filter -fp.".format(hours_span)) + start_time = current_time - timedelta(hours=int(hours_span)) + return ([job for job in jobs if job.check_started_after(start_time) or job.check_running_after(start_time)], start_time, current_time) + return (jobs, start_time, current_time) + + +def timedelta2hours(deltatime): + # type: (timedelta) -> float + return deltatime.days * 24 + deltatime.seconds / 3600.0