diff --git a/autosubmit_api/__init__.py b/autosubmit_api/__init__.py index 034db579f116ab1b705a9d07077d6f6ed689f0b5..2ca700a7ad8fe263a6eca75dcd24e162391a779d 100644 --- a/autosubmit_api/__init__.py +++ b/autosubmit_api/__init__.py @@ -19,4 +19,4 @@ __version__ = "4.0.1b2" __author__ = "Luiggi Tenorio, Bruno P. Kinoshita, Cristian GutiƩrrez, Julian Berlin, Wilmer Uruchi" -__credits__ = "Barcelona Supercomputing Center" \ No newline at end of file +__credits__ = "Barcelona Supercomputing Center" diff --git a/autosubmit_api/app.py b/autosubmit_api/app.py index 2d46cdb0074301aaa4c2790aceea59339f847000..eb747403fd5585285e0f0059fa6571e069bffd8e 100644 --- a/autosubmit_api/app.py +++ b/autosubmit_api/app.py @@ -12,15 +12,15 @@ from autosubmit_api.logger import get_app_logger from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.config import ( PROTECTION_LEVEL, - RUN_BACKGROUND_TASKS_ON_START, CAS_LOGIN_URL, CAS_VERIFY_URL, + get_run_background_tasks_on_start, + get_disable_background_tasks, ) from autosubmit_api.views import handle_HTTP_exception, home from werkzeug.exceptions import HTTPException - def create_app(): """ Autosubmit Flask application factory @@ -59,14 +59,15 @@ def create_app(): "PROTECTION_LEVEL": PROTECTION_LEVEL, "CAS_LOGIN_URL": CAS_LOGIN_URL, "CAS_VERIFY_URL": CAS_VERIFY_URL, - "RUN_BACKGROUND_TASKS_ON_START": RUN_BACKGROUND_TASKS_ON_START, + "DISABLE_BACKGROUND_TASKS": get_disable_background_tasks(), + "RUN_BACKGROUND_TASKS_ON_START": get_run_background_tasks_on_start(), } ) ) # Prepare DB prepare_db() - + # Background Scheduler create_bind_scheduler(app) @@ -75,7 +76,9 @@ def create_app(): app.route("/")(home) v3_blueprint = create_v3_blueprint() - app.register_blueprint(v3_blueprint, name="root") # Add v3 to root but will be DEPRECATED + app.register_blueprint( + v3_blueprint, name="root" + ) # Add v3 to root but will be DEPRECATED app.register_blueprint(v3_blueprint, url_prefix="/v3") v4_blueprint = create_v4_blueprint() diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index dfc6fe57cefafc02a16f3bab1a647f1967dba5d7..828f2f78b04ae35240aaa52ec8ff93f2d5b3a4c1 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -35,7 +35,7 @@ from autosubmit_api.performance.utils import calculate_ASYPD_perjob, calculate_S from autosubmit_api.components.jobs import utils as JUtils from autosubmit_api.monitor.monitor import Monitor from autosubmit_api.common.utils import Status -from bscearth.utils.date import date2str, parse_date +from bscearth.utils.date import date2str # from autosubmit_legacy.job.tree import Tree from autosubmit_api.database import db_structure as DbStructure from autosubmit_api.database.db_jobdata import JobDataStructure, JobRow @@ -645,61 +645,7 @@ class JobList: :return: submit time, start time, end time, status :rtype: 4-tuple in datetime format """ - values = list() - status_from_job = str(Status.VALUE_TO_KEY[status_code]) - now = datetime.datetime.now() - submit_time = now - start_time = now - finish_time = now - current_status = status_from_job - path = os.path.join(tmp_path, name + '_TOTAL_STATS') - if os.path.exists(path): - request = 'tail -1 ' + path - last_line = os.popen(request).readline() - # print(last_line) - - values = last_line.split() - # print(last_line) - try: - if status_code in [Status.RUNNING]: - submit_time = parse_date( - values[0]) if len(values) > 0 else now - start_time = parse_date(values[1]) if len( - values) > 1 else submit_time - finish_time = now - elif status_code in [Status.QUEUING, Status.SUBMITTED, Status.HELD]: - submit_time = parse_date( - values[0]) if len(values) > 0 else now - start_time = parse_date( - values[1]) if len(values) > 1 and values[0] != values[1] else now - elif status_code in [Status.COMPLETED]: - submit_time = parse_date( - values[0]) if len(values) > 0 else now - start_time = parse_date( - values[1]) if len(values) > 1 else submit_time - if len(values) > 3: - finish_time = parse_date(values[len(values) - 2]) - else: - finish_time = submit_time - else: - submit_time = parse_date( - values[0]) if len(values) > 0 else now - start_time = parse_date(values[1]) if len( - values) > 1 else submit_time - finish_time = parse_date(values[2]) if len( - values) > 2 else start_time - except Exception: - start_time = now - finish_time = now - # NA if reading fails - current_status = "NA" - - current_status = values[3] if (len(values) > 3 and len( - values[3]) != 14) else status_from_job - # TOTAL_STATS last line has more than 3 items, status is different from pkl, and status is not "NA" - if len(values) > 3 and current_status != status_from_job and current_status != "NA": - current_status = "SUSPICIOUS" - return (submit_time, start_time, finish_time, current_status) + return JUtils.get_job_total_stats(status_code, name, tmp_path) @staticmethod def retrieve_times( diff --git a/autosubmit_api/autosubmit_legacy/job/job_utils.py b/autosubmit_api/autosubmit_legacy/job/job_utils.py index 9795b1ab50e24f0539628400a2bcb4dea55689bb..7c613d8e09b29a66f66a32e19facdf16af68c62d 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_utils.py +++ b/autosubmit_api/autosubmit_legacy/job/job_utils.py @@ -17,37 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import networkx import datetime -import time - - - -def transitive_reduction(graph): - try: - return networkx.algorithms.dag.transitive_reduction(graph) - except Exception: - return None - # if not is_directed_acyclic_graph(graph): - # raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") - # reduced_graph = DiGraph() - # reduced_graph.add_nodes_from(graph.nodes()) - # for u in graph: - # u_edges = set(graph[u]) - # for v in graph[u]: - # u_edges -= {y for x, y in dfs_edges(graph, v)} - # reduced_graph.add_edges_from((u, v) for v in u_edges) - # return reduced_graph - -class SimpleJob(object): - """ - A simple replacement for jobs - """ - - def __init__(self, name, tmppath, statuscode): - self.name = name - self._tmp_path = tmppath - self.status = statuscode class SubJob(object): @@ -209,38 +179,6 @@ class SubJobManager(object): return self.subjobList -def parse_output_number(self, string_number): - """ - Parses number in format 1.0K 1.0M 1.0G - - :param string_number: String representation of number - :type string_number: str - :return: number in float format - :rtype: float - """ - number = 0.0 - if (string_number): - last_letter = string_number.strip()[-1] - multiplier = 1 - if last_letter == "G": - multiplier = 1000000000 - number = string_number[:-1] - elif last_letter == "M": - multiplier = 1000000 - number = string_number[:-1] - elif last_letter == "K": - multiplier = 1000 - number = string_number[:-1] - else: - number = string_number - try: - number = float(number) * multiplier - except Exception: - number = 0.0 - pass - return number - - def job_times_to_text(minutes_queue, minutes_running, status): """ Return text correpsonding to queue and running time @@ -268,35 +206,3 @@ def job_times_to_text(minutes_queue, minutes_running, status): running_text = running_text + \ " SUSPICIOUS" return running_text - - -def datechunk_to_year(chunk_unit: str, chunk_size: int) -> float: - """ - Gets chunk unit and size and returns the value in years - - :return: years - :rtype: float - """ - chunk_size = chunk_size * 1.0 - # options = ["year", "month", "day", "hour"] - if (chunk_unit == "year"): - return chunk_size - elif (chunk_unit == "month"): - return chunk_size / 12 - elif (chunk_unit == "day"): - return chunk_size / 365 - elif (chunk_unit == "hour"): - return chunk_size / 8760 - else: - return 0.0 - - -def tostamp(string_date: str) -> int: - """ - String datetime to timestamp - """ - if string_date and len(string_date) > 0: - return int(time.mktime(datetime.datetime.strptime(string_date, - "%Y-%m-%d %H:%M:%S").timetuple())) - else: - return 0 diff --git a/autosubmit_api/bgtasks/scheduler.py b/autosubmit_api/bgtasks/scheduler.py index 6e890a6453211040c0f78a48b0ddb6c86d3b1ca1..bbca554eed925601027a0028f5f111d5247ec4d3 100644 --- a/autosubmit_api/bgtasks/scheduler.py +++ b/autosubmit_api/bgtasks/scheduler.py @@ -7,8 +7,8 @@ from autosubmit_api.bgtasks.bgtask import ( ) from autosubmit_api.bgtasks.tasks.status_updater import StatusUpdater from autosubmit_api.config import ( - DISABLE_BACKGROUND_TASKS, - RUN_BACKGROUND_TASKS_ON_START, + get_disable_background_tasks, + get_run_background_tasks_on_start, ) from autosubmit_api.logger import logger, with_log_run_times @@ -25,7 +25,7 @@ def create_bind_scheduler(app): scheduler.init_app(app) scheduler.start() - if not DISABLE_BACKGROUND_TASKS: + if not get_disable_background_tasks(): for task in REGISTERED_TASKS: scheduler.add_job( task.id, @@ -37,7 +37,7 @@ def create_bind_scheduler(app): "Background tasks: " + str([str(task) for task in scheduler.get_jobs()]) ) - if RUN_BACKGROUND_TASKS_ON_START: + if get_run_background_tasks_on_start(): logger.info("Starting background tasks on app init before serving...") for task in REGISTERED_TASKS: scheduler.run_job(task.id) diff --git a/autosubmit_api/builders/experiment_builder.py b/autosubmit_api/builders/experiment_builder.py index c21e53d872af953c856ce61a51dc2730e61c4e0b..88866a2add8e04c8904f5a55d2a1b9f3c63da285 100644 --- a/autosubmit_api/builders/experiment_builder.py +++ b/autosubmit_api/builders/experiment_builder.py @@ -1,17 +1,24 @@ import datetime from autosubmit_api.builders import BaseBuilder -from autosubmit_api.builders.configuration_facade_builder import ( - AutosubmitConfigurationFacadeBuilder, - ConfigurationFacadeDirector, -) from autosubmit_api.database import tables from autosubmit_api.database.common import ( create_autosubmit_db_engine, ) from autosubmit_api.database.models import ExperimentModel +from autosubmit_api.persistance.pkl_reader import PklReader class ExperimentBuilder(BaseBuilder): + def produce_pkl_modified_time(self): + """ + Get the modified time of the pkl file. + """ + try: + self._product.modified = datetime.datetime.fromtimestamp( + PklReader(self._product.name).get_modified_time() + ).isoformat() + except Exception: + self._product.modified = None def produce_base_from_dict(self, obj: dict): """ @@ -58,29 +65,6 @@ class ExperimentBuilder(BaseBuilder): self._product.branch = result.branch self._product.hpc = result.hpc - def produce_config_data(self): - """ - Produce data from the files - """ - expid = self._product.name - autosubmit_config_facade = ConfigurationFacadeDirector( - AutosubmitConfigurationFacadeBuilder(expid) - ).build_autosubmit_configuration_facade() - - # Set config props - self._product.autosubmit_version = ( - autosubmit_config_facade.get_autosubmit_version() - ) - self._product.user = autosubmit_config_facade.get_owner_name() - self._product.hpc = autosubmit_config_facade.get_main_platform() - self._product.wrapper = autosubmit_config_facade.get_wrapper_type() - try: - self._product.modified = datetime.datetime.fromtimestamp( - autosubmit_config_facade.get_pkl_last_modified_timestamp() - ).isoformat() - except Exception: - self._product.modified = None - @property def product(self) -> ExperimentModel: """ diff --git a/autosubmit_api/builders/joblist_helper_builder.py b/autosubmit_api/builders/joblist_helper_builder.py index 1641344f444812f50028e10e76aacf53c4802dda..90b19796884a8a334bca401a4aefa77ff802e594 100644 --- a/autosubmit_api/builders/joblist_helper_builder.py +++ b/autosubmit_api/builders/joblist_helper_builder.py @@ -1,9 +1,9 @@ #!/usr/bin/env python from typing import Optional +from autosubmit_api.components.experiment.pkl_organizer import PklOrganizer from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.builders.configuration_facade_builder import AutosubmitConfigurationFacadeBuilder, ConfigurationFacadeDirector from autosubmit_api.builders.basic_builder import BasicBuilder -from autosubmit_api.builders.pkl_organizer_builder import PklOrganizerBuilder, PklOrganizerDirector from autosubmit_api.components.jobs.joblist_helper import JobListHelper from abc import ABCMeta, abstractmethod @@ -42,7 +42,7 @@ class JobListHelperBuilder(Builder): def generate_pkl_organizer(self): self._validate_autosubmit_configuration_facade() - self.pkl_organizer = PklOrganizerDirector(PklOrganizerBuilder(self.expid)).build_pkl_organizer_with_configuration_provided(self.configuration_facade) + self.pkl_organizer = PklOrganizer(self.expid) def make_joblist_helper(self) -> JobListHelper: self._validate_basic_config() diff --git a/autosubmit_api/builders/pkl_organizer_builder.py b/autosubmit_api/builders/pkl_organizer_builder.py deleted file mode 100644 index ad1d9e224fcee3f7fb4a2fc8a4187308ed54a815..0000000000000000000000000000000000000000 --- a/autosubmit_api/builders/pkl_organizer_builder.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -from typing import Optional -from autosubmit_api.config.basicConfig import APIBasicConfig -from autosubmit_api.components.experiment.pkl_organizer import PklOrganizer -from autosubmit_api.builders.configuration_facade_builder import AutosubmitConfigurationFacadeBuilder, ConfigurationFacadeDirector -from autosubmit_api.builders.basic_builder import BasicBuilder -from autosubmit_api.components.experiment.configuration_facade import AutosubmitConfigurationFacade - -class PklOrganizerBuilder(BasicBuilder): - def __init__(self, expid: str): - super(PklOrganizerBuilder, self).__init__(expid) - - def set_autosubmit_configuration_facade(self, configuration_facade: AutosubmitConfigurationFacade): - self.configuration_facade = configuration_facade - - def generate_autosubmit_configuration_facade(self): - self._validate_basic_config() - self.configuration_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(self.expid)).build_autosubmit_configuration_facade(self.basic_config) - - def _validate_autosubmit_configuration_facade(self): - if not self.configuration_facade: - raise Exception("AutosubmitConfigurationFacade is missing.") - - def make_pkl_organizer(self) -> PklOrganizer: - self._validate_basic_config() - self._validate_autosubmit_configuration_facade() - return PklOrganizer(self.configuration_facade) - -class PklOrganizerDirector: - def __init__(self, builder: PklOrganizerBuilder): - self.builder = builder - - def _set_basic_config(self, basic_config=None): - if basic_config: - self.builder.set_basic_config(basic_config) - else: - self.builder.generate_basic_config() - - def build_pkl_organizer(self, basic_config: Optional[APIBasicConfig] = None) -> PklOrganizer: - self._set_basic_config(basic_config) - self.builder.generate_autosubmit_configuration_facade() - return self.builder.make_pkl_organizer() - - def build_pkl_organizer_with_configuration_provided(self, configuration_facade: AutosubmitConfigurationFacade) -> PklOrganizer: - self._set_basic_config(configuration_facade.basic_configuration) - self.builder.set_autosubmit_configuration_facade(configuration_facade) - return self.builder.make_pkl_organizer() \ No newline at end of file diff --git a/autosubmit_api/components/experiment/pkl_organizer.py b/autosubmit_api/components/experiment/pkl_organizer.py index d640861b09dd3f0d1e0553daf4e532bdd80d147a..68c8f36f93b151c0306f11ea0c226a7c9cdd764f 100644 --- a/autosubmit_api/components/experiment/pkl_organizer.py +++ b/autosubmit_api/components/experiment/pkl_organizer.py @@ -1,9 +1,7 @@ #!/usr/bin/env python -import os from autosubmit_api.components.jobs import job_factory as factory from autosubmit_api.common.utils import JobSection, PklJob, PklJob14, Status -from autosubmit_api.components.experiment.configuration_facade import AutosubmitConfigurationFacade from autosubmit_api.components.jobs.job_factory import Job, SimpleJob from typing import List, Dict, Set, Union @@ -17,20 +15,18 @@ class PklOrganizer(object): Warnings are stored in self.warnings. """ - def __init__(self, configuration_facade: AutosubmitConfigurationFacade): + def __init__(self, expid: str): self.current_content: List[Union[PklJob,PklJob14]] = [] - self.configuration_facade: AutosubmitConfigurationFacade = configuration_facade + self.expid = expid self.sim_jobs: List[Job] = [] self.post_jobs: List[Job] = [] self.transfer_jobs: List[Job] = [] self.clean_jobs: List[Job] = [] - self.pkl_path: str = configuration_facade.pkl_path self.warnings: List[str] = [] self.dates: Set[str] = set() self.members: Set[str] = set() self.sections: Set[str] = set() self.section_jobs_map: Dict[str, List[Job]] = {} - # self.is_wrapper_type_in_pkl = is_wrapper_type_in_pkl_version(configuration_facade.get_autosubmit_version()) self._process_pkl() def prepare_jobs_for_performance_metrics(self): @@ -51,13 +47,10 @@ class PklOrganizer(object): return [SimpleJob(job.name, tmp_path, job.status) for job in self.current_content] def _process_pkl(self): - if os.path.exists(self.pkl_path): - try: - self.current_content = PklReader(self.configuration_facade.expid).parse_job_list() - except Exception as exc: - raise Exception("Exception while reading the pkl content: {}".format(str(exc))) - else: - raise Exception("Pkl file {0} not found.".format(self.pkl_path)) + try: + self.current_content = PklReader(self.expid).parse_job_list() + except Exception as exc: + raise Exception("Exception while reading the pkl content: {}".format(str(exc))) def identify_dates_members_sections(self): for job in self.current_content: @@ -115,11 +108,10 @@ class PklOrganizer(object): jobs.sort(key = lambda x: x.start, reverse=False) def __repr__(self): - return "Path: {5} \nTotal {0}\nSIM {1}\nPOST {2}\nTRANSFER {3}\nCLEAN {4}".format( + return "Total {0}\nSIM {1}\nPOST {2}\nTRANSFER {3}\nCLEAN {4}".format( len(self.current_content), len(self.sim_jobs), len(self.post_jobs), len(self.transfer_jobs), - len(self.clean_jobs), - self.pkl_path + len(self.clean_jobs) ) diff --git a/autosubmit_api/components/experiment/test.py b/autosubmit_api/components/experiment/test.py deleted file mode 100644 index 8dc766396fd4b92ca735817dafa073f3dee03786..0000000000000000000000000000000000000000 --- a/autosubmit_api/components/experiment/test.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -import unittest -from mock import Mock -from autosubmit_api.components.jobs.job_factory import SimJob -from autosubmit_api.components.experiment.pkl_organizer import PklOrganizer -from autosubmit_api.builders.configuration_facade_builder import ConfigurationFacadeDirector, AutosubmitConfigurationFacadeBuilder -from autosubmit_api.common.utils_for_testing import get_mock_basic_config - -class TestConfigurationFacade(unittest.TestCase): - def setUp(self): - pass - - def test_configfacade_build_object(self): - basic_config = get_mock_basic_config() - - sut = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder("a28v")).build_autosubmit_configuration_facade(basic_config) - - self.assertTrue(sut.chunk_size == 1) - self.assertTrue(sut.chunk_unit == "month") - self.assertTrue(sut.current_years_per_sim == float(1.0/12.0)) - - self.assertTrue(sut.sim_processors == 5040) - self.assertTrue(sut.get_owner_id() > 0) - self.assertTrue(sut.get_autosubmit_version() == "3.13.0") - self.assertTrue(sut.get_main_platform() == "marenostrum4") - self.assertTrue(sut.get_project_type() == "git") - self.assertTrue(sut.get_model() == "https://earth.bsc.es/gitlab/es/auto-ecearth3.git") - self.assertTrue(sut.get_branch() == "3.2.2_Primavera_Stream2_production_T1279-ORCA12") - self.assertTrue(sut.get_platform_qos("marenostrum4", 5040) == "class_a") - - - def test_configfacade_update_sims_updated(self): - sim_jobs = [SimJob(), SimJob(), SimJob(), SimJob(), SimJob()] - basic_config = get_mock_basic_config() - - sut = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder("a28v")).build_autosubmit_configuration_facade(basic_config) - sut.update_sim_jobs(sim_jobs) - - for job in sim_jobs: - self.assertTrue(job.ncpus == 5040) - self.assertTrue(job.years_per_sim == float(1.0/12.0)) - - def test_get_queue_serial(self): - basic_config = get_mock_basic_config() - - sut = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder("a28v")).build_autosubmit_configuration_facade(basic_config) - - self.assertTrue(sut.get_platform_qos("cca-intel", 5040) == "np") - self.assertTrue(sut.get_platform_qos("cca-intel", 1) == "ns") - - -class TestPklOrganizer(unittest.TestCase): - - def setUp(self): - self.configuration_facade = Mock() # - self.configuration_facade.pkl_path = "autosubmit_api/components/experiment/test_case/a29z/pkl/job_list_a29z.pkl" - self.configuration_facade.get_autosubmit_version.return_value = "3.13.0" - self.pkl_organizer = PklOrganizer(self.configuration_facade) - self.assertTrue(len(self.pkl_organizer.current_content) == 590) - self.assertTrue(len(self.pkl_organizer.sim_jobs) == 0) - self.assertTrue(len(self.pkl_organizer.post_jobs) == 0) - self.assertTrue(len(self.pkl_organizer.transfer_jobs) == 0) - self.assertTrue(len(self.pkl_organizer.clean_jobs) == 0) - self.assertTrue(len(self.pkl_organizer.dates) == 0) - self.assertTrue(len(self.pkl_organizer.members) == 0) - self.assertTrue(len(self.pkl_organizer.sections) == 0) - - def tearDown(self): - del self.pkl_organizer - - def test_identify_configuration(self): - self.pkl_organizer.identify_dates_members_sections() - self.assertTrue(len(self.pkl_organizer.dates) == 2) - self.assertTrue(len(self.pkl_organizer.members) == 7) - self.assertTrue(len(self.pkl_organizer.sections) == 9) - - def test_distribute_jobs(self): - self.pkl_organizer.distribute_jobs() - self.assertTrue(len(self.pkl_organizer.sim_jobs) == 168) - self.assertTrue(len(self.pkl_organizer.post_jobs) == 168) - self.assertTrue(len(self.pkl_organizer.transfer_jobs) == 42) - self.assertTrue(len(self.pkl_organizer.clean_jobs) == 168) - - def test_validate_warnings(self): - self.pkl_organizer.distribute_jobs() - self.assertTrue(len(self.pkl_organizer.get_completed_section_jobs("TRANSFER")) == 0) # There are no COMPLETED TRANSFER Jobs - self.pkl_organizer._validate_current() - self.assertTrue(self.pkl_organizer.warnings[0].startswith("RSYPD")) - - -if __name__ == '__main__': - unittest.main() diff --git a/autosubmit_api/components/representations/graph/test.py b/autosubmit_api/components/representations/graph/test.py deleted file mode 100644 index c89557fa0b70fbef33a995ce9bb39436df8a0443..0000000000000000000000000000000000000000 --- a/autosubmit_api/components/representations/graph/test.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python - -import unittest -import math -import autosubmit_api.common.utils_for_testing as TestUtils - -from autosubmit_api.common.utils import Status -from autosubmit_api.components.representations.graph.graph import GraphRepresentation, Layout -from autosubmit_api.builders.joblist_loader_builder import JobListLoaderBuilder, JobListLoaderDirector - -CASE_NO_WRAPPERS = "a3tb" # Job count = 55 -CASE_WITH_WRAPPERS = "a28v" -class TestGraph(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def get_loader(self, expid): - return JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader(TestUtils.get_mock_basic_config()) - - def get_standard_case_with_no_calculations(self) -> GraphRepresentation: - """ """ - loader = JobListLoaderDirector(JobListLoaderBuilder(CASE_NO_WRAPPERS)).build_loaded_joblist_loader(TestUtils.get_mock_basic_config()) - return GraphRepresentation(CASE_NO_WRAPPERS, loader, Layout.STANDARD) - - def get_wrapper_case_with_no_calculations(self) -> GraphRepresentation: - loader = JobListLoaderDirector(JobListLoaderBuilder(CASE_WITH_WRAPPERS)).build_loaded_joblist_loader(TestUtils.get_mock_basic_config()) - return GraphRepresentation(CASE_WITH_WRAPPERS, loader, Layout.STANDARD) - - def test_jobs_exist_and_equal_to_known_count(self): - # Arrange - sut = self.get_standard_case_with_no_calculations() - # Act: Graph constructos gets the jobs from the loader - # Assert - self.assertGreater(sut.job_count, 0) - self.assertEqual(sut.job_count, 55) - - def test_added_edges_same_as_parent_chilren_relations(self): - # Arrange - sut = self.get_standard_case_with_no_calculations() - relations_count = 0 - for job in sut.jobs: - relations_count += len(job.children_names) - # Act - sut.add_normal_edges() - # Assert - self.assertTrue(sut.edge_count == relations_count) - - - def test_level_updated(self): - # Arrange - sut = self.get_standard_case_with_no_calculations() - # Act - sut.add_normal_edges() - sut.update_jobs_level() - # Assert - self.assertEqual(sut.job_dictionary["a3tb_LOCAL_SETUP"].level, 1) - self.assertEqual(sut.job_dictionary["a3tb_REMOTE_SETUP"].level, 3) - self.assertEqual(sut.job_dictionary["a3tb_19930501_fc01_1_SAVEIC"].level, 6) - - def test_graphviz_coordinates_are_added(self): - sut = self.get_standard_case_with_no_calculations() - - sut.add_normal_edges() - sut.reset_jobs_coordinates() - sut.assign_graphviz_coordinates_to_jobs() - - for job in sut.jobs: - self.assertTrue(job.x_coordinate != 0 or job.y_coordinate != 0) - - def test_graphviz_generated_coordinates(self): - sut = self.get_standard_case_with_no_calculations() - - sut.add_normal_edges() - sut.reset_jobs_coordinates() - sut.assign_graphviz_calculated_coordinates_to_jobs() - - for job in sut.jobs: - self.assertTrue(job.x_coordinate != 0 or job.y_coordinate != 0) - - - def test_laplacian_generates_coordinates(self): - sut = self.get_standard_case_with_no_calculations() - - sut.add_normal_edges() - sut.reset_jobs_coordinates() - sut.assign_laplacian_coordinates_to_jobs() - center_count = 0 - for job in sut.jobs: - if job.x_coordinate == 0 and job.y_coordinate == 0: - center_count += 1 - - self.assertTrue(center_count <= math.ceil(sut.job_count/2)) - - def test_barycentric_generates_unique_coordinates(self): - sut = self.get_standard_case_with_no_calculations() - - sut.add_normal_edges() - sut.reset_jobs_coordinates() - sut.update_jobs_level() - sut.assign_barycentric_coordinates_to_jobs() - - unique_coordinates = set() - for job in sut.jobs: - self.assertTrue(job.x_coordinate > 0 or job.y_coordinate > 0) - self.assertNotIn((job.x_coordinate, job.y_coordinate), unique_coordinates) - #self.assertTrue((job.x_coordinate, job.y_coordinate) not in unique_coordinates) - unique_coordinates.add((job.x_coordinate, job.y_coordinate)) - - def test_wrong_layout_fails(self): - with self.assertRaises(ValueError): - # Arrange - loader = JobListLoaderDirector(JobListLoaderBuilder("a28v")).build_loaded_joblist_loader(TestUtils.get_mock_basic_config()) - graph = GraphRepresentation("a29z", loader, "otherlayout") - # Act - graph.perform_calculations() - # Assert - - def test_calculate_average_post_time_is_zero(self): - sut = self.get_standard_case_with_no_calculations() - - sut._calculate_average_post_time() - - self.assertEqual(sut.average_post_time, 0.0) - - def test_calculated_average_post_time_is_defined_value(self): - # sut = self.get_wrapper_case_with_no_calculations() - - # sut._calculate_average_post_time() - - # self.assertEqual() - # TODO: Add a case that includes COMPLETED POST sections - pass - - def test_generates_node_data(self): - sut = self.get_standard_case_with_no_calculations() - - sut.perform_calculations() - - self.assertGreater(len(sut.nodes), 0) - for node in sut.nodes: - self.assertIsNotNone(node["status"]) - self.assertIsNotNone(node["label"]) - self.assertIsNotNone(node["platform_name"]) - self.assertGreater(int(node["level"]), 0) - if node["status_code"] == Status.COMPLETED: - self.assertGreater(int(node["minutes"]), 0) - self.assertGreater(sut.max_children_count, 0) - self.assertGreater(sut.max_parent_count, 0) - - def test_generates_date_members_groups_correct_number(self): - sut = self.get_standard_case_with_no_calculations() - - sut.perform_calculations() - groups = sut._get_grouped_by_date_member_dict() - - self.assertGreater(len(groups), 0) - self.assertEqual(len(groups), int(len(sut.joblist_loader.dates)*len(sut.joblist_loader.members))) - - def test_grouped_by_status_generates_correct_groups(self): - sut = self.get_standard_case_with_no_calculations() - - sut.perform_calculations() - groups = sut._get_grouped_by_status_dict() - - self.assertTrue('WAITING' in groups) - self.assertTrue('COMPLETED' in groups) - self.assertTrue('SUSPENDED' in groups) - - def test_grouped_by_wrong_parameter_fails(self): - with self.assertRaises(ValueError): - loader = self.get_loader(CASE_NO_WRAPPERS) - sut = GraphRepresentation(CASE_NO_WRAPPERS, loader, Layout.STANDARD, "wrong-parameter") - - sut.perform_calculations() - - def test_out_err_files_are_generated_for_completed_jobs(self): - sut = self.get_standard_case_with_no_calculations() - basic_config = sut.joblist_helper.basic_config - - sut.perform_calculations() - - for job in sut.jobs: - if job.name in ['a3tb_19930101_fc01_1_SIM']: - self.assertTrue(job.out_file_path.startswith(basic_config.LOCAL_ROOT_DIR)) - self.assertTrue(job.err_file_path.startswith(basic_config.LOCAL_ROOT_DIR)) - else: - self.assertIsNone(job.out_file_path) - self.assertIsNone(job.err_file_path) - - - -if __name__ == '__main__': - unittest.main() diff --git a/autosubmit_api/components/representations/tree/test.py b/autosubmit_api/components/representations/tree/test.py deleted file mode 100644 index 2f1f24dbf6b5acaa7583f393d5f0bc0755d49f36..0000000000000000000000000000000000000000 --- a/autosubmit_api/components/representations/tree/test.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python - -import unittest -import common.utils_for_testing as UtilsForTesting -from components.representations.tree.tree import TreeRepresentation -from components.jobs.joblist_loader import JobListLoader -from components.experiment.pkl_organizer import PklOrganizer -from components.experiment.configuration_facade import AutosubmitConfigurationFacade -# from autosubmitAPIwu.config.basicConfig import BasicConfig -from bscearth.utils.config_parser import ConfigParserFactory -from config.config_common import AutosubmitConfigResolver - -from components.jobs.joblist_helper import JobListHelper - -class TestTreeRepresentation(unittest.TestCase): - def setUp(self): - # BasicConfig.read() - basic_config = UtilsForTesting.get_mock_basic_config() - self.EXPID = "a28v" - self.autosubmit_config = AutosubmitConfigResolver(self.EXPID, basic_config, ConfigParserFactory()) - self.autosubmit_config.reload() - self.configuration_facade = AutosubmitConfigurationFacade(self.EXPID, basic_config, self.autosubmit_config) - self.pkl_organizer = PklOrganizer(self.configuration_facade) - self.pkl_organizer.identify_dates_members_sections() - self.simple_jobs = self.pkl_organizer.get_simple_jobs(self.configuration_facade.tmp_path) - self.job_list_helper = JobListHelper(self.EXPID, self.simple_jobs, basic_config) - self.job_list_loader = JobListLoader(self.EXPID, self.configuration_facade, self.pkl_organizer, self.job_list_helper) - self.job_list_loader.load_jobs() - - def tearDown(self): - pass - - def test_full_tree_representation(self): - tree_representation = TreeRepresentation(self.EXPID, self.job_list_loader) - tree_representation.perform_calculations() - self.assertTrue(len(tree_representation.nodes) == 783) - self.assertTrue(len(tree_representation.joblist_loader.package_names) == 10) - - def test_date_member_distribution(self): - tree_representation = TreeRepresentation(self.EXPID, self.job_list_loader) - tree_representation._distribute_into_date_member_groups() - distribution_count = sum(len(tree_representation._date_member_distribution[item]) for item in tree_representation._date_member_distribution) - self.assertTrue(distribution_count + len(tree_representation._no_date_no_member_jobs) == 783) - self.assertTrue(len(tree_representation._date_member_distribution) == 1) - self.assertTrue(len(self.job_list_loader.dates) == len(tree_representation._distributed_dates)) - self.assertTrue(len(self.job_list_loader.members) == len(tree_representation._distributed_members)) - - - -if __name__ == '__main__': - unittest.main() diff --git a/autosubmit_api/config/__init__.py b/autosubmit_api/config/__init__.py index 50ad74c2c98e2b49726bfb1b5297a70f2ce886db..e09cf1bd31477c07e90d932725610b7841a49f38 100644 --- a/autosubmit_api/config/__init__.py +++ b/autosubmit_api/config/__init__.py @@ -30,15 +30,19 @@ GITHUB_OAUTH_CLIENT_SECRET = os.environ.get("GITHUB_OAUTH_CLIENT_SECRET") GITHUB_OAUTH_WHITELIST_ORGANIZATION = os.environ.get("GITHUB_OAUTH_WHITELIST_ORGANIZATION") GITHUB_OAUTH_WHITELIST_TEAM = os.environ.get("GITHUB_OAUTH_WHITELIST_TEAM") + # Startup options -RUN_BACKGROUND_TASKS_ON_START = os.environ.get("RUN_BACKGROUND_TASKS_ON_START") in [ - "True", - "T", - "true", -] # Default false - -DISABLE_BACKGROUND_TASKS = os.environ.get("DISABLE_BACKGROUND_TASKS") in [ - "True", - "T", - "true", -] # Default false +def get_run_background_tasks_on_start(): + return os.environ.get("RUN_BACKGROUND_TASKS_ON_START") in [ + "True", + "T", + "true", + ] # Default false + + +def get_disable_background_tasks(): + return os.environ.get("DISABLE_BACKGROUND_TASKS") in [ + "True", + "T", + "true", + ] # Default false diff --git a/autosubmit_api/database/db_manager.py b/autosubmit_api/database/db_manager.py deleted file mode 100644 index ad46394e8d1f6954d6cc00b4b99fa48ed7b35eec..0000000000000000000000000000000000000000 --- a/autosubmit_api/database/db_manager.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2015 Earth Sciences Department, BSC-CNS - -# This file is part of Autosubmit. - -# Autosubmit is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# Autosubmit is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . - -import sqlite3 -import os -from typing import List - -class DbManager(object): - """ - Class to manage an SQLite database. - """ - - def __init__(self, root_path: str, db_name: str, db_version: int = 1): - self.root_path = root_path - self.db_name = db_name - self.db_version = db_version - # is_new = not - if os.path.exists(self._get_db_filepath()): - self.connection = sqlite3.connect(self._get_db_filepath()) - elif os.path.exists(self._get_db_filepath() + ".db"): - self.connection = sqlite3.connect(self._get_db_filepath() + ".db") - else: - self.connection = None - # if is_new: - # self._initialize_database() - - def disconnect(self): - """ - Closes the manager connection - """ - if self.connection: - self.connection.close() - - def create_table(self, table_name: str, fields: List[str]): - """ - Creates a new table with the given fields - :param table_name: str - :param fields: List[str] - """ - if self.connection: - cursor = self.connection.cursor() - create_command = self.generate_create_table_command( - table_name, fields) - # print(create_command) - cursor.execute(create_command) - self.connection.commit() - - def create_view(self, view_name: str, statement: str): - """ - Creates a new view with the given statement - - Parameters - ---------- - view_name : str - Name of the view to create - statement : str - SQL statement - """ - if self.connection: - cursor = self.connection.cursor() - create_command = self.generate_create_view_command(view_name, statement) - # print(create_command) - cursor.execute(create_command) - self.connection.commit() - - def drop_table(self, table_name): - """ - Drops the given table - :param table_name: str - - """ - if self.connection: - cursor = self.connection.cursor() - drop_command = self.generate_drop_table_command(table_name) - cursor.execute(drop_command) - self.connection.commit() - - def insert(self, table_name, columns, values): - """ - Inserts a new row on the given table - :param table_name: str - :param columns: [str] - :param values: [str] - - """ - if self.connection: - cursor = self.connection.cursor() - insert_command = self.generate_insert_command( - table_name, columns[:], values[:]) - cursor.execute(insert_command) - self.connection.commit() - - def insertMany(self, table_name, data): - """ - Inserts multiple new rows on the given table - :param table_name: str - :param data: [()] - - """ - if self.connection: - cursor = self.connection.cursor() - insert_many_command = self.generate_insert_many_command( - table_name, len(data[0])) - cursor.executemany(insert_many_command, data) - self.connection.commit() - - def select_first(self, table_name): - """ - Returns the first row of the given table - :param table_name: str - :return row: [] - """ - if self.connection: - cursor = self._select_with_all_fields(table_name) - return cursor.fetchone() - - def select_first_where(self, table_name, where): - """ - Returns the first row of the given table that matches the given where conditions - :param table_name: str - :param where: [str] - :return row: [] - """ - if self.connection: - cursor = self._select_with_all_fields(table_name, where) - return cursor.fetchone() - - def select_all(self, table_name): - """ - Returns all the rows of the given table - :param table_name: str - :return rows: [[]] - """ - if self.connection: - cursor = self._select_with_all_fields(table_name) - return cursor.fetchall() - - def select_all_where(self, table_name, where): - """ - Returns all the rows of the given table that matches the given where conditions - :param table_name: str - :param where: [str] - :return rows: [[]] - """ - if self.connection: - cursor = self._select_with_all_fields(table_name, where) - return cursor.fetchall() - - def count(self, table_name): - """ - Returns the number of rows of the given table - :param table_name: str - :return int - """ - if self.connection: - cursor = self.connection.cursor() - count_command = self.generate_count_command(table_name) - cursor.execute(count_command) - return cursor.fetchone()[0] - - def drop(self): - """ - Drops the database (deletes the .db file) - - """ - if self.connection: - self.connection.close() - if os.path.exists(self._get_db_filepath()): - os.remove(self._get_db_filepath()) - - def _get_db_filepath(self) -> str: - """ - Returns the path of the .db file - """ - return os.path.join(self.root_path, self.db_name) - - def _initialize_database(self): - """ - Initialize the database with an options table - with the name and the version of the DB - - """ - if self.connection: - options_table_name = 'db_options' - columns = ['option_name', 'option_value'] - self.create_table(options_table_name, columns) - self.insert(options_table_name, columns, ['name', self.db_name]) - self.insert(options_table_name, columns, - ['version', self.db_version]) - - def _select_with_all_fields(self, table_name, where=[]): - """ - Returns the cursor of the select command with the given parameters - :param table_name: str - :param where: [str] - :return cursor: Cursor - """ - if self.connection: - cursor = self.connection.cursor() - count_command = self.generate_select_command(table_name, where[:]) - cursor.execute(count_command) - return cursor - - """ - Static methods that generates the SQLite commands to make the queries - """ - - @staticmethod - def generate_create_table_command(table_name: str, fields: List[str]) -> str: - create_command = f'CREATE TABLE IF NOT EXISTS {table_name} ( {", ".join(fields)} )' - return create_command - - @staticmethod - def generate_create_view_command(view_name: str, statement: str) -> str: - create_command = f'CREATE VIEW IF NOT EXISTS {view_name} as {statement}' - return create_command - - @staticmethod - def generate_drop_table_command(table_name: str): - drop_command = f'DROP TABLE IF EXISTS {table_name}' - return drop_command - - @staticmethod - def generate_insert_command(table_name, columns, values): - insert_command = 'INSERT INTO ' + table_name + '(' + columns.pop(0) - for column in columns: - insert_command += (', ' + column) - insert_command += (') VALUES ("' + str(values.pop(0)) + '"') - for value in values: - insert_command += (', "' + str(value) + '"') - insert_command += ')' - return insert_command - - @staticmethod - def generate_insert_many_command(table_name, num_of_values): - insert_command = 'INSERT INTO ' + table_name + ' VALUES (?' - num_of_values -= 1 - while num_of_values > 0: - insert_command += ',?' - num_of_values -= 1 - insert_command += ')' - return insert_command - - @staticmethod - def generate_count_command(table_name): - count_command = 'SELECT count(*) FROM ' + table_name - return count_command - - @staticmethod - def generate_select_command(table_name, where=[]): - basic_select = 'SELECT * FROM ' + table_name - select_command = basic_select if len( - where) == 0 else basic_select + ' WHERE ' + where.pop(0) - for condition in where: - select_command += ' AND ' + condition - return select_command diff --git a/autosubmit_api/experiment/common_requests.py b/autosubmit_api/experiment/common_requests.py index e7f055bcd6bc38fa4738a84b4efc6790c00dbcbf..f7a901c560d627a6497b2e2772c037ffd918e693 100644 --- a/autosubmit_api/experiment/common_requests.py +++ b/autosubmit_api/experiment/common_requests.py @@ -46,6 +46,7 @@ from autosubmit_api.monitor.monitor import Monitor from autosubmit_api.persistance.experiment import ExperimentPaths from autosubmit_api.persistance.job_package_reader import JobPackageReader +from autosubmit_api.persistance.pkl_reader import PklReader from autosubmit_api.statistics.statistics import Statistics from autosubmit_api.config.basicConfig import APIBasicConfig @@ -292,7 +293,7 @@ def _is_exp_running(expid: str, time_condition=300) -> Tuple[bool, str, bool, in return (error, error_message, is_running, timediff, definite_log_path) -def get_experiment_summary(expid, log): +def get_experiment_summary(expid: str, log): """ Gets job summary for the experiment. Consider seconds. :param expid: Name of experiment @@ -329,9 +330,7 @@ def get_experiment_summary(expid, log): fakeAllJobs = list() # Read PKL - autosubmit_config_facade = ConfigurationFacadeDirector( - AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() - pkl_organizer = PklOrganizer(autosubmit_config_facade) + pkl_organizer = PklOrganizer(expid) for job_item in pkl_organizer.current_content: status_code = job_item.status job_name = job_item.name @@ -571,120 +570,131 @@ def get_job_log(expid, logfile, nlines=150): 'logcontent': logcontent} -def get_experiment_pkl(expid: str) -> Dict: +def _retrieve_pkl_data(expid: str, out_format: str = "tree"): """ - Gets the current state of the pkl in a format proper for graph update. + Retrieves pkl data for the experiment. """ pkl_file_path = "" error = False error_message = "" pkl_content = list() - pkl_timestamp = 0 package_to_jobs = dict() + pkl_timestamp = 0 + try: - autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() - pkl_file_path = autosubmit_config_facade.pkl_path - pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() + pkl_reader = PklReader(expid) + pkl_file_path = pkl_reader.pkl_path + pkl_timestamp = pkl_reader.get_modified_time() - if not os.path.exists(autosubmit_config_facade.pkl_path): - raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) + if not os.path.exists(pkl_file_path): + raise Exception("Pkl file {} not found.".format(pkl_file_path)) - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + # Get last run data for each job + try: + experiment_history = ExperimentHistoryDirector( + ExperimentHistoryBuilder(expid) + ).build_reader_experiment_history() + last_jobs_run = experiment_history.get_all_jobs_last_run_dict() + except Exception: + last_jobs_run = {} + + job_list_loader = JobListLoaderDirector( + JobListLoaderBuilder(expid) + ).build_loaded_joblist_loader() package_to_jobs = job_list_loader.joblist_helper.package_to_jobs for job in job_list_loader.jobs: - pkl_content.append({'name': job.name, - 'rm_id': job.rm_id, - 'status_code': job.status, - 'SYPD': calculate_SYPD_perjob(autosubmit_config_facade.chunk_unit, autosubmit_config_facade.chunk_size, job.chunk, job.run_time, job.status), - 'minutes': job.run_time, - 'minutes_queue': job.queue_time, - 'submit': common_utils.timestamp_to_datetime_format(job.submit), - 'start': common_utils.timestamp_to_datetime_format(job.start), - 'finish': common_utils.timestamp_to_datetime_format(job.finish), - 'running_text': job.running_time_text, - 'dashed': True if job.package else False, - 'shape': job_list_loader.joblist_helper.package_to_symbol.get(job.package, "dot"), - 'package': job.package, - 'status': job.status_text, - 'status_color': job.status_color, - 'out': job.out_file_path, - 'err': job.err_file_path, - 'priority': job.priority}) + # Calculate SYPD + SYPD = None + last_run = last_jobs_run.get(job.name) + if last_run and last_run.chunk_unit and last_run.chunk_size: + SYPD = calculate_SYPD_perjob( + last_run.chunk_unit, + last_run.chunk_size, + job.chunk, + job.run_time, + job.status, + ) + + formatted_job_data = { + "name": job.name, + "rm_id": job.rm_id, + "status_code": job.status, + "SYPD": SYPD, + "minutes": job.run_time, + "minutes_queue": job.queue_time, + "submit": common_utils.timestamp_to_datetime_format(job.submit), + "start": common_utils.timestamp_to_datetime_format(job.start), + "finish": common_utils.timestamp_to_datetime_format(job.finish), + "running_text": job.running_time_text, + "status": job.status_text, + "status_color": job.status_color, + "out": job.out_file_path, + "err": job.err_file_path, + "priority": job.priority, + } - except Exception as e: + if out_format == "tree": + formatted_job_data.update( + { + "wrapper": job.package, + "wrapper_tag": job.package_tag, + "wrapper_id": job.package_code, + "title": job.tree_title, + } + ) + elif out_format == "graph": + formatted_job_data.update( + { + "dashed": True if job.package else False, + "shape": job_list_loader.joblist_helper.package_to_symbol.get( + job.package, "dot" + ), + "package": job.package, + } + ) + + pkl_content.append(formatted_job_data) + + except Exception as exc: error = True - error_message = str(e) + error_message = str(exc) - return { - 'pkl_file_name': pkl_file_path, - 'error': error, - 'error_message': error_message, - 'has_changed': True, - 'pkl_content': pkl_content, - 'pkl_timestamp': pkl_timestamp, - 'packages': package_to_jobs, + result = { + "pkl_file_name": pkl_file_path, + "error": error, + "error_message": error_message, + "has_changed": True, + "pkl_content": pkl_content, + "pkl_timestamp": pkl_timestamp, + "packages": package_to_jobs, } + if out_format == "tree": + result.update( + { + "source_tag": JUtils.source_tag, + "target_tag": JUtils.target_tag, + "sync_tag": JUtils.sync_tag, + "check_mark": JUtils.checkmark_tag, + } + ) -def get_experiment_tree_pkl(expid: str) -> Dict[str, Any]: - """ - Gets the current state of the pkl in a format for tree update - """ - pkl_file_path = "" - error = False - error_message = "" - pkl_content = list() - package_to_jobs = {} - pkl_timestamp = 0 + return result - try: - autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() - pkl_file_path = autosubmit_config_facade.pkl_path - pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() - if not os.path.exists(autosubmit_config_facade.pkl_path): - raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) +def get_experiment_pkl(expid: str) -> Dict[str, Any]: + """ + Gets the current state of the pkl in a format proper for graph update. + """ + return _retrieve_pkl_data(expid, out_format="graph") - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() - package_to_jobs = job_list_loader.joblist_helper.package_to_jobs - for job in job_list_loader.jobs: - pkl_content.append({'name': job.name, - 'rm_id': job.rm_id, - 'status_code': job.status, - 'SYPD': calculate_SYPD_perjob(autosubmit_config_facade.chunk_unit, autosubmit_config_facade.chunk_size, job.chunk, job.run_time, job.status), - 'minutes': job.run_time, - 'minutes_queue': job.queue_time, - 'submit': common_utils.timestamp_to_datetime_format(job.submit), - 'start': common_utils.timestamp_to_datetime_format(job.start), - 'finish': common_utils.timestamp_to_datetime_format(job.finish), - 'running_text': job.running_time_text, - 'status': job.status_text, - 'status_color': job.status_color, - 'wrapper': job.package, - 'wrapper_tag': job.package_tag, - 'wrapper_id': job.package_code, - 'out': job.out_file_path, - 'err': job.err_file_path, - 'title': job.tree_title, - 'priority': job.priority}) - except Exception as e: - error = True - error_message = str(e) - return { - 'pkl_file_name': pkl_file_path, - 'error': error, - 'error_message': error_message, - 'has_changed': True, - 'pkl_content': pkl_content, - 'packages': list(package_to_jobs.keys()), - 'pkl_timestamp': pkl_timestamp, - 'source_tag': JUtils.source_tag, - 'target_tag': JUtils.target_tag, - 'sync_tag': JUtils.sync_tag, - 'check_mark': JUtils.checkmark_tag, - } +def get_experiment_tree_pkl(expid: str) -> Dict[str, Any]: + """ + Gets the current state of the pkl in a format for tree update + """ + return _retrieve_pkl_data(expid, out_format="tree") def get_experiment_graph(expid, log, layout=Layout.STANDARD, grouped=GroupedBy.NO_GROUP): @@ -782,7 +792,7 @@ def get_experiment_tree_structured(expid, log): return {'tree': [], 'jobs': [], 'total': 0, 'reference': [], 'error': True, 'error_message': str(e), 'pkl_timestamp': 0} -def get_experiment_counters(expid): +def get_experiment_counters(expid: str): """ Returns status counters of the experiment. """ @@ -794,9 +804,7 @@ def get_experiment_counters(expid): # Default counter per status experiment_counters = {name: 0 for name in common_utils.Status.STRING_TO_CODE} try: - autosubmit_config_facade = ConfigurationFacadeDirector( - AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() - pkl_organizer = PklOrganizer(autosubmit_config_facade) + pkl_organizer = PklOrganizer(expid) for job_item in pkl_organizer.current_content: status_code = int(job_item.status) total += 1 @@ -812,7 +820,7 @@ def get_experiment_counters(expid): # TODO: Update to current representation standards and classes -def get_quick_view(expid): +def get_quick_view(expid: str): """ Lighter View """ error = False error_message = "" @@ -844,9 +852,7 @@ def get_quick_view(expid): # Reading PKL try: - autosubmit_config_facade = ConfigurationFacadeDirector( - AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() - pkl_organizer = PklOrganizer(autosubmit_config_facade) + pkl_organizer = PklOrganizer(expid) for job_item in pkl_organizer.current_content: status_code = int(job_item.status) # counters diff --git a/autosubmit_api/persistance/pkl_reader.py b/autosubmit_api/persistance/pkl_reader.py index 6bc5ceccf8a4c2c7b6cfff9dc48485527bcf00b9..446ff368ea62d85caefbcb9c26a9e55d89484b24 100644 --- a/autosubmit_api/persistance/pkl_reader.py +++ b/autosubmit_api/persistance/pkl_reader.py @@ -1,3 +1,4 @@ +import os from typing import List, Union import pickle from networkx import DiGraph @@ -7,7 +8,6 @@ from autosubmit_api.persistance.experiment import ExperimentPaths class PklReader: - def __init__(self, expid: str) -> None: self.expid = expid APIBasicConfig.read() @@ -17,6 +17,9 @@ class PklReader: with open(self.pkl_path, "rb") as f: return pickle.load(f, encoding="latin1") + def get_modified_time(self) -> int: + return int(os.stat(self.pkl_path).st_mtime) + def parse_job_list(self) -> List[PklJobModel]: job_list = [] obj = self.read_pkl() diff --git a/autosubmit_api/views/v4.py b/autosubmit_api/views/v4.py index a772856508bed9504461ce2aa2beaccac07595b5..138364bec482e092938c2a4d5b5ce22e424c5940 100644 --- a/autosubmit_api/views/v4.py +++ b/autosubmit_api/views/v4.py @@ -263,16 +263,7 @@ class ExperimentView(MethodView): for raw_exp in query_result: exp_builder = ExperimentBuilder() exp_builder.produce_base_from_dict(raw_exp._mapping) - - # Get additional data from config files - try: - exp_builder.produce_config_data() - except Exception as exc: - logger.warning( - f"Config files params were unable to get on search: {exc}" - ) - logger.warning(traceback.format_exc()) - + exp_builder.produce_pkl_modified_time() exp = exp_builder.product # Get current run data from history