diff --git a/autosubmit_api/app.py b/autosubmit_api/app.py index 053f4bd6c01b07e1046bfa08d285041447e5a24a..fdac797ead2520b2a24076f154cbcc052b22f3b8 100644 --- a/autosubmit_api/app.py +++ b/autosubmit_api/app.py @@ -25,8 +25,6 @@ from datetime import datetime, timedelta import requests import logging from flask_cors import CORS, cross_origin -# from flask_restful import Resource, Api -# from flask_restful.utils import cors from flask import Flask, request, session, redirect, url_for from bscearth.utils.log import Log from database.db_common import get_current_running_exp, update_experiment_description_owner @@ -36,7 +34,7 @@ from performance.performance_metrics import PerformanceMetrics from database.db_common import search_experiment_by_id from config.basicConfig import BasicConfig from builders.joblist_helper_builder import JobListHelperBuilder, JobListHelperDirector - +from multiprocessing import Manager JWT_SECRET = os.environ.get("SECRET_KEY") JWT_ALGORITHM = "HS256" @@ -44,9 +42,10 @@ JWT_EXP_DELTA_SECONDS = 84000*5 # 5 days sys.path.insert(0, os.path.abspath('.')) - app = Flask(__name__) +D = Manager().dict() + # CAS Stuff CAS_LOGIN_URL = os.environ.get("CAS_LOGIN_URL") # 'https://cas.bsc.es/cas/login' CAS_VERIFY_URL = os.environ.get("CAS_VERIFY_URL") # 'https://cas.bsc.es/cas/serviceValidate' @@ -56,6 +55,12 @@ gunicorn_logger = logging.getLogger('gunicorn.error') app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) +requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' +try: + requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += 'HIGH:!DH:!aNULL' +except AttributeError: + # no pyopenssl support used / needed / available + pass # CAS Login @app.route('/login') @@ -71,18 +76,19 @@ def login(): is_allowed = True if is_allowed == False: return {'authenticated': False, 'user': None, 'token': None, 'message': "Your client is not authorized for this operation. The API admin needs to add your URL to the list of allowed clients."} - + target_service = "{}{}/login".format(referrer, environment) if not ticket: - route_to_request_ticket = "{}?service={}".format(CAS_LOGIN_URL, target_service) + route_to_request_ticket = "{}?service={}".format(CAS_LOGIN_URL, target_service) + app.logger.info("Redirected to: " + str(route_to_request_ticket)) return redirect(route_to_request_ticket) - environment = environment if environment is not None else "autosubmitapp" # can be used to target the test environment - cas_verify_ticket_route = CAS_VERIFY_URL + '?service=' + target_service + '&ticket=' + ticket + environment = environment if environment is not None else "autosubmitapp" # can be used to target the test environment + cas_verify_ticket_route = CAS_VERIFY_URL + '?service=' + target_service + '&ticket=' + ticket response = requests.get(cas_verify_ticket_route) user = None if response: user = Utiles.get_cas_user_from_xml(response.content) - app.logger.info('CAS verify ticket response: user %s', user) + app.logger.info('CAS verify ticket response: user %s', user) if not user: return {'authenticated': False, 'user': None, 'token': None, 'message': "Can't verify user."} else: # Login successful @@ -112,7 +118,7 @@ def update_description(): jwt_token = jwt.decode(current_token, JWT_SECRET, JWT_ALGORITHM) except jwt.ExpiredSignatureError: jwt_token = {"user_id": None} - except Exception as exp: + except Exception as exp: jwt_token = {"user_id": None} valid_user = jwt_token.get("user_id", None) app.logger.info('UDESC|RECEIVED|') @@ -147,7 +153,7 @@ def test_token(): @app.route('/cconfig/', methods=['GET']) @cross_origin(expose_headers="Authorization") -def get_current_configuration(expid): +def get_current_configuration(expid): start_time = time.time() current_token = request.headers.get("Authorization") try: @@ -156,7 +162,7 @@ def get_current_configuration(expid): jwt_token = {"user_id": None} valid_user = jwt_token.get("user_id", None) app.logger.info('CCONFIG|RECEIVED|' + str(expid)) - result = CommonRequests.get_current_configuration_by_expid(expid, valid_user) + result = CommonRequests.get_current_configuration_by_expid(expid, valid_user, app.logger) app.logger.info('CCONFIG|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -196,7 +202,7 @@ def search_owner(owner, exptype=None, onlyactive=None): @app.route('/search/', methods=['GET']) def search_expid(expid, exptype=None, onlyactive=None): start_time = time.time() - app.logger.info('SEARCH|RECEIVED|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive)) + app.logger.info('SEARCH|RECEIVED|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive)) result = search_experiment_by_id(expid, owner=None, typeExp=exptype, onlyActive=onlyactive) app.logger.info('SEARCH|RTIME|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive) + "|" + str(time.time() - start_time)) return result @@ -210,6 +216,7 @@ def search_running(): if 'username' in session: print("USER {}".format(session['username'])) start_time = time.time() + app.logger.info("Active proceses: " + str(D)) app.logger.info('RUN|RECEIVED|') #app.logger.info("Received Currently Running query ") result = get_current_running_exp() @@ -250,19 +257,61 @@ def get_log_running(expid): @app.route('/summary/', methods=['GET']) def get_expsummary(expid): start_time = time.time() + user = request.args.get("loggedUser", default="null", type=str) app.logger.info('SUMMARY|RECEIVED|' + str(expid)) + if user != "null": D[os.getpid()] = [user, "summary", True] result = CommonRequests.get_experiment_summary(expid) + app.logger.info('Process: ' + str(os.getpid()) + " workers: " + str(D)) app.logger.info('SUMMARY|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "summary", False] + if user != "null": D.pop(os.getpid(), None) return result +@app.route('/shutdown/') +def shutdown(route): + """ + This function is invoked from the frontend (AS-GUI) to kill workers that are no longer needed. + This call is common in heavy parts of the GUI such as the Tree and Graph generation or Summaries fetching. + """ + start_time = time.time() + + try: + user = request.args.get("loggedUser", default="null", type=str) + expid = request.args.get("expid", default="null", type=str) + except Exception as exp: + app.logger.info("Bad parameters for user and expid in route.") + + if user != "null": + app.logger.info('SHUTDOWN|RECEIVED for route: ' + route + " user: " + user + " expid: " + expid) + try: + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) + app.logger.info("Workers before: " + str(D)) + for k,v in D.items(): + if v[0] == user and v[1] == route and v[-1] == True: + if v[2] == expid: + D[k] = [user, route, expid, False] + else: + D[k] = [user, route, False] + D.pop(k, None) + # reboot the worker + os.system('kill -HUP ' + str(k)) + app.logger.info("killed worker " + str(k)) + app.logger.info("Workers now: " + str(D)) + except Exception as exp: + app.logger.info("[CRITICAL] Could not shutdown process " + expid + " by user \"" + user + "\"") + app.logger.info('SHUTDOWN|DONE|RTIME' + "|" + str(time.time() - start_time)) + return "" + + @app.route('/performance/', methods=['GET']) def get_exp_performance(expid): start_time = time.time() app.logger.info('PRF|RECEIVED|' + str(expid)) result = {} - try: - result = PerformanceMetrics(expid, JobListHelperDirector(JobListHelperBuilder(expid)).build_job_list_helper()).to_json() + try: + result = PerformanceMetrics(expid, JobListHelperDirector(JobListHelperBuilder(expid)).build_job_list_helper()).to_json() except Exception as exp: result = {"SYPD": None, "ASYPD": None, @@ -274,7 +323,7 @@ def get_exp_performance(expid): "error": True, "error_message": str(exp), "warnings_job_data": [], - } + } app.logger.info('PRF|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -282,18 +331,32 @@ def get_exp_performance(expid): @app.route('/graph///', methods=['GET']) def get_list_format(expid, layout='standard', grouped='none'): start_time = time.time() + user = request.args.get("loggedUser", default="null", type=str) + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) app.logger.info('GRAPH|RECEIVED|' + str(expid) + "~" + str(grouped) + "~" + str(layout)) - result = CommonRequests.get_experiment_graph(expid, layout, grouped) + if user != "null": D[os.getpid()] = [user, "graph", expid, True] + result = CommonRequests.get_experiment_graph(expid, app.logger, layout, grouped) + app.logger.info('Process: ' + str(os.getpid()) + " graph workers: " + str(D)) app.logger.info('GRAPH|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "graph", expid, False] + if user != "null": D.pop(os.getpid(), None) return result @app.route('/tree/', methods=['GET']) def get_exp_tree(expid): start_time = time.time() - app.logger.info('TREE|RECEIVED|' + str(expid)) - result = CommonRequests.get_experiment_tree_structured(expid) + user = request.args.get("loggedUser", default="null", type=str) + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) + app.logger.info('TREE|RECEIVED|' + str(expid)) + if user != "null": D[os.getpid()] = [user, "tree", expid, True] + result = CommonRequests.get_experiment_tree_structured(expid, app.logger) + app.logger.info('Process: ' + str(os.getpid()) + " tree workers: " + str(D)) app.logger.info('TREE|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "tree", expid, False] + if user != "null": D.pop(os.getpid(), None) return result @@ -312,7 +375,7 @@ def get_experiment_running(expid): Finds log and gets the last 150 lines """ start_time = time.time() - app.logger.info('LOG|RECEIVED|' + str(expid)) + app.logger.info('LOG|RECEIVED|' + str(expid)) result = CommonRequests.get_experiment_log_last_lines(expid) app.logger.info('LOG|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -324,7 +387,7 @@ def get_job_log_from_path(logfile): Get log from path """ expid = logfile.split('_') if logfile is not None else "" - expid = expid[0] if len(expid) > 0 else "" + expid = expid[0] if len(expid) > 0 else "" start_time = time.time() app.logger.info('JOBLOG|RECEIVED|{0}'.format(expid)) result = CommonRequests.get_job_log(expid, logfile) @@ -335,7 +398,7 @@ def get_job_log_from_path(logfile): @app.route('/pklinfo//', methods=['GET']) def get_experiment_pklinfo(expid, timeStamp): start_time = time.time() - app.logger.info('GPKL|RECEIVED|' + str(expid) + "~" + str(timeStamp)) + app.logger.info('GPKL|RECEIVED|' + str(expid) + "~" + str(timeStamp)) result = CommonRequests.get_experiment_pkl(expid) app.logger.info('GPKL|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -371,7 +434,7 @@ def get_exp_job_history(expid, jobname): @app.route('/rundetail//') def get_experiment_run_job_detail(expid, runid): start_time = time.time() - app.logger.info('RUNDETAIL|RECEIVED|' + str(expid) + "~" + str(runid)) + app.logger.info('RUNDETAIL|RECEIVED|' + str(expid) + "~" + str(runid)) result = CommonRequests.get_experiment_tree_rundetail(expid, runid) app.logger.info('RUNDETAIL|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -384,4 +447,3 @@ def get_file_status(): result = CommonRequests.get_last_test_archive_status() app.logger.info('FSTATUS|RTIME|' + str(time.time() - start_time)) return result - diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index 6bf81146abcb90b61849efb47ab42ad6be93b32b..cba04ffca7600f15e821834bf6f205299e985352 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -2340,12 +2340,12 @@ class JobList: def get_graph_representation(self, BasicConfig, layout='standard', grouped='none', chunk_unit=None, chunk_size=1): """ Return graph representation in JSON format.\n - :param layout: established the type of layour to generate: 'standard', 'laplacian'. - :type layout: string - :param grouped: type of grouping to be applied: 'date-member', 'status'. - :type grouped: string - :return: list of edges, list of nodes - :rtype: JSON format + :param layout: established the type of layour to generate: 'standard', 'laplacian'. + :type layout: string + :param grouped: type of grouping to be applied: 'date-member', 'status'. + :type grouped: string + :return: list of edges, list of nodes + :rtype: JSON format """ dateformat = self.get_date_format node_id = dict() @@ -2374,7 +2374,7 @@ class JobList: # Update Level allJobs = self.get_all() # Validate if the graph data should be updated - graph_drawing_data = ExperimentGraphDrawing(self.expid).get_validated_data(self.get_all()) + graph_drawing_data = ExperimentGraphDrawing(self.expid, BasicConfig).get_validated_data(self.get_all()) if not graph_drawing_data or len(allJobs) > 1000: # print('Start Traverse Update.') start_time = time() @@ -2851,7 +2851,7 @@ class JobList: def job_list_traverse_update(self): """ - Traverses current job list and updates attribute 'level' to + Traverses current job list and updates attribute 'level' to reflect the hierarchical position of each job according to its dependencies :return: list of jobs """ @@ -2930,7 +2930,7 @@ class JobList: def print_with_status(self, statusChange=None): """ - Returns the string representation of the dependency tree of + Returns the string representation of the dependency tree of the Job List :return: String representation @@ -2991,8 +2991,8 @@ class JobList: def _recursion_print(self, job, level, statusChange=None): """ - Returns the list of children in a recursive way. Traverses the dependency tree. - :return: parent + list of children + Returns the list of children in a recursive way. Traverses the dependency tree. + :return: parent + list of children :rtype: String """ result = "" @@ -3038,11 +3038,11 @@ class JobList: # job_data = None # Job information from worker database job_times = DbRequests.get_times_detail_by_expid(conn, expid) - conn.close() + conn.close() # Job information from job historic data - # print("Get current job data structure...") - experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history() - job_data = experiment_history.manager.get_all_last_job_data_dcs() if experiment_history.is_header_ready() else None + # print("Get current job data structure...") + experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history() + job_data = experiment_history.manager.get_all_last_job_data_dcs() if experiment_history.is_header_ready() else None # Result variables job_running_time_seconds = dict() job_running_to_runtext = dict() @@ -3082,14 +3082,14 @@ class JobList: def _job_running_check(status_code, name, tmp_path): # type: (int, str, str) -> Tuple[datetime.datetime, datetime.datetime, datetime.datetime, str] """ - Receives job data and returns the data from its TOTAL_STATS file in an ordered way. - :param status_code: Status of job - :type status_code: Integer - :param name: Name of job - :type name: String - :param tmp_path: Path to the tmp folder of the experiment - :type tmp_path: String - :return: submit time, start time, end time, status + Receives job data and returns the data from its TOTAL_STATS file in an ordered way. + :param status_code: Status of job + :type status_code: Integer + :param name: Name of job + :type name: String + :param tmp_path: Path to the tmp folder of the experiment + :type tmp_path: String + :return: submit time, start time, end time, status :rtype: 4-tuple in datetime format """ values = list() @@ -3152,18 +3152,18 @@ class JobList: def retrieve_times(status_code, name, tmp_path, make_exception=False, job_times=None, seconds=False, job_data_collection=None): # type: (int, str, str, bool, Dict[str, Tuple[int, int, int, int, int]], bool, List[JobData]) -> JobRow """ - Retrieve job timestamps from database. - :param status_code: Code of the Status of the job - :type status_code: Integer - :param name: Name of the job - :type name: String - :param tmp_path: Path to the tmp folder of the experiment - :type tmp_path: String - :param make_exception: flag for testing purposes + Retrieve job timestamps from database. + :param status_code: Code of the Status of the job + :type status_code: Integer + :param name: Name of the job + :type name: String + :param tmp_path: Path to the tmp folder of the experiment + :type tmp_path: String + :param make_exception: flag for testing purposes :type make_exception: Boolean :param job_times: Detail from as_times.job_times for the experiment :type job_times: Dictionary Key: job name, Value: 5-tuple (submit time, start time, finish time, status, detail id) - :return: minutes the job has been queuing, minutes the job has been running, and the text that represents it + :return: minutes the job has been queuing, minutes the job has been running, and the text that represents it :rtype: int, int, str """ status = "NA" @@ -3189,7 +3189,7 @@ class JobList: if status == job_data.status: energy = job_data.energy if job_times: - t_submit, t_start, t_finish, _, _ = job_times.get(name, (0, 0, 0, 0, 0)) + t_submit, t_start, t_finish, _, _ = job_times.get(name, (0, 0, 0, 0, 0)) if t_finish - t_start > job_data.running_time: t_submit = t_submit if t_submit > 0 else job_data.submit t_start = t_start if t_start > 0 else job_data.start @@ -3209,7 +3209,7 @@ class JobList: if t_start >= t_finish: if job_times: _, c_start, _, _, _ = job_times.get(name, (0, t_start, t_finish, 0, 0)) - job_data.start = c_start if t_start > c_start else t_start + job_data.start = c_start if t_start > c_start else t_start if seconds == False: queue_time = math.ceil(job_data.queuing_time / 60) @@ -3249,12 +3249,12 @@ class JobList: else: # For job times completed we no longer use timedeltas, but timestamps - status = Status.VALUE_TO_KEY[status_code] - if job_times and status_code not in [Status.READY, Status.WAITING, Status.SUSPENDED]: - if name in job_times: + status = Status.VALUE_TO_KEY[status_code] + if job_times and status_code not in [Status.READY, Status.WAITING, Status.SUSPENDED]: + if name in job_times: submit_time, start_time, finish_time, status, detail_id = job_times[name] seconds_running = finish_time - start_time - seconds_queued = start_time - submit_time + seconds_queued = start_time - submit_time submit_time = int(submit_time) start_time = int(start_time) finish_time = int(finish_time) @@ -3279,15 +3279,15 @@ class JobList: else: queue_time = seconds_queued running_time = seconds_running - # print(name + "\t" + str(queue_time) + "\t" + str(running_time)) - return JobRow(name, - int(queue_time), - int(running_time), - status, - energy, - int(submit_time), - int(start_time), - int(finish_time), + # print(name + "\t" + str(queue_time) + "\t" + str(running_time)) + return JobRow(name, + int(queue_time), + int(running_time), + status, + energy, + int(submit_time), + int(start_time), + int(finish_time), None, None) @@ -3296,13 +3296,13 @@ class JobList: """ Retrieves dictionaries that map the collection of packages in the experiment - :param basic_config: Basic configuration + :param basic_config: Basic configuration :type basic_config: Configuration Object :param expid: Experiment Id :type expid: String :param current_jobs: list of names of current jobs :type current_jobs: list - :return: job to package, package to jobs, package to package_id, package to symbol + :return: job to package, package to jobs, package to package_id, package to symbol :rtype: Dictionary(Job Object, Package_name), Dictionary(Package_name, List of Job Objects), Dictionary(String, String), Dictionary(String, String) """ monitor = Monitor() diff --git a/autosubmit_api/components/representations/graph/graph.py b/autosubmit_api/components/representations/graph/graph.py index 00683a72eb6a91da7b7d676954343fc2f62a2240..8f957a0dff1b5a4e42fe4a71d909d317e38a1e5e 100644 --- a/autosubmit_api/components/representations/graph/graph.py +++ b/autosubmit_api/components/representations/graph/graph.py @@ -35,7 +35,7 @@ class GraphRepresentation(object): # type: (str, JobListLoader, str, str) -> None self.expid = expid self.layout = layout - self.grouped_by = grouped + self.grouped_by = grouped self.joblist_loader = job_list_loader self.joblist_helper = self.joblist_loader.joblist_helper self.jobs = self.joblist_loader.jobs @@ -48,16 +48,16 @@ class GraphRepresentation(object): self.nodes = [] # type: List[Dict[str, Any]] self.groups = {} # type: Dict[str, Dict[str, Any]] self.max_children_count = 0 # type: int - self.max_parent_count = 0 # type: int - + self.max_parent_count = 0 # type: int + @property def job_count(self): return len(self.jobs) - + @property def edge_count(self): return len(self.edges) - + def perform_calculations(self): # type: () -> None """ Calculate Graph Representation """ @@ -87,17 +87,17 @@ class GraphRepresentation(object): def calculate_valid_drawing(self): if len(self.edges) <= 0: - raise ValueError("The generation of a drawing requires that the graph model includes edges.") + raise ValueError("The generation of a drawing requires that the graph model includes edges.") self.update_jobs_level() if self.layout == Layout.STANDARD: - self.assign_graphviz_coordinates_to_jobs() + self.assign_graphviz_coordinates_to_jobs() elif self.layout == Layout.LAPLACIAN: self.assign_laplacian_coordinates_to_jobs() else: raise ValueError("You have requested a {0} layout, which is not implemented.".format(self.layout)) if not self.we_have_valid_graph_drawing: self.assign_barycentric_coordinates_to_jobs() - + def _calculate_groups(self): if self.grouped_by == GroupedBy.STATUS: self.groups = self._get_grouped_by_status_dict() @@ -107,7 +107,7 @@ class GraphRepresentation(object): self.groups = dict() else: raise ValueError("You have provided an invalid grouping selection: {}".format(self.grouped_by)) - + def _get_grouped_by_status_dict(self): # type: () -> Dict[str, Dict[str, Any]] groups = {} @@ -127,13 +127,13 @@ class GraphRepresentation(object): for date in self.joblist_loader.dates: formatted_date = self.joblist_loader.dates_formatted_dict.get(date, None) for member in self.joblist_loader.members: - status_counters = {} + status_counters = {} group_name = "{}_{}_{}_".format(self.expid, formatted_date, member) - jobs_in_date_member = filter(lambda x: x.name.startswith(group_name), self.jobs) + jobs_in_date_member = filter(lambda x: x.name.startswith(group_name), self.jobs) if len(jobs_in_date_member) == 0: raise Exception("You have configured date {} and member {} in your experiment but there are no jobs that use these settings. \ Review your configuration, something might be wrong.".format(formatted_date, member)) - for job in jobs_in_date_member: + for job in jobs_in_date_member: status_counters[job.status] = status_counters.setdefault(job.status, 0) + 1 group_color[group_name] = self._get_defined_group_color(status_counters) group_coordinates.append((group_name, @@ -156,8 +156,8 @@ class GraphRepresentation(object): visited.add(group_name) for group_triple_compared in group_coordinates: group_name_compared, x_j_coordinate, y_j_coordinate = group_triple_compared - if group_name_compared not in visited: - if abs(x_i_coordinate - x_j_coordinate) <= 250 and abs(y_i_coordinate - y_j_coordinate) <= 250: + if group_name_compared not in visited: + if abs(x_i_coordinate - x_j_coordinate) <= 250 and abs(y_i_coordinate - y_j_coordinate) <= 250: if y_i_coordinate > y_j_coordinate: y_i_coordinate = y_i_coordinate + (250 - abs(y_i_coordinate - y_j_coordinate)) else: @@ -187,18 +187,18 @@ class GraphRepresentation(object): self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_graph_drawing_data()) self.we_have_valid_graphviz_drawing = self.we_have_valid_graph_drawing if not self.we_have_valid_graph_drawing and len(self.jobs) <= SMALL_EXPERIMENT_THRESHOLD: - self.assign_graphviz_calculated_coordinates_to_jobs() - + self.assign_graphviz_calculated_coordinates_to_jobs() + def assign_graphviz_calculated_coordinates_to_jobs(self): """ Runs GraphViz to get the coordinates """ - self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_drawing()) + self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_drawing()) self.we_have_valid_graphviz_drawing = self.we_have_valid_graph_drawing - + def assign_laplacian_coordinates_to_jobs(self): """ Calculates Laplacian """ self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_laplacian_drawing()) self.we_have_valid_graphviz_drawing = False - + def assign_barycentric_coordinates_to_jobs(self): """ Calculates coordinates """ self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_hierarchical_drawing()) @@ -216,12 +216,12 @@ class GraphRepresentation(object): if self.job_dictionary[children_job_name].level == 0: self.job_dictionary[children_job_name].level = current.level stack.append(self.job_dictionary[children_job_name]) - + job_roots_names = [job.name for job in self.jobs if len(job.parents_names) == 0] - for job_name in job_roots_names: + for job_name in job_roots_names: stack = [] update_level(self.job_dictionary[job_name]) - + def reset_jobs_coordinates(self): """ Mainly for testing purposes """ for job in self.jobs: @@ -229,15 +229,16 @@ class GraphRepresentation(object): def add_normal_edges(self): for job in self.jobs: - for child_name in job.children_names: - self.edges.append(RealEdge(job.name, child_name, self.joblist_loader.are_these_in_same_package(job.name, child_name))) - + for child_name in job.children_names: + if job.name != child_name: + self.edges.append(RealEdge(job.name, child_name, self.joblist_loader.are_these_in_same_package(job.name, child_name))) + def _calculate_average_post_time(self): - post_jobs = [job for job in self.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] + post_jobs = [job for job in self.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): - for job_name in self.job_dictionary: + for job_name in self.job_dictionary: job = self.job_dictionary[job_name] self._calculate_max_children_parent(len(job.children_names), len(job.parents_names)) ini_date, end_date = job.get_date_ini_end(self.joblist_loader.chunk_size, self.joblist_loader.chunk_unit) @@ -251,7 +252,7 @@ class GraphRepresentation(object): "platform_name": job.platform, "chunk": job.chunk, "package": job.package, - "member": job.member, + "member": job.member, "date": ini_date, "date_plus": end_date, "SYPD": PUtils.calculate_SYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.run_time, job.status), @@ -267,27 +268,27 @@ class GraphRepresentation(object): "dashed": True if job.package else False, "shape": self.joblist_helper.package_to_symbol.get(job.package, "dot"), "processors": job.ncpus, - "wallclock": job.wallclock, + "wallclock": job.wallclock, "children": len(job.children_names), "children_list": list(job.children_names), "parents": len(job.parents_names), "parent_list": list(job.parents_names), "out": job.out_file_path, - "err": job.err_file_path, + "err": job.err_file_path, "custom_directives": None, "rm_id": job.rm_id, "x": job.x_coordinate, - "y": job.y_coordinate + "y": job.y_coordinate }) def _calculate_max_children_parent(self, children_count, parent_count): # type: (int, int) -> None self.max_children_count = max(self.max_children_count, children_count) self.max_parent_count = max(self.max_parent_count, parent_count) - + def _assign_coordinates_to_jobs(self, valid_coordinates): """ False if valid_coordinates is None OR empty""" - # type: (Dict[str, Tuple[int, int]] | None) -> bool + # type: (Dict[str, Tuple[int, int]] | None) -> bool if valid_coordinates and len(valid_coordinates) > 0: for job_name in self.job_dictionary: self.job_dictionary[job_name].x_coordinate, self.job_dictionary[job_name].y_coordinate = valid_coordinates[job_name] @@ -308,7 +309,7 @@ class GraphRepresentation(object): if len(node_data) > 1 and node_data[0] == "node": coordinates[str(node_data[1])] = (int(float(node_data[2])) * GRAPHVIZ_MULTIPLIER, int(float(node_data[3])) * -GRAPHVIZ_MULTIPLIER) return coordinates - + def _get_calculated_graph_laplacian_drawing(self): # type: () -> Dict[str, Tuple[int, int]] coordinates = dict() @@ -318,7 +319,7 @@ class GraphRepresentation(object): for edge in self.edges: nx_graph.add_edge(edge._from, edge._to, weight=(3 if edge._is_in_wrapper else 1)) laplacian_matrix = nx.normalized_laplacian_matrix(nx_graph) - eigval, eigvec = sparse.linalg.eigsh(laplacian_matrix, k=4, which="SM") + eigval, eigvec = sparse.linalg.eigsh(laplacian_matrix, k=4, which="SM") eigval1 = float(eigval[1]) eigval2 = float(eigval[2]) x_coords = eigvec[:, 1] * (self.job_count / eigval1) * 10.0 @@ -326,7 +327,7 @@ class GraphRepresentation(object): for i, job_name in enumerate(nx_graph.nodes): coordinates[job_name] = (int(x_coords[i]), int(y_coords[i])) return coordinates - + def _get_calculated_hierarchical_drawing(self): # type: () -> Dict[str, Tuple[int, int]] coordinates = {} @@ -334,31 +335,31 @@ class GraphRepresentation(object): max_level = max(job.level for job in self.jobs) for i in range(2, max_level+1): if i == 2: - jobs_in_previous_layer = filter(lambda x: x.level == i-1, self.jobs) + jobs_in_previous_layer = filter(lambda x: x.level == i-1, self.jobs) for k, job in enumerate(jobs_in_previous_layer): - self.job_dictionary[job.name].horizontal_order = (k+1) + self.job_dictionary[job.name].horizontal_order = (k+1) jobs_in_layer = filter(lambda x: x.level == i, self.jobs) for job in jobs_in_layer: sum_order = sum(self.job_dictionary[job_name].horizontal_order for job_name in job.parents_names) if len(job.parents_names) > 0: self.job_dictionary[job.name].barycentric_value = sum_order/len(job.parents_names) - + jobs_in_layer.sort(key=lambda x: x.barycentric_value) job_names_in_layer = {job.name for job in jobs_in_layer} already_assigned_order = set() for job in jobs_in_layer: - if job.name not in already_assigned_order: + if job.name not in already_assigned_order: self.job_dictionary[job.name].horizontal_order = len(already_assigned_order) + 1 already_assigned_order.add(job.name) if job.package and (job.package, job.level) not in processed_packages: processed_packages.add((job.package, job.level)) - job_names_in_package_and_same_level = [job.name for job in jobs_in_layer if job.name in self.joblist_helper.package_to_jobs.get(job.package, [])] - for job_name in job_names_in_package_and_same_level: + job_names_in_package_and_same_level = [job.name for job in jobs_in_layer if job.name in self.joblist_helper.package_to_jobs.get(job.package, [])] + for job_name in job_names_in_package_and_same_level: if self.job_dictionary[job_name].name in job_names_in_layer and job_name not in already_assigned_order: self.job_dictionary[job_name].horizontal_order = len(already_assigned_order) + 1 already_assigned_order.add(job_name) - + for job_name in self.job_dictionary: # print("{} {} {}".format(job_name, self.job_dictionary[job_name].horizontal_order, self.job_dictionary[job_name].level)) coordinates[job_name] = (int(self.job_dictionary[job_name].horizontal_order*BARYCENTRIC_X_MULTIPLIER), int(self.job_dictionary[job_name].level*BARYCENTRIC_Y_MULTIPLIER)) @@ -372,10 +373,3 @@ class GraphRepresentation(object): # pairs = set() # for job_name_from in self.joblist_helper.package_to_jobs[package]: # for job_name_to in self.joblist_helper.package_to_jobs[package]: - - - - - - - diff --git a/autosubmit_api/components/representations/tree/tree.py b/autosubmit_api/components/representations/tree/tree.py index 4be1c15f0819bbababe1ec975874bd8b2b7052fd..263aecc928aa4d66da2a573864862171a9fb8c77 100644 --- a/autosubmit_api/components/representations/tree/tree.py +++ b/autosubmit_api/components/representations/tree/tree.py @@ -10,7 +10,7 @@ from typing import List, Dict, Tuple, Set, Any DEFAULT_MEMBER = "DEFAULT" class TreeRepresentation(object): - def __init__(self, expid, job_list_loader): + def __init__(self, expid, job_list_loader): # type: (str, JobListLoader) -> None self.expid = expid # type: str # self.jobs = [] # type: List[Job] @@ -24,18 +24,18 @@ class TreeRepresentation(object): self.nodes = [] # type: List[Dict] self._distributed_dates = OrderedDict() # type: OrderedDict[str, None] self._distributed_members = OrderedDict() # type: OrderedDict[str, None] - + def perform_calculations(self): # type: () -> None - self._distribute_into_date_member_groups() + self._distribute_into_date_member_groups() self._generate_date_member_tree_folders() self._generate_no_date_no_member_tree_folder() self._generate_package_tree_folders() self._complement_result_header() self._calculate_average_post_time() self._generate_node_data() - + def get_tree_structure(self): # type: () -> Dict[str, Any] return { @@ -47,7 +47,7 @@ class TreeRepresentation(object): "error_message": "", "pkl_timestamp": get_current_timestamp() } - + def _distribute_into_date_member_groups(self): # type: () -> None for job in self.joblist_loader.jobs: @@ -61,17 +61,17 @@ class TreeRepresentation(object): intersection_member_parent = self.joblist_loader.members & parents_members intersection_member_children = self.joblist_loader.members & children_members if len(intersection_member_parent) > 0 or len(intersection_member_children) > 0: - member = intersection_member_parent.pop() if len(intersection_member_parent) > 0 else intersection_member_children.pop() + member = intersection_member_parent.pop() if len(intersection_member_parent) > 0 else intersection_member_children.pop() self._date_member_distribution.setdefault((job.date, member), []).append(job) self._distributed_dates[job.date] = None self._distributed_members[member] = None - else: + else: self._date_member_distribution.setdefault((job.date, DEFAULT_MEMBER), []).append(job) self._distributed_dates[job.date] = None self._distributed_members[DEFAULT_MEMBER] = None else: self._no_date_no_member_jobs.append(job) - + def _generate_date_member_tree_folders(self): # type: () -> None for date in self._distributed_dates: @@ -90,7 +90,7 @@ class TreeRepresentation(object): Status.HELD: 0 } jobs_in_date_member = self._date_member_distribution.get((date, member), []) sections = {job.section for job in jobs_in_date_member} - section_to_dm_jobs_dict = {section: [job for job in jobs_in_date_member if job.section == section] for section in sections} + section_to_dm_jobs_dict = {section: [job for job in jobs_in_date_member if job.section == section] for section in sections} sections_folder_open = set() jobs_in_section = OrderedDict() jobs_and_folders_in_member = deque() @@ -99,7 +99,7 @@ class TreeRepresentation(object): all_waiting = all_waiting and job.status is Status.WAITING all_completed = all_completed and job.status is Status.COMPLETED if job.status in status_counters: - status_counters[job.status] += 1 + status_counters[job.status] += 1 if len(section_to_dm_jobs_dict[job.section]) > 1: if job.status in self._normal_status: jobs_in_section.setdefault(job.section, deque()).append(job.leaf) @@ -109,10 +109,10 @@ class TreeRepresentation(object): else: if job.status in self._normal_status: jobs_and_folders_in_member.append(job.leaf) - else: + else: jobs_and_folders_in_member.appendleft(job.leaf) job.tree_parent.append("{0}_{1}_{2}".format(self.expid, self.joblist_loader.dates_formatted_dict.get(date, None), member)) - + for section in jobs_in_section: jobs_and_folders_in_member.append({ 'title': section, @@ -122,16 +122,16 @@ class TreeRepresentation(object): 'expanded': True if section in sections_folder_open else False, 'children': list(jobs_in_section.get(section, [])) }) - - + + if len(jobs_in_date_member) > 0: # If there is something inside the date-member group, we create it. total_jobs_startdate += len(jobs_in_date_member) ref_key = "{0}_{1}_{2}".format(self.expid, formatted_date, member) folders_in_date.append({ - "title": JUtils.get_folder_date_member_title(self.expid, - formatted_date, - member, - len(jobs_in_date_member), + "title": JUtils.get_folder_date_member_title(self.expid, + formatted_date, + member, + len(jobs_in_date_member), status_counters), "folder": True, "refKey": ref_key, @@ -177,7 +177,7 @@ class TreeRepresentation(object): def _generate_no_date_no_member_tree_folder(self): """ Generates folder for job with no date and no member """ - if len(self._no_date_no_member_jobs) > 0: + if len(self._no_date_no_member_jobs) > 0: self.result_tree.append({ "title": "Keys", "folder": True, @@ -209,12 +209,12 @@ class TreeRepresentation(object): status_counters[job.status] += 1 result_exp_wrappers.append({ "title": JUtils.get_folder_package_title(package_name, total_count, status_counters), - "folder": True, + "folder": True, "refKey": simple_title, - "data": {'completed': status_counters[Status.COMPLETED], - 'failed': status_counters[Status.FAILED], - 'running': status_counters[Status.RUNNING], - 'queuing': status_counters[Status.QUEUING], + "data": {'completed': status_counters[Status.COMPLETED], + 'failed': status_counters[Status.FAILED], + 'running': status_counters[Status.RUNNING], + 'queuing': status_counters[Status.QUEUING], 'held': status_counters[Status.HELD], 'total': total_count }, "expanded": False, @@ -249,16 +249,16 @@ class TreeRepresentation(object): def _complement_result_header(self): self.result_header["completed_tag"] = JUtils.completed_tag_with_anchors self.result_header["running_tag"] = JUtils.running_tag_with_anchors - self.result_header["queuing_tag"] = JUtils.queuing_tag_with_anchors + self.result_header["queuing_tag"] = JUtils.queuing_tag_with_anchors self.result_header["failed_tag"] = JUtils.failed_tag_with_anchors self.result_header["held_tag"] = JUtils.held_tag_with_anchors self.result_header["checkmark"] = JUtils.checkmark_tag - self.result_header["packages"] = [package_name for package_name in self.joblist_loader.package_names] + self.result_header["packages"] = [package_name for package_name in self.joblist_loader.package_names] self.result_header["chunk_unit"] = self.joblist_loader.chunk_unit self.result_header["chunk_size"] = self.joblist_loader.chunk_size def _calculate_average_post_time(self): - post_jobs = [job for job in self.joblist_loader.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] + post_jobs = [job for job in self.joblist_loader.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): diff --git a/autosubmit_api/config/config_common.py b/autosubmit_api/config/config_common.py index 206bb76731ced017784614e9ddbb36aa77ffb6bb..83ae5d958162e8812b7fb0b2a42e25632a1f0e70 100644 --- a/autosubmit_api/config/config_common.py +++ b/autosubmit_api/config/config_common.py @@ -42,24 +42,56 @@ class AutosubmitConfig(object): :type expid: str """ - def __init__(self, expid, basic_config, parser_factory): - # type: (str, BasicConfig, ConfigParserFactory) -> None + def __init__(self, expid, basic_config, parser_factory, extension=".yml"): + # type: (str, BasicConfig, ConfigParserFactory, Extension) -> None self.expid = expid self.basic_config = basic_config self.parser_factory = parser_factory + # By default check for .yml files first as it is the new standard for AS 4.0 + self._conf_parser = None # type: ConfigParser - self._conf_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "autosubmit_" + expid + ".conf") + self._conf_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "autosubmit_" + expid + extension) + if os.path.exists(self._conf_parser_file) == False: + if extension == ".yml": + self.__init__(expid, basic_config, parser_factory, ".conf") + elif extension == ".conf": + return None + self._exp_parser = None # type: ConfigParser - self._exp_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "expdef_" + expid + ".conf") + self._exp_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "expdef_" + expid + extension) + if os.path.exists(self._exp_parser_file) == False: + if extension == ".yml": + self.__init__(expid, basic_config, parser_factory, ".conf") + elif extension == ".conf": + return None + self._platforms_parser = None # type: ConfigParser - self._platforms_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "platforms_" + expid + ".conf") + self._platforms_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "platforms_" + expid + extension) + if os.path.exists(self._platforms_parser_file) == False: + if extension == ".yml": + self.__init__(expid, basic_config, parser_factory, ".conf") + elif extension == ".conf": + return None + self._jobs_parser = None # type: ConfigParser - self._jobs_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "jobs_" + expid + ".conf") + self._jobs_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "jobs_" + expid + extension) + if os.path.exists(self._jobs_parser_file) == False: + if extension == ".yml": + self.__init__(expid, basic_config, parser_factory, ".conf") + elif extension == ".conf": + return None + self._proj_parser = None # type: ConfigParser - self._proj_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "proj_" + expid + ".conf") + self._proj_parser_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "proj_" + expid + extension) + if os.path.exists(self._proj_parser_file) == False: + if extension == ".yml": + self.__init__(expid, basic_config, parser_factory, ".conf") + elif extension == ".conf": + return None + self.check_proj_file() @property @@ -185,7 +217,7 @@ class AutosubmitConfig(object): def get_platform_queue(self, platform): return self._platforms_parser.get_option(platform, 'QUEUE', '') - + def get_platform_serial_queue(self, platform): return self._platforms_parser.get_option(platform, 'SERIAL_QUEUE', '') @@ -305,7 +337,7 @@ class AutosubmitConfig(object): def set_new_user(self, section, new_user): """ Sets new user for given platform - :param new_user: + :param new_user: :param section: platform name :type: str """ @@ -343,7 +375,7 @@ class AutosubmitConfig(object): def set_new_project(self, section, new_project): """ Sets new project for given platform - :param new_project: + :param new_project: :param section: platform name :type: str """ @@ -945,7 +977,7 @@ class AutosubmitConfig(object): :return: Unit for the chunk length Options: {hour, day, month, year} :rtype: str """ - + return self._exp_parser.get('experiment', 'CHUNKSIZEUNIT').lower() def get_chunk_size(self, default=1): @@ -953,9 +985,9 @@ class AutosubmitConfig(object): """ Chunk Size as defined in the expdef file. - :return: Chunksize, 1 as default. + :return: Chunksize, 1 as default. :rtype: int - """ + """ try: chunk_size = self._exp_parser.get_option( 'experiment', 'CHUNKSIZE', default) @@ -1114,7 +1146,7 @@ class AutosubmitConfig(object): :return: safety sleep time :rtype: int - """ + """ return int(self._conf_parser.get_option('config', 'SAFETYSLEEPTIME', 10)) def set_safetysleeptime(self, sleep_time): diff --git a/autosubmit_api/experiment/common_requests.py b/autosubmit_api/experiment/common_requests.py index e4c6275d82b0e06217eeef289d17e88cda685f41..8b746cc349f877fd2ef90d0cf1e3235ab4c718bc 100644 --- a/autosubmit_api/experiment/common_requests.py +++ b/autosubmit_api/experiment/common_requests.py @@ -29,14 +29,12 @@ import json import multiprocessing import subprocess from collections import deque - - from autosubmit_api.autosubmit_legacy.autosubmit import Autosubmit import autosubmit_api.database.db_common as db_common import autosubmit_api.experiment.common_db_requests as DbRequests import autosubmit_api.database.db_jobdata as JobData import autosubmit_api.autosubmit_legacy.job.job_utils as LegacyJobUtils -import autosubmit_api.common.utils as common_utils +import autosubmit_api.common.utils as common_utils import autosubmit_api.components.jobs.utils as JUtils from autosubmit_api.autosubmit_legacy.job.job_list import JobList @@ -74,8 +72,8 @@ def get_experiment_stats(expid, filter_period, filter_type): error = False error_message = "" period_fi = "" - period_ini = "" - considered_jobs = list() + period_ini = "" + considered_jobs = list() result = None summary = None try: @@ -84,8 +82,8 @@ def get_experiment_stats(expid, filter_period, filter_type): job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() considered_jobs = job_list_loader.jobs - if filter_type and filter_type != 'Any': - considered_jobs = [job for job in job_list_loader.jobs if job.section == filter_type] + if filter_type and filter_type != 'Any': + considered_jobs = [job for job in job_list_loader.jobs if job.section == filter_type] period_fi = datetime.datetime.now().replace(second=0, microsecond=0) if filter_period and filter_period > 0: @@ -147,7 +145,7 @@ def get_experiment_data(expid): 'completed_jobs': 0, 'db_historic_version': "NA"} try: - autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() result["path"] = autosubmit_config_facade.experiment_path result["owner_id"] = autosubmit_config_facade.get_owner_id() result["owner"] = autosubmit_config_facade.get_owner_name() @@ -174,7 +172,7 @@ def get_experiment_data(expid): except Exception as exp: result["error"] = True - result["error_message"] = str(exp) + result["error_message"] = str(exp) pass return result @@ -469,7 +467,7 @@ def test_run(expid): """ running = False error = False - error_message = "" + error_message = "" try: error, error_message, running, _, _ = _is_exp_running(expid, time_condition=120) @@ -498,7 +496,7 @@ def get_experiment_log_last_lines(expid): try: BasicConfig.read() - path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR + '/' + BasicConfig.LOCAL_ASLOG_DIR + path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR + '/' + BasicConfig.LOCAL_ASLOG_DIR reading = os.popen('ls -t ' + path + ' | grep "run.log"').read() if (os.path.exists(path)) else "" # Finding log files @@ -506,16 +504,16 @@ def get_experiment_log_last_lines(expid): path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR reading = os.popen('ls -t ' + path + ' | grep "run.log"').read() if (os.path.exists(path)) else "" - if len(reading) > 0: - log_file_name = reading.split()[0] + if len(reading) > 0: + log_file_name = reading.split()[0] current_stat = os.stat(path + '/' + log_file_name) timest = int(current_stat.st_mtime) - log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) - found = True - request = 'tail -150 ' + path + '/' + log_file_name - last_lines = os.popen(request) + log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) + found = True + request = 'tail -150 ' + path + '/' + log_file_name + last_lines = os.popen(request) for i, item in enumerate(last_lines.readlines()): - logcontent.append({'index': i, 'content': item[0:-1]}) + logcontent.append({'index': i, 'content': item[0:-1]}) except Exception as e: error = True error_message = str(e) @@ -533,9 +531,9 @@ def get_experiment_log_last_lines(expid): def get_job_log(expid, logfile, nlines=150): """ Returns the last 150 lines of the log file. Targets out or err. - :param logfilepath: path to the log file + :param logfilepath: path to the log file :type logfilepath: str - :return: List of string + :return: List of string :rtype: list """ # Initializing results: @@ -554,8 +552,8 @@ def get_job_log(expid, logfile, nlines=150): timest = int(current_stat.st_mtime) log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) found = True - request = "tail -{0} {1}".format(nlines, logfilepath) - last50 = os.popen(request) + request = "tail -{0} {1}".format(nlines, logfilepath) + last50 = os.popen(request) i = 0 for item in last50.readlines(): logcontent.append({'index': i, 'content': item[0:-1]}) @@ -577,11 +575,11 @@ def get_job_log(expid, logfile, nlines=150): def get_experiment_pkl(expid): # type: (str) -> Dict[str, Any] """ - Gets the current state of the pkl in a format proper for graph update. + Gets the current state of the pkl in a format proper for graph update. """ pkl_file_path = "" error = False - error_message = "" + error_message = "" pkl_content = list() pkl_timestamp = 0 package_to_jobs = dict() @@ -590,11 +588,11 @@ def get_experiment_pkl(expid): pkl_file_path = autosubmit_config_facade.pkl_path pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() - if not os.path.exists(autosubmit_config_facade.pkl_path): + if not os.path.exists(autosubmit_config_facade.pkl_path): raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() - package_to_jobs = job_list_loader.joblist_helper.package_to_jobs + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + package_to_jobs = job_list_loader.joblist_helper.package_to_jobs for job in job_list_loader.jobs: pkl_content.append({'name': job.name, @@ -608,12 +606,12 @@ def get_experiment_pkl(expid): 'finish': common_utils.timestamp_to_datetime_format(job.finish), 'running_text': job.running_time_text, 'dashed': True if job.package else False, - 'shape': job_list_loader.joblist_helper.package_to_symbol.get(job.package, "dot"), + 'shape': job_list_loader.joblist_helper.package_to_symbol.get(job.package, "dot"), 'package': job.package, 'status': job.status_text, 'status_color': job.status_color, 'out': job.out_file_path, - 'err': job.err_file_path, + 'err': job.err_file_path, 'priority': job.priority}) except Exception as e: @@ -638,7 +636,7 @@ def get_experiment_tree_pkl(expid): """ pkl_file_path = "" error = False - error_message = "" + error_message = "" pkl_content = list() package_to_jobs = {} pkl_timestamp = 0 @@ -647,11 +645,11 @@ def get_experiment_tree_pkl(expid): autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() pkl_file_path = autosubmit_config_facade.pkl_path pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() - - if not os.path.exists(autosubmit_config_facade.pkl_path): + + if not os.path.exists(autosubmit_config_facade.pkl_path): raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() package_to_jobs = job_list_loader.joblist_helper.package_to_jobs for job in job_list_loader.jobs: pkl_content.append({'name': job.name, @@ -692,53 +690,62 @@ def get_experiment_tree_pkl(expid): } -def get_experiment_graph(expid, layout=Layout.STANDARD, grouped=GroupedBy.NO_GROUP): +def get_experiment_graph(expid, log, layout=Layout.STANDARD, grouped=GroupedBy.NO_GROUP): """ Gets graph representation """ base_list = dict() pkl_timestamp = 10000000 try: - autosubmit_configuration_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + # autosubmit_configuration_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + BasicConfig.read() + autosubmit_configuration_facade = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) + autosubmit_configuration_facade.reload() + # raise Exception("json config autosubmitgraph: " + str(autosubmit_configuration_facade.__dict__)) try: - if common_utils.is_version_historical_ready(autosubmit_configuration_facade.get_autosubmit_version()): - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + if common_utils.is_version_historical_ready(autosubmit_configuration_facade.get_version()): + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() graph = GraphRepresentation(expid, job_list_loader, layout, grouped) - graph.perform_calculations() + graph.perform_calculations() return graph.get_graph_representation_data() except Exception as exp: - print(traceback.format_exc()) + # print(traceback.format_exc()) print("New Graph Representation failed: {0}".format(exp)) + log.info("Could not generate graph with faster method") # Getting platform data + hpcarch = autosubmit_configuration_facade.get_platform() # Submitter - submitter = Autosubmit._get_submitter(autosubmit_configuration_facade.autosubmit_conf) - submitter.load_platforms(autosubmit_configuration_facade.autosubmit_conf) + submitter = Autosubmit._get_submitter(autosubmit_configuration_facade) + submitter.load_platforms(autosubmit_configuration_facade) # JobList construction - job_list = Autosubmit.load_job_list(expid, autosubmit_configuration_facade.autosubmit_conf, notransitive=False) + job_list = Autosubmit.load_job_list(expid, autosubmit_configuration_facade, notransitive=False) if job_list.graph == None: raise Exception("Graph generation is not possible for this experiment.") # Platform update - hpcarch = autosubmit_configuration_facade.get_main_platform() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower( - )] + job.platform = submitter.platforms[job.platform_name.lower()] # Chunk unit and chunk size - chunk_unit = autosubmit_configuration_facade.chunk_unit - chunk_size = autosubmit_configuration_facade.chunk_size + chunk_unit = autosubmit_configuration_facade.get_chunk_size_unit() + chunk_size = autosubmit_configuration_facade.get_chunk_size() job_list.sort_by_id() + base_list = job_list.get_graph_representation( - BasicConfig, layout, grouped, chunk_unit=chunk_unit, chunk_size=chunk_size) + BasicConfig, layout, grouped, chunk_unit=chunk_unit, chunk_size=chunk_size + ) + # raise Exception("Base list graph: ", str(base_list)) except Exception as e: print(traceback.format_exc()) + log.info("Could not generate Graph and recieved the following exception: " + str(e)) return {'nodes': [], 'edges': [], 'fake_edges': [], @@ -763,7 +770,7 @@ def get_experiment_tree_rundetail(expid, run_id): """ base_list = dict() pkl_timestamp = 10000000 - try: + try: print("Received Tree RunDetail " + str(expid)) BasicConfig.read() tree_structure, current_collection, reference = JobList.get_tree_structured_from_previous_run(expid, BasicConfig, run_id=run_id) @@ -780,7 +787,7 @@ def get_experiment_tree_rundetail(expid, run_id): return base_list -def get_experiment_tree_structured(expid): +def get_experiment_tree_structured(expid, log): """ Current version of the tree visualization algorithm. :param expid: Name of experiment @@ -792,21 +799,34 @@ def get_experiment_tree_structured(expid): pkl_timestamp = 10000000 try: notransitive = False - print("Received Tree Request " + str(expid)) BasicConfig.read() - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) - as_conf.reload() + + # TODO: Encapsulate this following 2 lines or move to the parent function in app.py + curr_exp_as_version = db_common.get_autosubmit_version(expid) + main, secondary = common_utils.parse_version_number(curr_exp_as_version) + if main >= "4": + # TODO: new YAML parser + test = 1 + else: + log.info("EXPERIMENT VERSION = " + str(curr_exp_as_version)) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf.reload() + + # If version is higher than 3.13, we can perform the new tree representation algorithm try: - if common_utils.is_version_historical_ready(as_conf.get_version()): + if common_utils.is_version_historical_ready(as_conf.get_version()): job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() - tree = TreeRepresentation(expid, job_list_loader) + tree = TreeRepresentation(expid, job_list_loader) tree.perform_calculations() + # este return return tree.get_tree_structure() + else: + log.info("TREE|Not using first method|autosubmit_version=" + as_conf.get_version()) except Exception as exp: print(traceback.format_exc()) print("New Tree Representation failed: {0}".format(exp)) + log.info("New Tree Representation failed: {0}".format(exp)) # Getting platform data # Main taget HPC @@ -876,15 +896,15 @@ def _get_hpcarch_project_from_experiment_run_metadata(run_id, experiment_runs_di main_platform = "" run = experiment_runs_dict.get(run_id, None) if run and run.metadata: - data = json.loads(run.metadata) - main_platform = data["exp"]["experiment"].get("HPCARCH", "") - for platform in data["platforms"]: + data = json.loads(run.metadata) + main_platform = data["exp"]["experiment"].get("HPCARCH", "") + for platform in data["platforms"]: platform_projects[platform] = data["platforms"][platform].get("PROJECT", "") else: raise Exception("NO METADATA ON RUN {0}".format(run_id)) print("PLATFORMS") print(platform_projects) - return (main_platform, platform_projects) + return (main_platform, platform_projects) except Exception as exp: print(exp) return ("", {}) @@ -924,7 +944,7 @@ def generate_all_experiment_data(exp_path, job_path): created) + "|" + format_model(str(model)) + "|" + str(hpc) + "|" + str(wrapper_type) + "|" + str(maxwrapped) + "\n") valid_id[_id] = name file1.close() - + # First step was successful, prepare to process jobs all_job_times = DbRequests.get_completed_times_detail() # if (all_job_times): @@ -934,9 +954,9 @@ def generate_all_experiment_data(exp_path, job_path): expid = valid_id.get(exp_id, None) historical_data = None # JobDataStructure(expid).get_all_current_job_data() TODO: Replace for new implementation experiment_runs = None # JobDataStructure(expid).get_experiment_runs() TODO: Replace for new implementation - experiment_runs = experiment_runs if experiment_runs else [] - # print(experiment_runs) - experiment_runs_dict = {run.run_id: run for run in experiment_runs} + experiment_runs = experiment_runs if experiment_runs else [] + # print(experiment_runs) + experiment_runs_dict = {run.run_id: run for run in experiment_runs} # print("run id -> (,)") experiment_runs_main_info = {run.run_id: _get_hpcarch_project_from_experiment_run_metadata(run.run_id, experiment_runs_dict) for run in experiment_runs} # print(experiment_runs_main_info) @@ -947,9 +967,9 @@ def generate_all_experiment_data(exp_path, job_path): job_conf = get_job_conf_list(expid) except: pass - job_conf_type = {} - for job in historical_data: - # Starting from DB VERSION 17, we go back to calling section -> section, and member -> member; instead of the previous erronous assignment. + job_conf_type = {} + for job in historical_data: + # Starting from DB VERSION 17, we go back to calling section -> section, and member -> member; instead of the previous erronous assignment. if job.member not in job_conf_type: # Member was confused by section in DB version <= 15 if job_conf: @@ -958,10 +978,10 @@ def generate_all_experiment_data(exp_path, job_path): project = "" if job.run_id: main_platform, platforms = experiment_runs_main_info.get(job.run_id, ("", {})) - project = platforms.get(job.platform, "") + project = platforms.get(job.platform, "") if len(project) == 0: try: - if job.member in job_conf_type: + if job.member in job_conf_type: job_conf_info, job_type = job_conf_type[job.member] wallclock, processors, threads, tasks, memory, mem_task, queue, platform, main_platform, project = job_conf_info except: @@ -1094,7 +1114,7 @@ def get_auto_conf_data(expid): max_wrapped = as_conf.get_max_wrapped_jobs() return (wrapper_type, max_wrapped) except Exception as ex: - print("Couldn't retrieve conf data (wrapper info) from {0}. Exception {1}.".format(expid, str(ex))) + print("Couldn't retrieve conf data (wrapper info) from {0}. Exception {1}.".format(expid, str(ex))) return ("None", 0) @@ -1286,9 +1306,9 @@ def get_job_history(expid, job_name): path_to_job_logs = "" result = None try: - BasicConfig.read() - path_to_job_logs = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "tmp", "LOG_" + expid) - result = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history().get_historic_job_data(job_name) + BasicConfig.read() + path_to_job_logs = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "tmp", "LOG_" + expid) + result = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history().get_historic_job_data(job_name) except Exception as exp: print(traceback.format_exc()) error = True @@ -1297,12 +1317,12 @@ def get_job_history(expid, job_name): return {"error": error, "error_message": error_message, "history": result, "path_to_logs": path_to_job_logs} -def get_current_configuration_by_expid(expid, valid_user): +def get_current_configuration_by_expid(expid, valid_user, log): """ Gets the current configuration by expid. The procedure queries the historical database and the filesystem. - :param expid: Experiment Identifier - :type expdi: str - :return: configuration content formatted as a JSON object + :param expid: Experiment Identifier + :type expdi: str + :return: configuration content formatted as a JSON object :rtype: Dictionary """ error = False @@ -1311,6 +1331,18 @@ def get_current_configuration_by_expid(expid, valid_user): warning_message = "" currentRunConfig = {} currentFileSystemConfig = {} + + def removeParameterDuplication(currentDict, keyToRemove, exceptionsKeys=[]): + if currentDict and isinstance(currentDict, dict): + try: + for k, nested_d in currentDict.items(): + if k not in exceptionsKeys and isinstance(nested_d, dict): + nested_d.pop(keyToRemove, None) + except Exception as exp: + log.info("Error while trying to eliminate duplicated key from config.") + pass + return currentDict + try: if not valid_user: raise Exception( @@ -1350,6 +1382,9 @@ def get_current_configuration_by_expid(expid, valid_user): currentFileSystemConfig["contains_nones"] = True pass + removeParameterDuplication(currentRunConfig['exp'], "EXPID", ["experiment"]) + removeParameterDuplication(currentFileSystemConfig['exp'], "EXPID", ["experiment"]) + except Exception as exp: error = True error_message = str(exp) @@ -1361,7 +1396,7 @@ def get_current_configuration_by_expid(expid, valid_user): def get_experiment_runs(expid): - """ + """ Get runs of the same experiment from historical db """ error = False @@ -1379,7 +1414,7 @@ def get_experiment_runs(expid): try: # TODO: TEST TEST TEST TEST - # Current data + # Current data joblist_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history() # time_0 = time.time() @@ -1390,43 +1425,43 @@ def get_experiment_runs(expid): for job_dc in experiment_history.manager.get_job_data_dcs_all(): if job_dc.status_code == common_utils.Status.COMPLETED: run_id_job_name_to_job_data_dc_COMPLETED[(job_dc.run_id, job_dc.job_name)] = job_dc - run_id_wrapper_code_to_job_dcs = {} + run_id_wrapper_code_to_job_dcs = {} for key, job_dc in run_id_job_name_to_job_data_dc_COMPLETED.items(): if job_dc.wrapper_code: run_id, _ = key run_id_wrapper_code_to_job_dcs.setdefault((run_id, job_dc.wrapper_code), []).append(job_dc) - - run_dict_SIM = {} + + run_dict_SIM = {} for job_data_dc in sim_jobs: run_dict_SIM.setdefault(job_data_dc.run_id, []).append(job_data_dc) - run_dict_POST = {} + run_dict_POST = {} for job_data_dc in post_jobs: run_dict_POST.setdefault(job_data_dc.run_id, []).append(job_data_dc) max_run_id = 0 # print("Time spent in data retrieval and pre-process: {}".format(time.time() - time_0)) if experiment_runs: - for experiment_run in experiment_runs: + for experiment_run in experiment_runs: max_run_id = max(experiment_run.run_id, max_run_id) - valid_SIM_in_run = run_dict_SIM.get(experiment_run.run_id, []) + valid_SIM_in_run = run_dict_SIM.get(experiment_run.run_id, []) valid_POST_in_run = run_dict_POST.get(experiment_run.run_id, []) # The content of the if block try to correct lack of finish time information in the Historical database # It may not be necessary in the future. if max_run_id == experiment_run.run_id: assign_current(joblist_loader.job_dictionary, valid_SIM_in_run, experiment_history) assign_current(joblist_loader.job_dictionary, valid_POST_in_run, experiment_history) - result.append({"run_id": experiment_run.run_id, - "created": experiment_run.created, - "finish": common_utils.timestamp_to_datetime_format(experiment_run.finish), - "chunk_unit": experiment_run.chunk_unit, + result.append({"run_id": experiment_run.run_id, + "created": experiment_run.created, + "finish": common_utils.timestamp_to_datetime_format(experiment_run.finish), + "chunk_unit": experiment_run.chunk_unit, "chunk_size": experiment_run.chunk_size, - "submitted": experiment_run.submitted, - "queuing": experiment_run.queuing, - "running": experiment_run.running, - "completed": experiment_run.completed, - "failed": experiment_run.failed, - "total": experiment_run.total, - "suspended": experiment_run.suspended, - "SYPD": experiment_run.getSYPD(valid_SIM_in_run), + "submitted": experiment_run.submitted, + "queuing": experiment_run.queuing, + "running": experiment_run.running, + "completed": experiment_run.completed, + "failed": experiment_run.failed, + "total": experiment_run.total, + "suspended": experiment_run.suspended, + "SYPD": experiment_run.getSYPD(valid_SIM_in_run), "ASYPD": experiment_run.getASYPD(valid_SIM_in_run, valid_POST_in_run, run_id_wrapper_code_to_job_dcs)}) result.sort(key=lambda x: x["run_id"], reverse=True) else: diff --git a/build/lib/autosubmit_api/app.py b/build/lib/autosubmit_api/app.py index 053f4bd6c01b07e1046bfa08d285041447e5a24a..fdac797ead2520b2a24076f154cbcc052b22f3b8 100644 --- a/build/lib/autosubmit_api/app.py +++ b/build/lib/autosubmit_api/app.py @@ -25,8 +25,6 @@ from datetime import datetime, timedelta import requests import logging from flask_cors import CORS, cross_origin -# from flask_restful import Resource, Api -# from flask_restful.utils import cors from flask import Flask, request, session, redirect, url_for from bscearth.utils.log import Log from database.db_common import get_current_running_exp, update_experiment_description_owner @@ -36,7 +34,7 @@ from performance.performance_metrics import PerformanceMetrics from database.db_common import search_experiment_by_id from config.basicConfig import BasicConfig from builders.joblist_helper_builder import JobListHelperBuilder, JobListHelperDirector - +from multiprocessing import Manager JWT_SECRET = os.environ.get("SECRET_KEY") JWT_ALGORITHM = "HS256" @@ -44,9 +42,10 @@ JWT_EXP_DELTA_SECONDS = 84000*5 # 5 days sys.path.insert(0, os.path.abspath('.')) - app = Flask(__name__) +D = Manager().dict() + # CAS Stuff CAS_LOGIN_URL = os.environ.get("CAS_LOGIN_URL") # 'https://cas.bsc.es/cas/login' CAS_VERIFY_URL = os.environ.get("CAS_VERIFY_URL") # 'https://cas.bsc.es/cas/serviceValidate' @@ -56,6 +55,12 @@ gunicorn_logger = logging.getLogger('gunicorn.error') app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) +requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' +try: + requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += 'HIGH:!DH:!aNULL' +except AttributeError: + # no pyopenssl support used / needed / available + pass # CAS Login @app.route('/login') @@ -71,18 +76,19 @@ def login(): is_allowed = True if is_allowed == False: return {'authenticated': False, 'user': None, 'token': None, 'message': "Your client is not authorized for this operation. The API admin needs to add your URL to the list of allowed clients."} - + target_service = "{}{}/login".format(referrer, environment) if not ticket: - route_to_request_ticket = "{}?service={}".format(CAS_LOGIN_URL, target_service) + route_to_request_ticket = "{}?service={}".format(CAS_LOGIN_URL, target_service) + app.logger.info("Redirected to: " + str(route_to_request_ticket)) return redirect(route_to_request_ticket) - environment = environment if environment is not None else "autosubmitapp" # can be used to target the test environment - cas_verify_ticket_route = CAS_VERIFY_URL + '?service=' + target_service + '&ticket=' + ticket + environment = environment if environment is not None else "autosubmitapp" # can be used to target the test environment + cas_verify_ticket_route = CAS_VERIFY_URL + '?service=' + target_service + '&ticket=' + ticket response = requests.get(cas_verify_ticket_route) user = None if response: user = Utiles.get_cas_user_from_xml(response.content) - app.logger.info('CAS verify ticket response: user %s', user) + app.logger.info('CAS verify ticket response: user %s', user) if not user: return {'authenticated': False, 'user': None, 'token': None, 'message': "Can't verify user."} else: # Login successful @@ -112,7 +118,7 @@ def update_description(): jwt_token = jwt.decode(current_token, JWT_SECRET, JWT_ALGORITHM) except jwt.ExpiredSignatureError: jwt_token = {"user_id": None} - except Exception as exp: + except Exception as exp: jwt_token = {"user_id": None} valid_user = jwt_token.get("user_id", None) app.logger.info('UDESC|RECEIVED|') @@ -147,7 +153,7 @@ def test_token(): @app.route('/cconfig/', methods=['GET']) @cross_origin(expose_headers="Authorization") -def get_current_configuration(expid): +def get_current_configuration(expid): start_time = time.time() current_token = request.headers.get("Authorization") try: @@ -156,7 +162,7 @@ def get_current_configuration(expid): jwt_token = {"user_id": None} valid_user = jwt_token.get("user_id", None) app.logger.info('CCONFIG|RECEIVED|' + str(expid)) - result = CommonRequests.get_current_configuration_by_expid(expid, valid_user) + result = CommonRequests.get_current_configuration_by_expid(expid, valid_user, app.logger) app.logger.info('CCONFIG|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -196,7 +202,7 @@ def search_owner(owner, exptype=None, onlyactive=None): @app.route('/search/', methods=['GET']) def search_expid(expid, exptype=None, onlyactive=None): start_time = time.time() - app.logger.info('SEARCH|RECEIVED|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive)) + app.logger.info('SEARCH|RECEIVED|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive)) result = search_experiment_by_id(expid, owner=None, typeExp=exptype, onlyActive=onlyactive) app.logger.info('SEARCH|RTIME|' + str(expid) + "|" + str(exptype) + "|" + str(onlyactive) + "|" + str(time.time() - start_time)) return result @@ -210,6 +216,7 @@ def search_running(): if 'username' in session: print("USER {}".format(session['username'])) start_time = time.time() + app.logger.info("Active proceses: " + str(D)) app.logger.info('RUN|RECEIVED|') #app.logger.info("Received Currently Running query ") result = get_current_running_exp() @@ -250,19 +257,61 @@ def get_log_running(expid): @app.route('/summary/', methods=['GET']) def get_expsummary(expid): start_time = time.time() + user = request.args.get("loggedUser", default="null", type=str) app.logger.info('SUMMARY|RECEIVED|' + str(expid)) + if user != "null": D[os.getpid()] = [user, "summary", True] result = CommonRequests.get_experiment_summary(expid) + app.logger.info('Process: ' + str(os.getpid()) + " workers: " + str(D)) app.logger.info('SUMMARY|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "summary", False] + if user != "null": D.pop(os.getpid(), None) return result +@app.route('/shutdown/') +def shutdown(route): + """ + This function is invoked from the frontend (AS-GUI) to kill workers that are no longer needed. + This call is common in heavy parts of the GUI such as the Tree and Graph generation or Summaries fetching. + """ + start_time = time.time() + + try: + user = request.args.get("loggedUser", default="null", type=str) + expid = request.args.get("expid", default="null", type=str) + except Exception as exp: + app.logger.info("Bad parameters for user and expid in route.") + + if user != "null": + app.logger.info('SHUTDOWN|RECEIVED for route: ' + route + " user: " + user + " expid: " + expid) + try: + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) + app.logger.info("Workers before: " + str(D)) + for k,v in D.items(): + if v[0] == user and v[1] == route and v[-1] == True: + if v[2] == expid: + D[k] = [user, route, expid, False] + else: + D[k] = [user, route, False] + D.pop(k, None) + # reboot the worker + os.system('kill -HUP ' + str(k)) + app.logger.info("killed worker " + str(k)) + app.logger.info("Workers now: " + str(D)) + except Exception as exp: + app.logger.info("[CRITICAL] Could not shutdown process " + expid + " by user \"" + user + "\"") + app.logger.info('SHUTDOWN|DONE|RTIME' + "|" + str(time.time() - start_time)) + return "" + + @app.route('/performance/', methods=['GET']) def get_exp_performance(expid): start_time = time.time() app.logger.info('PRF|RECEIVED|' + str(expid)) result = {} - try: - result = PerformanceMetrics(expid, JobListHelperDirector(JobListHelperBuilder(expid)).build_job_list_helper()).to_json() + try: + result = PerformanceMetrics(expid, JobListHelperDirector(JobListHelperBuilder(expid)).build_job_list_helper()).to_json() except Exception as exp: result = {"SYPD": None, "ASYPD": None, @@ -274,7 +323,7 @@ def get_exp_performance(expid): "error": True, "error_message": str(exp), "warnings_job_data": [], - } + } app.logger.info('PRF|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -282,18 +331,32 @@ def get_exp_performance(expid): @app.route('/graph///', methods=['GET']) def get_list_format(expid, layout='standard', grouped='none'): start_time = time.time() + user = request.args.get("loggedUser", default="null", type=str) + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) app.logger.info('GRAPH|RECEIVED|' + str(expid) + "~" + str(grouped) + "~" + str(layout)) - result = CommonRequests.get_experiment_graph(expid, layout, grouped) + if user != "null": D[os.getpid()] = [user, "graph", expid, True] + result = CommonRequests.get_experiment_graph(expid, app.logger, layout, grouped) + app.logger.info('Process: ' + str(os.getpid()) + " graph workers: " + str(D)) app.logger.info('GRAPH|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "graph", expid, False] + if user != "null": D.pop(os.getpid(), None) return result @app.route('/tree/', methods=['GET']) def get_exp_tree(expid): start_time = time.time() - app.logger.info('TREE|RECEIVED|' + str(expid)) - result = CommonRequests.get_experiment_tree_structured(expid) + user = request.args.get("loggedUser", default="null", type=str) + # app.logger.info("user: " + user) + # app.logger.info("expid: " + expid) + app.logger.info('TREE|RECEIVED|' + str(expid)) + if user != "null": D[os.getpid()] = [user, "tree", expid, True] + result = CommonRequests.get_experiment_tree_structured(expid, app.logger) + app.logger.info('Process: ' + str(os.getpid()) + " tree workers: " + str(D)) app.logger.info('TREE|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) + if user != "null": D[os.getpid()] = [user, "tree", expid, False] + if user != "null": D.pop(os.getpid(), None) return result @@ -312,7 +375,7 @@ def get_experiment_running(expid): Finds log and gets the last 150 lines """ start_time = time.time() - app.logger.info('LOG|RECEIVED|' + str(expid)) + app.logger.info('LOG|RECEIVED|' + str(expid)) result = CommonRequests.get_experiment_log_last_lines(expid) app.logger.info('LOG|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -324,7 +387,7 @@ def get_job_log_from_path(logfile): Get log from path """ expid = logfile.split('_') if logfile is not None else "" - expid = expid[0] if len(expid) > 0 else "" + expid = expid[0] if len(expid) > 0 else "" start_time = time.time() app.logger.info('JOBLOG|RECEIVED|{0}'.format(expid)) result = CommonRequests.get_job_log(expid, logfile) @@ -335,7 +398,7 @@ def get_job_log_from_path(logfile): @app.route('/pklinfo//', methods=['GET']) def get_experiment_pklinfo(expid, timeStamp): start_time = time.time() - app.logger.info('GPKL|RECEIVED|' + str(expid) + "~" + str(timeStamp)) + app.logger.info('GPKL|RECEIVED|' + str(expid) + "~" + str(timeStamp)) result = CommonRequests.get_experiment_pkl(expid) app.logger.info('GPKL|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -371,7 +434,7 @@ def get_exp_job_history(expid, jobname): @app.route('/rundetail//') def get_experiment_run_job_detail(expid, runid): start_time = time.time() - app.logger.info('RUNDETAIL|RECEIVED|' + str(expid) + "~" + str(runid)) + app.logger.info('RUNDETAIL|RECEIVED|' + str(expid) + "~" + str(runid)) result = CommonRequests.get_experiment_tree_rundetail(expid, runid) app.logger.info('RUNDETAIL|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) return result @@ -384,4 +447,3 @@ def get_file_status(): result = CommonRequests.get_last_test_archive_status() app.logger.info('FSTATUS|RTIME|' + str(time.time() - start_time)) return result - diff --git a/build/lib/autosubmit_api/components/representations/graph/graph.py b/build/lib/autosubmit_api/components/representations/graph/graph.py index 00683a72eb6a91da7b7d676954343fc2f62a2240..8f957a0dff1b5a4e42fe4a71d909d317e38a1e5e 100644 --- a/build/lib/autosubmit_api/components/representations/graph/graph.py +++ b/build/lib/autosubmit_api/components/representations/graph/graph.py @@ -35,7 +35,7 @@ class GraphRepresentation(object): # type: (str, JobListLoader, str, str) -> None self.expid = expid self.layout = layout - self.grouped_by = grouped + self.grouped_by = grouped self.joblist_loader = job_list_loader self.joblist_helper = self.joblist_loader.joblist_helper self.jobs = self.joblist_loader.jobs @@ -48,16 +48,16 @@ class GraphRepresentation(object): self.nodes = [] # type: List[Dict[str, Any]] self.groups = {} # type: Dict[str, Dict[str, Any]] self.max_children_count = 0 # type: int - self.max_parent_count = 0 # type: int - + self.max_parent_count = 0 # type: int + @property def job_count(self): return len(self.jobs) - + @property def edge_count(self): return len(self.edges) - + def perform_calculations(self): # type: () -> None """ Calculate Graph Representation """ @@ -87,17 +87,17 @@ class GraphRepresentation(object): def calculate_valid_drawing(self): if len(self.edges) <= 0: - raise ValueError("The generation of a drawing requires that the graph model includes edges.") + raise ValueError("The generation of a drawing requires that the graph model includes edges.") self.update_jobs_level() if self.layout == Layout.STANDARD: - self.assign_graphviz_coordinates_to_jobs() + self.assign_graphviz_coordinates_to_jobs() elif self.layout == Layout.LAPLACIAN: self.assign_laplacian_coordinates_to_jobs() else: raise ValueError("You have requested a {0} layout, which is not implemented.".format(self.layout)) if not self.we_have_valid_graph_drawing: self.assign_barycentric_coordinates_to_jobs() - + def _calculate_groups(self): if self.grouped_by == GroupedBy.STATUS: self.groups = self._get_grouped_by_status_dict() @@ -107,7 +107,7 @@ class GraphRepresentation(object): self.groups = dict() else: raise ValueError("You have provided an invalid grouping selection: {}".format(self.grouped_by)) - + def _get_grouped_by_status_dict(self): # type: () -> Dict[str, Dict[str, Any]] groups = {} @@ -127,13 +127,13 @@ class GraphRepresentation(object): for date in self.joblist_loader.dates: formatted_date = self.joblist_loader.dates_formatted_dict.get(date, None) for member in self.joblist_loader.members: - status_counters = {} + status_counters = {} group_name = "{}_{}_{}_".format(self.expid, formatted_date, member) - jobs_in_date_member = filter(lambda x: x.name.startswith(group_name), self.jobs) + jobs_in_date_member = filter(lambda x: x.name.startswith(group_name), self.jobs) if len(jobs_in_date_member) == 0: raise Exception("You have configured date {} and member {} in your experiment but there are no jobs that use these settings. \ Review your configuration, something might be wrong.".format(formatted_date, member)) - for job in jobs_in_date_member: + for job in jobs_in_date_member: status_counters[job.status] = status_counters.setdefault(job.status, 0) + 1 group_color[group_name] = self._get_defined_group_color(status_counters) group_coordinates.append((group_name, @@ -156,8 +156,8 @@ class GraphRepresentation(object): visited.add(group_name) for group_triple_compared in group_coordinates: group_name_compared, x_j_coordinate, y_j_coordinate = group_triple_compared - if group_name_compared not in visited: - if abs(x_i_coordinate - x_j_coordinate) <= 250 and abs(y_i_coordinate - y_j_coordinate) <= 250: + if group_name_compared not in visited: + if abs(x_i_coordinate - x_j_coordinate) <= 250 and abs(y_i_coordinate - y_j_coordinate) <= 250: if y_i_coordinate > y_j_coordinate: y_i_coordinate = y_i_coordinate + (250 - abs(y_i_coordinate - y_j_coordinate)) else: @@ -187,18 +187,18 @@ class GraphRepresentation(object): self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_graph_drawing_data()) self.we_have_valid_graphviz_drawing = self.we_have_valid_graph_drawing if not self.we_have_valid_graph_drawing and len(self.jobs) <= SMALL_EXPERIMENT_THRESHOLD: - self.assign_graphviz_calculated_coordinates_to_jobs() - + self.assign_graphviz_calculated_coordinates_to_jobs() + def assign_graphviz_calculated_coordinates_to_jobs(self): """ Runs GraphViz to get the coordinates """ - self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_drawing()) + self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_drawing()) self.we_have_valid_graphviz_drawing = self.we_have_valid_graph_drawing - + def assign_laplacian_coordinates_to_jobs(self): """ Calculates Laplacian """ self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_graph_laplacian_drawing()) self.we_have_valid_graphviz_drawing = False - + def assign_barycentric_coordinates_to_jobs(self): """ Calculates coordinates """ self.we_have_valid_graph_drawing = self._assign_coordinates_to_jobs(self._get_calculated_hierarchical_drawing()) @@ -216,12 +216,12 @@ class GraphRepresentation(object): if self.job_dictionary[children_job_name].level == 0: self.job_dictionary[children_job_name].level = current.level stack.append(self.job_dictionary[children_job_name]) - + job_roots_names = [job.name for job in self.jobs if len(job.parents_names) == 0] - for job_name in job_roots_names: + for job_name in job_roots_names: stack = [] update_level(self.job_dictionary[job_name]) - + def reset_jobs_coordinates(self): """ Mainly for testing purposes """ for job in self.jobs: @@ -229,15 +229,16 @@ class GraphRepresentation(object): def add_normal_edges(self): for job in self.jobs: - for child_name in job.children_names: - self.edges.append(RealEdge(job.name, child_name, self.joblist_loader.are_these_in_same_package(job.name, child_name))) - + for child_name in job.children_names: + if job.name != child_name: + self.edges.append(RealEdge(job.name, child_name, self.joblist_loader.are_these_in_same_package(job.name, child_name))) + def _calculate_average_post_time(self): - post_jobs = [job for job in self.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] + post_jobs = [job for job in self.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): - for job_name in self.job_dictionary: + for job_name in self.job_dictionary: job = self.job_dictionary[job_name] self._calculate_max_children_parent(len(job.children_names), len(job.parents_names)) ini_date, end_date = job.get_date_ini_end(self.joblist_loader.chunk_size, self.joblist_loader.chunk_unit) @@ -251,7 +252,7 @@ class GraphRepresentation(object): "platform_name": job.platform, "chunk": job.chunk, "package": job.package, - "member": job.member, + "member": job.member, "date": ini_date, "date_plus": end_date, "SYPD": PUtils.calculate_SYPD_perjob(self.joblist_loader.chunk_unit, self.joblist_loader.chunk_size, job.chunk, job.run_time, job.status), @@ -267,27 +268,27 @@ class GraphRepresentation(object): "dashed": True if job.package else False, "shape": self.joblist_helper.package_to_symbol.get(job.package, "dot"), "processors": job.ncpus, - "wallclock": job.wallclock, + "wallclock": job.wallclock, "children": len(job.children_names), "children_list": list(job.children_names), "parents": len(job.parents_names), "parent_list": list(job.parents_names), "out": job.out_file_path, - "err": job.err_file_path, + "err": job.err_file_path, "custom_directives": None, "rm_id": job.rm_id, "x": job.x_coordinate, - "y": job.y_coordinate + "y": job.y_coordinate }) def _calculate_max_children_parent(self, children_count, parent_count): # type: (int, int) -> None self.max_children_count = max(self.max_children_count, children_count) self.max_parent_count = max(self.max_parent_count, parent_count) - + def _assign_coordinates_to_jobs(self, valid_coordinates): """ False if valid_coordinates is None OR empty""" - # type: (Dict[str, Tuple[int, int]] | None) -> bool + # type: (Dict[str, Tuple[int, int]] | None) -> bool if valid_coordinates and len(valid_coordinates) > 0: for job_name in self.job_dictionary: self.job_dictionary[job_name].x_coordinate, self.job_dictionary[job_name].y_coordinate = valid_coordinates[job_name] @@ -308,7 +309,7 @@ class GraphRepresentation(object): if len(node_data) > 1 and node_data[0] == "node": coordinates[str(node_data[1])] = (int(float(node_data[2])) * GRAPHVIZ_MULTIPLIER, int(float(node_data[3])) * -GRAPHVIZ_MULTIPLIER) return coordinates - + def _get_calculated_graph_laplacian_drawing(self): # type: () -> Dict[str, Tuple[int, int]] coordinates = dict() @@ -318,7 +319,7 @@ class GraphRepresentation(object): for edge in self.edges: nx_graph.add_edge(edge._from, edge._to, weight=(3 if edge._is_in_wrapper else 1)) laplacian_matrix = nx.normalized_laplacian_matrix(nx_graph) - eigval, eigvec = sparse.linalg.eigsh(laplacian_matrix, k=4, which="SM") + eigval, eigvec = sparse.linalg.eigsh(laplacian_matrix, k=4, which="SM") eigval1 = float(eigval[1]) eigval2 = float(eigval[2]) x_coords = eigvec[:, 1] * (self.job_count / eigval1) * 10.0 @@ -326,7 +327,7 @@ class GraphRepresentation(object): for i, job_name in enumerate(nx_graph.nodes): coordinates[job_name] = (int(x_coords[i]), int(y_coords[i])) return coordinates - + def _get_calculated_hierarchical_drawing(self): # type: () -> Dict[str, Tuple[int, int]] coordinates = {} @@ -334,31 +335,31 @@ class GraphRepresentation(object): max_level = max(job.level for job in self.jobs) for i in range(2, max_level+1): if i == 2: - jobs_in_previous_layer = filter(lambda x: x.level == i-1, self.jobs) + jobs_in_previous_layer = filter(lambda x: x.level == i-1, self.jobs) for k, job in enumerate(jobs_in_previous_layer): - self.job_dictionary[job.name].horizontal_order = (k+1) + self.job_dictionary[job.name].horizontal_order = (k+1) jobs_in_layer = filter(lambda x: x.level == i, self.jobs) for job in jobs_in_layer: sum_order = sum(self.job_dictionary[job_name].horizontal_order for job_name in job.parents_names) if len(job.parents_names) > 0: self.job_dictionary[job.name].barycentric_value = sum_order/len(job.parents_names) - + jobs_in_layer.sort(key=lambda x: x.barycentric_value) job_names_in_layer = {job.name for job in jobs_in_layer} already_assigned_order = set() for job in jobs_in_layer: - if job.name not in already_assigned_order: + if job.name not in already_assigned_order: self.job_dictionary[job.name].horizontal_order = len(already_assigned_order) + 1 already_assigned_order.add(job.name) if job.package and (job.package, job.level) not in processed_packages: processed_packages.add((job.package, job.level)) - job_names_in_package_and_same_level = [job.name for job in jobs_in_layer if job.name in self.joblist_helper.package_to_jobs.get(job.package, [])] - for job_name in job_names_in_package_and_same_level: + job_names_in_package_and_same_level = [job.name for job in jobs_in_layer if job.name in self.joblist_helper.package_to_jobs.get(job.package, [])] + for job_name in job_names_in_package_and_same_level: if self.job_dictionary[job_name].name in job_names_in_layer and job_name not in already_assigned_order: self.job_dictionary[job_name].horizontal_order = len(already_assigned_order) + 1 already_assigned_order.add(job_name) - + for job_name in self.job_dictionary: # print("{} {} {}".format(job_name, self.job_dictionary[job_name].horizontal_order, self.job_dictionary[job_name].level)) coordinates[job_name] = (int(self.job_dictionary[job_name].horizontal_order*BARYCENTRIC_X_MULTIPLIER), int(self.job_dictionary[job_name].level*BARYCENTRIC_Y_MULTIPLIER)) @@ -372,10 +373,3 @@ class GraphRepresentation(object): # pairs = set() # for job_name_from in self.joblist_helper.package_to_jobs[package]: # for job_name_to in self.joblist_helper.package_to_jobs[package]: - - - - - - - diff --git a/build/lib/autosubmit_api/components/representations/tree/tree.py b/build/lib/autosubmit_api/components/representations/tree/tree.py index 7f5d971789dd86e09a3dac64b5fe488f4e97e5b6..df021314f71c869deeb338553c791a341f9152bc 100644 --- a/build/lib/autosubmit_api/components/representations/tree/tree.py +++ b/build/lib/autosubmit_api/components/representations/tree/tree.py @@ -10,32 +10,32 @@ from typing import List, Dict, Tuple, Set, Any DEFAULT_MEMBER = "DEFAULT" class TreeRepresentation(object): - def __init__(self, expid, job_list_loader): + def __init__(self, expid, job_list_loader): # type: (str, JobListLoader) -> None self.expid = expid # type: str # self.jobs = [] # type: List[Job] self.joblist_loader = job_list_loader self._date_member_distribution = {} # type: Dict[Tuple[str, str], List[Job]] self._no_date_no_member_jobs = [] # type: List[Job] - self._normal_status = {Status.COMPLETED, Status.WAITING, Status.READY} # type: Set + self._normal_status = {Status.COMPLETED, Status.WAITING, Status.READY, Status.SUSPENDED} # type: Set self.result_tree = list() # type: List self.result_header = dict() # type: Dict self.average_post_time = 0.0 # type: float self.nodes = [] # type: List[Dict] self._distributed_dates = OrderedDict() # type: OrderedDict[str, None] self._distributed_members = OrderedDict() # type: OrderedDict[str, None] - + def perform_calculations(self): # type: () -> None - self._distribute_into_date_member_groups() + self._distribute_into_date_member_groups() self._generate_date_member_tree_folders() self._generate_no_date_no_member_tree_folder() self._generate_package_tree_folders() self._complement_result_header() self._calculate_average_post_time() self._generate_node_data() - + def get_tree_structure(self): # type: () -> Dict[str, Any] return { @@ -47,7 +47,7 @@ class TreeRepresentation(object): "error_message": "", "pkl_timestamp": get_current_timestamp() } - + def _distribute_into_date_member_groups(self): # type: () -> None for job in self.joblist_loader.jobs: @@ -61,22 +61,26 @@ class TreeRepresentation(object): intersection_member_parent = self.joblist_loader.members & parents_members intersection_member_children = self.joblist_loader.members & children_members if len(intersection_member_parent) > 0 or len(intersection_member_children) > 0: - member = intersection_member_parent.pop() if len(intersection_member_parent) > 0 else intersection_member_children.pop() + member = intersection_member_parent.pop() if len(intersection_member_parent) > 0 else intersection_member_children.pop() self._date_member_distribution.setdefault((job.date, member), []).append(job) self._distributed_dates[job.date] = None self._distributed_members[member] = None - else: + else: self._date_member_distribution.setdefault((job.date, DEFAULT_MEMBER), []).append(job) self._distributed_dates[job.date] = None self._distributed_members[DEFAULT_MEMBER] = None else: self._no_date_no_member_jobs.append(job) - + def _generate_date_member_tree_folders(self): # type: () -> None for date in self._distributed_dates: folders_in_date = list() formatted_date = self.joblist_loader.dates_formatted_dict.get(date, None) + all_suspended = True + all_waiting = True + all_completed = True + total_jobs_startdate = 0 for member in self._distributed_members: status_counters = { Status.COMPLETED: 0, @@ -86,13 +90,16 @@ class TreeRepresentation(object): Status.HELD: 0 } jobs_in_date_member = self._date_member_distribution.get((date, member), []) sections = {job.section for job in jobs_in_date_member} - section_to_dm_jobs_dict = {section: [job for job in jobs_in_date_member if job.section == section] for section in sections} + section_to_dm_jobs_dict = {section: [job for job in jobs_in_date_member if job.section == section] for section in sections} sections_folder_open = set() jobs_in_section = OrderedDict() jobs_and_folders_in_member = deque() for job in jobs_in_date_member: + all_suspended = all_suspended and job.status is Status.SUSPENDED + all_waiting = all_waiting and job.status is Status.WAITING + all_completed = all_completed and job.status is Status.COMPLETED if job.status in status_counters: - status_counters[job.status] += 1 + status_counters[job.status] += 1 if len(section_to_dm_jobs_dict[job.section]) > 1: if job.status in self._normal_status: jobs_in_section.setdefault(job.section, deque()).append(job.leaf) @@ -102,10 +109,10 @@ class TreeRepresentation(object): else: if job.status in self._normal_status: jobs_and_folders_in_member.append(job.leaf) - else: + else: jobs_and_folders_in_member.appendleft(job.leaf) job.tree_parent.append("{0}_{1}_{2}".format(self.expid, self.joblist_loader.dates_formatted_dict.get(date, None), member)) - + for section in jobs_in_section: jobs_and_folders_in_member.append({ 'title': section, @@ -115,15 +122,16 @@ class TreeRepresentation(object): 'expanded': True if section in sections_folder_open else False, 'children': list(jobs_in_section.get(section, [])) }) - - + + if len(jobs_in_date_member) > 0: # If there is something inside the date-member group, we create it. + total_jobs_startdate += len(jobs_in_date_member) ref_key = "{0}_{1}_{2}".format(self.expid, formatted_date, member) folders_in_date.append({ - "title": JUtils.get_folder_date_member_title(self.expid, - formatted_date, - member, - len(jobs_in_date_member), + "title": JUtils.get_folder_date_member_title(self.expid, + formatted_date, + member, + len(jobs_in_date_member), status_counters), "folder": True, "refKey": ref_key, @@ -140,20 +148,38 @@ class TreeRepresentation(object): "total": len(jobs_in_date_member) }) + # for the folders representing those start dates whose members are all in STATE WAITING, COMPLETED or SUSPENDED, we display + # these will be displayed collapsed, if there is any job with status fail, this will be displayed expanded + # todo: add threshold variable + if len(folders_in_date) > 0: # If there is something inside the date folder, we create it. - date_folder_title = "{0}_{1}".format(self.expid, formatted_date) + #date_folder_title = "{0}_{1}".format(self.expid, formatted_date) + + if all_suspended or all_waiting or all_completed: + date_tag = JUtils.get_date_folder_tag("WAITING", total_jobs_startdate) if all_waiting else JUtils.get_date_folder_tag( "SUSPENDED", total_jobs_startdate) + if all_completed: + date_tag = JUtils.get_date_folder_tag("COMPLETED", total_jobs_startdate) + date_folder_title = "{0}_{1} {2}".format( + self.expid, + formatted_date, + date_tag + ) + else: + date_folder_title = "{0}_{1}".format(self.expid, formatted_date) + + self.result_tree.append({ "title": date_folder_title, "folder": True, "refKey": date_folder_title, "data": "Empty", - "expanded": True, + "expanded": False if len(self._distributed_dates) > 5 and (all_waiting or all_suspended or all_completed) else True, "children": list(folders_in_date) }) def _generate_no_date_no_member_tree_folder(self): """ Generates folder for job with no date and no member """ - if len(self._no_date_no_member_jobs) > 0: + if len(self._no_date_no_member_jobs) > 0: self.result_tree.append({ "title": "Keys", "folder": True, @@ -164,8 +190,11 @@ class TreeRepresentation(object): }) def _generate_package_tree_folders(self): - """ Package folders as roots in the tree. """ - for package_name in self.joblist_loader.package_names: + """ Package folders (wrappers) as roots in the tree. """ + # sort the list before iterating + result_exp_wrappers = [] + sorted_wrappers = sorted(self.joblist_loader.package_names) + for package_name in sorted_wrappers: jobs_in_package = sorted(self.joblist_loader.get_all_jobs_in_package(package_name), key=lambda x: x.chunk) simple_title = "Wrapper: {0}".format(package_name) total_count = len(jobs_in_package) @@ -175,23 +204,25 @@ class TreeRepresentation(object): Status.QUEUING: 0, Status.FAILED: 0, Status.HELD: 0 } + if total_count > 0: for job in jobs_in_package: if job.status in status_counters: status_counters[job.status] += 1 - self.result_tree.append({ + result_exp_wrappers.append({ "title": JUtils.get_folder_package_title(package_name, total_count, status_counters), - "folder": True, + "folder": True, "refKey": simple_title, - "data": {'completed': status_counters[Status.COMPLETED], - 'failed': status_counters[Status.FAILED], - 'running': status_counters[Status.RUNNING], - 'queuing': status_counters[Status.QUEUING], + "data": {'completed': status_counters[Status.COMPLETED], + 'failed': status_counters[Status.FAILED], + 'running': status_counters[Status.RUNNING], + 'queuing': status_counters[Status.QUEUING], 'held': status_counters[Status.HELD], 'total': total_count }, "expanded": False, "children": [job.leaf for job in jobs_in_package] }) + self.result_header[simple_title] = ({ "completed" : status_counters[Status.COMPLETED], "running": status_counters[Status.RUNNING], @@ -200,20 +231,36 @@ class TreeRepresentation(object): "held": status_counters[Status.HELD], "total": total_count }) - + + # add root folder to enclose all the wrappers + # If there is something inside the date-member group, we create it. + if len(sorted_wrappers) > 0: + self.result_tree.append({ + "title": "Wrappers", + "folder": True, + "refKey": "Wrappers_{0}".format(self.expid), + "data": "Empty", + "expanded": False, + "children": list(result_exp_wrappers) + }) + + + + + def _complement_result_header(self): self.result_header["completed_tag"] = JUtils.completed_tag_with_anchors self.result_header["running_tag"] = JUtils.running_tag_with_anchors - self.result_header["queuing_tag"] = JUtils.queuing_tag_with_anchors + self.result_header["queuing_tag"] = JUtils.queuing_tag_with_anchors self.result_header["failed_tag"] = JUtils.failed_tag_with_anchors self.result_header["held_tag"] = JUtils.held_tag_with_anchors self.result_header["checkmark"] = JUtils.checkmark_tag - self.result_header["packages"] = [package_name for package_name in self.joblist_loader.package_names] + self.result_header["packages"] = [package_name for package_name in self.joblist_loader.package_names] self.result_header["chunk_unit"] = self.joblist_loader.chunk_unit self.result_header["chunk_size"] = self.joblist_loader.chunk_size def _calculate_average_post_time(self): - post_jobs = [job for job in self.joblist_loader.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] + post_jobs = [job for job in self.joblist_loader.jobs if job.section == "POST" and job.status in {Status.COMPLETED}] self.average_post_time = get_average_total_time(post_jobs) def _generate_node_data(self): @@ -254,25 +301,4 @@ class TreeRepresentation(object): "custom_directives": None, "rm_id": job.rm_id, "status_color": job.status_color - }) - - - - - - - - - - - - - - - - - - - - - + }) \ No newline at end of file diff --git a/build/lib/autosubmit_api/database/db_jobdata.py b/build/lib/autosubmit_api/database/db_jobdata.py index 6b308a454e3bbef5a1783092bde1ec210768babf..5cc149abccfd5911998b78c1d1584a0879e5b7ad 100644 --- a/build/lib/autosubmit_api/database/db_jobdata.py +++ b/build/lib/autosubmit_api/database/db_jobdata.py @@ -106,8 +106,8 @@ class ExperimentRun(): # print(len(job_list)) total_run_time = sum(job.run_time for job in outlier_free_list) # print("run {3} yps {0} n {1} run_time {2}".format(years_per_sim, number_SIM, total_run_time, self.run_id)) - if total_run_time > 0: - return round((years_per_sim * number_SIM * seconds_per_day) / total_run_time, 2) + if total_run_time > 0: + return round((years_per_sim * number_SIM * seconds_per_day) / total_run_time, 2) return None def getASYPD(self, job_sim_list, job_post_list, package_jobs): @@ -241,17 +241,17 @@ class JobData(object): def calculateASYPD(self, chunk_unit, chunk_size, job_package_data, average_post_time): """ - Calculates ASYPD for a job in a run - - :param chunk_unit: chunk unit of the experiment - :type chunk_unit: str - :param chunk_size: chunk size of the experiment - :type chunk_size: str - :param job_package_data: jobs in the package (if self belongs to a package) - :type: list() - :param average_post_time: average queuing + running time of the post jobs in the run of self. - :type average_post_time: float - :return: void + Calculates ASYPD for a job in a run + + :param chunk_unit: chunk unit of the experiment + :type chunk_unit: str + :param chunk_size: chunk size of the experiment + :type chunk_size: str + :param job_package_data: jobs in the package (if self belongs to a package) + :type: list() + :param average_post_time: average queuing + running time of the post jobs in the run of self. + :type average_post_time: float + :return: void :rtype: void """ result_ASYPD = calculate_ASYPD_perjob( @@ -542,27 +542,27 @@ class MainDataBase(): class ExperimentGraphDrawing(MainDataBase): - def __init__(self, expid): + def __init__(self, expid, basic_config=None): """ Sets and validates graph drawing. - :param expid: Name of experiment - :type expid: str - :param allJobs: list of all jobs objects (usually from job_list) + :param expid: Name of experiment + :type expid: str + :param allJobs: list of all jobs objects (usually from job_list) :type allJobs: list() """ MainDataBase.__init__(self, expid) BasicConfig.read() self.expid = expid - self.folder_path = BasicConfig.GRAPHDATA_DIR + self.folder_path = BasicConfig.LOCAL_ROOT_DIR if basic_config is None else basic_config.LOCAL_ROOT_DIR self.database_path = os.path.join( - self.folder_path, "graph_data_" + str(expid) + ".db") + self.folder_path, "as_metadata", "graph" , "graph_data_" + str(expid) + ".db") self.create_table_query = textwrap.dedent( '''CREATE TABLE - IF NOT EXISTS experiment_graph_draw ( + IF NOT EXISTS experiment_graph_draw ( id INTEGER PRIMARY KEY, job_name text NOT NULL, x INTEGER NOT NULL, - y INTEGER NOT NULL + y INTEGER NOT NULL );''') if not os.path.exists(self.database_path): @@ -594,10 +594,10 @@ class ExperimentGraphDrawing(MainDataBase): except Exception as exp: self.locked = True - def get_validated_data(self, allJobs): + def get_validated_data(self, allJobs): """ - Validates if should update current graph drawing. - :return: None if graph drawing should be updated, otherwise, it returns the position data. + Validates if should update current graph drawing. + :return: None if graph drawing should be updated, otherwise, it returns the position data. :rype: None or dict() """ job_names = {job.name for job in allJobs} @@ -608,16 +608,16 @@ class ExperimentGraphDrawing(MainDataBase): self.should_update = True # Clear database return None - return self.current_position_dictionary + return self.current_position_dictionary # return None if self.should_update == True else self.current_position_dictionary def calculate_drawing(self, allJobs, independent=False, num_chunks=48, job_dictionary=None): """ Called in a thread. - :param allJobs: list of jobs (usually from job_list object) - :type allJobs: list() - :return: Last row Id - :rtype: int + :param allJobs: list of jobs (usually from job_list object) + :type allJobs: list() + :return: Last row Id + :rtype: int """ lock_name = "calculation_{}_in_progress.lock".format(self.expid) if independent == True else self.lock_name lock_path_file = os.path.join(self.folder_path, lock_name) @@ -677,8 +677,8 @@ class ExperimentGraphDrawing(MainDataBase): def set_current_position(self): """ - Sets all registers in the proper variables. - current_position_dictionary: JobName -> (x, y) + Sets all registers in the proper variables. + current_position_dictionary: JobName -> (x, y) current_jobs_set: JobName """ current_table = self._get_current_position() @@ -689,7 +689,7 @@ class ExperimentGraphDrawing(MainDataBase): def _get_current_position(self): """ Get all registers from experiment_graph_draw.\n - :return: row content: id, job_name, x, y + :return: row content: id, job_name, x, y :rtype: 4-tuple (int, str, int, int) """ try: @@ -796,7 +796,7 @@ class JobDataStructure(MainDataBase): return self.get_current_job_data_last(), [] else: job_data, warnings = self.process_current_run_collection(allJobs, job_worker_database) - return job_data, warnings + return job_data, warnings def process_current_run_collection(self, allJobs, job_worker_database=None): """Post-process for job_data. @@ -1137,11 +1137,11 @@ class JobDataStructure(MainDataBase): def get_historic_job_data(self, job_name): """ - Get the historic job data for a certain job + Get the historic job data for a certain job - :param job_name: Name of Job - :type job_name: str - :return: JobData rows that match the job_name + :param job_name: Name of Job + :type job_name: str + :return: JobData rows that match the job_name :rtype: list() of JobData objects """ jobdata = [] @@ -1163,15 +1163,15 @@ class JobDataStructure(MainDataBase): def get_max_id_experiment_run(self): """ - Get last (max) experiment run object. - :return: ExperimentRun data - :rtype: ExperimentRun object + Get last (max) experiment run object. + :return: ExperimentRun data + :rtype: ExperimentRun object """ try: # expe = list() if not os.path.exists(self.database_path): raise Exception("Job data folder not found {0} or the database version is outdated.".format(str(self.database_path))) - if self.db_version < DB_VERSION_SCHEMA_CHANGES: + if self.db_version < DB_VERSION_SCHEMA_CHANGES: print("Job database version {0} outdated.".format(str(self.db_version))) if os.path.exists(self.database_path) and self.db_version >= DB_VERSION_SCHEMA_CHANGES: modified_time = int(os.stat(self.database_path).st_mtime) @@ -1186,13 +1186,13 @@ class JobDataStructure(MainDataBase): raise Exception("Job data folder not found {0} or the database version is outdated.".format( str(self.database_path))) except Exception as exp: - print(str(exp)) - print(traceback.format_exc()) + print(str(exp)) + print(traceback.format_exc()) return None def get_experiment_runs(self): # type: () -> List[ExperimentRun] - """ + """ Get list of experiment runs stored in database """ try: @@ -1221,7 +1221,7 @@ class JobDataStructure(MainDataBase): return None def get_experiment_run_by_id(self, run_id): - """ + """ Get experiment run stored in database by run_id """ try: @@ -1318,11 +1318,11 @@ class JobDataStructure(MainDataBase): def get_current_job_data(self, run_id, all_states=False): """ - Gets the job historical data for a run_id. - :param run_id: Run identifier - :type run_id: int - :param all_states: False if only last=1 should be included, otherwise all rows - :return: List of jobdata rows + Gets the job historical data for a run_id. + :param run_id: Run identifier + :type run_id: int + :param all_states: False if only last=1 should be included, otherwise all rows + :return: List of jobdata rows :rtype: list() of JobData objects """ try: @@ -1384,11 +1384,11 @@ class JobDataStructure(MainDataBase): if (jobitem.job_name, jobitem.run_id) not in included_set: included_set.add( (jobitem.job_name, jobitem.run_id)) - + current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id)) # Outlier detection - + # data = {run_id: [y.running_time() for y in current_collection if y.run_id == run_id] # for run_id in set([job.run_id for job in current_collection])} # mean_sd = {run_id: (np.mean(data.get(run_id, [0])), np.std(data.get(run_id, [0]))) @@ -1539,9 +1539,9 @@ class JobDataStructure(MainDataBase): def _get_experiment_run_by_id(self, run_id): """ - :param run_id: Run Identifier - :type run_id: int - :return: First row that matches the run_id + :param run_id: Run Identifier + :type run_id: int + :return: First row that matches the run_id :rtype: Row as Tuple """ try: @@ -1627,11 +1627,11 @@ class JobDataStructure(MainDataBase): def _get_current_job_data(self, run_id, all_states=False): """ - Get JobData by run_id. - :param run_id: Run Identifier - :type run_id: int - :param all_states: False if only last=1, True all - :type all_states: bool + Get JobData by run_id. + :param run_id: Run Identifier + :type run_id: int + :param all_states: False if only last=1, True all + :type all_states: bool """ try: if self.conn: diff --git a/build/lib/autosubmit_api/experiment/common_requests.py b/build/lib/autosubmit_api/experiment/common_requests.py index e4c6275d82b0e06217eeef289d17e88cda685f41..8b746cc349f877fd2ef90d0cf1e3235ab4c718bc 100644 --- a/build/lib/autosubmit_api/experiment/common_requests.py +++ b/build/lib/autosubmit_api/experiment/common_requests.py @@ -29,14 +29,12 @@ import json import multiprocessing import subprocess from collections import deque - - from autosubmit_api.autosubmit_legacy.autosubmit import Autosubmit import autosubmit_api.database.db_common as db_common import autosubmit_api.experiment.common_db_requests as DbRequests import autosubmit_api.database.db_jobdata as JobData import autosubmit_api.autosubmit_legacy.job.job_utils as LegacyJobUtils -import autosubmit_api.common.utils as common_utils +import autosubmit_api.common.utils as common_utils import autosubmit_api.components.jobs.utils as JUtils from autosubmit_api.autosubmit_legacy.job.job_list import JobList @@ -74,8 +72,8 @@ def get_experiment_stats(expid, filter_period, filter_type): error = False error_message = "" period_fi = "" - period_ini = "" - considered_jobs = list() + period_ini = "" + considered_jobs = list() result = None summary = None try: @@ -84,8 +82,8 @@ def get_experiment_stats(expid, filter_period, filter_type): job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() considered_jobs = job_list_loader.jobs - if filter_type and filter_type != 'Any': - considered_jobs = [job for job in job_list_loader.jobs if job.section == filter_type] + if filter_type and filter_type != 'Any': + considered_jobs = [job for job in job_list_loader.jobs if job.section == filter_type] period_fi = datetime.datetime.now().replace(second=0, microsecond=0) if filter_period and filter_period > 0: @@ -147,7 +145,7 @@ def get_experiment_data(expid): 'completed_jobs': 0, 'db_historic_version': "NA"} try: - autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() result["path"] = autosubmit_config_facade.experiment_path result["owner_id"] = autosubmit_config_facade.get_owner_id() result["owner"] = autosubmit_config_facade.get_owner_name() @@ -174,7 +172,7 @@ def get_experiment_data(expid): except Exception as exp: result["error"] = True - result["error_message"] = str(exp) + result["error_message"] = str(exp) pass return result @@ -469,7 +467,7 @@ def test_run(expid): """ running = False error = False - error_message = "" + error_message = "" try: error, error_message, running, _, _ = _is_exp_running(expid, time_condition=120) @@ -498,7 +496,7 @@ def get_experiment_log_last_lines(expid): try: BasicConfig.read() - path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR + '/' + BasicConfig.LOCAL_ASLOG_DIR + path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR + '/' + BasicConfig.LOCAL_ASLOG_DIR reading = os.popen('ls -t ' + path + ' | grep "run.log"').read() if (os.path.exists(path)) else "" # Finding log files @@ -506,16 +504,16 @@ def get_experiment_log_last_lines(expid): path = BasicConfig.LOCAL_ROOT_DIR + '/' + expid + '/' + BasicConfig.LOCAL_TMP_DIR reading = os.popen('ls -t ' + path + ' | grep "run.log"').read() if (os.path.exists(path)) else "" - if len(reading) > 0: - log_file_name = reading.split()[0] + if len(reading) > 0: + log_file_name = reading.split()[0] current_stat = os.stat(path + '/' + log_file_name) timest = int(current_stat.st_mtime) - log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) - found = True - request = 'tail -150 ' + path + '/' + log_file_name - last_lines = os.popen(request) + log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) + found = True + request = 'tail -150 ' + path + '/' + log_file_name + last_lines = os.popen(request) for i, item in enumerate(last_lines.readlines()): - logcontent.append({'index': i, 'content': item[0:-1]}) + logcontent.append({'index': i, 'content': item[0:-1]}) except Exception as e: error = True error_message = str(e) @@ -533,9 +531,9 @@ def get_experiment_log_last_lines(expid): def get_job_log(expid, logfile, nlines=150): """ Returns the last 150 lines of the log file. Targets out or err. - :param logfilepath: path to the log file + :param logfilepath: path to the log file :type logfilepath: str - :return: List of string + :return: List of string :rtype: list """ # Initializing results: @@ -554,8 +552,8 @@ def get_job_log(expid, logfile, nlines=150): timest = int(current_stat.st_mtime) log_file_lastmodified = common_utils.timestamp_to_datetime_format(timest) found = True - request = "tail -{0} {1}".format(nlines, logfilepath) - last50 = os.popen(request) + request = "tail -{0} {1}".format(nlines, logfilepath) + last50 = os.popen(request) i = 0 for item in last50.readlines(): logcontent.append({'index': i, 'content': item[0:-1]}) @@ -577,11 +575,11 @@ def get_job_log(expid, logfile, nlines=150): def get_experiment_pkl(expid): # type: (str) -> Dict[str, Any] """ - Gets the current state of the pkl in a format proper for graph update. + Gets the current state of the pkl in a format proper for graph update. """ pkl_file_path = "" error = False - error_message = "" + error_message = "" pkl_content = list() pkl_timestamp = 0 package_to_jobs = dict() @@ -590,11 +588,11 @@ def get_experiment_pkl(expid): pkl_file_path = autosubmit_config_facade.pkl_path pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() - if not os.path.exists(autosubmit_config_facade.pkl_path): + if not os.path.exists(autosubmit_config_facade.pkl_path): raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() - package_to_jobs = job_list_loader.joblist_helper.package_to_jobs + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + package_to_jobs = job_list_loader.joblist_helper.package_to_jobs for job in job_list_loader.jobs: pkl_content.append({'name': job.name, @@ -608,12 +606,12 @@ def get_experiment_pkl(expid): 'finish': common_utils.timestamp_to_datetime_format(job.finish), 'running_text': job.running_time_text, 'dashed': True if job.package else False, - 'shape': job_list_loader.joblist_helper.package_to_symbol.get(job.package, "dot"), + 'shape': job_list_loader.joblist_helper.package_to_symbol.get(job.package, "dot"), 'package': job.package, 'status': job.status_text, 'status_color': job.status_color, 'out': job.out_file_path, - 'err': job.err_file_path, + 'err': job.err_file_path, 'priority': job.priority}) except Exception as e: @@ -638,7 +636,7 @@ def get_experiment_tree_pkl(expid): """ pkl_file_path = "" error = False - error_message = "" + error_message = "" pkl_content = list() package_to_jobs = {} pkl_timestamp = 0 @@ -647,11 +645,11 @@ def get_experiment_tree_pkl(expid): autosubmit_config_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() pkl_file_path = autosubmit_config_facade.pkl_path pkl_timestamp = autosubmit_config_facade.get_pkl_last_modified_timestamp() - - if not os.path.exists(autosubmit_config_facade.pkl_path): + + if not os.path.exists(autosubmit_config_facade.pkl_path): raise Exception("Pkl file {} not found.".format(autosubmit_config_facade.pkl_path)) - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() package_to_jobs = job_list_loader.joblist_helper.package_to_jobs for job in job_list_loader.jobs: pkl_content.append({'name': job.name, @@ -692,53 +690,62 @@ def get_experiment_tree_pkl(expid): } -def get_experiment_graph(expid, layout=Layout.STANDARD, grouped=GroupedBy.NO_GROUP): +def get_experiment_graph(expid, log, layout=Layout.STANDARD, grouped=GroupedBy.NO_GROUP): """ Gets graph representation """ base_list = dict() pkl_timestamp = 10000000 try: - autosubmit_configuration_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + # autosubmit_configuration_facade = ConfigurationFacadeDirector(AutosubmitConfigurationFacadeBuilder(expid)).build_autosubmit_configuration_facade() + BasicConfig.read() + autosubmit_configuration_facade = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) + autosubmit_configuration_facade.reload() + # raise Exception("json config autosubmitgraph: " + str(autosubmit_configuration_facade.__dict__)) try: - if common_utils.is_version_historical_ready(autosubmit_configuration_facade.get_autosubmit_version()): - job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() + if common_utils.is_version_historical_ready(autosubmit_configuration_facade.get_version()): + job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() graph = GraphRepresentation(expid, job_list_loader, layout, grouped) - graph.perform_calculations() + graph.perform_calculations() return graph.get_graph_representation_data() except Exception as exp: - print(traceback.format_exc()) + # print(traceback.format_exc()) print("New Graph Representation failed: {0}".format(exp)) + log.info("Could not generate graph with faster method") # Getting platform data + hpcarch = autosubmit_configuration_facade.get_platform() # Submitter - submitter = Autosubmit._get_submitter(autosubmit_configuration_facade.autosubmit_conf) - submitter.load_platforms(autosubmit_configuration_facade.autosubmit_conf) + submitter = Autosubmit._get_submitter(autosubmit_configuration_facade) + submitter.load_platforms(autosubmit_configuration_facade) # JobList construction - job_list = Autosubmit.load_job_list(expid, autosubmit_configuration_facade.autosubmit_conf, notransitive=False) + job_list = Autosubmit.load_job_list(expid, autosubmit_configuration_facade, notransitive=False) if job_list.graph == None: raise Exception("Graph generation is not possible for this experiment.") # Platform update - hpcarch = autosubmit_configuration_facade.get_main_platform() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower( - )] + job.platform = submitter.platforms[job.platform_name.lower()] # Chunk unit and chunk size - chunk_unit = autosubmit_configuration_facade.chunk_unit - chunk_size = autosubmit_configuration_facade.chunk_size + chunk_unit = autosubmit_configuration_facade.get_chunk_size_unit() + chunk_size = autosubmit_configuration_facade.get_chunk_size() job_list.sort_by_id() + base_list = job_list.get_graph_representation( - BasicConfig, layout, grouped, chunk_unit=chunk_unit, chunk_size=chunk_size) + BasicConfig, layout, grouped, chunk_unit=chunk_unit, chunk_size=chunk_size + ) + # raise Exception("Base list graph: ", str(base_list)) except Exception as e: print(traceback.format_exc()) + log.info("Could not generate Graph and recieved the following exception: " + str(e)) return {'nodes': [], 'edges': [], 'fake_edges': [], @@ -763,7 +770,7 @@ def get_experiment_tree_rundetail(expid, run_id): """ base_list = dict() pkl_timestamp = 10000000 - try: + try: print("Received Tree RunDetail " + str(expid)) BasicConfig.read() tree_structure, current_collection, reference = JobList.get_tree_structured_from_previous_run(expid, BasicConfig, run_id=run_id) @@ -780,7 +787,7 @@ def get_experiment_tree_rundetail(expid, run_id): return base_list -def get_experiment_tree_structured(expid): +def get_experiment_tree_structured(expid, log): """ Current version of the tree visualization algorithm. :param expid: Name of experiment @@ -792,21 +799,34 @@ def get_experiment_tree_structured(expid): pkl_timestamp = 10000000 try: notransitive = False - print("Received Tree Request " + str(expid)) BasicConfig.read() - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) - as_conf.reload() + + # TODO: Encapsulate this following 2 lines or move to the parent function in app.py + curr_exp_as_version = db_common.get_autosubmit_version(expid) + main, secondary = common_utils.parse_version_number(curr_exp_as_version) + if main >= "4": + # TODO: new YAML parser + test = 1 + else: + log.info("EXPERIMENT VERSION = " + str(curr_exp_as_version)) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf.reload() + + # If version is higher than 3.13, we can perform the new tree representation algorithm try: - if common_utils.is_version_historical_ready(as_conf.get_version()): + if common_utils.is_version_historical_ready(as_conf.get_version()): job_list_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() - tree = TreeRepresentation(expid, job_list_loader) + tree = TreeRepresentation(expid, job_list_loader) tree.perform_calculations() + # este return return tree.get_tree_structure() + else: + log.info("TREE|Not using first method|autosubmit_version=" + as_conf.get_version()) except Exception as exp: print(traceback.format_exc()) print("New Tree Representation failed: {0}".format(exp)) + log.info("New Tree Representation failed: {0}".format(exp)) # Getting platform data # Main taget HPC @@ -876,15 +896,15 @@ def _get_hpcarch_project_from_experiment_run_metadata(run_id, experiment_runs_di main_platform = "" run = experiment_runs_dict.get(run_id, None) if run and run.metadata: - data = json.loads(run.metadata) - main_platform = data["exp"]["experiment"].get("HPCARCH", "") - for platform in data["platforms"]: + data = json.loads(run.metadata) + main_platform = data["exp"]["experiment"].get("HPCARCH", "") + for platform in data["platforms"]: platform_projects[platform] = data["platforms"][platform].get("PROJECT", "") else: raise Exception("NO METADATA ON RUN {0}".format(run_id)) print("PLATFORMS") print(platform_projects) - return (main_platform, platform_projects) + return (main_platform, platform_projects) except Exception as exp: print(exp) return ("", {}) @@ -924,7 +944,7 @@ def generate_all_experiment_data(exp_path, job_path): created) + "|" + format_model(str(model)) + "|" + str(hpc) + "|" + str(wrapper_type) + "|" + str(maxwrapped) + "\n") valid_id[_id] = name file1.close() - + # First step was successful, prepare to process jobs all_job_times = DbRequests.get_completed_times_detail() # if (all_job_times): @@ -934,9 +954,9 @@ def generate_all_experiment_data(exp_path, job_path): expid = valid_id.get(exp_id, None) historical_data = None # JobDataStructure(expid).get_all_current_job_data() TODO: Replace for new implementation experiment_runs = None # JobDataStructure(expid).get_experiment_runs() TODO: Replace for new implementation - experiment_runs = experiment_runs if experiment_runs else [] - # print(experiment_runs) - experiment_runs_dict = {run.run_id: run for run in experiment_runs} + experiment_runs = experiment_runs if experiment_runs else [] + # print(experiment_runs) + experiment_runs_dict = {run.run_id: run for run in experiment_runs} # print("run id -> (,)") experiment_runs_main_info = {run.run_id: _get_hpcarch_project_from_experiment_run_metadata(run.run_id, experiment_runs_dict) for run in experiment_runs} # print(experiment_runs_main_info) @@ -947,9 +967,9 @@ def generate_all_experiment_data(exp_path, job_path): job_conf = get_job_conf_list(expid) except: pass - job_conf_type = {} - for job in historical_data: - # Starting from DB VERSION 17, we go back to calling section -> section, and member -> member; instead of the previous erronous assignment. + job_conf_type = {} + for job in historical_data: + # Starting from DB VERSION 17, we go back to calling section -> section, and member -> member; instead of the previous erronous assignment. if job.member not in job_conf_type: # Member was confused by section in DB version <= 15 if job_conf: @@ -958,10 +978,10 @@ def generate_all_experiment_data(exp_path, job_path): project = "" if job.run_id: main_platform, platforms = experiment_runs_main_info.get(job.run_id, ("", {})) - project = platforms.get(job.platform, "") + project = platforms.get(job.platform, "") if len(project) == 0: try: - if job.member in job_conf_type: + if job.member in job_conf_type: job_conf_info, job_type = job_conf_type[job.member] wallclock, processors, threads, tasks, memory, mem_task, queue, platform, main_platform, project = job_conf_info except: @@ -1094,7 +1114,7 @@ def get_auto_conf_data(expid): max_wrapped = as_conf.get_max_wrapped_jobs() return (wrapper_type, max_wrapped) except Exception as ex: - print("Couldn't retrieve conf data (wrapper info) from {0}. Exception {1}.".format(expid, str(ex))) + print("Couldn't retrieve conf data (wrapper info) from {0}. Exception {1}.".format(expid, str(ex))) return ("None", 0) @@ -1286,9 +1306,9 @@ def get_job_history(expid, job_name): path_to_job_logs = "" result = None try: - BasicConfig.read() - path_to_job_logs = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "tmp", "LOG_" + expid) - result = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history().get_historic_job_data(job_name) + BasicConfig.read() + path_to_job_logs = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "tmp", "LOG_" + expid) + result = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history().get_historic_job_data(job_name) except Exception as exp: print(traceback.format_exc()) error = True @@ -1297,12 +1317,12 @@ def get_job_history(expid, job_name): return {"error": error, "error_message": error_message, "history": result, "path_to_logs": path_to_job_logs} -def get_current_configuration_by_expid(expid, valid_user): +def get_current_configuration_by_expid(expid, valid_user, log): """ Gets the current configuration by expid. The procedure queries the historical database and the filesystem. - :param expid: Experiment Identifier - :type expdi: str - :return: configuration content formatted as a JSON object + :param expid: Experiment Identifier + :type expdi: str + :return: configuration content formatted as a JSON object :rtype: Dictionary """ error = False @@ -1311,6 +1331,18 @@ def get_current_configuration_by_expid(expid, valid_user): warning_message = "" currentRunConfig = {} currentFileSystemConfig = {} + + def removeParameterDuplication(currentDict, keyToRemove, exceptionsKeys=[]): + if currentDict and isinstance(currentDict, dict): + try: + for k, nested_d in currentDict.items(): + if k not in exceptionsKeys and isinstance(nested_d, dict): + nested_d.pop(keyToRemove, None) + except Exception as exp: + log.info("Error while trying to eliminate duplicated key from config.") + pass + return currentDict + try: if not valid_user: raise Exception( @@ -1350,6 +1382,9 @@ def get_current_configuration_by_expid(expid, valid_user): currentFileSystemConfig["contains_nones"] = True pass + removeParameterDuplication(currentRunConfig['exp'], "EXPID", ["experiment"]) + removeParameterDuplication(currentFileSystemConfig['exp'], "EXPID", ["experiment"]) + except Exception as exp: error = True error_message = str(exp) @@ -1361,7 +1396,7 @@ def get_current_configuration_by_expid(expid, valid_user): def get_experiment_runs(expid): - """ + """ Get runs of the same experiment from historical db """ error = False @@ -1379,7 +1414,7 @@ def get_experiment_runs(expid): try: # TODO: TEST TEST TEST TEST - # Current data + # Current data joblist_loader = JobListLoaderDirector(JobListLoaderBuilder(expid)).build_loaded_joblist_loader() experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history() # time_0 = time.time() @@ -1390,43 +1425,43 @@ def get_experiment_runs(expid): for job_dc in experiment_history.manager.get_job_data_dcs_all(): if job_dc.status_code == common_utils.Status.COMPLETED: run_id_job_name_to_job_data_dc_COMPLETED[(job_dc.run_id, job_dc.job_name)] = job_dc - run_id_wrapper_code_to_job_dcs = {} + run_id_wrapper_code_to_job_dcs = {} for key, job_dc in run_id_job_name_to_job_data_dc_COMPLETED.items(): if job_dc.wrapper_code: run_id, _ = key run_id_wrapper_code_to_job_dcs.setdefault((run_id, job_dc.wrapper_code), []).append(job_dc) - - run_dict_SIM = {} + + run_dict_SIM = {} for job_data_dc in sim_jobs: run_dict_SIM.setdefault(job_data_dc.run_id, []).append(job_data_dc) - run_dict_POST = {} + run_dict_POST = {} for job_data_dc in post_jobs: run_dict_POST.setdefault(job_data_dc.run_id, []).append(job_data_dc) max_run_id = 0 # print("Time spent in data retrieval and pre-process: {}".format(time.time() - time_0)) if experiment_runs: - for experiment_run in experiment_runs: + for experiment_run in experiment_runs: max_run_id = max(experiment_run.run_id, max_run_id) - valid_SIM_in_run = run_dict_SIM.get(experiment_run.run_id, []) + valid_SIM_in_run = run_dict_SIM.get(experiment_run.run_id, []) valid_POST_in_run = run_dict_POST.get(experiment_run.run_id, []) # The content of the if block try to correct lack of finish time information in the Historical database # It may not be necessary in the future. if max_run_id == experiment_run.run_id: assign_current(joblist_loader.job_dictionary, valid_SIM_in_run, experiment_history) assign_current(joblist_loader.job_dictionary, valid_POST_in_run, experiment_history) - result.append({"run_id": experiment_run.run_id, - "created": experiment_run.created, - "finish": common_utils.timestamp_to_datetime_format(experiment_run.finish), - "chunk_unit": experiment_run.chunk_unit, + result.append({"run_id": experiment_run.run_id, + "created": experiment_run.created, + "finish": common_utils.timestamp_to_datetime_format(experiment_run.finish), + "chunk_unit": experiment_run.chunk_unit, "chunk_size": experiment_run.chunk_size, - "submitted": experiment_run.submitted, - "queuing": experiment_run.queuing, - "running": experiment_run.running, - "completed": experiment_run.completed, - "failed": experiment_run.failed, - "total": experiment_run.total, - "suspended": experiment_run.suspended, - "SYPD": experiment_run.getSYPD(valid_SIM_in_run), + "submitted": experiment_run.submitted, + "queuing": experiment_run.queuing, + "running": experiment_run.running, + "completed": experiment_run.completed, + "failed": experiment_run.failed, + "total": experiment_run.total, + "suspended": experiment_run.suspended, + "SYPD": experiment_run.getSYPD(valid_SIM_in_run), "ASYPD": experiment_run.getASYPD(valid_SIM_in_run, valid_POST_in_run, run_id_wrapper_code_to_job_dcs)}) result.sort(key=lambda x: x["run_id"], reverse=True) else: