diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000000000000000000000000000000000000..28097deed077b9a955b2ae5f8e6eb304542510d5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ +# CHANGELOG + +### Pre-release v4.0.0b2 - Release date: 2023-11-20 + +* Fix bug where API allowed clients aren't read from the autosubmitrc. +* Fixed bug that doesn't show the full config of AS4 in the `/cconfig/` endpoint. +* Added differences calculation in the `/cconfig/` endpoint. +* Added job status in the `/quick/` endpoint. +* Improved security by using protection levels. +* Improved logging and code structure. +* Updated the `Parallelization` metric to consider the platform `PROCESSORS` of the job in the `/performance/` endpoint. +* Added a Processing Elements (PE) estimation that extends the `Parallelization` logic in the `/performance/` endpoint which improves the CHSY metric accuracy. +* Fixed bug where jobs don't show the correct platform. +* Fixed error while populating the `experiment_times` table which affected the `/running/` endpoint. + + +### Pre-release v4.0.0b1 - Release date: 2023-11-02 + +* Introduced `autosubmit_api` CLI +* Majorly solved compatibility with autosubmit >= 4.0.0 \ No newline at end of file diff --git a/README.md b/README.md index 6c612d6cb3856ebeb2199a32a73164b7e307f72d..381bf2b7a733dde0401ffd7590eee2686d142879 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ 1. [Overview](#overview) 2. [Autosubmit Big Picture](#autosubmit-big-picture) 3. [General Knowledge Requirements](#general-knowledge-requirements) -4. [Deployment](#deployment) +4. [Installation](#Installation) ## Overview @@ -40,6 +40,19 @@ In this image you can see the flow of information in the **Autosubmit environmen * Gunicorn * Unit testing +## Installation +Autosubmit API can be easily installed via pip +```sh +pip install autosubmit-api # >=4.0 (recommended) +# Check installation +autosubmit_api start -h +``` + +Start the server: + +```sh +autosubmit_api start +``` diff --git a/VERSION b/VERSION deleted file mode 100644 index c9d9681b383a1746eb074946893c6ce60c0d8090..0000000000000000000000000000000000000000 --- a/VERSION +++ /dev/null @@ -1 +0,0 @@ -4.0.0b1 diff --git a/autosubmit_api/__init__.py b/autosubmit_api/__init__.py index 38aa811c261c7fab40e9abcb939223d72c96aa44..764d200bc566a0b8c65926ace874952cda60a48f 100644 --- a/autosubmit_api/__init__.py +++ b/autosubmit_api/__init__.py @@ -1,3 +1,3 @@ -__version__ = "4.0.0b1" -__author__ = 'Luiggi Tenorio, Cristian Gutiérrez, Julian Berlin, Wilmer Uruchi' +__version__ = "4.0.0b2" +__author__ = 'Luiggi Tenorio, Bruno P. Kinoshita, Cristian Gutiérrez, Julian Berlin, Wilmer Uruchi' __credits__ = 'Barcelona Supercomputing Center' diff --git a/autosubmit_api/app.py b/autosubmit_api/app.py index 9799534bb8c1b325694317001d8527a3ea1463d7..f41f558a343723b1853823cf5677fc5f7ddbb5c2 100644 --- a/autosubmit_api/app.py +++ b/autosubmit_api/app.py @@ -17,20 +17,21 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from functools import wraps import os import sys -import time from datetime import datetime, timedelta +from typing import Optional import requests -import logging from flask_cors import CORS, cross_origin from flask import Flask, request, session, redirect - +from autosubmit_api import __version__ as APIVersion +from autosubmit_api.auth import ProtectionLevels, with_auth_token +from autosubmit_api.bgtasks.scheduler import create_bind_scheduler from autosubmit_api.database.extended_db import ExtendedDB from autosubmit_api.database.db_common import get_current_running_exp, update_experiment_description_owner from autosubmit_api.experiment import common_requests as CommonRequests from autosubmit_api.experiment import utils as Utiles +from autosubmit_api.logger import get_app_logger, with_log_run_times from autosubmit_api.performance.performance_metrics import PerformanceMetrics from autosubmit_api.database.db_common import search_experiment_by_id from autosubmit_api.config.basicConfig import APIBasicConfig @@ -38,27 +39,8 @@ from autosubmit_api.builders.joblist_helper_builder import JobListHelperBuilder, from multiprocessing import Manager, Lock import jwt import sys -from flask_apscheduler import APScheduler -from autosubmit_api.workers import populate_details_db, populate_queue_run_times, populate_running_experiments, populate_graph, verify_complete -from autosubmit_api.config import JWT_SECRET, JWT_ALGORITHM, JWT_EXP_DELTA_SECONDS, RUN_BACKGROUND_TASKS_ON_START, CAS_LOGIN_URL, CAS_VERIFY_URL - -def with_log_run_times(_logger: logging.Logger, _tag: str): - def decorator(func): - @wraps(func) - def inner_wrapper(*args, **kwargs): - start_time = time.time() - path = "" - try: - path = request.path - except: - pass - _logger.info('{}|RECEIVED|{}'.format(_tag, path)) - response = func(*args, **kwargs) - _logger.info('{}|RTIME|{}|{:.3f}'.format(_tag, path,(time.time() - start_time))) - return response +from autosubmit_api.config import JWT_SECRET, JWT_ALGORITHM, JWT_EXP_DELTA_SECONDS, PROTECTION_LEVEL, RUN_BACKGROUND_TASKS_ON_START, CAS_LOGIN_URL, CAS_VERIFY_URL - return inner_wrapper - return decorator def create_app(): """ @@ -70,69 +52,52 @@ def create_app(): app = Flask(__name__) - D = Manager().dict() + # Multiprocessing setup + D = Manager().dict() + lock = Lock() + # CORS setup CORS(app) - gunicorn_logger = logging.getLogger('gunicorn.error') - app.logger.handlers = gunicorn_logger.handlers - app.logger.setLevel(gunicorn_logger.level) + # Logger binding + app.logger = get_app_logger() app.logger.info("PYTHON VERSION: " + sys.version) + # Enforce Language Locale + CommonRequests.enforceLocal(app.logger) + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' try: requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += 'HIGH:!DH:!aNULL' except AttributeError: - # no pyopenssl support used / needed / available - pass - - lock = Lock() + app.logger.warning('No pyopenssl support used / needed / available') + + # Initial read config + APIBasicConfig.read() + app.logger.debug("API Basic config: " + str(APIBasicConfig().props())) + app.logger.debug("Env Config: "+ str({ + "PROTECTION_LEVEL": PROTECTION_LEVEL, + "CAS_LOGIN_URL": CAS_LOGIN_URL, + "CAS_VERIFY_URL": CAS_VERIFY_URL, + "RUN_BACKGROUND_TASKS_ON_START": RUN_BACKGROUND_TASKS_ON_START + })) - CommonRequests.enforceLocal(app.logger) + # Prepare DB + ext_db = ExtendedDB(APIBasicConfig.DB_DIR, + APIBasicConfig.DB_FILE, APIBasicConfig.AS_TIMES_DB) + ext_db.prepare_db() # Background Scheduler - scheduler = APScheduler() - scheduler.init_app(app) - scheduler.start() - - @scheduler.task('interval', id='populate_details_db', hours=4) - @with_log_run_times(app.logger, "WRKPOPDET") - def worker_populate_details_db(): - populate_details_db.main() - - @scheduler.task('interval', id='populate_queue_run_times', minutes=3) - @with_log_run_times(app.logger, "WRKPOPQUE") - def worker_populate_queue_run_times(): - populate_queue_run_times.main() - - @scheduler.task('interval', id='populate_running_experiments', minutes=5) - @with_log_run_times(app.logger, "WRKPOPREX") - def worker_populate_running_experiments(): - populate_running_experiments.main() - - @scheduler.task('interval', id='verify_complete', minutes=10) - @with_log_run_times(app.logger, "WRKVRFCMPT") - def worker_verify_complete(): - verify_complete.main() - - @scheduler.task('interval', id='populate_graph', hours=24) - @with_log_run_times(app.logger, "WRKPOPGRPH") - def worker_populate_graph(): - populate_graph.main() + create_bind_scheduler(app) - # Prepare DB - config = APIBasicConfig() - config.read() - ext_db = ExtendedDB(config.DB_DIR, config.DB_FILE, config.AS_TIMES_DB) - ext_db.prepare_db() + ################################ ROUTES ################################ - if RUN_BACKGROUND_TASKS_ON_START: - app.logger.info('Starting populate workers on init...') - worker_populate_details_db() - worker_populate_queue_run_times() - worker_populate_running_experiments() - worker_verify_complete() - worker_populate_graph() + @app.route('/') + def home(): + return { + "name": "Autosubmit API", + "version": APIVersion + } # CAS Login @app.route('/login') @@ -151,11 +116,14 @@ def create_app(): target_service = "{}{}/login".format(referrer, environment) if not ticket: - route_to_request_ticket = "{}?service={}".format(CAS_LOGIN_URL, target_service) + route_to_request_ticket = "{}?service={}".format( + CAS_LOGIN_URL, target_service) app.logger.info("Redirected to: " + str(route_to_request_ticket)) return redirect(route_to_request_ticket) - environment = environment if environment is not None else "autosubmitapp" # can be used to target the test environment - cas_verify_ticket_route = CAS_VERIFY_URL + '?service=' + target_service + '&ticket=' + ticket + # can be used to target the test environment + # environment = environment if environment is not None else "autosubmitapp" + cas_verify_ticket_route = CAS_VERIFY_URL + \ + '?service=' + target_service + '&ticket=' + ticket response = requests.get(cas_verify_ticket_route) user = None if response: @@ -171,11 +139,11 @@ def create_app(): jwt_token = jwt.encode(payload, JWT_SECRET, JWT_ALGORITHM) return {'authenticated': True, 'user': user, 'token': jwt_token, 'message': "Token generated."} - @app.route('/updatedesc', methods=['GET', 'POST']) @cross_origin(expose_headers="Authorization") @with_log_run_times(app.logger, "UDESC") - def update_description(): + @with_auth_token(threshold=ProtectionLevels.WRITEONLY) + def update_description(user_id: Optional[str] = None): """ Updates the description of an experiment. Requires authenticated user. """ @@ -185,140 +153,128 @@ def create_app(): body_data = request.json expid = body_data.get("expid", None) new_description = body_data.get("description", None) - current_token = request.headers.get("Authorization") - try: - jwt_token = jwt.decode(current_token, JWT_SECRET, JWT_ALGORITHM) - except jwt.ExpiredSignatureError: - jwt_token = {"user_id": None} - except Exception as exp: - jwt_token = {"user_id": None} - valid_user = jwt_token.get("user_id", None) - return update_experiment_description_owner(expid, new_description, valid_user) - + return update_experiment_description_owner(expid, new_description, user_id), 200 if user_id else 401 @app.route('/tokentest', methods=['GET', 'POST']) @cross_origin(expose_headers="Authorization") @with_log_run_times(app.logger, "TTEST") - def test_token(): + @with_auth_token(threshold=ProtectionLevels.NONE, response_on_fail=False) + def test_token(user_id: Optional[str] = None): """ Tests if a token is still valid """ - current_token = request.headers.get("Authorization") - try: - jwt_token = jwt.decode(current_token, JWT_SECRET, JWT_ALGORITHM) - except jwt.ExpiredSignatureError: - jwt_token = {"user_id": None} - except Exception as exp: - print(exp) - jwt_token = {"user_id": None} - - valid_user = jwt_token.get("user_id", None) return { - "isValid": True if valid_user else False, - "message": "Session expired" if not valid_user else None - } - + "isValid": True if user_id else False, + "message": "Unauthorized" if not user_id else None + }, 200 if user_id else 401 @app.route('/cconfig/', methods=['GET']) @cross_origin(expose_headers="Authorization") @with_log_run_times(app.logger, "CCONFIG") - def get_current_configuration(expid): - current_token = request.headers.get("Authorization") - try: - jwt_token = jwt.decode(current_token, JWT_SECRET, JWT_ALGORITHM) - except Exception as exp: - jwt_token = {"user_id": None} - valid_user = jwt_token.get("user_id", None) - result = CommonRequests.get_current_configuration_by_expid(expid, valid_user, app.logger) + @with_auth_token() + def get_current_configuration(expid: str, user_id: Optional[str] = None): + result = CommonRequests.get_current_configuration_by_expid( + expid, user_id) return result - @app.route('/expinfo/', methods=['GET']) @with_log_run_times(app.logger, "EXPINFO") - def exp_info(expid): + @with_auth_token() + def exp_info(expid: str, user_id: Optional[str] = None): result = CommonRequests.get_experiment_data(expid) return result - @app.route('/expcount/', methods=['GET']) @with_log_run_times(app.logger, "EXPCOUNT") - def exp_counters(expid): + @with_auth_token() + def exp_counters(expid: str, user_id: Optional[str] = None): result = CommonRequests.get_experiment_counters(expid) return result - @app.route('/searchowner///', methods=['GET']) @app.route('/searchowner/', methods=['GET']) @with_log_run_times(app.logger, "SOWNER") - def search_owner(owner, exptype=None, onlyactive=None): + @with_auth_token() + def search_owner(owner, exptype=None, onlyactive=None, user_id: Optional[str] = None): """ Same output format as search_expid """ - result = search_experiment_by_id(searchString=None, owner=owner, typeExp=exptype, onlyActive=onlyactive) + result = search_experiment_by_id( + searchString=None, owner=owner, typeExp=exptype, onlyActive=onlyactive) return result - @app.route('/search///', methods=['GET']) @app.route('/search/', methods=['GET']) @with_log_run_times(app.logger, "SEARCH") - def search_expid(expid, exptype=None, onlyactive=None): - result = search_experiment_by_id(expid, owner=None, typeExp=exptype, onlyActive=onlyactive) + @with_auth_token() + def search_expid(expid, exptype=None, onlyactive=None, user_id: Optional[str] = None): + result = search_experiment_by_id( + expid, owner=None, typeExp=exptype, onlyActive=onlyactive) return result - @app.route('/running/', methods=['GET']) @with_log_run_times(app.logger, "RUN") - def search_running(): + @with_auth_token() + def search_running(user_id: Optional[str] = None): """ Returns the list of all experiments that are currently running. """ if 'username' in session: - print(("USER {}".format(session['username']))) - app.logger.info("Active proceses: " + str(D)) - #app.logger.info("Received Currently Running query ") + app.logger.debug(("USER {}".format(session['username']))) + app.logger.debug("Active proceses: " + str(D)) + # app.logger.info("Received Currently Running query ") result = get_current_running_exp() return result - @app.route('/runs/', methods=['GET']) @with_log_run_times(app.logger, "ERUNS") - def get_runs(expid): + @with_auth_token() + def get_runs(expid, user_id: Optional[str] = None): """ Get list of runs of the same experiment from the historical db """ result = CommonRequests.get_experiment_runs(expid) return result - @app.route('/ifrun/', methods=['GET']) @with_log_run_times(app.logger, "IFRUN") - def get_if_running(expid): + @with_auth_token() + def get_if_running(expid, user_id: Optional[str] = None): result = CommonRequests.quick_test_run(expid) return result - @app.route('/logrun/', methods=['GET']) @with_log_run_times(app.logger, "LOGRUN") - def get_log_running(expid): + @with_auth_token() + def get_log_running(expid, user_id: Optional[str] = None): result = CommonRequests.get_current_status_log_plus(expid) return result - @app.route('/summary/', methods=['GET']) @with_log_run_times(app.logger, "SUMMARY") - def get_expsummary(expid): + @with_auth_token() + def get_expsummary(expid, user_id: Optional[str] = None): user = request.args.get("loggedUser", default="null", type=str) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "summary", True]; lock.release(); + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "summary", True] + lock.release() result = CommonRequests.get_experiment_summary(expid, app.logger) app.logger.info('Process: ' + str(os.getpid()) + " workers: " + str(D)) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "summary", False]; lock.release(); - if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "summary", False] + lock.release() + if user != "null": + lock.acquire() + D.pop(os.getpid(), None) + lock.release() return result - @app.route('/shutdown/') @with_log_run_times(app.logger, "SHUTDOWN") - def shutdown(route): + @with_auth_token() + def shutdown(route, user_id: Optional[str] = None): """ This function is invoked from the frontend (AS-GUI) to kill workers that are no longer needed. This call is common in heavy parts of the GUI such as the Tree and Graph generation or Summaries fetching. @@ -330,13 +286,14 @@ def create_app(): app.logger.info("Bad parameters for user and expid in route.") if user != "null": - app.logger.info('SHUTDOWN|DETAILS|route: ' + route + " user: " + user + " expid: " + expid) + app.logger.info('SHUTDOWN|DETAILS|route: ' + route + + " user: " + user + " expid: " + expid) try: # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) app.logger.info("Workers before: " + str(D)) lock.acquire() - for k,v in list(D.items()): + for k, v in list(D.items()): if v[0] == user and v[1] == route and v[-1] == True: if v[2] == expid: D[k] = [user, route, expid, False] @@ -349,79 +306,104 @@ def create_app(): lock.release() app.logger.info("Workers now: " + str(D)) except Exception as exp: - app.logger.info("[CRITICAL] Could not shutdown process " + expid + " by user \"" + user + "\"") + app.logger.info( + "[CRITICAL] Could not shutdown process " + expid + " by user \"" + user + "\"") return "" - @app.route('/performance/', methods=['GET']) @with_log_run_times(app.logger, "PRF") - def get_exp_performance(expid): + @with_auth_token() + def get_exp_performance(expid, user_id: Optional[str] = None): result = {} try: - result = PerformanceMetrics(expid, JobListHelperDirector(JobListHelperBuilder(expid)).build_job_list_helper()).to_json() - except Exception as exp: + result = PerformanceMetrics(expid, JobListHelperDirector( + JobListHelperBuilder(expid)).build_job_list_helper()).to_json() + except Exception as exc: result = {"SYPD": None, - "ASYPD": None, - "RSYPD": None, - "CHSY": None, - "JPSY": None, - "Parallelization": None, - "considered": [], - "error": True, - "error_message": str(exp), - "warnings_job_data": [], - } + "ASYPD": None, + "RSYPD": None, + "CHSY": None, + "JPSY": None, + "Parallelization": None, + "PE": None, + "considered": [], + "error": True, + "error_message": str(exc), + "warnings_job_data": [], + } return result - @app.route('/graph///', methods=['GET']) @with_log_run_times(app.logger, "GRAPH") - def get_list_format(expid, layout='standard', grouped='none'): + @with_auth_token() + def get_list_format(expid, layout='standard', grouped='none', user_id: Optional[str] = None): user = request.args.get("loggedUser", default="null", type=str) # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "graph", expid, True]; lock.release(); - result = CommonRequests.get_experiment_graph(expid, app.logger, layout, grouped) - app.logger.info('Process: ' + str(os.getpid()) + " graph workers: " + str(D)) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "graph", expid, False]; lock.release(); - if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "graph", expid, True] + lock.release() + result = CommonRequests.get_experiment_graph( + expid, app.logger, layout, grouped) + app.logger.info('Process: ' + str(os.getpid()) + + " graph workers: " + str(D)) + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "graph", expid, False] + lock.release() + if user != "null": + lock.acquire() + D.pop(os.getpid(), None) + lock.release() return result - @app.route('/tree/', methods=['GET']) @with_log_run_times(app.logger, "TREE") - def get_exp_tree(expid): + @with_auth_token() + def get_exp_tree(expid, user_id: Optional[str] = None): user = request.args.get("loggedUser", default="null", type=str) # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "tree", expid, True]; lock.release(); - result = CommonRequests.get_experiment_tree_structured(expid, app.logger) - app.logger.info('Process: ' + str(os.getpid()) + " tree workers: " + str(D)) - if user != "null": lock.acquire(); D[os.getpid()] = [user, "tree", expid, False]; lock.release(); - if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "tree", expid, True] + lock.release() + result = CommonRequests.get_experiment_tree_structured( + expid, app.logger) + app.logger.info('Process: ' + str(os.getpid()) + + " tree workers: " + str(D)) + if user != "null": + lock.acquire() + D[os.getpid()] = [user, "tree", expid, False] + lock.release() + if user != "null": + lock.acquire() + D.pop(os.getpid(), None) + lock.release() return result - @app.route('/quick/', methods=['GET']) @with_log_run_times(app.logger, "QUICK") - def get_quick_view_data(expid): + @with_auth_token(response_on_fail=True) + def get_quick_view_data(expid, user_id=None): result = CommonRequests.get_quick_view(expid) return result - @app.route('/exprun/', methods=['GET']) @with_log_run_times(app.logger, "LOG") - def get_experiment_running(expid): + @with_auth_token() + def get_experiment_running(expid, user_id: Optional[str] = None): """ Finds log and gets the last 150 lines """ result = CommonRequests.get_experiment_log_last_lines(expid) return result - @app.route('/joblog/', methods=['GET']) @with_log_run_times(app.logger, "JOBLOG") - def get_job_log_from_path(logfile): + @with_auth_token() + def get_job_log_from_path(logfile, user_id: Optional[str] = None): """ Get log from path """ @@ -430,49 +412,46 @@ def create_app(): result = CommonRequests.get_job_log(expid, logfile) return result - @app.route('/pklinfo//', methods=['GET']) @with_log_run_times(app.logger, "GPKL") - def get_experiment_pklinfo(expid, timeStamp): + @with_auth_token() + def get_experiment_pklinfo(expid, timeStamp, user_id: Optional[str] = None): result = CommonRequests.get_experiment_pkl(expid) return result - @app.route('/pkltreeinfo//', methods=['GET']) @with_log_run_times(app.logger, "TPKL") - def get_experiment_tree_pklinfo(expid, timeStamp): + @with_auth_token() + def get_experiment_tree_pklinfo(expid, timeStamp, user_id: Optional[str] = None): result = CommonRequests.get_experiment_tree_pkl(expid) return result - @app.route('/stats///') @with_log_run_times(app.logger, "STAT") - def get_experiment_statistics(expid, filter_period, filter_type): - result = CommonRequests.get_experiment_stats(expid, filter_period, filter_type) + @with_auth_token() + def get_experiment_statistics(expid, filter_period, filter_type, user_id: Optional[str] = None): + result = CommonRequests.get_experiment_stats( + expid, filter_period, filter_type) return result - @app.route('/history//') @with_log_run_times(app.logger, "HISTORY") - def get_exp_job_history(expid, jobname): + @with_auth_token() + def get_exp_job_history(expid, jobname, user_id: Optional[str] = None): result = CommonRequests.get_job_history(expid, jobname) return result - @app.route('/rundetail//') @with_log_run_times(app.logger, "RUNDETAIL") - def get_experiment_run_job_detail(expid, runid): + @with_auth_token() + def get_experiment_run_job_detail(expid, runid, user_id: Optional[str] = None): result = CommonRequests.get_experiment_tree_rundetail(expid, runid) return result - @app.route('/filestatus/') @with_log_run_times(app.logger, "FSTATUS") def get_file_status(): result = CommonRequests.get_last_test_archive_status() return result - - return app - -app = create_app() \ No newline at end of file + return app \ No newline at end of file diff --git a/autosubmit_api/auth/__init__.py b/autosubmit_api/auth/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..dc9af150d0901f9c84753335162b4d8f89c826df --- /dev/null +++ b/autosubmit_api/auth/__init__.py @@ -0,0 +1,65 @@ +from functools import wraps +from flask import request +import jwt +from autosubmit_api.logger import logger +from autosubmit_api.config import PROTECTION_LEVEL, JWT_ALGORITHM, JWT_SECRET +from enum import IntEnum + + +class ProtectionLevels(IntEnum): + ALL = 100 + WRITEONLY = 20 + NONE = 0 + + +class AppAuthError(ValueError): + code = 401 + + +def _parse_protection_level_env(_var): + if _var == "NONE": + return ProtectionLevels.NONE + elif _var == "WRITEONLY": + return ProtectionLevels.WRITEONLY + + return ProtectionLevels.ALL + + +def with_auth_token(threshold=ProtectionLevels.ALL, response_on_fail=True, raise_on_fail=False): + """ + Decorator that validates the Authorization token in a request. + + It adds the `user_id` variable inside the arguments of the wrapped function. + + :param threshold: The minimum PROTECTION_LEVEL that needs to be set to trigger a *_on_fail + :param response_on_fail: if `True` will return a Flask response on fail + :param raise_on_fail: if `True` will raise an exception on fail + :raises AppAuthError: if raise_on_fail=True and decoding fails + """ + def decorator(func): + @wraps(func) + def inner_wrapper(*args, **kwargs): + try: + current_token = request.headers.get("Authorization") + jwt_token = jwt.decode( + current_token, JWT_SECRET, JWT_ALGORITHM) + except Exception as exc: + error_msg = "Unauthorized" + if isinstance(exc, jwt.ExpiredSignatureError): + error_msg = "Expired token" + auth_level = _parse_protection_level_env(PROTECTION_LEVEL) + if threshold <= auth_level: # If True, will trigger *_on_fail + if raise_on_fail: + raise AppAuthError(error_msg) + if response_on_fail: + return {"error": True, "message": error_msg}, 401 + jwt_token = {"user_id": None} + + user_id = jwt_token.get("user_id", None) + logger.debug("decorator user_id: " + str(user_id)) + kwargs["user_id"] = user_id + + return func(*args, **kwargs) + + return inner_wrapper + return decorator diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index 02e23242366693f7c91467c7bddd73e1547f7822..ca74674708acafe5781ac62504780f2209296df1 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -29,50 +29,43 @@ from bscearth.utils.config_parser import ConfigParserFactory import os import re import pickle -import hashlib import traceback import datetime import math -import random # Spectral imports -import numpy as np import networkx as nx from scipy import sparse from fnmatch import fnmatch from collections import deque, OrderedDict -from threading import Thread # End Spectral imports from time import localtime, strftime, time, mktime from shutil import move -from random import shuffle from dateutil.relativedelta import * -from .job import Job -from ...config.config_common import AutosubmitConfigResolver +from autosubmit_api.config.config_common import AutosubmitConfigResolver from bscearth.utils.log import Log -from .job_dict import DicJobs -from .job_utils import Dependency -from .job_utils import SubJob -from .job_utils import SubJobManager, job_times_to_text, datechunk_to_year -from ...performance.utils import calculate_ASYPD_perjob, calculate_SYPD_perjob -from ...components.jobs import utils as JUtils -from ...monitor.monitor import Monitor -from .job_common import Status, Type -from bscearth.utils.date import date2str, parse_date, sum_str_hours -from ...experiment import common_db_requests as DbRequests -from .job_packages import JobPackageSimple, JobPackageArray, JobPackageThread -from .job_package_persistence import JobPackagePersistence +from autosubmit_api.autosubmit_legacy.job.job_dict import DicJobs +from autosubmit_api.autosubmit_legacy.job.job_utils import Dependency +from autosubmit_api.autosubmit_legacy.job.job_utils import SubJob +from autosubmit_api.autosubmit_legacy.job.job_utils import SubJobManager, job_times_to_text, datechunk_to_year +from autosubmit_api.performance.utils import calculate_ASYPD_perjob, calculate_SYPD_perjob +from autosubmit_api.components.jobs import utils as JUtils +from autosubmit_api.monitor.monitor import Monitor +from autosubmit_api.autosubmit_legacy.job.job_common import Status +from bscearth.utils.date import date2str, parse_date +from autosubmit_api.experiment import common_db_requests as DbRequests +from autosubmit_api.autosubmit_legacy.job.job_package_persistence import JobPackagePersistence # from autosubmit_legacy.job.tree import Tree -from ...database import db_structure as DbStructure -from ...database.db_jobdata import JobDataStructure, JobRow, ExperimentGraphDrawing -from ...builders.experiment_history_builder import ExperimentHistoryDirector, ExperimentHistoryBuilder -from ...history.data_classes.job_data import JobData +from autosubmit_api.database import db_structure as DbStructure +from autosubmit_api.database.db_jobdata import JobDataStructure, JobRow, ExperimentGraphDrawing +from autosubmit_api.builders.experiment_history_builder import ExperimentHistoryDirector, ExperimentHistoryBuilder +from autosubmit_api.history.data_classes.job_data import JobData from networkx import DiGraph -from .job_utils import transitive_reduction -from ...common.utils import timestamp_to_datetime_format +from autosubmit_api.autosubmit_legacy.job.job_utils import transitive_reduction +from autosubmit_api.common.utils import timestamp_to_datetime_format from typing import List, Dict, Tuple diff --git a/autosubmit_api/bgtasks/__init__.py b/autosubmit_api/bgtasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/autosubmit_api/bgtasks/bgtask.py b/autosubmit_api/bgtasks/bgtask.py new file mode 100644 index 0000000000000000000000000000000000000000..30698b62d6db6ec7dc36f4ec10da18a9a9ca31b7 --- /dev/null +++ b/autosubmit_api/bgtasks/bgtask.py @@ -0,0 +1,91 @@ +from abc import ABC, abstractmethod +from autosubmit_api.experiment import common_requests +from autosubmit_api.history.experiment_status_manager import ExperimentStatusManager +from autosubmit_api.config.basicConfig import APIBasicConfig +from autosubmit_api.workers.business import populate_times, process_graph_drawings +from autosubmit_api.workers.populate_details.populate import DetailsProcessor + +class BackgroundTask(ABC): + + @staticmethod + @abstractmethod + def run(): + raise NotImplementedError + + @property + @abstractmethod + def id(self) -> dict: + raise NotImplementedError + + @property + @abstractmethod + def trigger_options(self) -> dict: + raise NotImplementedError + + +class PopulateDetailsDB(BackgroundTask): + id = "TASK_POPDET" + trigger_options = { + "trigger": "interval", + "hours": 4 + } + + @staticmethod + def run(): + APIBasicConfig.read() + return DetailsProcessor(APIBasicConfig).process() + + +class PopulateQueueRuntimes(BackgroundTask): + id = "TASK_POPQUE" + trigger_options = { + "trigger": "interval", + "minutes": 3 + } + + @staticmethod + def run(): + """ Process and updates queuing and running times. """ + populate_times.process_completed_times() + + +class PopulateRunningExperiments(BackgroundTask): + id = "TASK_POPRUNEX" + trigger_options = { + "trigger": "interval", + "minutes": 5 + } + + @staticmethod + def run(): + """ + Updates STATUS of experiments. + """ + ExperimentStatusManager().update_running_experiments() + + +class VerifyComplete(BackgroundTask): + id = "TASK_VRFCMPT" + trigger_options = { + "trigger": "interval", + "minutes": 10 + } + + @staticmethod + def run(): + common_requests.verify_last_completed(1800) + + +class PopulateGraph(BackgroundTask): + id = "TASK_POPGRPH" + trigger_options = { + "trigger": "interval", + "hours": 24 + } + + @staticmethod + def run(): + """ + Process coordinates of nodes in a graph drawing and saves them. + """ + process_graph_drawings.process_active_graphs() \ No newline at end of file diff --git a/autosubmit_api/bgtasks/scheduler.py b/autosubmit_api/bgtasks/scheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..2f69e407659a0e0540734cc38c4a50603ce99620 --- /dev/null +++ b/autosubmit_api/bgtasks/scheduler.py @@ -0,0 +1,31 @@ +from typing import List +from flask_apscheduler import APScheduler +from autosubmit_api.bgtasks.bgtask import BackgroundTask, PopulateDetailsDB, PopulateQueueRuntimes, PopulateRunningExperiments, VerifyComplete, PopulateGraph +from autosubmit_api.config import RUN_BACKGROUND_TASKS_ON_START + +from autosubmit_api.logger import logger, with_log_run_times + +REGISTERED_TASKS: List[BackgroundTask] = [ + PopulateDetailsDB, + PopulateQueueRuntimes, + PopulateRunningExperiments, + VerifyComplete, + PopulateGraph +] + +def create_bind_scheduler(app): + scheduler = APScheduler() + scheduler.init_app(app) + scheduler.start() + + for task in REGISTERED_TASKS: + scheduler.add_job(task.id, with_log_run_times(logger, task.id, catch_exc=True)(task.run), **task.trigger_options) + + logger.info("Background tasks: " + str([str(task) for task in scheduler.get_jobs()])) + + if RUN_BACKGROUND_TASKS_ON_START: + logger.info('Starting background tasks on app init before serving...') + for task in REGISTERED_TASKS: + scheduler.run_job(task.id) + + return scheduler diff --git a/autosubmit_api/common/utils.py b/autosubmit_api/common/utils.py index fe491d21b6e063887fedb5684a27c815c247fd41..7056076fa7b6727dcd3c0d2d1045c57e0b18de4f 100644 --- a/autosubmit_api/common/utils.py +++ b/autosubmit_api/common/utils.py @@ -1,11 +1,9 @@ #!/usr/bin/env python -import os -import pickle +import statistics import subprocess import time import datetime import math -import numpy as np from collections import namedtuple from bscearth.utils.date import date2str from dateutil.relativedelta import * @@ -81,8 +79,8 @@ def get_jobs_with_no_outliers(jobs): if len(data_run_times) == 0: return jobs - mean = np.mean(data_run_times) - std = np.std(data_run_times) + mean = statistics.mean(data_run_times) + std = statistics.stdev(data_run_times) # print("mean {0} std {1}".format(mean, std)) if std == 0: @@ -91,7 +89,7 @@ def get_jobs_with_no_outliers(jobs): for job in jobs: z_score = (job.run_time - mean) / std # print("{0} {1} {2}".format(job.name, np.abs(z_score), job.run_time)) - if np.abs(z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: + if math.fabs(z_score) <= THRESHOLD_OUTLIER and job.run_time > 0: new_list.append(job) # else: # print(" OUTLIED {0} {1} {2}".format(job.name, np.abs(z_score), job.run_time)) diff --git a/autosubmit_api/components/experiment/configuration_facade.py b/autosubmit_api/components/experiment/configuration_facade.py index 4a725c2531101bcecd8ffa3129aa5eec35a023e2..8eba4677cd08f9ef853c956ad3f9625d259b29ee 100644 --- a/autosubmit_api/components/experiment/configuration_facade.py +++ b/autosubmit_api/components/experiment/configuration_facade.py @@ -1,11 +1,14 @@ #!/usr/bin/env python +import math import os -from ...config.basicConfig import APIBasicConfig -from ..jobs.job_factory import SimJob, Job -from ...config.config_common import AutosubmitConfigResolver -from bscearth.utils.config_parser import ConfigParserFactory +from autosubmit_api.logger import logger +from autosubmit_api.components.jobs.utils import convert_int_default +from autosubmit_api.config.ymlConfigStrategy import ymlConfigStrategy +from autosubmit_api.config.basicConfig import APIBasicConfig +from autosubmit_api.components.jobs.job_factory import SimJob +from autosubmit_api.config.config_common import AutosubmitConfigResolver from abc import ABCMeta, abstractmethod -from ...common.utils import JobSection, parse_number_processors, timestamp_to_datetime_format, datechunk_to_year +from autosubmit_api.common.utils import JobSection, parse_number_processors, timestamp_to_datetime_format, datechunk_to_year from typing import List class ProjectType: @@ -123,6 +126,19 @@ class AutosubmitConfigurationFacade(ConfigurationFacade): self.chunk_size = self.autosubmit_conf.get_chunk_size() self.current_years_per_sim = datechunk_to_year(self.chunk_unit, self.chunk_size) self.sim_processors = self._get_processors_number(self.autosubmit_conf.get_processors(JobSection.SIM)) + + # Process for yml + if isinstance(self.autosubmit_conf._configWrapper, ymlConfigStrategy): + self.sim_tasks = convert_int_default(self.autosubmit_conf._configWrapper.get_tasks(JobSection.SIM)) + self.sim_nodes = convert_int_default(self.autosubmit_conf._configWrapper.get_nodes(JobSection.SIM)) + self.sim_processors_per_node = convert_int_default(self.autosubmit_conf._configWrapper.get_processors_per_node(JobSection.SIM)) + else: + self.sim_tasks = None + self.sim_nodes = None + self.sim_processors_per_node = None + + self.sim_processing_elements = self._calculate_processing_elements() + self.experiment_stat_data = os.stat(self.experiment_path) def get_pkl_last_modified_timestamp(self): @@ -242,7 +258,7 @@ class AutosubmitConfigurationFacade(ConfigurationFacade): # type: (List[SimJob]) -> None """ Update the jobs with the latest configuration values: Processors, years per sim """ for job in sim_jobs: - job.set_ncpus(self.sim_processors) + job.set_ncpus(self.sim_processing_elements) job.set_years_per_sim(self.current_years_per_sim) def _get_processors_number(self, conf_job_processors): @@ -264,3 +280,21 @@ class AutosubmitConfigurationFacade(ConfigurationFacade): def _add_warning(self, message): # type: (str) -> None self.warnings.append(message) + + def _estimate_requested_nodes(self) -> int: + if self.sim_nodes: + return self.sim_nodes + elif self.sim_tasks: + return math.ceil(self.sim_processors / self.sim_tasks) + elif self.sim_processors_per_node and self.sim_processors > self.sim_processors_per_node: + return math.ceil(self.sim_processors / self.sim_processors_per_node) + else: + return 1 + + def _calculate_processing_elements(self) -> int: + if self.sim_processors_per_node: + estimated_nodes = self._estimate_requested_nodes() + return estimated_nodes * self.sim_processors_per_node + elif self.sim_tasks or self.sim_nodes: + logger.warning('Missing PROCESSORS_PER_NODE. Should be set if TASKS or NODES are defined. The SIM PROCESSORS will used instead.') + return self.sim_processors \ No newline at end of file diff --git a/autosubmit_api/components/jobs/job_factory.py b/autosubmit_api/components/jobs/job_factory.py index db242e184b36402f25abfa3550d44e7209e1d28c..d4c4d5939a410c1856f7b7ed507d16d86ef60382 100644 --- a/autosubmit_api/components/jobs/job_factory.py +++ b/autosubmit_api/components/jobs/job_factory.py @@ -1,11 +1,10 @@ #!/usr/bin/env python import collections -import time -from ...common import utils as util -from . import utils as JUtils -from ...common.utils import Status -from ...monitor.monitor import Monitor -from ...history.data_classes.job_data import JobData +from autosubmit_api.common import utils as util +from autosubmit_api.components.jobs import utils as JUtils +from autosubmit_api.common.utils import Status +from autosubmit_api.monitor.monitor import Monitor +from autosubmit_api.history.data_classes.job_data import JobData from typing import Tuple, List, Dict, Set # from autosubmitAPIwu.database.db_jobdata import JobData from abc import ABCMeta, abstractmethod diff --git a/autosubmit_api/components/jobs/joblist_helper.py b/autosubmit_api/components/jobs/joblist_helper.py index 7c6820f63287e7ff22a25ed8cf6ddf0d7a384e11..997b272e7c669c7728024c1aecd49998ea02eda9 100644 --- a/autosubmit_api/components/jobs/joblist_helper.py +++ b/autosubmit_api/components/jobs/joblist_helper.py @@ -1,12 +1,12 @@ #!/usr/bin/env python -from ...autosubmit_legacy.job.job_list import JobList -from ...database.db_jobdata import JobDataStructure, JobRow -from ..experiment.configuration_facade import AutosubmitConfigurationFacade -from ..experiment.pkl_organizer import PklOrganizer -from ...config.basicConfig import APIBasicConfig -from ...autosubmit_legacy.job.job_utils import datechunk_to_year +from autosubmit_api.autosubmit_legacy.job.job_list import JobList +from autosubmit_api.common.utils import datechunk_to_year +from autosubmit_api.database.db_jobdata import JobDataStructure, JobRow +from autosubmit_api.components.experiment.configuration_facade import AutosubmitConfigurationFacade +from autosubmit_api.components.experiment.pkl_organizer import PklOrganizer +from autosubmit_api.config.basicConfig import APIBasicConfig from typing import List, Dict -from .job_factory import Job +from autosubmit_api.components.jobs.job_factory import Job class JobListHelper(object): """ Loads time (queuing runnning) and packages. Applies the fix for queue time of jobs in wrappers. """ diff --git a/autosubmit_api/components/jobs/joblist_loader.py b/autosubmit_api/components/jobs/joblist_loader.py index 6d66c309307cfdfd40ec41f7befd44565c5a27a5..1edc563f9fb63e9ceca08fcd02d09ac33af4e1a5 100644 --- a/autosubmit_api/components/jobs/joblist_loader.py +++ b/autosubmit_api/components/jobs/joblist_loader.py @@ -2,18 +2,13 @@ import os from fnmatch import fnmatch -from .joblist_helper import JobListHelper -from ..experiment.pkl_organizer import PklOrganizer -from ..experiment.configuration_facade import AutosubmitConfigurationFacade -from .job_factory import StandardJob, Job -from ...database.db_structure import get_structure -from ...autosubmit_legacy.job.job_common import Status -from bscearth.utils.date import date2str, parse_date -from typing import Dict, List, Set, Tuple +from autosubmit_api.components.jobs.joblist_helper import JobListHelper +from autosubmit_api.components.jobs.job_factory import StandardJob, Job +from autosubmit_api.database.db_structure import get_structure +from autosubmit_api.common.utils import Status +from bscearth.utils.date import date2str +from typing import Dict, List, Set # Builder Imports -from ...config.config_common import AutosubmitConfigResolver -from ...config.basicConfig import APIBasicConfig -import json import logging diff --git a/autosubmit_api/components/jobs/utils.py b/autosubmit_api/components/jobs/utils.py index b73eaf7b6ab5b8f09662e87f1e2b1d926bff7c72..95ec42d9ba94cb2956163afdeb7508af90cf1270 100644 --- a/autosubmit_api/components/jobs/utils.py +++ b/autosubmit_api/components/jobs/utils.py @@ -1,9 +1,10 @@ #!/usr/bin/env python -from datetime import date -from pickle import NONE -from ...autosubmit_legacy.job.job_common import Status -from typing import List, Dict +import datetime +import os +from autosubmit_api.common.utils import Status +from typing import List, Dict, Tuple +from bscearth.utils.date import parse_date wrapped_title_format = " Wrapped {0} " source_tag = " SOURCE" @@ -132,4 +133,111 @@ def get_folder_package_title(package_name, jobs_count, counters): get_folder_queuing_tag(counters[Status.QUEUING]), get_folder_held_tag(counters[Status.HELD]), get_folder_checkmark(counters[Status.COMPLETED], jobs_count) - ) \ No newline at end of file + ) + +def convert_int_default(value, default_value=None): + try: + return int(value) + except: + return default_value + +def get_job_total_stats(status_code: int, name: str, tmp_path: str) -> Tuple[datetime.datetime, datetime.datetime, datetime.datetime, str]: + """ + Receives job data and returns the data from its TOTAL_STATS file in an ordered way. + Function migrated from the legacy JobList class method _job_running_check() + :param status_code: Status of job + :param name: Name of job + :param tmp_path: Path to the tmp folder of the experiment + :return: submit time, start time, end time, status + :rtype: 4-tuple in datetime format + """ + values = list() + status_from_job = str(Status.VALUE_TO_KEY[status_code]) + now = datetime.datetime.now() + submit_time = now + start_time = now + finish_time = now + current_status = status_from_job + path = os.path.join(tmp_path, name + '_TOTAL_STATS') + if os.path.exists(path): + request = 'tail -1 ' + path + last_line = os.popen(request).readline() + # print(last_line) + + values = last_line.split() + # print(last_line) + try: + if status_code in [Status.RUNNING]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date(values[1]) if len( + values) > 1 else submit_time + finish_time = now + elif status_code in [Status.QUEUING, Status.SUBMITTED, Status.HELD]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date( + values[1]) if len(values) > 1 and values[0] != values[1] else now + elif status_code in [Status.COMPLETED]: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date( + values[1]) if len(values) > 1 else submit_time + if len(values) > 3: + finish_time = parse_date(values[len(values) - 2]) + else: + finish_time = submit_time + else: + submit_time = parse_date( + values[0]) if len(values) > 0 else now + start_time = parse_date(values[1]) if len( + values) > 1 else submit_time + finish_time = parse_date(values[2]) if len( + values) > 2 else start_time + except Exception as exp: + start_time = now + finish_time = now + # NA if reading fails + current_status = "NA" + + current_status = values[3] if (len(values) > 3 and len( + values[3]) != 14) else status_from_job + # TOTAL_STATS last line has more than 3 items, status is different from pkl, and status is not "NA" + if len(values) > 3 and current_status != status_from_job and current_status != "NA": + current_status = "SUSPICIOUS" + return (submit_time, start_time, finish_time, current_status) + + +def job_times_to_text(minutes_queue: int, minutes_running: int, status: str): + """ + Return text correpsonding to queue and running time. + Function migrated from the legacy job.utils + :param minutes_queue: seconds queuing (actually using seconds) + :type minutes_queue: int + :param minutes_running: seconds running (actually using seconds) + :type minutes_running: int + :param status: current status + :type status: string + :return: string + """ + if status in ["COMPLETED", "FAILED", "RUNNING"]: + running_text = "( " + str(datetime.timedelta(seconds=minutes_queue)) + \ + " ) + " + \ + str(datetime.timedelta(seconds=minutes_running)) + elif status in ["SUBMITTED", "QUEUING", "HELD", "HOLD"]: + running_text = "( " + \ + str(datetime.timedelta(seconds=minutes_queue)) + " )" + elif status in ["NA"]: + running_text = " NA" + else: + running_text = "" + + if status == "SUSPICIOUS": + running_text = running_text + \ + " SUSPICIOUS" + return running_text + + +def generate_job_html_title(job_name, status_color, status_text): + # type: (str, str, str) -> str + return job_name + " #" + status_text + "" \ No newline at end of file diff --git a/autosubmit_api/config/IConfigStrategy.py b/autosubmit_api/config/IConfigStrategy.py index c1524fe55210a4881fa178c6324545051970a2e3..cf552bdfc6c7a1a0a66fa9c1a2e1450f18e08c93 100644 --- a/autosubmit_api/config/IConfigStrategy.py +++ b/autosubmit_api/config/IConfigStrategy.py @@ -123,7 +123,12 @@ class IConfigStrategy(ABC): pass @abstractmethod - def get_job_platform(self, section): + def get_job_platform(self, section: str) -> str: + """ + Gets wallclock for the given job type + :param section: job type + :return: wallclock time + """ pass @abstractmethod @@ -162,13 +167,11 @@ class IConfigStrategy(ABC): """ pass - def get_processors(self, section): + def get_processors(self, section: str) -> str: """ Gets processors needed for the given job type :param section: job type - :type section: str :return: wallclock time - :rtype: str """ pass @@ -794,12 +797,11 @@ class IConfigStrategy(ABC): return True else: return False - @classmethod + def is_valid_communications_library(self): library = self.get_communications_library() return library in ['paramiko', 'saga'] - @classmethod def is_valid_storage_type(self): storage_type = self.get_storage_type() return storage_type in ['pkl', 'db'] diff --git a/autosubmit_api/config/__init__.py b/autosubmit_api/config/__init__.py index 95542ee6f93aa32f3ad3dc5eb3922f814c6ea750..c0ad1881567f2623c86843a13b2e53fce9221e70 100644 --- a/autosubmit_api/config/__init__.py +++ b/autosubmit_api/config/__init__.py @@ -3,13 +3,20 @@ from dotenv import load_dotenv load_dotenv() -JWT_SECRET = os.environ.get("SECRET_KEY", "M87;Z$,o5?MSC(/@#-LbzgE3PH-5ki.ZvS}N.s09v>I#v8I'00THrA-:ykh3HX?") # WARNING: Always provide a SECRET_KEY for production +# Auth +PROTECTION_LEVEL = os.environ.get("PROTECTION_LEVEL") +# WARNING: Always provide a SECRET_KEY for production +JWT_SECRET = os.environ.get( + "SECRET_KEY", "M87;Z$,o5?MSC(/@#-LbzgE3PH-5ki.ZvS}N.s09v>I#v8I'00THrA-:ykh3HX?") JWT_ALGORITHM = "HS256" JWT_EXP_DELTA_SECONDS = 84000*5 # 5 days -RUN_BACKGROUND_TASKS_ON_START = True if os.environ.get("RUN_BACKGROUND_TASKS_ON_START") in ["True", "T", "true"] else False # Default dalse - # CAS Stuff -CAS_LOGIN_URL = os.environ.get("CAS_LOGIN_URL") # e.g: 'https://cas.bsc.es/cas/login' -CAS_VERIFY_URL = os.environ.get("CAS_VERIFY_URL") # e.g: 'https://cas.bsc.es/cas/serviceValidate' +# e.g: 'https://cas.bsc.es/cas/login' +CAS_LOGIN_URL = os.environ.get("CAS_LOGIN_URL") +# e.g: 'https://cas.bsc.es/cas/serviceValidate' +CAS_VERIFY_URL = os.environ.get("CAS_VERIFY_URL") +# Startup options +RUN_BACKGROUND_TASKS_ON_START = os.environ.get("RUN_BACKGROUND_TASKS_ON_START") in [ + "True", "T", "true"] # Default false diff --git a/autosubmit_api/config/basicConfig.py b/autosubmit_api/config/basicConfig.py index 616eb826b09a08eabd0f62f995d00cc7ff0e2655..fdb9e57013a7bc2281effaed95112a221afd9a07 100644 --- a/autosubmit_api/config/basicConfig.py +++ b/autosubmit_api/config/basicConfig.py @@ -27,14 +27,14 @@ class APIBasicConfig(BasicConfig): Extended class to manage configuration for Autosubmit path, database and default values for new experiments in the Autosubmit API """ - GRAPHDATA_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'graph') - FILE_STATUS_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'test') + GRAPHDATA_DIR = os.path.join(os.path.expanduser('~'), 'autosubmit', 'as_metadata', 'graph') + FILE_STATUS_DIR = os.path.join(os.path.expanduser('~'), 'autosubmit', 'as_metadata', 'test') FILE_STATUS_DB = 'status.db' - ALLOWED_CLIENTS = set(['https://earth.bsc.es/']) + ALLOWED_CLIENTS = set([]) @staticmethod def __read_file_config(file_path): - super().__read_file_config(file_path) + # WARNING: Is unsafe to call this method directly. Doing APIBasicConfig.__read_file_config doesn't run BasicConfig.__read_file_config if not os.path.isfile(file_path): return @@ -50,4 +50,28 @@ class APIBasicConfig(BasicConfig): if parser.has_option('statusdb', 'filename'): APIBasicConfig.FILE_STATUS_DB = parser.get('statusdb', 'filename') if parser.has_option('clients', 'authorized'): - APIBasicConfig.ALLOWED_CLIENTS = set(parser.get('clients', 'authorized').split()) \ No newline at end of file + APIBasicConfig.ALLOWED_CLIENTS = set(parser.get('clients', 'authorized').split()) + + + @staticmethod + def read(): + BasicConfig.read() # This is done to run BasicConfig.__read_file_config indirectly + + filename = 'autosubmitrc' + if 'AUTOSUBMIT_CONFIGURATION' in os.environ and os.path.exists(os.environ['AUTOSUBMIT_CONFIGURATION']): + config_file_path = os.environ['AUTOSUBMIT_CONFIGURATION'] + # Call read_file_config with the value of the environment variable + APIBasicConfig.__read_file_config(config_file_path) + else: + if os.path.exists(os.path.join('', '.' + filename)): + APIBasicConfig.__read_file_config(os.path.join('', '.' + filename)) + elif os.path.exists(os.path.join(os.path.expanduser('~'), '.' + filename)): + APIBasicConfig.__read_file_config(os.path.join( + os.path.expanduser('~'), '.' + filename)) + else: + APIBasicConfig.__read_file_config(os.path.join('/etc', filename)) + + # Check if the environment variable is defined + + APIBasicConfig._update_config() + return \ No newline at end of file diff --git a/autosubmit_api/config/config_common.py b/autosubmit_api/config/config_common.py index 0bf282b3b0802eb7a9efd08d846b4bf2f4771514..1f2aad5e616697cb30ac3714140252b4f7bbdf27 100644 --- a/autosubmit_api/config/config_common.py +++ b/autosubmit_api/config/config_common.py @@ -48,10 +48,10 @@ class AutosubmitConfigResolver(object): # check which type of config files (AS3 or AS4) expdef_conf_file = os.path.join(self.basic_config.LOCAL_ROOT_DIR, expid, "conf", "expdef_" + expid + ".conf") if os.path.exists(expdef_conf_file): - logger.info("Setting AS3 Config strategy - conf") + # logger.info("Setting AS3 Config strategy - conf") self._configWrapper = confConfigStrategy(expid, basic_config, parser_factory, ".conf") else: - logger.info("Setting AS4 Config strategy - yml") + # logger.info("Setting AS4 Config strategy - yml") self._configWrapper = ymlConfigStrategy(expid, basic_config, parser_factory, ".yml") diff --git a/autosubmit_api/config/ymlConfigStrategy.py b/autosubmit_api/config/ymlConfigStrategy.py index a0eb76e2879e3182cd41de609d83e8882cebf71e..bb68afdcae473da463b18feb510e84893888fba7 100644 --- a/autosubmit_api/config/ymlConfigStrategy.py +++ b/autosubmit_api/config/ymlConfigStrategy.py @@ -17,28 +17,12 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -try: - # noinspection PyCompatibility - from configparser import SafeConfigParser - from autosubmitconfigparser.config.configcommon import AutosubmitConfig as Autosubmit4Config -except ImportError: - # noinspection PyCompatibility - from configparser import SafeConfigParser - -import os -import re -import subprocess -import json -import logging - -from pyparsing import nestedExpr -from bscearth.utils.config_parser import ConfigParserFactory, ConfigParser -from bscearth.utils.date import parse_date -from bscearth.utils.log import Log -from ..config.basicConfig import APIBasicConfig -from ..config.IConfigStrategy import IConfigStrategy - -logger = logging.getLogger('gunicorn.error') +from typing import Any +from autosubmitconfigparser.config.configcommon import AutosubmitConfig as Autosubmit4Config +from autosubmit_api.logger import logger +from autosubmit_api.config.basicConfig import APIBasicConfig +from autosubmit_api.config.IConfigStrategy import IConfigStrategy + class ymlConfigStrategy(IConfigStrategy): """ @@ -48,12 +32,12 @@ class ymlConfigStrategy(IConfigStrategy): :type expid: str """ def __init__(self, expid, basic_config = APIBasicConfig, parser_factory = None, extension=".yml"): - logger.info("Creating AS4 Parser !!!!!") + # logger.info("Creating AS4 Parser !!!!!") self._conf_parser = Autosubmit4Config(expid, basic_config) self._conf_parser.reload(True) def jobs_parser(self): - logger.info("Not yet implemented") + logger.error("Not yet implemented") pass #TODO: at the end of the implementation, check which methods can be moved to the top class for avoid code duplication @@ -65,7 +49,7 @@ class ymlConfigStrategy(IConfigStrategy): return self._exp_parser_file def platforms_parser(self): - logger.info("OBSOLOTED - Not yet implemented") + logger.error("OBSOLOTED - Not yet implemented") pass @property @@ -92,35 +76,32 @@ class ymlConfigStrategy(IConfigStrategy): """ return self._jobs_parser_file - def get_full_config_as_dict(self): + def get_full_config_as_dict(self) -> dict: """ Returns full configuration as json object """ - _conf = _exp = _platforms = _jobs = _proj = None - result = {} - - def get_data( parser): - """ - dictionary comprehension to get data from parser - """ - logger.info(parser) - #res = {sec: {option: parser[sec][option] for option in parser[sec].keys()} for sec in [ - # section for section in parser.keys()]} - #return res - return parser - - # res = {sec: {option: parser.get(sec, option) for option in parser.options(sec)} for sec in [ - # section for section in parser.sections()]} - - - # print(self._conf_parser) - #result["conf"] = get_data( self._conf_parser.experiment_data["CONF"]) if self._conf_parser else None - #result["exp"] = get_data( self._conf_parser.experiment_data["CONF"]) if self._exp_parser else None - result["platforms"] = self._conf_parser.platforms_data if self._conf_parser.platforms_data else None - #result["jobs"] = get_data( self._conf_parser.experiment_data["JOBS"]) if self._conf_parser.experiment_data["JOBS"] else None - #result["proj"] = get_data( self._conf_parser.experiment_data["CONF"] ) if self._proj_parser else None - return result + return self._conf_parser.experiment_data + + def _get_platform_config(self, platform: str) -> dict: + return self._conf_parser.experiment_data.get("PLATFORMS", {}).get(platform, {}) + def _get_from_job_or_platform(self, section: str, config_key: str, default_value = None) -> Any: + """ + Helper function to get a value from a JOB section of its related PLATFORM + """ + # Check in job + logger.debug("Checking " + config_key + " in JOBS") + value = self._conf_parser.jobs_data.get(section, {}).get(config_key) + if value: + return value + + # Check in job platform + logger.debug("Checking " + config_key + " in PLATFORMS " + self.get_job_platform(section)) + value = self._get_platform_config(self.get_job_platform(section)).get(config_key) + if value: + return value + else: + return default_value def get_full_config_as_json(self): return self._conf_parser.get_full_config_as_json() @@ -128,52 +109,65 @@ class ymlConfigStrategy(IConfigStrategy): def get_project_dir(self): return self._conf_parser.get_project_dir() - def get_queue(self, section): + def get_queue(self, section: str) -> str: return self._conf_parser.jobs_data[section].get('QUEUE', "") - def get_job_platform(self, section): - pass + def get_job_platform(self, section: str) -> str: + # return the JOBS.
.PLATFORM or DEFAULT.HPCARCH + return self._conf_parser.jobs_data.get(section, {}).get("PLATFORM", self.get_platform()) - def get_platform_queue(self, platform): - logger.info("get_platform_queue") - return self._conf_parser.platforms_data[platform]["QUEUE"] + def get_platform_queue(self, platform: str) -> str: + logger.debug("get_platform_queue") + return self._get_platform_config(platform).get("QUEUE") - def get_platform_serial_queue(self, platform): - logger.info("get_platform_serial_queue") - return self._conf_parser.platforms_data[platform]["SERIAL_QUEUE"] + def get_platform_serial_queue(self, platform: str) -> str: + logger.debug("get_platform_serial_queue") + return self._get_platform_config(platform).get("SERIAL_QUEUE") - def get_platform_project(self, platform): - logger.info("get_platform_project") - return self._conf_parser.platforms_data[platform]["PROJECT"] + def get_platform_project(self, platform: str) -> str: + logger.debug("get_platform_project") + return self._get_platform_config(platform).get("PROJECT") - def get_platform_wallclock(self, platform): - logger.info("get_platform_wallclock") - return self._conf_parser.platforms_data[platform].get('MAX_WALLCLOCK', "") + def get_platform_wallclock(self, platform: str) -> str: + logger.debug("get_platform_wallclock") + return self._get_platform_config(platform).get('MAX_WALLCLOCK', "") - def get_wallclock(self, section): + def get_wallclock(self, section: str) -> str: return self._conf_parser.jobs_data[section].get('WALLCLOCK', '') + def get_synchronize(self, section: str) -> str: + # return self._conf_parser.get_synchronize(section) + return self._get_from_job_or_platform(section, "SYNCHRONIZE", "") - def get_synchronize(self, section): - return self._conf_parser.get_synchronize(section) + def get_processors(self, section: str) -> str: + # return self._conf_parser.get_processors() + return self._get_from_job_or_platform(section, "PROCESSORS", "1") - def get_processors(self, section): - return self._conf_parser.jobs_data.get(section, {}).get("PROCESSORS", "1") + def get_threads(self, section: str) -> str: + # return self._conf_parser.get_threads(section) + return self._get_from_job_or_platform(section, "THREADS", "1") - def get_threads(self, section): - return self._conf_parser.get_threads(section) + def get_tasks(self, section: str) -> str: + # return self._conf_parser.get_tasks(section) + return self._get_from_job_or_platform(section, "TASKS", "") + + def get_nodes(self, section: str) -> str: + return self._get_from_job_or_platform(section, "NODES", "") - def get_tasks(self, section): - return self._conf_parser.get_tasks(section) + def get_processors_per_node(self, section: str) -> str: + return self._get_from_job_or_platform(section, "PROCESSORS_PER_NODE", "") - def get_scratch_free_space(self, section): - return self._conf_parser.get_scratch_free_space(section) + def get_scratch_free_space(self, section: str) -> str: + # return self._conf_parser.get_scratch_free_space(section) + return self._get_from_job_or_platform(section, "SCRATCH_FREE_SPACE", "") - def get_memory(self, section): - return self._conf_parser.get_memory(section) + def get_memory(self, section: str) -> str: + # return self._conf_parser.get_memory(section) + return self._get_from_job_or_platform(section, "MEMORY", "") - def get_memory_per_task(self, section): - return self._conf_parser.get_memory_per_task(section) + def get_memory_per_task(self, section: str) -> str: + # return self._conf_parser.get_memory_per_task(section) + return self._get_from_job_or_platform(section, "MEMORY_PER_TASK", "") def get_migrate_user_to(self, section): """ @@ -324,8 +318,12 @@ class ymlConfigStrategy(IConfigStrategy): def get_chunk_list(self): return self._conf_parser.get_chunk_list() - def get_platform(self): - return self._conf_parser.get_platform() + def get_platform(self) -> str: + try: + # return DEFAULT.HPCARCH + return self._conf_parser.get_platform() + except: + return "" def set_platform(self, hpc): self._conf_parser.set_platform(hpc) @@ -400,14 +398,12 @@ class ymlConfigStrategy(IConfigStrategy): return self._conf_parser.get_storage_type() @staticmethod - def is_valid_mail_address(mail_address): - return self._conf_parser.is_valid_mail_address(mail_address) + def is_valid_mail_address(mail_address: str) -> bool: + return Autosubmit4Config.is_valid_mail_address(mail_address) - @classmethod def is_valid_communications_library(self): return self._conf_parser.is_valid_communications_library() - @classmethod def is_valid_storage_type(self): return self._conf_parser.is_valid_storage_type() diff --git a/autosubmit_api/database/autosubmit.py b/autosubmit_api/database/autosubmit.py deleted file mode 100644 index 86a94261584e811b8d959b3df41793a2d83d1f4b..0000000000000000000000000000000000000000 --- a/autosubmit_api/database/autosubmit.py +++ /dev/null @@ -1,3501 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2017 Earth Sciences Department, BSC-CNS - -# This file is part of Autosubmit. - -# Autosubmit is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# Autosubmit is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . -# pipeline_test - - -import traceback -from pyparsing import nestedExpr -from collections import defaultdict -from distutils.util import strtobool -from pkg_resources import require, resource_listdir, resource_exists, resource_string -import portalocker -import datetime -import signal -import random -import re -import shutil -import sys -import pwd -import os -import copy -import time -import tarfile -import json -import subprocess -import argparse - -sys.path.insert(0, os.path.abspath('.')) -from autosubmit_api.config.basicConfig import APIBasicConfig -from autosubmit_api.config.config_common import AutosubmitConfigResolver -from bscearth.utils.config_parser import ConfigParserFactory -from autosubmit_api.autosubmit_legacy.job.job_common import Status -from autosubmit_api.git.autosubmit_git import AutosubmitGit -from autosubmit_api.autosubmit_legacy.job.job_list import JobList -from autosubmit_api.autosubmit_legacy.job.job_packages import JobPackageThread -from autosubmit_api.autosubmit_legacy.job.job_package_persistence import JobPackagePersistence -from autosubmit_api.autosubmit_legacy.job.job_list_persistence import JobListPersistenceDb -from autosubmit_api.autosubmit_legacy.job.job_list_persistence import JobListPersistencePkl -from autosubmit_api.autosubmit_legacy.job.job_grouping import JobGrouping -from bscearth.utils.log import Log -from autosubmit_api.database.db_common import create_db -from autosubmit_api.experiment.experiment_common import new_experiment -from autosubmit_api.experiment.experiment_common import copy_experiment -from autosubmit_api.database.db_common import delete_experiment -from autosubmit_api.database.db_common import get_autosubmit_version -from autosubmit_api.monitor.monitor import Monitor -from bscearth.utils.date import date2str -from autosubmit_api.notifications.mail_notifier import MailNotifier -from autosubmit_api.notifications.notifier import Notifier -from autosubmit_api.autosubmit_legacy.platforms.saga_submitter import SagaSubmitter -from autosubmit_api.autosubmit_legacy.platforms.paramiko_submitter import ParamikoSubmitter -from autosubmit_api.autosubmit_legacy.job.job_exceptions import WrongTemplateException -from autosubmit_api.autosubmit_legacy.job.job_packager import JobPackager -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoTimeout -""" -Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit -""" - -try: - # noinspection PyCompatibility - from configparser import SafeConfigParser -except ImportError: - # noinspection PyCompatibility - from configparser import SafeConfigParser - -# It is Python dialog available? (optional dependency) -try: - import dialog -except Exception: - dialog = None - - -# noinspection PyPackageRequirements -# noinspection PyPackageRequirements -# noinspection PyPackageRequirements - -# noinspection PyUnusedLocal - - -def signal_handler(signal_received, frame): - """ - Used to handle interrupt signals, allowing autosubmit to clean before exit - - :param signal_received: - :param frame: - """ - Log.info('Autosubmit will interrupt at the next safe occasion') - Autosubmit.exit = True - - -class Autosubmit: - """ - Interface class for autosubmit. - """ - # sys.setrecursionlimit(500000) - # # Get the version number from the relevant file. If not, from autosubmit package - # scriptdir = os.path.abspath(os.path.dirname(__file__)) - - # if not os.path.exists(os.path.join(scriptdir, 'VERSION')): - # scriptdir = os.path.join(scriptdir, os.path.pardir) - - # version_path = os.path.join(scriptdir, 'VERSION') - # readme_path = os.path.join(scriptdir, 'README') - # changes_path = os.path.join(scriptdir, 'CHANGELOG') - # if os.path.isfile(version_path): - # with open(version_path) as f: - # autosubmit_version = f.read().strip() - # else: - # autosubmit_version = require("autosubmitAPIwu")[0].version - - exit = False - - @staticmethod - def parse_args(): - """ - Parse arguments given to an executable and start execution of command given - """ - try: - APIBasicConfig.read() - - parser = argparse.ArgumentParser( - description='Main executable for autosubmit. ') - parser.add_argument('-v', '--version', action='version', version=Autosubmit.autosubmit_version, - help="returns autosubmit's version number and exit") - parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', - 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), - default='DEBUG', type=str, - help="sets file's log level.") - parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', - 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), - default='INFO', type=str, - help="sets console's log level") - - subparsers = parser.add_subparsers(dest='command') - - # Run - subparser = subparsers.add_parser( - 'run', description="runs specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - - # Expid - subparser = subparsers.add_parser( - 'expid', description="Creates a new experiment") - group = subparser.add_mutually_exclusive_group() - group.add_argument( - '-y', '--copy', help='makes a copy of the specified experiment') - group.add_argument('-dm', '--dummy', action='store_true', - help='creates a new experiment with default values, usually for testing') - group.add_argument('-op', '--operational', action='store_true', - help='creates a new experiment with operational experiment id') - subparser.add_argument('-H', '--HPC', required=True, - help='specifies the HPC to use for the experiment') - subparser.add_argument('-d', '--description', type=str, required=True, - help='sets a description for the experiment to store in the database.') - - # Delete - subparser = subparsers.add_parser( - 'delete', description="delete specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument( - '-f', '--force', action='store_true', help='deletes experiment without confirmation') - - # Monitor - subparser = subparsers.add_parser( - 'monitor', description="plots specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', - help='chooses type of output for generated plot') - subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, - help='Groups the jobs automatically or by date, member, chunk or split') - subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument( - '-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('--hide_groups', action='store_true', - default=False, help='Hides the groups from the plot') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', - default=False, help='Generate possible wrapper in the current workflow') - - group2 = subparser.add_mutually_exclusive_group(required=False) - - group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', - 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), - help='Select the original status to filter the list of jobs') - group = subparser.add_mutually_exclusive_group(required=False) - group.add_argument('-fl', '--list', type=str, - help='Supply the list of job names to be filtered. Default = "Any". ' - 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') - group.add_argument('-fc', '--filter_chunks', type=str, - help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', - 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), - help='Select the original status to filter the list of jobs') - group.add_argument('-ft', '--filter_type', type=str, - help='Select the job type to filter the list of jobs') - subparser.add_argument('--hide', action='store_true', default=False, - help='hides plot window') - group2.add_argument('--txt', action='store_true', default=False, - help='Generates only txt status file') - - group2.add_argument('-txtlog', '--txt_logfiles', action='store_true', default=False, - help='Generates only txt status file(AS < 3.12b behaviour)') - - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - - # Stats - subparser = subparsers.add_parser( - 'stats', description="plots statistics for specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter ' - 'the list of jobs') - subparser.add_argument('-fp', '--filter_period', type=int, help='Select the period to filter jobs ' - 'from current time to the past ' - 'in number of hours back') - subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', - help='type of output for generated plot') - subparser.add_argument('--hide', action='store_true', default=False, - help='hides plot window') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - - # Clean - subparser = subparsers.add_parser( - 'clean', description="clean specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument( - '-pr', '--project', action="store_true", help='clean project') - subparser.add_argument('-p', '--plot', action="store_true", - help='clean plot, only 2 last will remain') - subparser.add_argument('-s', '--stats', action="store_true", - help='clean stats, only last will remain') - - # Recovery - subparser = subparsers.add_parser( - 'recovery', description="recover specified experiment") - subparser.add_argument( - 'expid', type=str, help='experiment identifier') - subparser.add_argument( - '-np', '--noplot', action='store_true', default=False, help='omit plot') - subparser.add_argument('--all', action="store_true", default=False, - help='Get completed files to synchronize pkl') - subparser.add_argument( - '-s', '--save', action="store_true", default=False, help='Save changes to disk') - subparser.add_argument('--hide', action='store_true', default=False, - help='hides plot window') - subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, - help='Groups the jobs automatically or by date, member, chunk or split') - subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument( - '-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - subparser.add_argument('-nl', '--no_recover_logs', action='store_true', default=False, - help='Disable logs recovery') - # Migrate - subparser = subparsers.add_parser( - 'migrate', description="Migrate experiments from current user to another") - subparser.add_argument('expid', help='experiment identifier') - group = subparser.add_mutually_exclusive_group(required=True) - group.add_argument('-o', '--offer', action="store_true", - default=False, help='Offer experiment') - group.add_argument('-p', '--pickup', action="store_true", - default=False, help='Pick-up released experiment') - - # Inspect - subparser = subparsers.add_parser( - 'inspect', description="Generate all .cmd files") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - subparser.add_argument( - '-f', '--force', action="store_true", help='Overwrite all cmd') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', - default=False, help='Generate possible wrapper in the current workflow') - - group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', - 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), - help='Select the original status to filter the list of jobs') - group = subparser.add_mutually_exclusive_group(required=False) - group.add_argument('-fl', '--list', type=str, - help='Supply the list of job names to be filtered. Default = "Any". ' - 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') - group.add_argument('-fc', '--filter_chunks', type=str, - help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - group.add_argument('-fs', '--filter_status', type=str, - choices=('Any', 'READY', 'COMPLETED', - 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), - help='Select the original status to filter the list of jobs') - group.add_argument('-ft', '--filter_type', type=str, - help='Select the job type to filter the list of jobs') - - # Check - subparser = subparsers.add_parser( - 'check', description="check configuration for specified experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - # Describe - subparser = subparsers.add_parser( - 'describe', description="Show details for specified experiment") - subparser.add_argument('expid', help='experiment identifier') - - # Create - subparser = subparsers.add_parser( - 'create', description="create specified experiment joblist") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument( - '-np', '--noplot', action='store_true', default=False, help='omit plot') - subparser.add_argument('--hide', action='store_true', default=False, - help='hides plot window') - subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', - help='chooses type of output for generated plot') - subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, - help='Groups the jobs automatically or by date, member, chunk or split') - subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument( - '-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', - default=False, help='Generate possible wrapper in the current workflow') - - # Configure - subparser = subparsers.add_parser('configure', description="configure database and path for autosubmit. It " - "can be done at machine, user or local level." - "If no arguments specified configure will " - "display dialog boxes (if installed)") - subparser.add_argument( - '--advanced', action="store_true", help="Open advanced configuration of autosubmit") - subparser.add_argument('-db', '--databasepath', default=None, help='path to database. If not supplied, ' - 'it will prompt for it') - subparser.add_argument( - '-dbf', '--databasefilename', default=None, help='database filename') - subparser.add_argument('-lr', '--localrootpath', default=None, help='path to store experiments. If not ' - 'supplied, it will prompt for it') - subparser.add_argument('-pc', '--platformsconfpath', default=None, help='path to platforms.conf file to ' - 'use by default. Optional') - subparser.add_argument('-jc', '--jobsconfpath', default=None, help='path to jobs.conf file to use by ' - 'default. Optional') - subparser.add_argument( - '-sm', '--smtphostname', default=None, help='STMP server hostname. Optional') - subparser.add_argument( - '-mf', '--mailfrom', default=None, help='Notifications sender address. Optional') - group = subparser.add_mutually_exclusive_group() - group.add_argument('--all', action="store_true", - help='configure for all users') - group.add_argument('--local', action="store_true", help='configure only for using Autosubmit from this ' - 'path') - - # Install - subparsers.add_parser( - 'install', description='install database for autosubmit on the configured folder') - - # Set status - subparser = subparsers.add_parser( - 'setstatus', description="sets job status for an experiment") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument( - '-np', '--noplot', action='store_true', default=False, help='omit plot') - subparser.add_argument( - '-s', '--save', action="store_true", default=False, help='Save changes to disk') - - subparser.add_argument('-t', '--status_final', - choices=('READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', - 'QUEUING', 'RUNNING'), - required=True, - help='Supply the target status') - group = subparser.add_mutually_exclusive_group(required=True) - group.add_argument('-fl', '--list', type=str, - help='Supply the list of job names to be changed. Default = "Any". ' - 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') - group.add_argument('-fc', '--filter_chunks', type=str, - help='Supply the list of chunks to change the status. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - group.add_argument('-fs', '--filter_status', type=str, - help='Select the status (one or more) to filter the list of jobs.' - "Valid values = ['Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN']") - group.add_argument('-ft', '--filter_type', type=str, - help='Select the job type to filter the list of jobs') - - subparser.add_argument('--hide', action='store_true', default=False, - help='hides plot window') - subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, - help='Groups the jobs automatically or by date, member, chunk or split') - subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument( - '-expand_status', type=str, help='Select the statuses to be expanded') - subparser.add_argument('-nt', '--notransitive', action='store_true', - default=False, help='Disable transitive reduction') - subparser.add_argument('-cw', '--check_wrapper', action='store_true', - default=False, help='Generate possible wrapper in the current workflow') - - # Test Case - subparser = subparsers.add_parser( - 'testcase', description='create test case experiment') - subparser.add_argument( - '-y', '--copy', help='makes a copy of the specified experiment') - subparser.add_argument( - '-d', '--description', required=True, help='description of the test case') - subparser.add_argument('-c', '--chunks', help='chunks to run') - subparser.add_argument('-m', '--member', help='member to run') - subparser.add_argument('-s', '--stardate', help='stardate to run') - subparser.add_argument( - '-H', '--HPC', required=True, help='HPC to run experiment on it') - subparser.add_argument( - '-b', '--branch', help='branch of git to run (or revision from subversion)') - - # Test - subparser = subparsers.add_parser( - 'test', description='test experiment') - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument( - '-c', '--chunks', required=True, help='chunks to run') - subparser.add_argument('-m', '--member', help='member to run') - subparser.add_argument('-s', '--stardate', help='stardate to run') - subparser.add_argument( - '-H', '--HPC', help='HPC to run experiment on it') - subparser.add_argument( - '-b', '--branch', help='branch of git to run (or revision from subversion)') - - # Refresh - subparser = subparsers.add_parser( - 'refresh', description='refresh project directory for an experiment') - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-mc', '--model_conf', default=False, action='store_true', - help='overwrite model conf file') - subparser.add_argument('-jc', '--jobs_conf', default=False, action='store_true', - help='overwrite jobs conf file') - - # Archive - subparser = subparsers.add_parser( - 'archive', description='archives an experiment') - subparser.add_argument('expid', help='experiment identifier') - - # Unarchive - subparser = subparsers.add_parser( - 'unarchive', description='unarchives an experiment') - subparser.add_argument('expid', help='experiment identifier') - - # Readme - subparsers.add_parser('readme', description='show readme') - - # Changelog - subparsers.add_parser('changelog', description='show changelog') - - args = parser.parse_args() - - Log.set_console_level(args.logconsole) - Log.set_file_level(args.logfile) - - if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive) - elif args.command == 'expid': - return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, - args.operational) != '' - elif args.command == 'delete': - return Autosubmit.delete(args.expid, args.force) - elif args.command == 'monitor': - return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, - args.filter_type, args.hide, args.txt, args.group_by, args.expand, - args.expand_status, args.hide_groups, args.notransitive, args.check_wrapper, args.txt_logfiles) - elif args.command == 'stats': - return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, - args.notransitive) - elif args.command == 'clean': - return Autosubmit.clean(args.expid, args.project, args.plot, args.stats) - elif args.command == 'recovery': - return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide, args.group_by, - args.expand, args.expand_status, args.notransitive, args.no_recover_logs) - elif args.command == 'check': - return Autosubmit.check(args.expid, args.notransitive) - elif args.command == 'inspect': - return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, - args.filter_type, args.notransitive, args.force, args.check_wrapper) - elif args.command == 'describe': - return Autosubmit.describe(args.expid) - elif args.command == 'migrate': - return Autosubmit.migrate(args.expid, args.offer, args.pickup) - elif args.command == 'create': - return Autosubmit.create(args.expid, args.noplot, args.hide, args.output, args.group_by, args.expand, - args.expand_status, args.notransitive, args.check_wrapper) - elif args.command == 'configure': - if not args.advanced or (args.advanced and dialog is None): - return Autosubmit.configure(args.advanced, args.databasepath, args.databasefilename, - args.localrootpath, args.platformsconfpath, args.jobsconfpath, - args.smtphostname, args.mailfrom, args.all, args.local) - else: - return Autosubmit.configure_dialog() - elif args.command == 'install': - return Autosubmit.install() - elif args.command == 'setstatus': - return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, - args.filter_chunks, args.filter_status, args.filter_type, args.hide, - args.group_by, args.expand, args.expand_status, args.notransitive, args.check_wrapper) - elif args.command == 'testcase': - return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, - args.HPC, args.branch) - elif args.command == 'test': - return Autosubmit.test(args.expid, args.chunks, args.member, args.stardate, args.HPC, args.branch) - elif args.command == 'refresh': - return Autosubmit.refresh(args.expid, args.model_conf, args.jobs_conf) - elif args.command == 'archive': - return Autosubmit.archive(args.expid) - elif args.command == 'unarchive': - return Autosubmit.unarchive(args.expid) - - elif args.command == 'readme': - if os.path.isfile(Autosubmit.readme_path): - with open(Autosubmit.readme_path) as f: - print(f.read()) - return True - return False - elif args.command == 'changelog': - if os.path.isfile(Autosubmit.changes_path): - with open(Autosubmit.changes_path) as f: - print(f.read()) - return True - return False - except Exception as e: - from traceback import format_exc - Log.critical( - 'Unhandled exception on Autosubmit: {0}\n{1}', e, format_exc(10)) - - return False - - @staticmethod - def _delete_expid(expid_delete): - """ - Removes an experiment from path and database - - :type expid_delete: str - :param expid_delete: identifier of the experiment to delete - """ - if expid_delete == '' or expid_delete is None and not os.path.exists(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, - expid_delete)): - Log.info("Experiment directory does not exist.") - else: - Log.info("Removing experiment directory...") - ret = False - if pwd.getpwuid(os.stat(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name == os.getlogin(): - try: - - shutil.rmtree(os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, expid_delete)) - except OSError as e: - Log.warning('Can not delete experiment folder: {0}', e) - return ret - Log.info("Deleting experiment from database...") - ret = delete_experiment(expid_delete) - if ret: - Log.result("Experiment {0} deleted".format(expid_delete)) - else: - Log.warning( - "Current User is not the Owner {0} can not be deleted!", expid_delete) - return ret - - @staticmethod - def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False): - """ - Creates a new experiment for given HPC - - :param operational: if true, creates an operational experiment - :type operational: bool - :type hpc: str - :type description: str - :type copy_id: str - :type dummy: bool - :param hpc: name of the main HPC for the experiment - :param description: short experiment's description. - :param copy_id: experiment identifier of experiment to copy - :param dummy: if true, writes a default dummy configuration for testing - :param test: if true, creates an experiment for testing - :return: experiment identifier. If method fails, returns ''. - :rtype: str - """ - APIBasicConfig.read() - - log_path = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, 'ASlogs', 'expid.log'.format(os.getuid())) - try: - Log.set_file(log_path) - except IOError as e: - Log.error("Can not create log file in path {0}: {1}".format( - log_path, e.message)) - exp_id = None - if description is None: - Log.error("Missing experiment description.") - return '' - if hpc is None: - Log.error("Missing HPC.") - return '' - if not copy_id: - exp_id = new_experiment( - description, Autosubmit.autosubmit_version, test, operational) - if exp_id == '': - return '' - try: - os.mkdir(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, exp_id)) - - os.mkdir(os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, exp_id, 'conf')) - Log.info("Copying config files...") - - # autosubmit config and experiment copied from AS. - files = resource_listdir('autosubmit.config', 'files') - for filename in files: - if resource_exists('autosubmit.config', 'files/' + filename): - index = filename.index('.') - new_filename = filename[:index] + \ - "_" + exp_id + filename[index:] - - if filename == 'platforms.conf' and APIBasicConfig.DEFAULT_PLATFORMS_CONF != '': - content = open(os.path.join( - APIBasicConfig.DEFAULT_PLATFORMS_CONF, filename)).read() - elif filename == 'jobs.conf' and APIBasicConfig.DEFAULT_JOBS_CONF != '': - content = open(os.path.join( - APIBasicConfig.DEFAULT_JOBS_CONF, filename)).read() - else: - content = resource_string( - 'autosubmit.config', 'files/' + filename) - - conf_new_filename = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, exp_id, "conf", new_filename) - Log.debug(conf_new_filename) - open(conf_new_filename, 'w').write(content) - Autosubmit._prepare_conf_files( - exp_id, hpc, Autosubmit.autosubmit_version, dummy) - except (OSError, IOError) as e: - Log.error( - "Can not create experiment: {0}\nCleaning...".format(e)) - Autosubmit._delete_expid(exp_id) - return '' - else: - try: - if os.path.exists(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, copy_id)): - exp_id = copy_experiment( - copy_id, description, Autosubmit.autosubmit_version, test, operational) - if exp_id == '': - return '' - dir_exp_id = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, exp_id) - os.mkdir(dir_exp_id) - os.mkdir(dir_exp_id + '/conf') - Log.info("Copying previous experiment config directories") - conf_copy_id = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, copy_id, "conf") - files = os.listdir(conf_copy_id) - for filename in files: - if os.path.isfile(os.path.join(conf_copy_id, filename)): - new_filename = filename.replace(copy_id, exp_id) - content = open(os.path.join( - conf_copy_id, filename), 'r').read() - open(os.path.join(dir_exp_id, "conf", - new_filename), 'w').write(content) - Autosubmit._prepare_conf_files( - exp_id, hpc, Autosubmit.autosubmit_version, dummy) - ##### - autosubmit_config = AutosubmitConfigResolver( - copy_id, APIBasicConfig, ConfigParserFactory()) - if autosubmit_config.check_conf_files(): - project_type = autosubmit_config.get_project_type() - if project_type == "git": - autosubmit_config.check_proj() - autosubmit_git = AutosubmitGit(copy_id[0]) - Log.info("checking model version...") - if not autosubmit_git.check_commit(autosubmit_config): - return False - ##### - else: - Log.critical( - "The previous experiment directory does not exist") - return '' - except (OSError, IOError) as e: - Log.error( - "Can not create experiment: {0}\nCleaning...".format(e)) - Autosubmit._delete_expid(exp_id) - return '' - - Log.debug("Creating temporal directory...") - exp_id_path = os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, exp_id) - tmp_path = os.path.join(exp_id_path, "tmp") - os.mkdir(tmp_path) - os.chmod(tmp_path, 0o775) - os.mkdir(os.path.join(tmp_path, APIBasicConfig.LOCAL_ASLOG_DIR)) - os.chmod(os.path.join(tmp_path, APIBasicConfig.LOCAL_ASLOG_DIR), 0o775) - Log.debug("Creating temporal remote directory...") - remote_tmp_path = os.path.join(tmp_path, "LOG_" + exp_id) - os.mkdir(remote_tmp_path) - os.chmod(remote_tmp_path, 0o775) - - Log.debug("Creating pkl directory...") - os.mkdir(os.path.join(exp_id_path, "pkl")) - - Log.debug("Creating plot directory...") - os.mkdir(os.path.join(exp_id_path, "plot")) - os.chmod(os.path.join(exp_id_path, "plot"), 0o775) - Log.result("Experiment registered successfully") - Log.user_warning("Remember to MODIFY the config files!") - return exp_id - - @staticmethod - def delete(expid, force): - """ - Deletes and experiment from database and experiment's folder - - :type force: bool - :type expid: str - :param expid: identifier of the experiment to delete - :param force: if True, does not ask for confirmation - - :returns: True if succesful, False if not - :rtype: bool - """ - log_path = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'delete.log'.format(os.getuid())) - try: - Log.set_file(log_path) - except IOError as e: - Log.error("Can not create log file in path {0}: {1}".format( - log_path, e.message)) - - if os.path.exists(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid)): - if force or Autosubmit._user_yes_no_query("Do you want to delete " + expid + " ?"): - return Autosubmit._delete_expid(expid) - else: - Log.info("Quitting...") - return False - else: - Log.error("The experiment does not exist") - return True - - @staticmethod - def _load_parameters(as_conf, job_list, platforms): - # Load parameters - Log.debug("Loading parameters...") - parameters = as_conf.load_parameters() - for platform_name in platforms: - platform = platforms[platform_name] - platform.add_parameters(parameters) - - platform = platforms[as_conf.get_platform().lower()] - platform.add_parameters(parameters, True) - - job_list.parameters = parameters - - @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): - """ - Generates cmd files experiment. - - :type expid: str - :param expid: identifier of experiment to be run - :return: True if run to the end, False otherwise - :rtype: bool - """ - - if expid is None: - Log.critical("Missing experiment id") - - APIBasicConfig.read() - exp_path = os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, APIBasicConfig.LOCAL_TMP_DIR) - if os.path.exists(os.path.join(tmp_path, 'autosubmit.lock')): - locked = True - else: - locked = False - - if not os.path.exists(exp_path): - Log.critical( - "The directory %s is needed and does not exist" % exp_path) - Log.warning("Does an experiment with the given id exist?") - return 1 - Log.info("Starting inspect command") - Log.set_file(os.path.join( - tmp_path, APIBasicConfig.LOCAL_ASLOG_DIR, 'generate.log')) - os.system('clear') - signal.signal(signal.SIGINT, signal_handler) - as_conf = AutosubmitConfigResolver(expid, APIBasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): - Log.critical('Can not generate scripts with invalid configuration') - return False - project_type = as_conf.get_project_type() - if project_type != "none": - # Check proj configuration - as_conf.check_proj() - safetysleeptime = as_conf.get_safetysleeptime() - Log.debug("The Experiment name is: {0}", expid) - Log.debug("Sleep: {0}", safetysleeptime) - packages_persistence = JobPackagePersistence(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) - os.chmod(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid, - "pkl", "job_packages_" + expid + ".db"), 0o664) - - packages_persistence.reset_table(True) - job_list_original = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - job_list = copy.deepcopy(job_list_original) - job_list.packages_dict = {} - - Log.debug("Length of the jobs list: {0}", len(job_list)) - - # variables to be updated on the fly - safetysleeptime = as_conf.get_safetysleeptime() - Log.debug("Sleep: {0}", safetysleeptime) - # Generate - Log.info("Starting to generate cmd scripts") - - if not isinstance(job_list, type([])): - jobs = [] - jobs_cw = [] - if check_wrapper and (not locked or (force and locked)): - Log.info("Generating all cmd script adapted for wrappers") - jobs = job_list.get_uncompleted() - - jobs_cw = job_list.get_completed() - else: - if (force and not locked) or (force and locked): - Log.info("Overwritting all cmd scripts") - jobs = job_list.get_job_list() - elif locked: - Log.warning( - "There is a .lock file and not -f, generating only all unsubmitted cmd scripts") - jobs = job_list.get_unsubmitted() - else: - Log.info("Generating cmd scripts only for selected jobs") - if filter_chunks: - fc = filter_chunks - Log.debug(fc) - if fc == 'Any': - jobs = job_list.get_job_list() - else: - # noinspection PyTypeChecker - data = json.loads(Autosubmit._create_json(fc)) - for date_json in data['sds']: - date = date_json['sd'] - jobs_date = [j for j in job_list.get_job_list() if date2str( - j.date) == date] - - for member_json in date_json['ms']: - member = member_json['m'] - jobs_member = [j for j in jobs_date if j.member == member] - - for chunk_json in member_json['cs']: - chunk = int(chunk_json) - jobs = jobs + \ - [job for job in [j for j in jobs_member if j.chunk == chunk]] - - elif filter_status: - Log.debug( - "Filtering jobs with status {0}", filter_status) - if filter_status == 'Any': - jobs = job_list.get_job_list() - else: - fs = Autosubmit._get_status(filter_status) - jobs = [job for job in [j for j in job_list.get_job_list() if j.status == fs]] - - elif filter_section: - ft = filter_section - Log.debug(ft) - - if ft == 'Any': - jobs = job_list.get_job_list() - else: - for job in job_list.get_job_list(): - if job.section == ft: - jobs.append(job) - elif lst: - jobs_lst = lst.split() - - if jobs == 'Any': - jobs = job_list.get_job_list() - else: - for job in job_list.get_job_list(): - if job.name in jobs_lst: - jobs.append(job) - else: - jobs = job_list.get_job_list() - if isinstance(jobs, type([])): - referenced_jobs_to_remove = set() - for job in jobs: - for child in job.children: - if child not in jobs: - referenced_jobs_to_remove.add(child) - for parent in job.parents: - if parent not in jobs: - referenced_jobs_to_remove.add(parent) - - for job in jobs: - job.status = Status.WAITING - - Autosubmit.generate_scripts_andor_wrappers( - as_conf, job_list, jobs, packages_persistence, False) - if len(jobs_cw) > 0: - referenced_jobs_to_remove = set() - for job in jobs_cw: - for child in job.children: - if child not in jobs_cw: - referenced_jobs_to_remove.add(child) - for parent in job.parents: - if parent not in jobs_cw: - referenced_jobs_to_remove.add(parent) - - for job in jobs_cw: - job.status = Status.WAITING - Autosubmit.generate_scripts_andor_wrappers( - as_conf, job_list, jobs_cw, packages_persistence, False) - - Log.info("no more scripts to generate, now proceed to check them manually") - time.sleep(safetysleeptime) - return True - - @staticmethod - def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_persistence, only_wrappers=False): - """ - as_conf: AutosubmitConfig object - job_list: JobList object, contains a list of jobs - jobs_filtered: list of jobs - packages_persistence: Database handler - only_wrappers: True - """ - job_list._job_list = jobs_filtered - job_list.update_list(as_conf, False) - # Identifying the submitter and loading it - submitter = Autosubmit._get_submitter(as_conf) - # Function depending on the submitter - submitter.load_platforms(as_conf) - # Identifying HPC from config files - hpcarch = as_conf.get_platform() - # - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - platforms_to_test = set() - for job in job_list.get_job_list(): - if job.platform_name is None: - job.platform_name = hpcarch - # noinspection PyTypeChecker - job.platform = submitter.platforms[job.platform_name.lower()] - # noinspection PyTypeChecker - platforms_to_test.add(job.platform) - # case setstatus - job_list.check_scripts(as_conf) - job_list.update_list(as_conf, False) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - while job_list.get_active(): - Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers) - - job_list.update_list(as_conf, False) - - @staticmethod - def run_experiment(expid, notransitive=False): - """ - Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). - - :type expid: str - :param expid: identifier of experiment to be run - :return: True if run to the end, False otherwise - :rtype: bool - """ - if expid is None: - Log.critical("Missing experiment id") - - APIBasicConfig.read() - exp_path = os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, APIBasicConfig.LOCAL_TMP_DIR) - aslogs_path = os.path.join(tmp_path, APIBasicConfig.LOCAL_ASLOG_DIR) - if not os.path.exists(aslogs_path): - os.mkdir(aslogs_path) - os.chmod(aslogs_path, 0o775) - if not os.path.exists(exp_path): - Log.critical( - "The directory %s is needed and does not exist" % exp_path) - Log.warning("Does an experiment with the given id exist?") - return 1 - - # checking host whitelist - import platform - host = platform.node() - print(host) - if APIBasicConfig.ALLOWED_HOSTS and host not in APIBasicConfig.ALLOWED_HOSTS: - Log.info("\n Autosubmit run command is not allowed on this host") - return False - - # checking if there is a lock file to avoid multiple running on the same expid - try: - with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): - Log.info( - "Preparing .lock file to avoid multiple instances with same experiment id") - - Log.set_file(os.path.join(aslogs_path, 'run.log')) - os.system('clear') - - signal.signal(signal.SIGINT, signal_handler) - - as_conf = AutosubmitConfigResolver( - expid, APIBasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): - Log.critical('Can not run with invalid configuration') - return False - - project_type = as_conf.get_project_type() - if project_type != "none": - # Check proj configuration - as_conf.check_proj() - - hpcarch = as_conf.get_platform() - - safetysleeptime = as_conf.get_safetysleeptime() - retrials = as_conf.get_retrials() - - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - - Log.debug("The Experiment name is: {0}", expid) - Log.debug("Sleep: {0}", safetysleeptime) - Log.debug("Default retrials: {0}", retrials) - - Log.info("Starting job submission...") - - pkl_dir = os.path.join( - APIBasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - - Log.debug( - "Starting from job list restored from {0} files", pkl_dir) - - Log.debug("Length of the jobs list: {0}", len(job_list)) - - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - - # check the job list script creation - Log.debug("Checking experiment templates...") - - platforms_to_test = set() - for job in job_list.get_job_list(): - if job.platform_name is None: - job.platform_name = hpcarch - # noinspection PyTypeChecker - job.platform = submitter.platforms[job.platform_name.lower( - )] - # noinspection PyTypeChecker - platforms_to_test.add(job.platform) - - job_list.check_scripts(as_conf) - - packages_persistence = JobPackagePersistence(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) - - if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(APIBasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0o664) - packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - - for package_name, jobs in list(job_list.packages_dict.items()): - from job.job import WrapperJob - wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, - None, - None, jobs[0].platform, as_conf) - job_list.job_package_map[jobs[0].id] = wrapper_job - job_list.update_list(as_conf) - job_list.save() - ######################### - # AUTOSUBMIT - MAIN LOOP - ######################### - # Main loop. Finishing when all jobs have been submitted - while job_list.get_active(): - if Autosubmit.exit: - return 2 - # reload parameters changes - Log.debug("Reloading parameters...") - as_conf.reload() - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - # variables to be updated on the fly - total_jobs = len(job_list.get_job_list()) - Log.info( - "\n\n{0} of {1} jobs remaining ({2})".format(total_jobs - len(job_list.get_completed()), - total_jobs, - time.strftime("%H:%M"))) - safetysleeptime = as_conf.get_safetysleeptime() - Log.debug("Sleep: {0}", safetysleeptime) - default_retrials = as_conf.get_retrials() - Log.debug("Number of retrials: {0}", default_retrials) - - check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() - Log.debug('WRAPPER CHECK TIME = {0}'.format( - check_wrapper_jobs_sleeptime)) - - save = False - - slurm = [] - for platform in platforms_to_test: - list_jobid = "" - completed_joblist = [] - list_prevStatus = [] - queuing_jobs = job_list.get_in_queue_grouped_id( - platform) - for job_id, job in list(queuing_jobs.items()): - if job_list.job_package_map and job_id in job_list.job_package_map: - Log.debug( - 'Checking wrapper job with id ' + str(job_id)) - wrapper_job = job_list.job_package_map[job_id] - check_wrapper = True - if wrapper_job.status == Status.RUNNING: - check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now( - ) - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False - if check_wrapper: - wrapper_job.checked_time = datetime.datetime.now() - platform.check_job(wrapper_job) - Log.info( - 'Wrapper job ' + wrapper_job.name + ' is ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) - wrapper_job.check_status( - wrapper_job.new_status) - save = True - else: - Log.info( - "Waiting for wrapper check time: {0}\n", check_wrapper_jobs_sleeptime) - else: - job = job[0] - prev_status = job.status - if job.status == Status.FAILED: - continue - - if platform.type == "slurm": - list_jobid += str(job_id) + ',' - list_prevStatus.append(prev_status) - completed_joblist.append(job) - else: - platform.check_job(job) - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): - if as_conf.get_notifications() == 'true': - if Status.VALUE_TO_KEY[job.status] in job.notify_on: - Notifier.notify_status_change(MailNotifier(APIBasicConfig), expid, job.name, - Status.VALUE_TO_KEY[prev_status], - Status.VALUE_TO_KEY[job.status], - as_conf.get_mails_to()) - save = True - - if platform.type == "slurm" and list_jobid != "": - slurm.append( - [platform, list_jobid, list_prevStatus, completed_joblist]) - # END LOOP - for platform_jobs in slurm: - platform = platform_jobs[0] - jobs_to_check = platform_jobs[1] - platform.check_Alljobs( - platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) - - for j_Indx in range(0, len(platform_jobs[3])): - prev_status = platform_jobs[2][j_Indx] - job = platform_jobs[3][j_Indx] - - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): - if as_conf.get_notifications() == 'true': - if Status.VALUE_TO_KEY[job.status] in job.notify_on: - Notifier.notify_status_change(MailNotifier(APIBasicConfig), expid, job.name, - Status.VALUE_TO_KEY[prev_status], - Status.VALUE_TO_KEY[job.status], - as_conf.get_mails_to()) - save = True - - if job_list.update_list(as_conf) or save: - job_list.save() - - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): - job_list.save() - - if Autosubmit.exit: - return 2 - time.sleep(safetysleeptime) - - Log.info("No more jobs to run.") - if len(job_list.get_failed()) > 0: - Log.info("Some jobs have failed and reached maximum retrials") - return False - else: - Log.result("Run successful") - return True - - except portalocker.AlreadyLocked: - Autosubmit.show_lock_warning(expid) - - except WrongTemplateException: - return False - - @staticmethod - def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, - only_wrappers=False): - """ - Gets READY jobs and send them to the platforms if there is available space on the queues - - :param as_conf: autosubmit config object. \n - :type as_conf: AutosubmitConfig Object. \n - :param job_list: JobList as a single entity. \n - :type job_list: JobList() Object. \n - :param platforms_to_test: List of platforms that will be used in the experiment. \n - :type platforms_to_test: Set() of Platform() Object. e.g. EcPlatform(), LsfPlatform(), etc. \n - :return: True if at least one job was submitted, False otherwise - :rtype: bool - """ - save = False - - for platform in platforms_to_test: - Log.debug("\nJobs ready for {1}: {0}", len( - job_list.get_ready(platform)), platform.name) - packages_to_submit, remote_dependencies_dict = JobPackager( - as_conf, platform, job_list).build_packages() - if not inspect: - platform.open_submit_script() - valid_packages_to_submit = [] - for package in packages_to_submit: - try: - if hasattr(package, "name"): - if remote_dependencies_dict and package.name in remote_dependencies_dict['dependencies']: - remote_dependency = remote_dependencies_dict['dependencies'][package.name] - remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] - package.set_job_dependency(remote_dependency_id) - if not only_wrappers: - try: - package.submit( - as_conf, job_list.parameters, inspect) - valid_packages_to_submit.append(package) - except (IOError, OSError): - # write error file - continue - if only_wrappers or inspect: - for innerJob in package._jobs: - innerJob.status = Status.COMPLETED - - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.READY, 0, - package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id - if isinstance(package, JobPackageThread): - packages_persistence.save( - package.name, package.jobs, package._expid, inspect) - save = True - except WrongTemplateException as e: - Log.error( - "Invalid parameter substitution in {0} template", e.job_name) - raise - except Exception: - Log.error( - "{0} submission failed due to Unknown error", platform.name) - raise - - if platform.type == "slurm" and not inspect and not only_wrappers: - try: - save = True - if len(valid_packages_to_submit) > 0: - jobs_id = platform.submit_Script() - if jobs_id is None: - raise BaseException( - "Exiting AS being unable to get jobID") - i = 0 - for package in valid_packages_to_submit: - for job in package.jobs: - job.id = str(jobs_id[i]) - Log.info("{0} submitted", job.name) - job.status = Status.SUBMITTED - job.write_submit_time() - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, - package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict[ - 'name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id - if isinstance(package, JobPackageThread): - packages_persistence.save( - package.name, package.jobs, package._expid, inspect) - i += 1 - - except WrongTemplateException as e: - Log.error( - "Invalid parameter substitution in {0} template", e.job_name) - raise - except Exception: - Log.error("{0} submission failed", platform.name) - raise - - return save - - @staticmethod - def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, - group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False, check_wrapper=False, txt_logfiles=False): - """ - Plots workflow graph for a given experiment with status of each job coded by node color. - Plot is created in experiment's plot folder with name __