diff --git a/VERSION b/VERSION index bd8bf882d06184bb54615a59477e3c5e35f522fc..adb7b04cb2fa210fbe2ce1b09f9926f7bedea01e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.7.0 +1.0.27 diff --git a/autosubmit_api.egg-info/PKG-INFO b/autosubmit_api.egg-info/PKG-INFO index 88a03dc4e7ac07331136d0cf4c94685dac893aec..31c238680622f81390d66c097e13a3e991728e12 100644 --- a/autosubmit_api.egg-info/PKG-INFO +++ b/autosubmit_api.egg-info/PKG-INFO @@ -1,17 +1,14 @@ -Metadata-Version: 1.2 +Metadata-Version: 2.1 Name: autosubmit-api -Version: 1.7.0 +Version: 1.0.27 Summary: An extension to the Autosubmit package that serves its information as an API -Home-page: https://earth.bsc.es/gitlab/es/autosubmit_api +Home-page: https://earth.bsc.es/gitlab/wuruchi/autosubmit_api Author: Wilmer Uruchi Author-email: wilmer.uruchi@bsc.es License: GNU GPL -Description: UNKNOWN Keywords: autosubmit,API -Platform: UNKNOWN Classifier: Development Status :: 4 - Beta Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: POSIX :: Linux -Classifier: Programming Language :: Python :: 2.7 -Requires-Python: >=2.6, !=3.* +Classifier: Programming Language :: Python :: 3.7 diff --git a/autosubmit_api.egg-info/SOURCES.txt b/autosubmit_api.egg-info/SOURCES.txt index 26ff4c148adeb31cf29ba388e2be6e4593dd8c3e..f0ccb7254c04d36adeeb15db22c11d7fdce57470 100644 --- a/autosubmit_api.egg-info/SOURCES.txt +++ b/autosubmit_api.egg-info/SOURCES.txt @@ -1,7 +1,7 @@ -README.md setup.py autosubmit_api/__init__.py autosubmit_api/app.py +autosubmit_api/get-pip.py autosubmit_api.egg-info/PKG-INFO autosubmit_api.egg-info/SOURCES.txt autosubmit_api.egg-info/dependency_links.txt @@ -85,6 +85,7 @@ autosubmit_api/config/__init__.py autosubmit_api/config/basicConfig.py autosubmit_api/config/config_common.py autosubmit_api/database/__init__.py +autosubmit_api/database/autosubmit.py autosubmit_api/database/db_common.py autosubmit_api/database/db_jobdata.py autosubmit_api/database/db_manager.py diff --git a/autosubmit_api.egg-info/requires.txt b/autosubmit_api.egg-info/requires.txt index 15b6d7fbeca5b021bb06bceb7eb35ea8204763d9..d8ff9e05a6fd86e529fee22dcb82571fd4c49046 100644 --- a/autosubmit_api.egg-info/requires.txt +++ b/autosubmit_api.egg-info/requires.txt @@ -1,21 +1,11 @@ +jwt==1.3.1 +requests==2.28.1 +flask_cors==3.0.10 bscearth.utils==0.5.2 -PyJWT==1.7.1 -Flask==1.1.1 -Flask-Cors==3.0.8 -Flask-Jsonpify==1.5.0 -Flask-RESTful==0.3.7 -gunicorn==19.9.0 -mock==3.0.5 -networkx==2.2 -numpy==1.16.4 -paramiko==1.15.0 -portalocker==0.5.7 +pysqlite3==0.4.7 +numpy==1.21.6 pydotplus==2.0.2 -pydot==1.4.1 -regex==2019.6.8 -requests==2.22.0 -graphviz==0.13 -enum34==1.1.6 -typing==3.7.4.3 -radical.saga==0.70.0 -scipy==1.2.2 +portalocker==2.6.0 +networkx==2.6.3 +scipy==1.7.3 +paramiko==2.12.0 diff --git a/autosubmit_api/__init__.py b/autosubmit_api/__init__.py index 6cf5d99625b918be31d74e9976c64ac3435d22e5..95eb0feb76c72b5058951af785851049aa255ad3 100644 --- a/autosubmit_api/__init__.py +++ b/autosubmit_api/__init__.py @@ -1,3 +1,3 @@ __version__ = "1.0.0" __author__ = 'Wilmer Uruchi' -__credits__ = 'Barcelona Supercomputing Center' \ No newline at end of file +__credits__ = 'Barcelona Supercomputing Center' diff --git a/autosubmit_api/app.py b/autosubmit_api/app.py index fdac797ead2520b2a24076f154cbcc052b22f3b8..8ea8659fbf2cb60a918b7d98e26480c2e5b1372d 100644 --- a/autosubmit_api/app.py +++ b/autosubmit_api/app.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3.7 # Copyright 2017 Earth Sciences Department, BSC-CNS @@ -18,7 +18,6 @@ # along with Autosubmit. If not, see . import os -import jwt import sys import time from datetime import datetime, timedelta @@ -27,14 +26,16 @@ import logging from flask_cors import CORS, cross_origin from flask import Flask, request, session, redirect, url_for from bscearth.utils.log import Log -from database.db_common import get_current_running_exp, update_experiment_description_owner -import experiment.common_requests as CommonRequests -import experiment.utils as Utiles -from performance.performance_metrics import PerformanceMetrics -from database.db_common import search_experiment_by_id -from config.basicConfig import BasicConfig -from builders.joblist_helper_builder import JobListHelperBuilder, JobListHelperDirector -from multiprocessing import Manager +from .database.db_common import get_current_running_exp, update_experiment_description_owner +from .experiment import common_requests as CommonRequests +from .experiment import utils as Utiles +from .performance.performance_metrics import PerformanceMetrics +from .database.db_common import search_experiment_by_id +from .config.basicConfig import BasicConfig +from .builders.joblist_helper_builder import JobListHelperBuilder, JobListHelperDirector +from multiprocessing import Manager, Lock +import jwt +import sys JWT_SECRET = os.environ.get("SECRET_KEY") JWT_ALGORITHM = "HS256" @@ -55,6 +56,8 @@ gunicorn_logger = logging.getLogger('gunicorn.error') app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) +app.logger.info("PYTHON VERSION: " + sys.version) + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' try: requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += 'HIGH:!DH:!aNULL' @@ -62,6 +65,10 @@ except AttributeError: # no pyopenssl support used / needed / available pass +lock = Lock() + +CommonRequests.enforceLocal(app.logger) + # CAS Login @app.route('/login') def login(): @@ -214,7 +221,7 @@ def search_running(): Returns the list of all experiments that are currently running. """ if 'username' in session: - print("USER {}".format(session['username'])) + print(("USER {}".format(session['username']))) start_time = time.time() app.logger.info("Active proceses: " + str(D)) app.logger.info('RUN|RECEIVED|') @@ -259,12 +266,12 @@ def get_expsummary(expid): start_time = time.time() user = request.args.get("loggedUser", default="null", type=str) app.logger.info('SUMMARY|RECEIVED|' + str(expid)) - if user != "null": D[os.getpid()] = [user, "summary", True] - result = CommonRequests.get_experiment_summary(expid) + if user != "null": lock.acquire(); D[os.getpid()] = [user, "summary", True]; lock.release(); + result = CommonRequests.get_experiment_summary(expid, app.logger) app.logger.info('Process: ' + str(os.getpid()) + " workers: " + str(D)) app.logger.info('SUMMARY|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) - if user != "null": D[os.getpid()] = [user, "summary", False] - if user != "null": D.pop(os.getpid(), None) + if user != "null": lock.acquire(); D[os.getpid()] = [user, "summary", False]; lock.release(); + if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); return result @@ -288,7 +295,8 @@ def shutdown(route): # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) app.logger.info("Workers before: " + str(D)) - for k,v in D.items(): + lock.acquire() + for k,v in list(D.items()): if v[0] == user and v[1] == route and v[-1] == True: if v[2] == expid: D[k] = [user, route, expid, False] @@ -298,6 +306,7 @@ def shutdown(route): # reboot the worker os.system('kill -HUP ' + str(k)) app.logger.info("killed worker " + str(k)) + lock.release() app.logger.info("Workers now: " + str(D)) except Exception as exp: app.logger.info("[CRITICAL] Could not shutdown process " + expid + " by user \"" + user + "\"") @@ -335,12 +344,12 @@ def get_list_format(expid, layout='standard', grouped='none'): # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) app.logger.info('GRAPH|RECEIVED|' + str(expid) + "~" + str(grouped) + "~" + str(layout)) - if user != "null": D[os.getpid()] = [user, "graph", expid, True] + if user != "null": lock.acquire(); D[os.getpid()] = [user, "graph", expid, True]; lock.release(); result = CommonRequests.get_experiment_graph(expid, app.logger, layout, grouped) app.logger.info('Process: ' + str(os.getpid()) + " graph workers: " + str(D)) app.logger.info('GRAPH|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) - if user != "null": D[os.getpid()] = [user, "graph", expid, False] - if user != "null": D.pop(os.getpid(), None) + if user != "null": lock.acquire(); D[os.getpid()] = [user, "graph", expid, False]; lock.release(); + if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); return result @@ -351,12 +360,12 @@ def get_exp_tree(expid): # app.logger.info("user: " + user) # app.logger.info("expid: " + expid) app.logger.info('TREE|RECEIVED|' + str(expid)) - if user != "null": D[os.getpid()] = [user, "tree", expid, True] + if user != "null": lock.acquire(); D[os.getpid()] = [user, "tree", expid, True]; lock.release(); result = CommonRequests.get_experiment_tree_structured(expid, app.logger) app.logger.info('Process: ' + str(os.getpid()) + " tree workers: " + str(D)) app.logger.info('TREE|RTIME|' + str(expid) + "|" + str(time.time() - start_time)) - if user != "null": D[os.getpid()] = [user, "tree", expid, False] - if user != "null": D.pop(os.getpid(), None) + if user != "null": lock.acquire(); D[os.getpid()] = [user, "tree", expid, False]; lock.release(); + if user != "null": lock.acquire(); D.pop(os.getpid(), None); lock.release(); return result diff --git a/autosubmit_api/autosubmit_api.egg-info/PKG-INFO b/autosubmit_api/autosubmit_api.egg-info/PKG-INFO new file mode 100644 index 0000000000000000000000000000000000000000..754166785604aece4732b5dbe08990418dfd8e2c --- /dev/null +++ b/autosubmit_api/autosubmit_api.egg-info/PKG-INFO @@ -0,0 +1,16 @@ +Metadata-Version: 1.1 +Name: autosubmit-api +Version: 1.0.0 +Summary: An extension to the Autosubmit package that serves its information as an API +Home-page: https://earth.bsc.es/gitlab/wuruchi/autosubmit_api +Author: Wilmer Uruchi +Author-email: wilmer.uruchi@bsc.es +License: GNU GPL +Description: UNKNOWN +Keywords: autosubmit,API +Platform: UNKNOWN +Classifier: Development Status :: 4 - Beta +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) +Classifier: Operating System :: POSIX :: Linux +Classifier: Programming Language :: Python :: 3.7 diff --git a/autosubmit_api/autosubmit_api.egg-info/SOURCES.txt b/autosubmit_api/autosubmit_api.egg-info/SOURCES.txt new file mode 100644 index 0000000000000000000000000000000000000000..861769ff0cfc65c3d1facd06b41e6c194716d681 --- /dev/null +++ b/autosubmit_api/autosubmit_api.egg-info/SOURCES.txt @@ -0,0 +1,156 @@ +.egg-info/PKG-INFO +.egg-info/SOURCES.txt +.egg-info/dependency_links.txt +.egg-info/requires.txt +.egg-info/top_level.txt +autosubmit_legacy/__init__.py +autosubmit_legacy/autosubmit.py +autosubmit_legacy/job/__init__.py +autosubmit_legacy/job/job.py +autosubmit_legacy/job/job_common.py +autosubmit_legacy/job/job_dict.py +autosubmit_legacy/job/job_exceptions.py +autosubmit_legacy/job/job_grouping.py +autosubmit_legacy/job/job_list.py +autosubmit_legacy/job/job_list_persistence.py +autosubmit_legacy/job/job_package_persistence.py +autosubmit_legacy/job/job_packager.py +autosubmit_legacy/job/job_packages.py +autosubmit_legacy/job/job_utils.py +autosubmit_legacy/platforms/__init__.py +autosubmit_legacy/platforms/ecmwf_adaptor.py +autosubmit_legacy/platforms/ecplatform.py +autosubmit_legacy/platforms/locplatform.py +autosubmit_legacy/platforms/lsfplatform.py +autosubmit_legacy/platforms/mn_adaptor.py +autosubmit_legacy/platforms/paramiko_platform.py +autosubmit_legacy/platforms/paramiko_submitter.py +autosubmit_legacy/platforms/pbsplatform.py +autosubmit_legacy/platforms/platform.py +autosubmit_legacy/platforms/psplatform.py +autosubmit_legacy/platforms/saga_platform.py +autosubmit_legacy/platforms/saga_submitter.py +autosubmit_legacy/platforms/sgeplatform.py +autosubmit_legacy/platforms/slurmplatform.py +autosubmit_legacy/platforms/submitter.py +autosubmit_legacy/platforms/headers/__init__.py +autosubmit_legacy/platforms/headers/ec_cca_header.py +autosubmit_legacy/platforms/headers/ec_header.py +autosubmit_legacy/platforms/headers/local_header.py +autosubmit_legacy/platforms/headers/lsf_header.py +autosubmit_legacy/platforms/headers/pbs10_header.py +autosubmit_legacy/platforms/headers/pbs11_header.py +autosubmit_legacy/platforms/headers/pbs12_header.py +autosubmit_legacy/platforms/headers/ps_header.py +autosubmit_legacy/platforms/headers/sge_header.py +autosubmit_legacy/platforms/headers/slurm_header.py +autosubmit_legacy/platforms/wrappers/__init__.py +autosubmit_legacy/platforms/wrappers/wrapper_builder.py +autosubmit_legacy/platforms/wrappers/wrapper_factory.py +builders/__init__.py +builders/basic_builder.py +builders/configuration_facade_builder.py +builders/experiment_history_builder.py +builders/joblist_helper_builder.py +builders/joblist_loader_builder.py +builders/pkl_organizer_builder.py +common/__init__.py +common/utils.py +common/utils_for_testing.py +components/__init__.py +components/experiment/__init__.py +components/experiment/configuration_facade.py +components/experiment/pkl_organizer.py +components/experiment/test.py +components/jobs/__init__.py +components/jobs/job_factory.py +components/jobs/job_support.py +components/jobs/joblist_helper.py +components/jobs/joblist_loader.py +components/jobs/test.py +components/jobs/utils.py +components/representations/__init__.py +components/representations/graph/__init__.py +components/representations/graph/edge.py +components/representations/graph/graph.py +components/representations/graph/test.py +components/representations/tree/__init__.py +components/representations/tree/test.py +components/representations/tree/tree.py +config/__init__.py +config/basicConfig.py +config/config_common.py +database/__init__.py +database/db_common.py +database/db_jobdata.py +database/db_manager.py +database/db_structure.py +experiment/__init__.py +experiment/as_times_db_manager.py +experiment/common_db_requests.py +experiment/common_requests.py +experiment/experiment_common.py +experiment/experiment_db_manager.py +experiment/test.py +experiment/utils.py +git/__init__.py +git/autosubmit_git.py +history/__init__.py +history/experiment_history.py +history/experiment_status.py +history/experiment_status_manager.py +history/internal_logging.py +history/strategies.py +history/test.py +history/test_job_history.py +history/test_strategies.py +history/test_utils.py +history/utils.py +history/data_classes/__init__.py +history/data_classes/experiment_run.py +history/data_classes/job_data.py +history/database_managers/__init__.py +history/database_managers/database_manager.py +history/database_managers/database_models.py +history/database_managers/experiment_history_db_manager.py +history/database_managers/experiment_status_db_manager.py +history/database_managers/test.py +history/platform_monitor/__init__.py +history/platform_monitor/platform_monitor.py +history/platform_monitor/platform_utils.py +history/platform_monitor/slurm_monitor.py +history/platform_monitor/slurm_monitor_item.py +history/platform_monitor/test.py +monitor/__init__.py +monitor/diagram.py +monitor/monitor.py +monitor/utils.py +notifications/__init__.py +notifications/mail_notifier.py +notifications/notifier.py +performance/__init__.py +performance/performance_metrics.py +performance/utils.py +statistics/__init__.py +statistics/job_stat.py +statistics/statistics.py +statistics/stats_summary.py +statistics/test.py +statistics/utils.py +workers/__init__.py +workers/populate_details_db.py +workers/populate_graph.py +workers/populate_queue_run_times.py +workers/populate_running_experiments.py +workers/test.py +workers/test_esarchive.py +workers/verify_complete.py +workers/business/__init__.py +workers/business/populate_times.py +workers/business/process_graph_drawings.py +workers/deprecated/__init__.py +workers/deprecated/fix_historic.py +workers/deprecated/fix_historic_energy.py +workers/populate_details/__init__.py +workers/populate_details/populate.py +workers/populate_details/test.py \ No newline at end of file diff --git a/autosubmit_api/autosubmit_api.egg-info/dependency_links.txt b/autosubmit_api/autosubmit_api.egg-info/dependency_links.txt new file mode 100644 index 0000000000000000000000000000000000000000..8b137891791fe96927ad78e64b0aad7bded08bdc --- /dev/null +++ b/autosubmit_api/autosubmit_api.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/autosubmit_api/autosubmit_api.egg-info/requires.txt b/autosubmit_api/autosubmit_api.egg-info/requires.txt new file mode 100644 index 0000000000000000000000000000000000000000..d48668615a82bcef4d7bdaf81c0326244541e470 --- /dev/null +++ b/autosubmit_api/autosubmit_api.egg-info/requires.txt @@ -0,0 +1,37 @@ +bscearth.utils==0.5.2 +argparse<2,>=1.2 +python-dateutil>2 +pydotplus>=2 +pyparsing>=2.0.1 +numpy +matplotlib +paramiko==2.6.0 +mock>=1.3.0 +portalocker>=0.5.7 +networkx +bscearth.utils +Flask==1.0.4 +Flask-Cors==3.0.8 +Flask-Jsonpify==1.5.0 +Flask-RESTful==0.3.7 +SQLAlchemy==1.3.11 +PyJWT==1.7.1 +Flask==1.1.1 +Flask-Cors==3.0.8 +Flask-Jsonpify==1.5.0 +Flask-RESTful==0.3.7 +gunicorn==19.9.0 +mock==3.0.5 +networkx==2.2 +numpy==1.16.4 +paramiko==1.15.0 +portalocker==0.5.7 +pydotplus==2.0.2 +pydot==1.4.1 +regex==2019.6.8 +requests==2.22.0 +graphviz==0.13 +enum34==1.1.6 +typing==3.7.4.3 +radical.saga==0.70.0 +scipy==1.2.2 diff --git a/autosubmit_api/autosubmit_api.egg-info/top_level.txt b/autosubmit_api/autosubmit_api.egg-info/top_level.txt new file mode 100644 index 0000000000000000000000000000000000000000..25b178aab7083c3934c3d3f0b90bdc5ac4e5988d --- /dev/null +++ b/autosubmit_api/autosubmit_api.egg-info/top_level.txt @@ -0,0 +1,14 @@ +autosubmit_legacy +builders +common +components +config +database +experiment +git +history +monitor +notifications +performance +statistics +workers diff --git a/autosubmit_api/autosubmit_legacy/autosubmit.py b/autosubmit_api/autosubmit_legacy/autosubmit.py index f9281d2e8c2ba430580ace44339d6dfe26c3edd9..4a7c4dba46018119fbda5636654d4babf5aac686 100644 --- a/autosubmit_api/autosubmit_legacy/autosubmit.py +++ b/autosubmit_api/autosubmit_legacy/autosubmit.py @@ -18,7 +18,7 @@ # along with Autosubmit. If not, see . # pipeline_test -from __future__ import print_function + import traceback from pyparsing import nestedExpr from collections import defaultdict @@ -39,35 +39,34 @@ import tarfile import json import subprocess import argparse -import radical.saga as saga sys.path.insert(0, os.path.abspath('.')) -from autosubmit_api.config.basicConfig import BasicConfig -from autosubmit_api.config.config_common import AutosubmitConfig +from ..config.basicConfig import BasicConfig +from ..config.config_common import AutosubmitConfig from bscearth.utils.config_parser import ConfigParserFactory -from autosubmit_api.autosubmit_legacy.job.job_common import Status -from autosubmit_api.git.autosubmit_git import AutosubmitGit -from autosubmit_api.autosubmit_legacy.job.job_list import JobList -from autosubmit_api.autosubmit_legacy.job.job_packages import JobPackageThread -from autosubmit_api.autosubmit_legacy.job.job_package_persistence import JobPackagePersistence -from autosubmit_api.autosubmit_legacy.job.job_list_persistence import JobListPersistenceDb -from autosubmit_api.autosubmit_legacy.job.job_list_persistence import JobListPersistencePkl -from autosubmit_api.autosubmit_legacy.job.job_grouping import JobGrouping +from .job.job_common import Status +from ..git.autosubmit_git import AutosubmitGit +from .job.job_list import JobList +from .job.job_packages import JobPackageThread +from .job.job_package_persistence import JobPackagePersistence +from .job.job_list_persistence import JobListPersistenceDb +from .job.job_list_persistence import JobListPersistencePkl +from .job.job_grouping import JobGrouping from bscearth.utils.log import Log -from autosubmit_api.database.db_common import create_db -from autosubmit_api.experiment.experiment_common import new_experiment -from autosubmit_api.experiment.experiment_common import copy_experiment -from autosubmit_api.database.db_common import delete_experiment -from autosubmit_api.database.db_common import get_autosubmit_version -from autosubmit_api.monitor.monitor import Monitor +from ..database.db_common import create_db +from ..experiment.experiment_common import new_experiment +from ..experiment.experiment_common import copy_experiment +from ..database.db_common import delete_experiment +from ..database.db_common import get_autosubmit_version +from ..monitor.monitor import Monitor from bscearth.utils.date import date2str -from autosubmit_api.notifications.mail_notifier import MailNotifier -from autosubmit_api.notifications.notifier import Notifier -from autosubmit_api.autosubmit_legacy.platforms.saga_submitter import SagaSubmitter -from autosubmit_api.autosubmit_legacy.platforms.paramiko_submitter import ParamikoSubmitter -from autosubmit_api.autosubmit_legacy.job.job_exceptions import WrongTemplateException -from autosubmit_api.autosubmit_legacy.job.job_packager import JobPackager -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoTimeout +from ..notifications.mail_notifier import MailNotifier +from ..notifications.notifier import Notifier +from .platforms.saga_submitter import SagaSubmitter +from .platforms.paramiko_submitter import ParamikoSubmitter +from .job.job_exceptions import WrongTemplateException +from .job.job_packager import JobPackager +from .platforms.paramiko_platform import ParamikoTimeout """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit """ @@ -77,7 +76,7 @@ try: from configparser import SafeConfigParser except ImportError: # noinspection PyCompatibility - from ConfigParser import SafeConfigParser + from configparser import SafeConfigParser # It is Python dialog available? (optional dependency) try: @@ -820,7 +819,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, - "pkl", "job_packages_" + expid + ".db"), 0664) + "pkl", "job_packages_" + expid + ".db"), 0o664) packages_persistence.reset_table(True) job_list_original = Autosubmit.load_job_list( @@ -864,19 +863,17 @@ class Autosubmit: data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] - jobs_date = filter(lambda j: date2str( - j.date) == date, job_list.get_job_list()) + jobs_date = [j for j in job_list.get_job_list() if date2str( + j.date) == date] for member_json in date_json['ms']: member = member_json['m'] - jobs_member = filter( - lambda j: j.member == member, jobs_date) + jobs_member = [j for j in jobs_date if j.member == member] for chunk_json in member_json['cs']: chunk = int(chunk_json) jobs = jobs + \ - [job for job in filter( - lambda j: j.chunk == chunk, jobs_member)] + [job for job in [j for j in jobs_member if j.chunk == chunk]] elif filter_status: Log.debug( @@ -885,8 +882,7 @@ class Autosubmit: jobs = job_list.get_job_list() else: fs = Autosubmit._get_status(filter_status) - jobs = [job for job in filter( - lambda j: j.status == fs, job_list.get_job_list())] + jobs = [job for job in [j for j in job_list.get_job_list() if j.status == fs]] elif filter_section: ft = filter_section @@ -1083,7 +1079,7 @@ class Autosubmit: if as_conf.get_wrapper_type() != 'none': os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0664) + expid, "pkl", "job_packages_" + expid + ".db"), 0o664) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: @@ -1091,7 +1087,7 @@ class Autosubmit: job_list.packages_dict[package_name].append( job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): + for package_name, jobs in list(job_list.packages_dict.items()): from job.job import WrapperJob wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, None, @@ -1135,7 +1131,7 @@ class Autosubmit: list_prevStatus = [] queuing_jobs = job_list.get_in_queue_grouped_id( platform) - for job_id, job in queuing_jobs.items(): + for job_id, job in list(queuing_jobs.items()): if job_list.job_package_map and job_id in job_list.job_package_map: Log.debug( 'Checking wrapper job with id ' + str(job_id)) @@ -1186,7 +1182,7 @@ class Autosubmit: platform.check_Alljobs( platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) - for j_Indx in xrange(0, len(platform_jobs[3])): + for j_Indx in range(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] @@ -1391,19 +1387,17 @@ class Autosubmit: data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] - jobs_date = filter(lambda j: date2str( - j.date) == date, job_list.get_job_list()) + jobs_date = [j for j in job_list.get_job_list() if date2str( + j.date) == date] for member_json in date_json['ms']: member = member_json['m'] - jobs_member = filter( - lambda j: j.member == member, jobs_date) + jobs_member = [j for j in jobs_date if j.member == member] for chunk_json in member_json['cs']: chunk = int(chunk_json) jobs = jobs + \ - [job for job in filter( - lambda j: j.chunk == chunk, jobs_member)] + [job for job in [j for j in jobs_member if j.chunk == chunk]] elif filter_status: Log.debug("Filtering jobs with status {0}", filter_status) @@ -1411,8 +1405,7 @@ class Autosubmit: jobs = job_list.get_job_list() else: fs = Autosubmit._get_status(filter_status) - jobs = [job for job in filter( - lambda j: j.status == fs, job_list.get_job_list())] + jobs = [job for job in [j for j in job_list.get_job_list() if j.status == fs]] elif filter_section: ft = filter_section @@ -1454,7 +1447,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, - "pkl", "job_packages_" + expid + ".db"), 0664) + "pkl", "job_packages_" + expid + ".db"), 0o664) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) @@ -1782,8 +1775,8 @@ class Autosubmit: return False Log.info("Checking remote platforms") - platforms = filter(lambda x: x not in [ - 'local', 'LOCAL'], submitter.platforms) + platforms = [x for x in submitter.platforms if x not in [ + 'local', 'LOCAL']] already_moved = set() backup_files = [] backup_conf = [] @@ -1918,8 +1911,8 @@ class Autosubmit: if submitter.platforms is None: return False - platforms = filter(lambda x: x not in [ - 'local', 'LOCAL'], submitter.platforms) + platforms = [x for x in submitter.platforms if x not in [ + 'local', 'LOCAL']] already_moved = set() for platform in platforms: p = submitter.platforms[platform] @@ -2014,7 +2007,7 @@ class Autosubmit: :param experiment_id: experiment identifier: :type experiment_id: str - :return: user, date created, model, branch, hpc + :return: user, date created, model, branch, hpc :rtype: str, str, str, str, str """ user = "" @@ -2098,14 +2091,14 @@ class Autosubmit: database_filename = 'autosubmit.db' while database_path is None: - database_path = raw_input("Introduce Database path: ") + database_path = input("Introduce Database path: ") database_path = database_path.replace('~', home_path) if not os.path.exists(database_path): Log.error("Database path does not exist.") return False while local_root_path is None: - local_root_path = raw_input("Introduce path to experiments: ") + local_root_path = input("Introduce path to experiments: ") local_root_path = local_root_path.replace('~', home_path) if not os.path.exists(local_root_path): Log.error("Local Root path does not exist.") @@ -2936,24 +2929,23 @@ class Autosubmit: data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] - jobs_date = filter(lambda j: date2str( - j.date) == date, jobs_filtered) + jobs_date = [j for j in jobs_filtered if date2str( + j.date) == date] for member_json in date_json['ms']: member = member_json['m'] - jobs_member = filter( - lambda j: j.member == member, jobs_date) + jobs_member = [j for j in jobs_date if j.member == member] # for job in filter(lambda j: j.chunk is None, jobs_member): # Autosubmit.change_status(final, final_status, job) for chunk_json in member_json['cs']: chunk = int(chunk_json) - for job in filter(lambda j: j.chunk == chunk and j.synchronize is not None, jobs_date): + for job in [j for j in jobs_date if j.chunk == chunk and j.synchronize is not None]: Autosubmit.change_status( final, final_status, job) - for job in filter(lambda j: j.chunk == chunk, jobs_member): + for job in [j for j in jobs_member if j.chunk == chunk]: Autosubmit.change_status( final, final_status, job) @@ -2967,7 +2959,7 @@ class Autosubmit: else: for status in status_list: fs = Autosubmit._get_status(status) - for job in filter(lambda j: j.status == fs, job_list.get_job_list()): + for job in [j for j in job_list.get_job_list() if j.status == fs]: Autosubmit.change_status( final, final_status, job) @@ -3008,7 +3000,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0664) + expid, "pkl", "job_packages_" + expid + ".db"), 0o664) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) @@ -3069,10 +3061,10 @@ class Autosubmit: while True: try: if sys.version_info[0] == 3: - answer = raw_input() + answer = input() else: # noinspection PyCompatibility - answer = raw_input() + answer = input() return strtobool(answer.lower()) except ValueError: sys.stdout.write('Please respond with \'y\' or \'n\'.\n') @@ -3383,7 +3375,7 @@ class Autosubmit: @staticmethod def load_job_list(expid, as_conf, notransitive=False, monitor=False): """ - Process that builds the job list. + Process that builds the job list. """ # print("Load Job List Start") try: @@ -3429,7 +3421,8 @@ class Autosubmit: return None except Exception as e: print(traceback.format_exc()) - print('Exception: ' + str(e)) + print(str(e)) + raise Exception(str(e)) return None # print("Load Job List End") return job_list diff --git a/autosubmit_api/autosubmit_legacy/job/job.py b/autosubmit_api/autosubmit_legacy/job/job.py index 5e588b845886fe05cf00996a02129290c606fdd5..59ca6164a8fcedf65f9669851c17e7971307b0d5 100644 --- a/autosubmit_api/autosubmit_legacy/job/job.py +++ b/autosubmit_api/autosubmit_legacy/job/job.py @@ -28,13 +28,14 @@ import datetime from collections import OrderedDict #from bscearth.utils.config_parser import ConfigParserFactory -from autosubmit_api.monitor.monitor import Monitor -from autosubmit_api.autosubmit_legacy.job.job_common import Status, Type -from autosubmit_api.config.basicConfig import BasicConfig -from autosubmit_api.autosubmit_legacy.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython -from autosubmit_api.autosubmit_legacy.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty +from ...monitor.monitor import Monitor +from .job_common import Status, Type +from ...config.basicConfig import BasicConfig +from .job_common import StatisticsSnippetBash, StatisticsSnippetPython +from .job_common import StatisticsSnippetR, StatisticsSnippetEmpty #from autosubmitAPIwu.config.config_common import AutosubmitConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates +from functools import reduce class Job(object): @@ -498,8 +499,7 @@ class Job(object): if already_completed: break already_completed = True - retrial_dates = map(lambda y: parse_date(y) if y != 'COMPLETED' and y != 'FAILED' else y, - retrial_fields) + retrial_dates = [parse_date(y) if y != 'COMPLETED' and y != 'FAILED' else y for y in retrial_fields] retrials_list.insert(0, retrial_dates) return retrials_list @@ -839,7 +839,7 @@ class Job(object): """ parameters = self.parameters template_content = self.update_content(as_conf) - for key, value in parameters.items(): + for key, value in list(parameters.items()): template_content = re.sub( '%(?. -from autosubmit_api.autosubmit_legacy.job.job import Job +from .job import Job from bscearth.utils.date import date2str -from autosubmit_api.autosubmit_legacy.job.job_common import Status, Type +from .job_common import Status, Type class DicJobs: diff --git a/autosubmit_api/autosubmit_legacy/job/job_grouping.py b/autosubmit_api/autosubmit_legacy/job/job_grouping.py index 2e3d65ffa11b6ac65f3814f36596a2b4e63b2621..df0d22601cc0c1673e96c4790a9171da24afdb91 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_grouping.py +++ b/autosubmit_api/autosubmit_legacy/job/job_grouping.py @@ -17,7 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit_api.autosubmit_legacy.job.job_common import Status +from .job_common import Status from bscearth.utils.date import date2str import copy @@ -31,7 +31,7 @@ class JobGrouping(object): job_list: JobList instance.\n expand_list: A formula that specifies the jobs that should not be grouped (or that should be expanded) e.g. "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]".\n espanded_status: List of status that should not be grouped (e.g. "RUNNING, FAILED") (or that should be expanded). Comma separated or single.\n - """ + """ self.group_by = group_by self.jobs = jobs self.job_list = job_list @@ -40,8 +40,8 @@ class JobGrouping(object): self.expand_status = expanded_status self.automatic = False # Key: Group, Value: Starts as a List(of String) but at the end of the process turns into String. - self.group_status_dict = dict() - # List of jobs that should not be grouped as a result of parsing expand_list + self.group_status_dict = dict() + # List of jobs that should not be grouped as a result of parsing expand_list self.ungrouped_jobs = list() def group_jobs(self): @@ -73,16 +73,16 @@ class JobGrouping(object): # Retrieve dictionary of group -> status list (set) in that group # This dictionary was filled in self._create_groups() - for group, statuses in self.group_status_dict.items(): + for group, statuses in list(self.group_status_dict.items()): # Function that returns the status of the group - # Provides a hierarchy, returns int + # Provides a hierarchy, returns int status = self._set_group_status(statuses) self.group_status_dict[group] = status final_jobs_group = dict() # Iterating the items in the Dictionary that # associates job -> group, dictionary filled in self._create_groups() - for job, groups in jobs_group_dict.items(): + for job, groups in list(jobs_group_dict.items()): for group in groups: # This blacklist is always empty if group not in blacklist: @@ -91,7 +91,7 @@ class JobGrouping(object): # Iterating through old small groups while group in groups_map: # group = new bigger group - group = groups_map[group] + group = groups_map[group] # Testing if group exists # Status is now unique from the process above if group in self.group_status_dict: @@ -109,7 +109,7 @@ class JobGrouping(object): return groups_dict - # Fill self.ungrouped_jobs + # Fill self.ungrouped_jobs def _set_expanded_jobs(self): """ Uses self.expand_list to produce a list of jobs that should not be grouped (or that must be expanded).\n @@ -135,7 +135,7 @@ class JobGrouping(object): out = nestedExpr('[', ']').parseString(text).asList() - depth = lambda L: isinstance(L, list) and max(map(depth, L)) + 1 + depth = lambda L: isinstance(L, list) and max(list(map(depth, L))) + 1 if self.group_by == 'date': if depth(out) == 2: @@ -187,7 +187,7 @@ class JobGrouping(object): def _set_group_status(self, statuses): """ - Receives a collection of status.\n + Receives a collection of status.\n :return: Final status.\n :rtype: int """ @@ -227,7 +227,7 @@ class JobGrouping(object): Modifies: self.jobs, self.group_status_dict, [parameter] jobs_group_dict """ # Reverse iteration for some reason, bottom-up approach perhaps - for i in reversed(range(len(self.jobs))): + for i in reversed(list(range(len(self.jobs)))): job = self.jobs[i] groups = [] @@ -237,7 +237,7 @@ class JobGrouping(object): if self.group_by == 'split': if job.split is not None: idx = job.name.rfind("_") - groups.append(job.name[:idx - 1] + job.name[idx + 1:]) + groups.append(job.name[:idx - 1] + job.name[idx + 1:]) elif self.group_by == 'chunk': if job.chunk is not None: # Building group name @@ -246,7 +246,7 @@ class JobGrouping(object): if job.member is not None: groups.append(date2str(job.date, self.date_format) + '_' + job.member) elif self.group_by == 'date': - if job.date is not None: + if job.date is not None: groups.append(date2str(job.date, self.date_format)) # If a group has been created, then current job is used, so it is taken out of the original list. # Modifying an object while iterating it is not really recommended in my opinion @@ -262,9 +262,9 @@ class JobGrouping(object): if group not in self.group_status_dict: self.group_status_dict[group] = set() # Dictionary of group name to a list of status codes (0 -> WAITING) of its jobs - self.group_status_dict[group].add(job.status) - # If status code of job in expand_status list (from input command), then it should not be grouped - # OR + self.group_status_dict[group].add(job.status) + # If status code of job in expand_status list (from input command), then it should not be grouped + # OR # If automatic grouping and group already in the dictionary from above (which is redundant) # AND the length of the value for the key 'group' is greater than 1, meaning that more than 1 status has been added, not allowed for automatic if job.status in self.expand_status or \ @@ -274,12 +274,12 @@ class JobGrouping(object): # Adding the group to the blacklist blacklist.append(group) break - # Is this job in the list of jobs grouped? + # Is this job in the list of jobs grouped? if job.name not in jobs_group_dict: jobs_group_dict[job.name] = list() # Dictionary of jobs to the groups they belong jobs_group_dict[job.name].append(group) - + # This is always false @@ -290,8 +290,8 @@ class JobGrouping(object): """ synchronized = False # Making sure job is a chunk - if job.chunk is not None: - # job.chunk exists but .date and .member don't. Is that possible? + if job.chunk is not None: + # job.chunk exists but .date and .member don't. Is that possible? if job.date is None and job.member is None: # Rule: If job.chunk exists, and job.date and job.member are None # then this job is sync @@ -299,7 +299,7 @@ class JobGrouping(object): for date in self.job_list.get_date_list(): # Create group name for every date in experiment group_name = date2str(date, self.date_format) - if self.group_by in ['member', 'chunk']: + if self.group_by in ['member', 'chunk']: for member in self.job_list.get_member_list(): # If group_by is member, add +member to group name group_name += '_' + member @@ -316,7 +316,7 @@ class JobGrouping(object): elif job.member is None: # Rule: If job.chunk exists, and job.member is None, meaning that job.date is not None # then this job is sync - synchronized = True + synchronized = True if self.group_by == 'date': # Adding groups to list groups.append(date2str(job.date, self.date_format)) @@ -343,7 +343,7 @@ class JobGrouping(object): all_jobs = copy.deepcopy(self.jobs) # Try running as split split_groups, split_groups_status = self._create_splits_groups() - + blacklist = list() jobs_group_dict = dict() self.group_status_dict = dict() @@ -351,8 +351,8 @@ class JobGrouping(object): self.group_by = 'chunk' self.jobs = all_jobs self._create_groups(jobs_group_dict, blacklist) - - for group, statuses in self.group_status_dict.items(): + + for group, statuses in list(self.group_status_dict.items()): # Concludes a status for the group status = self._set_group_status(statuses) # Assign that status to the key group in the dictionary @@ -360,9 +360,9 @@ class JobGrouping(object): # At this point groups_map is empty - + # Enter high level thing - self._create_higher_level_group(self.group_status_dict.keys(), groups_map) + self._create_higher_level_group(list(self.group_status_dict.keys()), groups_map) # Using split_groups, split_groups_status that were extracted at the start of the function # Also uses jobs_group_dict that is a result from chunk grouping @@ -372,9 +372,9 @@ class JobGrouping(object): # Check if remaining jobs can be grouped, reversed so it performs bottom-up # Since the last grouping was done by chunk, the list self.jobs still contains those jobs that can be merged into bigger groups # See self._cheate_higher_level_group - for i in reversed(range(len(self.jobs))): - job = self.jobs[i] - for group, status in self.group_status_dict.items(): + for i in reversed(list(range(len(self.jobs)))): + job = self.jobs[i] + for group, status in list(self.group_status_dict.items()): # If the name of the group is contained in the name of the job and they have the same status if group in job.name and status == job.status: # Add the job and assign it to the new group @@ -404,17 +404,17 @@ class JobGrouping(object): """ if split_groups and split_groups_status: group_maps = dict() - for group in self.group_status_dict.keys(): - matching_groups = [split_group for split_group in split_groups_status.keys() if group in split_group] + for group in list(self.group_status_dict.keys()): + matching_groups = [split_group for split_group in list(split_groups_status.keys()) if group in split_group] for matching_group in matching_groups: group_maps[matching_group] = group split_groups_status.pop(matching_group) - for split_group, statuses in split_groups_status.items(): + for split_group, statuses in list(split_groups_status.items()): status = self._set_group_status(statuses) self.group_status_dict[split_group] = status - for job, groups in split_groups.items(): + for job, groups in list(split_groups.items()): final_groups = list() for group in groups: if group in group_maps: @@ -440,12 +440,12 @@ class JobGrouping(object): # Iterating starting at second item for group in groups_list[1:]: status = self.group_status_dict[group] - # If the status of the first item in the list of existing groups - # is not the same as the other existing groups, then not valid + # If the status of the first item in the list of existing groups + # is not the same as the other existing groups, then not valid if status != group_status: return False - - # At this point the assignment is valid + + # At this point the assignment is valid for group in groups_list: # Take out of dict the groups that will be merged self.group_status_dict.pop(group) @@ -465,7 +465,7 @@ class JobGrouping(object): groups_map: Empty. Dictionary Key: Old small group, Value: New bigger group. \n Also uses: self.group_status_dict, self.job_list """ - + checked_groups = list() for group in groups_to_check: # This if is not needed, groups_to_check === self.group_status_dict.keys() @@ -486,7 +486,7 @@ class JobGrouping(object): checked_groups.append(new_group) # This one takes the prize for weirdest one yet # Selects from self.group_status_dict (group names) if key contains new_group+'_' - # Meaning that we are counting the number of elements in the list of groups already defined (self.group_status_dict) + # Meaning that we are counting the number of elements in the list of groups already defined (self.group_status_dict) # that a are a subset of new_group+'_' possible_groups = [existing_group for existing_group in list(self.group_status_dict.keys()) if new_group+'_' in existing_group] @@ -499,5 +499,5 @@ class JobGrouping(object): if self._check_valid_group(possible_groups, new_group, groups_map): # Remember that groups_to_check === self.group_status_dict.keys() # So, adding a new key to self.group_status_dict - # An attempt to make recursive, bottom-up + # An attempt to make recursive, bottom-up groups_to_check.append(new_group) \ No newline at end of file diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index b2f887de1da78fd0b3f6d7012cab7fd76d94f50a..256fa09dd13a6af78cbf3ad44dc8dfa0d2be0aba 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -21,7 +21,7 @@ try: from configparser import SafeConfigParser except ImportError: # noinspection PyCompatibility - from ConfigParser import SafeConfigParser + from configparser import SafeConfigParser import json from bscearth.utils.config_parser import ConfigParserFactory @@ -49,30 +49,30 @@ from shutil import move from random import shuffle from dateutil.relativedelta import * -from autosubmit_api.autosubmit_legacy.job.job import Job -from autosubmit_api.config.config_common import AutosubmitConfig +from .job import Job +from ...config.config_common import AutosubmitConfig from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.job.job_dict import DicJobs -from autosubmit_api.autosubmit_legacy.job.job_utils import Dependency -from autosubmit_api.autosubmit_legacy.job.job_utils import SubJob -from autosubmit_api.autosubmit_legacy.job.job_utils import SubJobManager, job_times_to_text, datechunk_to_year -from autosubmit_api.performance.utils import calculate_ASYPD_perjob, calculate_SYPD_perjob -import autosubmit_api.components.jobs.utils as JUtils -from autosubmit_api.monitor.monitor import Monitor -from autosubmit_api.autosubmit_legacy.job.job_common import Status, Type +from .job_dict import DicJobs +from .job_utils import Dependency +from .job_utils import SubJob +from .job_utils import SubJobManager, job_times_to_text, datechunk_to_year +from ...performance.utils import calculate_ASYPD_perjob, calculate_SYPD_perjob +from ...components.jobs import utils as JUtils +from ...monitor.monitor import Monitor +from .job_common import Status, Type from bscearth.utils.date import date2str, parse_date, sum_str_hours -import autosubmit_api.experiment.common_db_requests as DbRequests -from autosubmit_api.autosubmit_legacy.job.job_packages import JobPackageSimple, JobPackageArray, JobPackageThread -from autosubmit_api.autosubmit_legacy.job.job_package_persistence import JobPackagePersistence -# from autosubmit_api.autosubmit_legacy.job.tree import Tree -import autosubmit_api.database.db_structure as DbStructure -from autosubmit_api.database.db_jobdata import JobDataStructure, JobRow, ExperimentGraphDrawing -from autosubmit_api.builders.experiment_history_builder import ExperimentHistoryDirector, ExperimentHistoryBuilder -from autosubmit_api.history.data_classes.job_data import JobData +from ...experiment import common_db_requests as DbRequests +from .job_packages import JobPackageSimple, JobPackageArray, JobPackageThread +from .job_package_persistence import JobPackagePersistence +# from autosubmit_legacy.job.tree import Tree +from ...database import db_structure as DbStructure +from ...database.db_jobdata import JobDataStructure, JobRow, ExperimentGraphDrawing +from ...builders.experiment_history_builder import ExperimentHistoryDirector, ExperimentHistoryBuilder +from ...history.data_classes.job_data import JobData from networkx import DiGraph -from autosubmit_api.autosubmit_legacy.job.job_utils import transitive_reduction -from autosubmit_api.common.utils import timestamp_to_datetime_format +from .job_utils import transitive_reduction +from ...common.utils import timestamp_to_datetime_format from typing import List, Dict, Tuple @@ -171,7 +171,7 @@ class JobList: self._date_list = date_list self._member_list = member_list - chunk_list = range(chunk_ini, num_chunks + 1) + chunk_list = list(range(chunk_ini, num_chunks + 1)) self._chunk_list = chunk_list jobs_parser = self._get_jobs_parser() @@ -221,9 +221,10 @@ class JobList: self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs( wrapper_jobs) except AssertionError as e: - raise AssertionError(str(e)) + raise AssertionError("Assertion err:::" + str(e)) except Exception as e: print(e) + raise Exception("here: " + str(e)) @staticmethod def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"): @@ -243,7 +244,7 @@ class JobList: num_jobs = 1 if isinstance(job, list): num_jobs = len(job) - for i in xrange(num_jobs): + for i in range(num_jobs): _job = job[i] if num_jobs > 1 else job JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph) @@ -289,23 +290,23 @@ class JobList: for section_chunk in sections_chunks: info = section_chunk.split('*') if info[0] in key: - for relation in xrange(1, len(info)): + for relation in range(1, len(info)): auxiliar_relation_list = [] for location in info[relation].split('-'): auxiliar_chunk_list = [] location = location.strip('[').strip(']') if ':' in location: if len(location) == 3: - for chunk_number in xrange(int(location[0]), int(location[2]) + 1): + for chunk_number in range(int(location[0]), int(location[2]) + 1): auxiliar_chunk_list.append( chunk_number) elif len(location) == 2: if ':' == location[0]: - for chunk_number in xrange(0, int(location[1]) + 1): + for chunk_number in range(0, int(location[1]) + 1): auxiliar_chunk_list.append( chunk_number) elif ':' == location[1]: - for chunk_number in xrange(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): + for chunk_number in range(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): auxiliar_chunk_list.append( chunk_number) elif ',' in location: @@ -374,12 +375,10 @@ class JobList: if dependency.delay == -1 or chunk > dependency.delay: if isinstance(parent, list): if job.split is not None: - parent = filter( - lambda _parent: _parent.split == job.split, parent)[0] + parent = [_parent for _parent in parent if _parent.split == job.split][0] else: if dependency.splits is not None: - parent = filter( - lambda _parent: _parent.split in dependency.splits, parent) + parent = [_parent for _parent in parent if _parent.split in dependency.splits] if len(dependency.select_chunks_dest) <= 0 or parent.chunk is None: job.add_parent(parent) JobList._add_edge(graph, job, parent) @@ -503,8 +502,7 @@ class JobList: dict_jobs[date][member] = list() num_chunks = len(self._chunk_list) - filtered_jobs_list = filter( - lambda job: job.section in wrapper_jobs, self._job_list) + filtered_jobs_list = [job for job in self._job_list if job.section in wrapper_jobs] filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members( filtered_jobs_list) @@ -517,8 +515,8 @@ class JobList: for date in self._date_list: str_date = self._get_date(date) for member in self._member_list: - sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and - job.name.split("_")[2] == member, filtered_jobs_fake_date_member) + sorted_jobs_list = [job for job in filtered_jobs_fake_date_member if job.name.split("_")[1] == str_date and + job.name.split("_")[2] == member] previous_job = sorted_jobs_list[0] section_running_type = sections_running_type_map[previous_job.section] @@ -923,7 +921,7 @@ class JobList: """ if os.path.exists(filename): fd = open(filename, 'rw') - return pickle.load(fd) + return pickle.load(fd, encoding="latin1") else: Log.critical('File {0} does not exist'.format(filename)) return list() @@ -935,7 +933,6 @@ class JobList: :return: loaded job list object :rtype: JobList """ - # Log.info("Loading JobList") return self._persistence.load(self._persistence_path, self._persistence_file) def save(self): @@ -1077,7 +1074,7 @@ class JobList: # str(len(current_structure.keys()))) structure_valid = False - if ((current_structure) and (len(self._job_list) == len(current_structure.keys()))): + if ((current_structure) and (len(self._job_list) == len(list(current_structure.keys())))): structure_valid = True # print(current_structure.keys()) # Structure exists and is valid, use it as a source of dependencies @@ -1380,7 +1377,7 @@ class JobList: for job in job_list: original_job_name = job.job_name job.job_name = job.job_name + \ - ("+" * len(filter(lambda x: x == job.job_name, already_included)) + ("+" * len([x for x in already_included if x == job.job_name]) if job.job_name in already_included else "") already_included.append(original_job_name) @@ -1442,21 +1439,21 @@ class JobList: (key, member), []) # local_short_list = filter( # lambda x: x.date == key and x.member == member, jobs) - local_list = filter(lambda x: ( - str(x.date) == str(key) and str(x.section) == str(member)) or x in current_list, job_list) - print("Local list {} for {} - {}".format(len(local_list), key, member)) + local_list = [x for x in job_list if ( + str(x.date) == str(key) and str(x.section) == str(member)) or x in current_list] + print(("Local list {} for {} - {}".format(len(local_list), key, member))) date_member_groups[(key, member)] = sorted( - local_list, key=lambda x: x.chunk) + local_list, key=lambda x: x.chunk if x.chunk is not None else 0) added_job_names.update({job.job_name for job in local_list}) # print(local_list[0].name) # jobs = [job for job in jobs if job not in local_short_list] # jobs.extend(date_member_repetition[(date,member)]) # jobs -= local_list - print("Spent in main: " + str(time() - start_time)) + print(("Spent in main: " + str(time() - start_time))) # Printing date - member groups / date - chunk syncs # Working with date-member groups - for date in dates.keys(): + for date in list(dates.keys()): date_member = list() all_suspended = True all_waiting = True @@ -1566,7 +1563,7 @@ class JobList: running = 0 failed = 0 date_member = [] - local_list = filter(lambda x: x.date == date, jobs) + local_list = [x for x in jobs if x.date == date] if len(local_list) > 0: # already_included = [] for job in local_list: @@ -1641,7 +1638,7 @@ class JobList: # current_title = current_title + source job_name_to_job_title[job.job_name] = current_title job.job_name = job.job_name + \ - ("+" * len(filter(lambda x: x == job.job_name, already_included)) + ("+" * len([x for x in already_included if x == job.job_name]) if job.job_name in already_included else "") floating_around.append( {'title': current_title, @@ -1665,7 +1662,7 @@ class JobList: running = 0 failed = 0 job_objects = sorted([job_name_to_job[name] for name in jobs_in_package if job_name_to_job.get( - name, None)], key=lambda x: x.chunk) + name, None)], key=lambda x: x.chunk if x.chunk is not None else 0) # job_objects = sorted([job for k, job in job_name_to_job.items( # ) if k in jobs_in_package], key=lambda x: x.chunk) jobs_in_wrapper = [] @@ -1692,7 +1689,7 @@ class JobList: # current_title = current_title + source # Individual Job in wrapper jobs_in_wrapper.append({'title': current_title, - 'refKey': job.job_name + ("+" * len(filter(lambda x: x == job.job_name, already_included)) if job.job_name in already_included else ""), + 'refKey': job.job_name + ("+" * len([x for x in already_included if x == job.job_name]) if job.job_name in already_included else ""), 'data': 'Empty', 'children': []}) already_included.append(job.job_name) @@ -1838,7 +1835,7 @@ class JobList: print("Start update job logs.") time_0 = time() self.update_job_logs(path_to_logs) - print("Update logs time {0}".format(time() - time_0)) + print(("Update logs time {0}".format(time() - time_0))) allJobs = self._job_list # path_local_root = BasicConfig.LOCAL_ROOT_DIR # db_file = os.path.join(path_local_root, "ecearth.db") @@ -1878,8 +1875,8 @@ class JobList: # Determine if some jobs with date but no member belong to a date-member group for date in dates: # dates.keys(): - local_list = filter(lambda x: x.date == - date and x.member == None, jobs) + local_list = [x for x in jobs if x.date == + date and x.member == None] for job in local_list: # Perhaps I exaggerated in my search for optimization parents_members = {parent.member for parent in job._parents} @@ -1901,15 +1898,15 @@ class JobList: (key, member), []) # local_short_list = filter( # lambda x: x.date == key and x.member == member, jobs) - local_list = filter(lambda x: ( - x.date == key and x.member == member) or x in current_list, jobs) + local_list = [x for x in jobs if ( + x.date == key and x.member == member) or x in current_list] date_member_groups[(key, member)] = sorted( - local_list, key=lambda x: x.chunk) + local_list, key=lambda x: x.chunk if x.chunk is not None else 0) added_job_names.update({job.name for job in local_list}) # print("Spent in main: " + str(time() - start_time)) # Printing date - member groups / date - chunk syncs # Working with date-member groups - for date in dates.keys(): + for date in list(dates.keys()): date_member = list() all_suspended = True all_waiting = True @@ -1997,7 +1994,7 @@ class JobList: # added_job_names.add(job.name) # If there are section folders, we add them to children member to the left reversed_date_member_section_jobs = OrderedDict( - reversed(date_member_section_jobs.items())) + reversed(list(date_member_section_jobs.items()))) for section_folder in reversed_date_member_section_jobs: children_member.appendleft({ 'title': section_folder, @@ -2060,7 +2057,7 @@ class JobList: running = 0 failed = 0 date_member = [] - local_list = filter(lambda x: x.date == date, jobs) + local_list = [x for x in jobs if x.date == date] if len(local_list) > 0: for job in local_list: if job.status == Status.COMPLETED: @@ -2148,7 +2145,7 @@ class JobList: running = 0 failed = 0 job_objects = sorted([job_name_to_job[name] for name in jobs_in_package if job_name_to_job.get( - name, None)], key=lambda x: x.chunk) + name, None)], key=lambda x: x.chunk if x.chunk is not None else 0) # job_objects = sorted([job for k, job in job_name_to_job.items( # ) if k in jobs_in_package], key=lambda x: x.chunk) jobs_in_wrapper = [] @@ -2418,7 +2415,7 @@ class JobList: BasicConfig, self.expid, [job.name for job in allJobs]) group = 1 - for package in package_to_symbol.keys(): + for package in list(package_to_symbol.keys()): package_to_group_number[package] = group group += 1 @@ -2430,7 +2427,7 @@ class JobList: job_running_to_min, job_running_to_runtext, _ = JobList.get_job_times_collection( BasicConfig, allJobs, self.expid, job_to_package, package_to_jobs) - print("Spent in times: " + str(time() - start_time_operation)) + print(("Spent in times: " + str(time() - start_time_operation))) # Adding edges for job in allJobs: @@ -2440,7 +2437,7 @@ class JobList: maxChildren = num_children if num_parent > maxParent: maxParent = num_parent - if job._children > 0: + if len(job._children) > 0: # current_group = job_to_package.get(job.name, None) # job_to_package[job.name] if job.name in job_to_package else None for child in job._children: if ((node_id[job.name], node_id[child.name]) not in raw_edges): @@ -2506,21 +2503,20 @@ class JobList: start_time = time() # Testing Barycentric max_level = max([job.level for job in allJobs]) - print("Levels " + str(max_level)) + print(("Levels " + str(max_level))) # min_level = min([job.level for job in allJobs]) # Assuming level starts at 1 for i in range(2, max_level + 1): # Order for first layer if i == 2: - jobs_layer_previous = filter( - lambda x: x.level == i - 1, allJobs) + jobs_layer_previous = [x for x in allJobs if x.level == i - 1] k = 1 for job_prev in jobs_layer_previous: job_prev.h_order = k k = k + 1 - jobs_layer = filter(lambda x: x.level == i, allJobs) + jobs_layer = [x for x in allJobs if x.level == i] # print("Level " + str(i) + " ~ " + str(len(jobs_layer))) for job in jobs_layer: @@ -2541,11 +2537,10 @@ class JobList: already_assigned_order.append(jobs_layer[j - 1].name) jobs_layer[j - 1].h_order = len(already_assigned_order) + 1 - if jobs_layer[j - 1].name in job_to_package.keys(): + if jobs_layer[j - 1].name in list(job_to_package.keys()): jobs_in_wrapper = package_to_jobs[job_to_package[jobs_layer[j - 1].name]] # jobs_in_wrapper.sort(key=lambda x: x.barycentric_value) - jobs_obj_in_wrapper = filter( - lambda x: x.name in jobs_in_wrapper, allJobs) + jobs_obj_in_wrapper = [x for x in allJobs if x.name in jobs_in_wrapper] jobs_obj_in_wrapper.sort( key=lambda x: x.barycentric_value) subcount = len(already_assigned_order) + 2 @@ -2565,7 +2560,7 @@ class JobList: mainCoordinates[job.name] = ( job.h_order * resize_x, job.level * resize_y) # print(job.name + ": " + str(job.h_order) + ", x: " + str(job.h_order * resize) + ", y: " + str(job.level * resize) + " ~ baryval " + str(job.barycentric_value)) - print("Seconds spent in Barycentric: " + str(time() - start_time)) + print(("Seconds spent in Barycentric: " + str(time() - start_time))) elif (layout == 'standard' or layout == 'laplacian'): # Spectral Drawing of coordinates print("Start Construction Laplacian") @@ -2589,8 +2584,8 @@ class JobList: for i in range(len(x_coords)): mainCoordinates[allJobs[i].name] = (x_coords[i], y_coords[i]) - print("Seconds Spent in Laplacian: " + - str(time() - start_time_operation)) + print(("Seconds Spent in Laplacian: " + + str(time() - start_time_operation))) # ASYPD : POST jobs in experiment post_jobs = [job for job in allJobs if job.section == @@ -2601,7 +2596,7 @@ class JobList: .run_time for job in post_jobs if job_running_to_min.get(job.name, None) is not None) / len(post_jobs), 2) for job in allJobs: - if (len(mainCoordinates.keys())) > 0: + if (len(list(mainCoordinates.keys()))) > 0: x, y = mainCoordinates[job.name] else: x, y = 0, 0 @@ -2619,8 +2614,8 @@ class JobList: path_to_logs, job.err) if job.err != "NA" else None # min_q, min_r, status_retrieved, energy = job_running_to_min[job.name] if job.name in list( # job_running_to_min.keys()) else (-1, -1, "UNKNOWN", 0) - job_info = job_running_to_min[job.name] if job.name in job_running_to_min.keys( - ) else None + job_info = job_running_to_min[job.name] if job.name in list(job_running_to_min.keys( + )) else None ini_date, end_date = JobList.date_plus(job.date, chunk_unit, job.chunk, chunk_size) if job.date is not None else ( date2str(job.date, self.get_date_format), "") nodes.append({'id': job.name, @@ -2639,7 +2634,7 @@ class JobList: 'section': job.section, 'queue': job.queue, 'level': job.level, - 'dashed': True if job.name in job_to_package.keys() else False, + 'dashed': True if job.name in list(job_to_package.keys()) else False, 'shape': package_to_symbol[job_to_package[job.name]] if job.name in job_to_package else 'dot', 'processors': job.processors, 'wallclock': job.wallclock, @@ -2677,8 +2672,7 @@ class JobList: final_color = Monitor.color_status(Status.WAITING) group_name = self._expid + "_" + \ str(dates[date]) + "_" + member + "_" - local_list = filter( - lambda x: x.name.startswith(group_name), allJobs) + local_list = [x for x in allJobs if x.name.startswith(group_name)] # if group_name not in list(list_groups.keys()): # list_groups[group_name] = list() # list_groups.setdefault(group_name, []) @@ -2757,8 +2751,7 @@ class JobList: str(dates[date]) + "_" + \ member + "_" + str(chunk) + "_" # print("Group Name: {}".format(group_name)) - specific_list = filter( - lambda x: x.name.startswith(group_name), allJobs) + specific_list = [x for x in allJobs if x.name.startswith(group_name)] if len(specific_list) > 0: failed_count = sum( 1 for x in specific_list if x.status == Status.FAILED) @@ -2822,7 +2815,7 @@ class JobList: # for item in list_groups: # print("{} -> x: {}, y: {}".format(item, list_groups[item]["x"], list_groups[item]["y"])) except Exception as exp: - print(traceback.format_exc()) + print((traceback.format_exc())) print(exp) elif grouped == 'status': # status_list_waiting = filter(lambda x: x.status == Status.WAITING and x.packed == False, allJobs) @@ -2943,7 +2936,7 @@ class JobList: if statusChange is not None: result += " with " + bcolors.OKGREEN + \ - str(len(statusChange.keys())) + \ + str(len(list(statusChange.keys()))) + \ " Changes ##" + bcolors.ENDC + bcolors.ENDC else: result += "## " + bcolors.ENDC @@ -3264,7 +3257,7 @@ class JobList: finish_time = 0 except Exception as exp: - print(traceback.format_exc()) + print((traceback.format_exc())) return seconds_queued = seconds_queued * \ @@ -3351,14 +3344,14 @@ class JobList: # package_to_jobs[package_name].append(name) for key in package_to_jobs: package_to_package_id[key] = key.split("_")[2] - list_packages = job_to_package.values() + list_packages = list(job_to_package.values()) for i in range(len(list_packages)): if i % 2 == 0: package_to_symbol[list_packages[i]] = 'square' else: package_to_symbol[list_packages[i]] = 'hexagon' except Exception as ex: - print(traceback.format_exc()) + print((traceback.format_exc())) return (job_to_package, package_to_jobs, package_to_package_id, package_to_symbol) diff --git a/autosubmit_api/autosubmit_legacy/job/job_list_persistence.py b/autosubmit_api/autosubmit_legacy/job/job_list_persistence.py index decb973749f8441e36018552fb57e36331000c32..af410f89ea02473de29230d32cec3a016251e4ea 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list_persistence.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list_persistence.py @@ -22,7 +22,7 @@ import pickle import os from bscearth.utils.log import Log -from autosubmit_api.database.db_manager import DbManager +from ...database.db_manager import DbManager class JobListPersistence(object): @@ -73,8 +73,8 @@ class JobListPersistencePkl(JobListPersistence): if file_size <= 6: raise AssertionError( "Your pkl file seems to be empty. Make sure that you are running Autosubmit. See the FAQ tab for more information.") - fd = open(path, 'r') - return pickle.load(fd) + fd = open(path, 'rb') + return pickle.load(fd, encoding="latin1") else: raise AssertionError( "The pkl file of your experiment does not exist. Make sure that you are running Autosubmit. Try again, if the error keeps happening see the FAQ tab for more information.") diff --git a/autosubmit_api/autosubmit_legacy/job/job_package_persistence.py b/autosubmit_api/autosubmit_legacy/job/job_package_persistence.py index 89d8ed423fdbc621951200d4bda4a90a1128ba34..acc5bbb5bdec8ab178d2d4843ca139d520979171 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_package_persistence.py +++ b/autosubmit_api/autosubmit_legacy/job/job_package_persistence.py @@ -20,7 +20,7 @@ import os from bscearth.utils.log import Log -from autosubmit_api.database.db_manager import DbManager +from ...database.db_manager import DbManager class JobPackagePersistence(object): diff --git a/autosubmit_api/autosubmit_legacy/job/job_packager.py b/autosubmit_api/autosubmit_legacy/job/job_packager.py index 215f5cdbb1b8fedc9f15ab6fae1ee08e5dd4dc62..8cb1fce1f0045befba36929fbfa7777e7321263b 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_packager.py +++ b/autosubmit_api/autosubmit_legacy/job/job_packager.py @@ -18,9 +18,9 @@ # along with Autosubmit. If not, see . from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.job.job_common import Status, Type +from .job_common import Status, Type from bscearth.utils.date import sum_str_hours -from autosubmit_api.autosubmit_legacy.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ +from .job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal from operator import attrgetter from math import ceil @@ -308,8 +308,8 @@ class JobPackagerVerticalMixed(JobPackagerVertical): self.ready_job = ready_job self.dict_jobs = dict_jobs - date = dict_jobs.keys()[-1] - member = dict_jobs[date].keys()[-1] + date = list(dict_jobs.keys())[-1] + member = list(dict_jobs[date].keys())[-1] if ready_job.date is not None: date = ready_job.date if ready_job.member is not None: @@ -462,5 +462,5 @@ class JobPackagerHorizontal(object): self._components_dict[job.section] = dict() self._components_dict[job.section]['COMPONENTS'] = {parameter: job.parameters[parameter] - for parameter in job.parameters.keys() + for parameter in list(job.parameters.keys()) if '_NUMPROC' in parameter } diff --git a/autosubmit_api/autosubmit_legacy/job/job_packages.py b/autosubmit_api/autosubmit_legacy/job/job_packages.py index e0a13552205da27c236926713fef4a037b32ce35..7255779f065779b2fbb42e93d86d956f769875ef 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_packages.py +++ b/autosubmit_api/autosubmit_legacy/job/job_packages.py @@ -22,15 +22,15 @@ try: from configparser import SafeConfigParser except ImportError: # noinspection PyCompatibility - from ConfigParser import SafeConfigParser + from configparser import SafeConfigParser import os import time import random -from autosubmit_api.autosubmit_legacy.job.job_common import Status +from .job_common import Status from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.job.job_exceptions import WrongTemplateException -from autosubmit_api.autosubmit_legacy.job.job import Job +from .job_exceptions import WrongTemplateException +from .job import Job from bscearth.utils.date import sum_str_hours diff --git a/autosubmit_api/autosubmit_legacy/job/job_utils.py b/autosubmit_api/autosubmit_legacy/job/job_utils.py index 7cd98800c8ebdc982a05ce156c104f45abbb3614..fd80be04f1c63d46247e6cc0042b6f1c6d6f25ac 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_utils.py +++ b/autosubmit_api/autosubmit_legacy/job/job_utils.py @@ -26,8 +26,8 @@ from networkx.algorithms.dag import is_directed_acyclic_graph from networkx import DiGraph from networkx import dfs_edges from networkx import NetworkXError -from autosubmit_api.autosubmit_legacy.job.job_package_persistence import JobPackagePersistence -from autosubmit_api.config.basicConfig import BasicConfig +from .job_package_persistence import JobPackagePersistence +from ...config.basicConfig import BasicConfig def transitive_reduction(graph): @@ -51,7 +51,7 @@ def get_job_package_code(expid, job_name): """ Finds the package code and retrieves it. None if no package. - :param BasicConfig: Basic configuration + :param BasicConfig: Basic configuration :type BasicConfig: Configuration Object :param expid: Experiment Id :type expid: String @@ -70,7 +70,7 @@ def get_job_package_code(expid, job_name): for exp, package_name, _job_name in packages: if job_name == _job_name: code = int(package_name.split("_")[2]) - return code + return code except: pass return 0 @@ -143,7 +143,7 @@ class SubJobManager(object): def process_index(self): """ - Builds a dictionary of jobname -> SubJob object. + Builds a dictionary of jobname -> SubJob object. """ for subjob in self.subjobList: self.subjobindex[subjob.name] = subjob @@ -152,7 +152,7 @@ class SubJobManager(object): """ """ if (self.job_to_package) and (self.package_to_jobs): - if(self.current_structure) and len(self.current_structure.keys()) > 0: + if(self.current_structure) and len(list(self.current_structure.keys())) > 0: # Structure exists new_queues = dict() fixes_applied = dict() @@ -161,8 +161,8 @@ class SubJobManager(object): local_structure = dict() # SubJob Name -> SubJob Object local_index = dict() - subjobs_in_package = filter(lambda x: x.package == - package, self.subjobList) + subjobs_in_package = [x for x in self.subjobList if x.package == + package] local_jobs_in_package = [job for job in subjobs_in_package] # Build index for sub in local_jobs_in_package: @@ -203,7 +203,7 @@ class SubJobManager(object): new_queues[sub_children_name] = fixed_queue_time # print(new_queues[sub_name]) - for key, value in new_queues.items(): + for key, value in list(new_queues.items()): self.subjobindex[key].queue = value # print("{} : {}".format(key, value)) for name in fixes_applied: @@ -213,8 +213,8 @@ class SubJobManager(object): # There is no structure for package in self.package_to_jobs: # Filter only jobs in the current package - filtered = filter(lambda x: x.package == - package, self.subjobList) + filtered = [x for x in self.subjobList if x.package == + package] # Order jobs by total time (queue + run) filtered = sorted( filtered, key=lambda x: x.total, reverse=False) @@ -308,9 +308,9 @@ def parse_output_number(self, string_number): def job_times_to_text(minutes_queue, minutes_running, status): """ Return text correpsonding to queue and running time - :param minutes_queue: seconds queuing (actually using seconds) + :param minutes_queue: seconds queuing (actually using seconds) :type minutes_queue: int - :param minutes_running: seconds running (actually using seconds) + :param minutes_running: seconds running (actually using seconds) :type minutes_running: int :param status: current status :type status: string @@ -339,9 +339,9 @@ def datechunk_to_year(chunk_unit, chunk_size): """ Gets chunk unit and size and returns the value in years - :return: years + :return: years :rtype: float - """ + """ chunk_size = chunk_size * 1.0 options = ["year", "month", "day", "hour"] if (chunk_unit == "year"): @@ -371,9 +371,9 @@ def tostamp(string_date): # def calculate_SYPD_perjob(chunk_unit, chunk_size, job_chunk, run_time): # # type: (str, int, int, int) -> float # """ -# :param chunk_unit: -# :param chunk_size: -# :param job_chunk: +# :param chunk_unit: +# :param chunk_size: +# :param job_chunk: # :param run_time: # """ # if job_chunk and job_chunk > 0: @@ -385,10 +385,10 @@ def tostamp(string_date): # def calculate_ASYPD_perjob(chunk_unit, chunk_size, job_chunk, queue_run_time, average_post): # # type: (str, int, int, int, float) -> float # """ -# :param chunk_unit: -# :param chunk_size: -# :param job_chunk: -# :param queue_run_time: +# :param chunk_unit: +# :param chunk_size: +# :param job_chunk: +# :param queue_run_time: # :param average_post: # """ # if job_chunk and job_chunk > 0: diff --git a/autosubmit_api/autosubmit_legacy/platforms/ecmwf_adaptor.py b/autosubmit_api/autosubmit_legacy/platforms/ecmwf_adaptor.py index e8509a23b9a2c40a1ab92f9d1f39adb94c486d1c..792d7c0f4a6c99dd0e24413af1449e033591e35e 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/ecmwf_adaptor.py +++ b/autosubmit_api/autosubmit_legacy/platforms/ecmwf_adaptor.py @@ -5,7 +5,6 @@ import re import os import time import threading -import radical.saga as saga # noinspection PyPackageRequirements import radical.utils.threads as sut @@ -53,7 +52,7 @@ class _job_state_monitor(threading.Thread): # do bulk updates here! we don't want to pull information # job by job. that would be too inefficient! jobs = self.js.jobs - job_keys = jobs.keys() + job_keys = list(jobs.keys()) for job in job_keys: # if the job hasn't been started, we can't update its @@ -365,7 +364,7 @@ class ECMWFJobService(saga.adaptors.cpi.job.Service): log_error_and_raise(message, saga.NoSuccess, self._logger) else: lines = out.split("\n") - lines = filter(lambda l: l != '', lines) # remove empty + lines = [l for l in lines if l != ''] # remove empty self._logger.info('ecaccess-job-submit: %s' % ''.join(lines)) @@ -403,7 +402,7 @@ class ECMWFJobService(saga.adaptors.cpi.job.Service): if jd.environment is not None: variable_list = '' - for key in jd.environment.keys(): + for key in list(jd.environment.keys()): variable_list += "%s=%s;" % (key, jd.environment[key]) loadl_params += "#@ environment = %s \n" % variable_list diff --git a/autosubmit_api/autosubmit_legacy/platforms/ecplatform.py b/autosubmit_api/autosubmit_legacy/platforms/ecplatform.py index 88d68f8ecc3545f78ab6c1a4b36eca33fd623986..6d0b091255323bd5c910d398d5c5bbcf453566a5 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/ecplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/ecplatform.py @@ -20,13 +20,13 @@ import os import subprocess -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException +from .paramiko_platform import ParamikoPlatform, ParamikoPlatformException from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.platforms.headers.ec_header import EcHeader -from autosubmit_api.autosubmit_legacy.platforms.headers.ec_cca_header import EcCcaHeader -from autosubmit_api.autosubmit_legacy.platforms.headers.slurm_header import SlurmHeader -from autosubmit_api.autosubmit_legacy.platforms.wrappers.wrapper_factory import EcWrapperFactory +from .headers.ec_header import EcHeader +from .headers.ec_cca_header import EcCcaHeader +from .headers.slurm_header import SlurmHeader +from .wrappers.wrapper_factory import EcWrapperFactory class EcPlatform(ParamikoPlatform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/locplatform.py b/autosubmit_api/autosubmit_legacy/platforms/locplatform.py index cac2a42bb9e3e5576948e523348bac261e40d542..043f1c5b0b394bdea8bc3a59f45c039cebd65587 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/locplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/locplatform.py @@ -21,10 +21,10 @@ import os from xml.dom.minidom import parseString import subprocess -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform -from autosubmit_api.autosubmit_legacy.platforms.headers.local_header import LocalHeader +from .paramiko_platform import ParamikoPlatform +from .headers.local_header import LocalHeader -from autosubmit_api.config.basicConfig import BasicConfig +from ...config.basicConfig import BasicConfig from bscearth.utils.log import Log diff --git a/autosubmit_api/autosubmit_legacy/platforms/lsfplatform.py b/autosubmit_api/autosubmit_legacy/platforms/lsfplatform.py index 62167bcf5f8ba5d6c071a8855d1642c6bcc933c7..96a94bd3085efdf64b72d27454b8cad37a0a10ea 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/lsfplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/lsfplatform.py @@ -19,9 +19,9 @@ import os -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform -from autosubmit_api.autosubmit_legacy.platforms.headers.lsf_header import LsfHeader -from autosubmit_api.autosubmit_legacy.platforms.wrappers.wrapper_factory import LSFWrapperFactory +from .paramiko_platform import ParamikoPlatform +from .headers.lsf_header import LsfHeader +from .wrappers.wrapper_factory import LSFWrapperFactory class LsfPlatform(ParamikoPlatform): @@ -85,7 +85,7 @@ class LsfPlatform(ParamikoPlatform): return output.split('<')[1].split('>')[0] def jobs_in_queue(self): - return zip(*[line.split() for line in ''.split('\n')])[0][1:] + return list(zip(*[line.split() for line in ''.split('\n')]))[0][1:] def get_checkjob_cmd(self, job_id): return self._checkjob_cmd + str(job_id) diff --git a/autosubmit_api/autosubmit_legacy/platforms/mn_adaptor.py b/autosubmit_api/autosubmit_legacy/platforms/mn_adaptor.py index 0138e9f3882cfbb33599a4009c17227daf8cda4d..42bb2cb0ea0b7c3e45a134e397dcc06fbbebe7c1 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/mn_adaptor.py +++ b/autosubmit_api/autosubmit_legacy/platforms/mn_adaptor.py @@ -10,7 +10,6 @@ from cgi import parse_qs # noinspection PyPackageRequirements import radical.utils.threads as sut -import radical.saga as saga import saga.url as surl import saga.utils.pty_shell import saga.adaptors.base @@ -53,7 +52,7 @@ class _job_state_monitor(threading.Thread): # do bulk updates here! we don't want to pull information # job by job. that would be too inefficient! jobs = self.js.jobs - job_keys = jobs.keys() + job_keys = list(jobs.keys()) for job in job_keys: # if the job hasn't been started, we can't update its @@ -134,7 +133,7 @@ def _mnscript_generator(jd, queue=None): if jd.environment is not None: env_variable_list = "export " - for key in jd.environment.keys(): + for key in list(jd.environment.keys()): env_variable_list += " %s=%s " % (key, jd.environment[key]) else: env_variable_list = "" @@ -409,7 +408,7 @@ class MNJobService(saga.adaptors.cpi.job.Service): # 'query' component of the job service URL. if rm_url.query is not None: # noinspection PyDeprecation - for key, val in parse_qs(rm_url.query).iteritems(): + for key, val in parse_qs(rm_url.query).items(): if key == 'queue': self.queue = val[0] elif key == 'span': @@ -549,7 +548,7 @@ class MNJobService(saga.adaptors.cpi.job.Service): # parse the job id. bsub's output looks like this: # Job <901545> is submitted to queue lines = out.split("\n") - lines = filter(lambda l: l != '', lines) # remove empty + lines = [l for l in lines if l != ''] # remove empty self._logger.info('bsub: %s' % ''.join(lines)) diff --git a/autosubmit_api/autosubmit_legacy/platforms/paramiko_platform.py b/autosubmit_api/autosubmit_legacy/platforms/paramiko_platform.py index 82f2fd4b24250e08994635a10cc882d5c54d9ec7..0105876a3f21b94fd9ca98368b8c11a635d9f683 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/paramiko_platform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/paramiko_platform.py @@ -7,9 +7,9 @@ import time import select from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.job.job_common import Status -from autosubmit_api.autosubmit_legacy.job.job_common import Type -from autosubmit_api.autosubmit_legacy.platforms.platform import Platform +from ..job.job_common import Status +from ..job.job_common import Type +from .platform import Platform from bscearth.utils.date import date2str class ParamikoTimeout(Exception): @@ -426,7 +426,7 @@ class ParamikoPlatform(Platform): :return: True if executed, False if failed :rtype: bool """ - + if self._ssh is None: if not self.connect(): return None diff --git a/autosubmit_api/autosubmit_legacy/platforms/paramiko_submitter.py b/autosubmit_api/autosubmit_legacy/platforms/paramiko_submitter.py index abd51b4e13b7f34a0449be3cc08c1af823c9a015..1290b9641bc947d9b80a0fc1462a03f07dba1f2f 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/paramiko_submitter.py +++ b/autosubmit_api/autosubmit_legacy/platforms/paramiko_submitter.py @@ -24,17 +24,17 @@ import os from bscearth.utils.log import Log -from autosubmit_api.config.basicConfig import BasicConfig -from autosubmit_api.config.config_common import AutosubmitConfig -from submitter import Submitter -from autosubmit_api.autosubmit_legacy.platforms.psplatform import PsPlatform -from autosubmit_api.autosubmit_legacy.platforms.lsfplatform import LsfPlatform -from autosubmit_api.autosubmit_legacy.platforms.pbsplatform import PBSPlatform -from autosubmit_api.autosubmit_legacy.platforms.sgeplatform import SgePlatform -from autosubmit_api.autosubmit_legacy.platforms.ecplatform import EcPlatform -from autosubmit_api.autosubmit_legacy.platforms.slurmplatform import SlurmPlatform -from autosubmit_api.autosubmit_legacy.platforms.locplatform import LocalPlatform -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatformException +from ...config.basicConfig import BasicConfig +from ...config.config_common import AutosubmitConfig +from .submitter import Submitter +from .psplatform import PsPlatform +from .lsfplatform import LsfPlatform +from .pbsplatform import PBSPlatform +from .sgeplatform import SgePlatform +from .ecplatform import EcPlatform +from .slurmplatform import SlurmPlatform +from .locplatform import LocalPlatform +from .paramiko_platform import ParamikoPlatformException class ParamikoSubmitter(Submitter): diff --git a/autosubmit_api/autosubmit_legacy/platforms/pbsplatform.py b/autosubmit_api/autosubmit_legacy/platforms/pbsplatform.py index 96126d6586d365d8f14d81b4dff4f02a882eeda0..c68740b04ac55a5e6d397c8ab760aeb497c739fc 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/pbsplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/pbsplatform.py @@ -19,12 +19,12 @@ import os -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException +from .paramiko_platform import ParamikoPlatform, ParamikoPlatformException from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.platforms.headers.pbs10_header import Pbs10Header -from autosubmit_api.autosubmit_legacy.platforms.headers.pbs11_header import Pbs11Header -from autosubmit_api.autosubmit_legacy.platforms.headers.pbs12_header import Pbs12Header +from .headers.pbs10_header import Pbs10Header +from .headers.pbs11_header import Pbs11Header +from .headers.pbs12_header import Pbs12Header class PBSPlatform(ParamikoPlatform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/platform.py b/autosubmit_api/autosubmit_legacy/platforms/platform.py index 6880a532c8044123a0863684dcebe5c872da16b9..80bb01804460d401dc5ad24ee45cee991b25ae79 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/platform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/platform.py @@ -3,7 +3,7 @@ from time import sleep import os from bscearth.utils.log import Log -from autosubmit_api.autosubmit_legacy.job.job_common import Status +from ..job.job_common import Status class Platform(object): @@ -198,7 +198,7 @@ class Platform(object): def get_logs_files(self, exp_id, remote_logs): """ Get the given LOGS files - + :param exp_id: experiment id :type exp_id: str :param remote_logs: names of the log files diff --git a/autosubmit_api/autosubmit_legacy/platforms/psplatform.py b/autosubmit_api/autosubmit_legacy/platforms/psplatform.py index fafaaa5bdf1f02edead24795ed4ed08f2e48f1d0..91fdb7137c757a3adfa540f9658273dc8f1424c5 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/psplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/psplatform.py @@ -20,8 +20,8 @@ import os from xml.dom.minidom import parseString -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform -from autosubmit_api.autosubmit_legacy.platforms.headers.ps_header import PsHeader +from .paramiko_platform import ParamikoPlatform +from .headers.ps_header import PsHeader class PsPlatform(ParamikoPlatform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/saga_platform.py b/autosubmit_api/autosubmit_legacy/platforms/saga_platform.py index 227620024024968c78ad11021438dc8f4d3c6739..d9bc9eb31a2050f0bb6374ab701c27ded76c47e8 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/saga_platform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/saga_platform.py @@ -3,12 +3,11 @@ import subprocess from time import sleep import os -import radical.saga as saga from bscearth.utils.log import Log from bscearth.utils.date import date2str -from autosubmit_api.autosubmit_legacy.job.job_common import Status, Type -from autosubmit_api.autosubmit_legacy.platforms.platform import Platform +from ..job.job_common import Status, Type +from ..platforms.platform import Platform class SagaPlatform(Platform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/saga_submitter.py b/autosubmit_api/autosubmit_legacy/platforms/saga_submitter.py index df70aacd4acb0d83bce878a26ea2e4347fae92bd..8bccf9d089511888ad9fd5907a69be9d808742db 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/saga_submitter.py +++ b/autosubmit_api/autosubmit_legacy/platforms/saga_submitter.py @@ -21,12 +21,11 @@ import time import os -import radical.saga as saga -from autosubmit_api.config.basicConfig import BasicConfig -from autosubmit_api.config.config_common import AutosubmitConfig -from saga_platform import SagaPlatform -from submitter import Submitter +from ...config.basicConfig import BasicConfig +from ...config.config_common import AutosubmitConfig +from .saga_platform import SagaPlatform +from .submitter import Submitter class SagaSubmitter(Submitter): diff --git a/autosubmit_api/autosubmit_legacy/platforms/sgeplatform.py b/autosubmit_api/autosubmit_legacy/platforms/sgeplatform.py index cfac5f1f9fc901968de66d031bcdb13690872201..063de289360c57b775015efcfe91bcc4c0f9c372 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/sgeplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/sgeplatform.py @@ -22,8 +22,8 @@ import subprocess from xml.dom.minidom import parseString -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform -from autosubmit_api.autosubmit_legacy.platforms.headers.sge_header import SgeHeader +from .paramiko_platform import ParamikoPlatform +from .headers.sge_header import SgeHeader class SgePlatform(ParamikoPlatform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/slurmplatform.py b/autosubmit_api/autosubmit_legacy/platforms/slurmplatform.py index bcd29bf6eaa8e7a40bbd24069fcaf6e3836c81cf..25d803ecc310b4eccc8007402261499ec8a72b26 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/slurmplatform.py +++ b/autosubmit_api/autosubmit_legacy/platforms/slurmplatform.py @@ -21,10 +21,10 @@ import os from xml.dom.minidom import parseString -from autosubmit_api.autosubmit_legacy.platforms.paramiko_platform import ParamikoPlatform -from autosubmit_api.autosubmit_legacy.platforms.headers.slurm_header import SlurmHeader -from autosubmit_api.autosubmit_legacy.platforms.wrappers.wrapper_factory import SlurmWrapperFactory -from autosubmit_api.config.basicConfig import BasicConfig +from .paramiko_platform import ParamikoPlatform +from .headers.slurm_header import SlurmHeader +from .wrappers.wrapper_factory import SlurmWrapperFactory +from ...config.basicConfig import BasicConfig class SlurmPlatform(ParamikoPlatform): diff --git a/autosubmit_api/autosubmit_legacy/platforms/submitter.py b/autosubmit_api/autosubmit_legacy/platforms/submitter.py index c61e82fddb9adab184c0b7b435ce7a6ff43ed6b6..89d4523e71b155e764c6692ca3676a87bf74badc 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/submitter.py +++ b/autosubmit_api/autosubmit_legacy/platforms/submitter.py @@ -18,7 +18,7 @@ # along with Autosubmit. If not, see . -from autosubmit_api.config.config_common import AutosubmitConfig +from ...config.config_common import AutosubmitConfig class Submitter: diff --git a/autosubmit_api/autosubmit_legacy/platforms/wrappers/wrapper_factory.py b/autosubmit_api/autosubmit_legacy/platforms/wrappers/wrapper_factory.py index df709522b0a48bc450f0c86a63306aae622520e5..9b48bab70b329eca5b59b87fcc27209395a89d5f 100644 --- a/autosubmit_api/autosubmit_legacy/platforms/wrappers/wrapper_factory.py +++ b/autosubmit_api/autosubmit_legacy/platforms/wrappers/wrapper_factory.py @@ -17,7 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit_api.autosubmit_legacy.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ +from .wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \ BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder diff --git a/build/lib/autosubmit_api/autosubmit_legacy/__init__.py b/autosubmit_api/build/lib/autosubmit_legacy/__init__.py similarity index 100% rename from build/lib/autosubmit_api/autosubmit_legacy/__init__.py rename to autosubmit_api/build/lib/autosubmit_legacy/__init__.py diff --git a/autosubmit_api/build/lib/autosubmit_legacy/autosubmit.py b/autosubmit_api/build/lib/autosubmit_legacy/autosubmit.py new file mode 100644 index 0000000000000000000000000000000000000000..4a7c4dba46018119fbda5636654d4babf5aac686 --- /dev/null +++ b/autosubmit_api/build/lib/autosubmit_legacy/autosubmit.py @@ -0,0 +1,3498 @@ +#!/usr/bin/env python + +# Copyright 2017 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . +# pipeline_test + + +import traceback +from pyparsing import nestedExpr +from collections import defaultdict +from distutils.util import strtobool +from pkg_resources import require, resource_listdir, resource_exists, resource_string +import portalocker +import datetime +import signal +import random +import re +import shutil +import sys +import pwd +import os +import copy +import time +import tarfile +import json +import subprocess +import argparse + +sys.path.insert(0, os.path.abspath('.')) +from ..config.basicConfig import BasicConfig +from ..config.config_common import AutosubmitConfig +from bscearth.utils.config_parser import ConfigParserFactory +from .job.job_common import Status +from ..git.autosubmit_git import AutosubmitGit +from .job.job_list import JobList +from .job.job_packages import JobPackageThread +from .job.job_package_persistence import JobPackagePersistence +from .job.job_list_persistence import JobListPersistenceDb +from .job.job_list_persistence import JobListPersistencePkl +from .job.job_grouping import JobGrouping +from bscearth.utils.log import Log +from ..database.db_common import create_db +from ..experiment.experiment_common import new_experiment +from ..experiment.experiment_common import copy_experiment +from ..database.db_common import delete_experiment +from ..database.db_common import get_autosubmit_version +from ..monitor.monitor import Monitor +from bscearth.utils.date import date2str +from ..notifications.mail_notifier import MailNotifier +from ..notifications.notifier import Notifier +from .platforms.saga_submitter import SagaSubmitter +from .platforms.paramiko_submitter import ParamikoSubmitter +from .job.job_exceptions import WrongTemplateException +from .job.job_packager import JobPackager +from .platforms.paramiko_platform import ParamikoTimeout +""" +Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit +""" + +try: + # noinspection PyCompatibility + from configparser import SafeConfigParser +except ImportError: + # noinspection PyCompatibility + from configparser import SafeConfigParser + +# It is Python dialog available? (optional dependency) +try: + import dialog +except Exception: + dialog = None + + +# noinspection PyPackageRequirements +# noinspection PyPackageRequirements +# noinspection PyPackageRequirements + +# noinspection PyUnusedLocal + + +def signal_handler(signal_received, frame): + """ + Used to handle interrupt signals, allowing autosubmit to clean before exit + + :param signal_received: + :param frame: + """ + Log.info('Autosubmit will interrupt at the next safe occasion') + Autosubmit.exit = True + + +class Autosubmit: + """ + Interface class for autosubmit. + """ + # sys.setrecursionlimit(500000) + # # Get the version number from the relevant file. If not, from autosubmit package + # scriptdir = os.path.abspath(os.path.dirname(__file__)) + + # if not os.path.exists(os.path.join(scriptdir, 'VERSION')): + # scriptdir = os.path.join(scriptdir, os.path.pardir) + + # version_path = os.path.join(scriptdir, 'VERSION') + # readme_path = os.path.join(scriptdir, 'README') + # changes_path = os.path.join(scriptdir, 'CHANGELOG') + # if os.path.isfile(version_path): + # with open(version_path) as f: + # autosubmit_version = f.read().strip() + # else: + # autosubmit_version = require("autosubmitAPIwu")[0].version + + exit = False + + @staticmethod + def parse_args(): + """ + Parse arguments given to an executable and start execution of command given + """ + try: + BasicConfig.read() + + parser = argparse.ArgumentParser( + description='Main executable for autosubmit. ') + parser.add_argument('-v', '--version', action='version', version=Autosubmit.autosubmit_version, + help="returns autosubmit's version number and exit") + parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', + 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), + default='DEBUG', type=str, + help="sets file's log level.") + parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', + 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), + default='INFO', type=str, + help="sets console's log level") + + subparsers = parser.add_subparsers(dest='command') + + # Run + subparser = subparsers.add_parser( + 'run', description="runs specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + + # Expid + subparser = subparsers.add_parser( + 'expid', description="Creates a new experiment") + group = subparser.add_mutually_exclusive_group() + group.add_argument( + '-y', '--copy', help='makes a copy of the specified experiment') + group.add_argument('-dm', '--dummy', action='store_true', + help='creates a new experiment with default values, usually for testing') + group.add_argument('-op', '--operational', action='store_true', + help='creates a new experiment with operational experiment id') + subparser.add_argument('-H', '--HPC', required=True, + help='specifies the HPC to use for the experiment') + subparser.add_argument('-d', '--description', type=str, required=True, + help='sets a description for the experiment to store in the database.') + + # Delete + subparser = subparsers.add_parser( + 'delete', description="delete specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument( + '-f', '--force', action='store_true', help='deletes experiment without confirmation') + + # Monitor + subparser = subparsers.add_parser( + 'monitor', description="plots specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', + help='chooses type of output for generated plot') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, + help='Groups the jobs automatically or by date, member, chunk or split') + subparser.add_argument('-expand', type=str, + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('--hide_groups', action='store_true', + default=False, help='Hides the groups from the plot') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') + + group2 = subparser.add_mutually_exclusive_group(required=False) + + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group = subparser.add_mutually_exclusive_group(required=False) + group.add_argument('-fl', '--list', type=str, + help='Supply the list of job names to be filtered. Default = "Any". ' + 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') + group.add_argument('-fc', '--filter_chunks', type=str, + help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group.add_argument('-ft', '--filter_type', type=str, + help='Select the job type to filter the list of jobs') + subparser.add_argument('--hide', action='store_true', default=False, + help='hides plot window') + group2.add_argument('--txt', action='store_true', default=False, + help='Generates only txt status file') + + group2.add_argument('-txtlog', '--txt_logfiles', action='store_true', default=False, + help='Generates only txt status file(AS < 3.12b behaviour)') + + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + + # Stats + subparser = subparsers.add_parser( + 'stats', description="plots statistics for specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter ' + 'the list of jobs') + subparser.add_argument('-fp', '--filter_period', type=int, help='Select the period to filter jobs ' + 'from current time to the past ' + 'in number of hours back') + subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', + help='type of output for generated plot') + subparser.add_argument('--hide', action='store_true', default=False, + help='hides plot window') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + + # Clean + subparser = subparsers.add_parser( + 'clean', description="clean specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument( + '-pr', '--project', action="store_true", help='clean project') + subparser.add_argument('-p', '--plot', action="store_true", + help='clean plot, only 2 last will remain') + subparser.add_argument('-s', '--stats', action="store_true", + help='clean stats, only last will remain') + + # Recovery + subparser = subparsers.add_parser( + 'recovery', description="recover specified experiment") + subparser.add_argument( + 'expid', type=str, help='experiment identifier') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser.add_argument('--all', action="store_true", default=False, + help='Get completed files to synchronize pkl') + subparser.add_argument( + '-s', '--save', action="store_true", default=False, help='Save changes to disk') + subparser.add_argument('--hide', action='store_true', default=False, + help='hides plot window') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, + help='Groups the jobs automatically or by date, member, chunk or split') + subparser.add_argument('-expand', type=str, + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-nl', '--no_recover_logs', action='store_true', default=False, + help='Disable logs recovery') + # Migrate + subparser = subparsers.add_parser( + 'migrate', description="Migrate experiments from current user to another") + subparser.add_argument('expid', help='experiment identifier') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('-o', '--offer', action="store_true", + default=False, help='Offer experiment') + group.add_argument('-p', '--pickup', action="store_true", + default=False, help='Pick-up released experiment') + + # Inspect + subparser = subparsers.add_parser( + 'inspect', description="Generate all .cmd files") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument( + '-f', '--force', action="store_true", help='Overwrite all cmd') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') + + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group = subparser.add_mutually_exclusive_group(required=False) + group.add_argument('-fl', '--list', type=str, + help='Supply the list of job names to be filtered. Default = "Any". ' + 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') + group.add_argument('-fc', '--filter_chunks', type=str, + help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', + 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group.add_argument('-ft', '--filter_type', type=str, + help='Select the job type to filter the list of jobs') + + # Check + subparser = subparsers.add_parser( + 'check', description="check configuration for specified experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + # Describe + subparser = subparsers.add_parser( + 'describe', description="Show details for specified experiment") + subparser.add_argument('expid', help='experiment identifier') + + # Create + subparser = subparsers.add_parser( + 'create', description="create specified experiment joblist") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser.add_argument('--hide', action='store_true', default=False, + help='hides plot window') + subparser.add_argument('-o', '--output', choices=('pdf', 'png', 'ps', 'svg'), default='pdf', + help='chooses type of output for generated plot') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, + help='Groups the jobs automatically or by date, member, chunk or split') + subparser.add_argument('-expand', type=str, + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') + + # Configure + subparser = subparsers.add_parser('configure', description="configure database and path for autosubmit. It " + "can be done at machine, user or local level." + "If no arguments specified configure will " + "display dialog boxes (if installed)") + subparser.add_argument( + '--advanced', action="store_true", help="Open advanced configuration of autosubmit") + subparser.add_argument('-db', '--databasepath', default=None, help='path to database. If not supplied, ' + 'it will prompt for it') + subparser.add_argument( + '-dbf', '--databasefilename', default=None, help='database filename') + subparser.add_argument('-lr', '--localrootpath', default=None, help='path to store experiments. If not ' + 'supplied, it will prompt for it') + subparser.add_argument('-pc', '--platformsconfpath', default=None, help='path to platforms.conf file to ' + 'use by default. Optional') + subparser.add_argument('-jc', '--jobsconfpath', default=None, help='path to jobs.conf file to use by ' + 'default. Optional') + subparser.add_argument( + '-sm', '--smtphostname', default=None, help='STMP server hostname. Optional') + subparser.add_argument( + '-mf', '--mailfrom', default=None, help='Notifications sender address. Optional') + group = subparser.add_mutually_exclusive_group() + group.add_argument('--all', action="store_true", + help='configure for all users') + group.add_argument('--local', action="store_true", help='configure only for using Autosubmit from this ' + 'path') + + # Install + subparsers.add_parser( + 'install', description='install database for autosubmit on the configured folder') + + # Set status + subparser = subparsers.add_parser( + 'setstatus', description="sets job status for an experiment") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument( + '-np', '--noplot', action='store_true', default=False, help='omit plot') + subparser.add_argument( + '-s', '--save', action="store_true", default=False, help='Save changes to disk') + + subparser.add_argument('-t', '--status_final', + choices=('READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', + 'QUEUING', 'RUNNING'), + required=True, + help='Supply the target status') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('-fl', '--list', type=str, + help='Supply the list of job names to be changed. Default = "Any". ' + 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') + group.add_argument('-fc', '--filter_chunks', type=str, + help='Supply the list of chunks to change the status. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + group.add_argument('-fs', '--filter_status', type=str, + help='Select the status (one or more) to filter the list of jobs.' + "Valid values = ['Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN']") + group.add_argument('-ft', '--filter_type', type=str, + help='Select the job type to filter the list of jobs') + + subparser.add_argument('--hide', action='store_true', default=False, + help='hides plot window') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, + help='Groups the jobs automatically or by date, member, chunk or split') + subparser.add_argument('-expand', type=str, + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument( + '-expand_status', type=str, help='Select the statuses to be expanded') + subparser.add_argument('-nt', '--notransitive', action='store_true', + default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--check_wrapper', action='store_true', + default=False, help='Generate possible wrapper in the current workflow') + + # Test Case + subparser = subparsers.add_parser( + 'testcase', description='create test case experiment') + subparser.add_argument( + '-y', '--copy', help='makes a copy of the specified experiment') + subparser.add_argument( + '-d', '--description', required=True, help='description of the test case') + subparser.add_argument('-c', '--chunks', help='chunks to run') + subparser.add_argument('-m', '--member', help='member to run') + subparser.add_argument('-s', '--stardate', help='stardate to run') + subparser.add_argument( + '-H', '--HPC', required=True, help='HPC to run experiment on it') + subparser.add_argument( + '-b', '--branch', help='branch of git to run (or revision from subversion)') + + # Test + subparser = subparsers.add_parser( + 'test', description='test experiment') + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument( + '-c', '--chunks', required=True, help='chunks to run') + subparser.add_argument('-m', '--member', help='member to run') + subparser.add_argument('-s', '--stardate', help='stardate to run') + subparser.add_argument( + '-H', '--HPC', help='HPC to run experiment on it') + subparser.add_argument( + '-b', '--branch', help='branch of git to run (or revision from subversion)') + + # Refresh + subparser = subparsers.add_parser( + 'refresh', description='refresh project directory for an experiment') + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-mc', '--model_conf', default=False, action='store_true', + help='overwrite model conf file') + subparser.add_argument('-jc', '--jobs_conf', default=False, action='store_true', + help='overwrite jobs conf file') + + # Archive + subparser = subparsers.add_parser( + 'archive', description='archives an experiment') + subparser.add_argument('expid', help='experiment identifier') + + # Unarchive + subparser = subparsers.add_parser( + 'unarchive', description='unarchives an experiment') + subparser.add_argument('expid', help='experiment identifier') + + # Readme + subparsers.add_parser('readme', description='show readme') + + # Changelog + subparsers.add_parser('changelog', description='show changelog') + + args = parser.parse_args() + + Log.set_console_level(args.logconsole) + Log.set_file_level(args.logfile) + + if args.command == 'run': + return Autosubmit.run_experiment(args.expid, args.notransitive) + elif args.command == 'expid': + return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, + args.operational) != '' + elif args.command == 'delete': + return Autosubmit.delete(args.expid, args.force) + elif args.command == 'monitor': + return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, + args.filter_type, args.hide, args.txt, args.group_by, args.expand, + args.expand_status, args.hide_groups, args.notransitive, args.check_wrapper, args.txt_logfiles) + elif args.command == 'stats': + return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, + args.notransitive) + elif args.command == 'clean': + return Autosubmit.clean(args.expid, args.project, args.plot, args.stats) + elif args.command == 'recovery': + return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide, args.group_by, + args.expand, args.expand_status, args.notransitive, args.no_recover_logs) + elif args.command == 'check': + return Autosubmit.check(args.expid, args.notransitive) + elif args.command == 'inspect': + return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, + args.filter_type, args.notransitive, args.force, args.check_wrapper) + elif args.command == 'describe': + return Autosubmit.describe(args.expid) + elif args.command == 'migrate': + return Autosubmit.migrate(args.expid, args.offer, args.pickup) + elif args.command == 'create': + return Autosubmit.create(args.expid, args.noplot, args.hide, args.output, args.group_by, args.expand, + args.expand_status, args.notransitive, args.check_wrapper) + elif args.command == 'configure': + if not args.advanced or (args.advanced and dialog is None): + return Autosubmit.configure(args.advanced, args.databasepath, args.databasefilename, + args.localrootpath, args.platformsconfpath, args.jobsconfpath, + args.smtphostname, args.mailfrom, args.all, args.local) + else: + return Autosubmit.configure_dialog() + elif args.command == 'install': + return Autosubmit.install() + elif args.command == 'setstatus': + return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, + args.filter_chunks, args.filter_status, args.filter_type, args.hide, + args.group_by, args.expand, args.expand_status, args.notransitive, args.check_wrapper) + elif args.command == 'testcase': + return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, + args.HPC, args.branch) + elif args.command == 'test': + return Autosubmit.test(args.expid, args.chunks, args.member, args.stardate, args.HPC, args.branch) + elif args.command == 'refresh': + return Autosubmit.refresh(args.expid, args.model_conf, args.jobs_conf) + elif args.command == 'archive': + return Autosubmit.archive(args.expid) + elif args.command == 'unarchive': + return Autosubmit.unarchive(args.expid) + + elif args.command == 'readme': + if os.path.isfile(Autosubmit.readme_path): + with open(Autosubmit.readme_path) as f: + print(f.read()) + return True + return False + elif args.command == 'changelog': + if os.path.isfile(Autosubmit.changes_path): + with open(Autosubmit.changes_path) as f: + print(f.read()) + return True + return False + except Exception as e: + from traceback import format_exc + Log.critical( + 'Unhandled exception on Autosubmit: {0}\n{1}', e, format_exc(10)) + + return False + + @staticmethod + def _delete_expid(expid_delete): + """ + Removes an experiment from path and database + + :type expid_delete: str + :param expid_delete: identifier of the experiment to delete + """ + if expid_delete == '' or expid_delete is None and not os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid_delete)): + Log.info("Experiment directory does not exist.") + else: + Log.info("Removing experiment directory...") + ret = False + if pwd.getpwuid(os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name == os.getlogin(): + try: + + shutil.rmtree(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)) + except OSError as e: + Log.warning('Can not delete experiment folder: {0}', e) + return ret + Log.info("Deleting experiment from database...") + ret = delete_experiment(expid_delete) + if ret: + Log.result("Experiment {0} deleted".format(expid_delete)) + else: + Log.warning( + "Current User is not the Owner {0} can not be deleted!", expid_delete) + return ret + + @staticmethod + def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False): + """ + Creates a new experiment for given HPC + + :param operational: if true, creates an operational experiment + :type operational: bool + :type hpc: str + :type description: str + :type copy_id: str + :type dummy: bool + :param hpc: name of the main HPC for the experiment + :param description: short experiment's description. + :param copy_id: experiment identifier of experiment to copy + :param dummy: if true, writes a default dummy configuration for testing + :param test: if true, creates an experiment for testing + :return: experiment identifier. If method fails, returns ''. + :rtype: str + """ + BasicConfig.read() + + log_path = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, 'ASlogs', 'expid.log'.format(os.getuid())) + try: + Log.set_file(log_path) + except IOError as e: + Log.error("Can not create log file in path {0}: {1}".format( + log_path, e.message)) + exp_id = None + if description is None: + Log.error("Missing experiment description.") + return '' + if hpc is None: + Log.error("Missing HPC.") + return '' + if not copy_id: + exp_id = new_experiment( + description, Autosubmit.autosubmit_version, test, operational) + if exp_id == '': + return '' + try: + os.mkdir(os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id)) + + os.mkdir(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id, 'conf')) + Log.info("Copying config files...") + + # autosubmit config and experiment copied from AS. + files = resource_listdir('autosubmit.config', 'files') + for filename in files: + if resource_exists('autosubmit.config', 'files/' + filename): + index = filename.index('.') + new_filename = filename[:index] + \ + "_" + exp_id + filename[index:] + + if filename == 'platforms.conf' and BasicConfig.DEFAULT_PLATFORMS_CONF != '': + content = open(os.path.join( + BasicConfig.DEFAULT_PLATFORMS_CONF, filename)).read() + elif filename == 'jobs.conf' and BasicConfig.DEFAULT_JOBS_CONF != '': + content = open(os.path.join( + BasicConfig.DEFAULT_JOBS_CONF, filename)).read() + else: + content = resource_string( + 'autosubmit.config', 'files/' + filename) + + conf_new_filename = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id, "conf", new_filename) + Log.debug(conf_new_filename) + open(conf_new_filename, 'w').write(content) + Autosubmit._prepare_conf_files( + exp_id, hpc, Autosubmit.autosubmit_version, dummy) + except (OSError, IOError) as e: + Log.error( + "Can not create experiment: {0}\nCleaning...".format(e)) + Autosubmit._delete_expid(exp_id) + return '' + else: + try: + if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id)): + exp_id = copy_experiment( + copy_id, description, Autosubmit.autosubmit_version, test, operational) + if exp_id == '': + return '' + dir_exp_id = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, exp_id) + os.mkdir(dir_exp_id) + os.mkdir(dir_exp_id + '/conf') + Log.info("Copying previous experiment config directories") + conf_copy_id = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, copy_id, "conf") + files = os.listdir(conf_copy_id) + for filename in files: + if os.path.isfile(os.path.join(conf_copy_id, filename)): + new_filename = filename.replace(copy_id, exp_id) + content = open(os.path.join( + conf_copy_id, filename), 'r').read() + open(os.path.join(dir_exp_id, "conf", + new_filename), 'w').write(content) + Autosubmit._prepare_conf_files( + exp_id, hpc, Autosubmit.autosubmit_version, dummy) + ##### + autosubmit_config = AutosubmitConfig( + copy_id, BasicConfig, ConfigParserFactory()) + if autosubmit_config.check_conf_files(): + project_type = autosubmit_config.get_project_type() + if project_type == "git": + autosubmit_config.check_proj() + autosubmit_git = AutosubmitGit(copy_id[0]) + Log.info("checking model version...") + if not autosubmit_git.check_commit(autosubmit_config): + return False + ##### + else: + Log.critical( + "The previous experiment directory does not exist") + return '' + except (OSError, IOError) as e: + Log.error( + "Can not create experiment: {0}\nCleaning...".format(e)) + Autosubmit._delete_expid(exp_id) + return '' + + Log.debug("Creating temporal directory...") + exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) + tmp_path = os.path.join(exp_id_path, "tmp") + os.mkdir(tmp_path) + os.chmod(tmp_path, 0o775) + os.mkdir(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR)) + os.chmod(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR), 0o775) + Log.debug("Creating temporal remote directory...") + remote_tmp_path = os.path.join(tmp_path, "LOG_" + exp_id) + os.mkdir(remote_tmp_path) + os.chmod(remote_tmp_path, 0o775) + + Log.debug("Creating pkl directory...") + os.mkdir(os.path.join(exp_id_path, "pkl")) + + Log.debug("Creating plot directory...") + os.mkdir(os.path.join(exp_id_path, "plot")) + os.chmod(os.path.join(exp_id_path, "plot"), 0o775) + Log.result("Experiment registered successfully") + Log.user_warning("Remember to MODIFY the config files!") + return exp_id + + @staticmethod + def delete(expid, force): + """ + Deletes and experiment from database and experiment's folder + + :type force: bool + :type expid: str + :param expid: identifier of the experiment to delete + :param force: if True, does not ask for confirmation + + :returns: True if succesful, False if not + :rtype: bool + """ + log_path = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, "ASlogs", 'delete.log'.format(os.getuid())) + try: + Log.set_file(log_path) + except IOError as e: + Log.error("Can not create log file in path {0}: {1}".format( + log_path, e.message)) + + if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid)): + if force or Autosubmit._user_yes_no_query("Do you want to delete " + expid + " ?"): + return Autosubmit._delete_expid(expid) + else: + Log.info("Quitting...") + return False + else: + Log.error("The experiment does not exist") + return True + + @staticmethod + def _load_parameters(as_conf, job_list, platforms): + # Load parameters + Log.debug("Loading parameters...") + parameters = as_conf.load_parameters() + for platform_name in platforms: + platform = platforms[platform_name] + platform.add_parameters(parameters) + + platform = platforms[as_conf.get_platform().lower()] + platform.add_parameters(parameters, True) + + job_list.parameters = parameters + + @staticmethod + def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): + """ + Generates cmd files experiment. + + :type expid: str + :param expid: identifier of experiment to be run + :return: True if run to the end, False otherwise + :rtype: bool + """ + + if expid is None: + Log.critical("Missing experiment id") + + BasicConfig.read() + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + if os.path.exists(os.path.join(tmp_path, 'autosubmit.lock')): + locked = True + else: + locked = False + + if not os.path.exists(exp_path): + Log.critical( + "The directory %s is needed and does not exist" % exp_path) + Log.warning("Does an experiment with the given id exist?") + return 1 + Log.info("Starting inspect command") + Log.set_file(os.path.join( + tmp_path, BasicConfig.LOCAL_ASLOG_DIR, 'generate.log')) + os.system('clear') + signal.signal(signal.SIGINT, signal_handler) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not generate scripts with invalid configuration') + return False + project_type = as_conf.get_project_type() + if project_type != "none": + # Check proj configuration + as_conf.check_proj() + safetysleeptime = as_conf.get_safetysleeptime() + Log.debug("The Experiment name is: {0}", expid) + Log.debug("Sleep: {0}", safetysleeptime) + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, + "pkl", "job_packages_" + expid + ".db"), 0o664) + + packages_persistence.reset_table(True) + job_list_original = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) + job_list = copy.deepcopy(job_list_original) + job_list.packages_dict = {} + + Log.debug("Length of the jobs list: {0}", len(job_list)) + + # variables to be updated on the fly + safetysleeptime = as_conf.get_safetysleeptime() + Log.debug("Sleep: {0}", safetysleeptime) + # Generate + Log.info("Starting to generate cmd scripts") + + if not isinstance(job_list, type([])): + jobs = [] + jobs_cw = [] + if check_wrapper and (not locked or (force and locked)): + Log.info("Generating all cmd script adapted for wrappers") + jobs = job_list.get_uncompleted() + + jobs_cw = job_list.get_completed() + else: + if (force and not locked) or (force and locked): + Log.info("Overwritting all cmd scripts") + jobs = job_list.get_job_list() + elif locked: + Log.warning( + "There is a .lock file and not -f, generating only all unsubmitted cmd scripts") + jobs = job_list.get_unsubmitted() + else: + Log.info("Generating cmd scripts only for selected jobs") + if filter_chunks: + fc = filter_chunks + Log.debug(fc) + if fc == 'Any': + jobs = job_list.get_job_list() + else: + # noinspection PyTypeChecker + data = json.loads(Autosubmit._create_json(fc)) + for date_json in data['sds']: + date = date_json['sd'] + jobs_date = [j for j in job_list.get_job_list() if date2str( + j.date) == date] + + for member_json in date_json['ms']: + member = member_json['m'] + jobs_member = [j for j in jobs_date if j.member == member] + + for chunk_json in member_json['cs']: + chunk = int(chunk_json) + jobs = jobs + \ + [job for job in [j for j in jobs_member if j.chunk == chunk]] + + elif filter_status: + Log.debug( + "Filtering jobs with status {0}", filter_status) + if filter_status == 'Any': + jobs = job_list.get_job_list() + else: + fs = Autosubmit._get_status(filter_status) + jobs = [job for job in [j for j in job_list.get_job_list() if j.status == fs]] + + elif filter_section: + ft = filter_section + Log.debug(ft) + + if ft == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.section == ft: + jobs.append(job) + elif lst: + jobs_lst = lst.split() + + if jobs == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.name in jobs_lst: + jobs.append(job) + else: + jobs = job_list.get_job_list() + if isinstance(jobs, type([])): + referenced_jobs_to_remove = set() + for job in jobs: + for child in job.children: + if child not in jobs: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs: + referenced_jobs_to_remove.add(parent) + + for job in jobs: + job.status = Status.WAITING + + Autosubmit.generate_scripts_andor_wrappers( + as_conf, job_list, jobs, packages_persistence, False) + if len(jobs_cw) > 0: + referenced_jobs_to_remove = set() + for job in jobs_cw: + for child in job.children: + if child not in jobs_cw: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_cw: + referenced_jobs_to_remove.add(parent) + + for job in jobs_cw: + job.status = Status.WAITING + Autosubmit.generate_scripts_andor_wrappers( + as_conf, job_list, jobs_cw, packages_persistence, False) + + Log.info("no more scripts to generate, now proceed to check them manually") + time.sleep(safetysleeptime) + return True + + @staticmethod + def generate_scripts_andor_wrappers(as_conf, job_list, jobs_filtered, packages_persistence, only_wrappers=False): + """ + as_conf: AutosubmitConfig object + job_list: JobList object, contains a list of jobs + jobs_filtered: list of jobs + packages_persistence: Database handler + only_wrappers: True + """ + job_list._job_list = jobs_filtered + job_list.update_list(as_conf, False) + # Identifying the submitter and loading it + submitter = Autosubmit._get_submitter(as_conf) + # Function depending on the submitter + submitter.load_platforms(as_conf) + # Identifying HPC from config files + hpcarch = as_conf.get_platform() + # + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = submitter.platforms[job.platform_name.lower()] + # noinspection PyTypeChecker + platforms_to_test.add(job.platform) + # case setstatus + job_list.check_scripts(as_conf) + job_list.update_list(as_conf, False) + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + while job_list.get_active(): + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers) + + job_list.update_list(as_conf, False) + + @staticmethod + def run_experiment(expid, notransitive=False): + """ + Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). + + :type expid: str + :param expid: identifier of experiment to be run + :return: True if run to the end, False otherwise + :rtype: bool + """ + if expid is None: + Log.critical("Missing experiment id") + + BasicConfig.read() + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) + if not os.path.exists(aslogs_path): + os.mkdir(aslogs_path) + os.chmod(aslogs_path, 0o775) + if not os.path.exists(exp_path): + Log.critical( + "The directory %s is needed and does not exist" % exp_path) + Log.warning("Does an experiment with the given id exist?") + return 1 + + # checking host whitelist + import platform + host = platform.node() + print(host) + if BasicConfig.ALLOWED_HOSTS and host not in BasicConfig.ALLOWED_HOSTS: + Log.info("\n Autosubmit run command is not allowed on this host") + return False + + # checking if there is a lock file to avoid multiple running on the same expid + try: + with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): + Log.info( + "Preparing .lock file to avoid multiple instances with same experiment id") + + Log.set_file(os.path.join(aslogs_path, 'run.log')) + os.system('clear') + + signal.signal(signal.SIGINT, signal_handler) + + as_conf = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not run with invalid configuration') + return False + + project_type = as_conf.get_project_type() + if project_type != "none": + # Check proj configuration + as_conf.check_proj() + + hpcarch = as_conf.get_platform() + + safetysleeptime = as_conf.get_safetysleeptime() + retrials = as_conf.get_retrials() + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + + Log.debug("The Experiment name is: {0}", expid) + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Default retrials: {0}", retrials) + + Log.info("Starting job submission...") + + pkl_dir = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) + + Log.debug( + "Starting from job list restored from {0} files", pkl_dir) + + Log.debug("Length of the jobs list: {0}", len(job_list)) + + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + + # check the job list script creation + Log.debug("Checking experiment templates...") + + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = submitter.platforms[job.platform_name.lower( + )] + # noinspection PyTypeChecker + platforms_to_test.add(job.platform) + + job_list.check_scripts(as_conf) + + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + + if as_conf.get_wrapper_type() != 'none': + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid, "pkl", "job_packages_" + expid + ".db"), 0o664) + packages = packages_persistence.load() + for (exp_id, package_name, job_name) in packages: + if package_name not in job_list.packages_dict: + job_list.packages_dict[package_name] = [] + job_list.packages_dict[package_name].append( + job_list.get_job_by_name(job_name)) + + for package_name, jobs in list(job_list.packages_dict.items()): + from job.job import WrapperJob + wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, + None, + None, jobs[0].platform, as_conf) + job_list.job_package_map[jobs[0].id] = wrapper_job + job_list.update_list(as_conf) + job_list.save() + ######################### + # AUTOSUBMIT - MAIN LOOP + ######################### + # Main loop. Finishing when all jobs have been submitted + while job_list.get_active(): + if Autosubmit.exit: + return 2 + # reload parameters changes + Log.debug("Reloading parameters...") + as_conf.reload() + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + # variables to be updated on the fly + total_jobs = len(job_list.get_job_list()) + Log.info( + "\n\n{0} of {1} jobs remaining ({2})".format(total_jobs - len(job_list.get_completed()), + total_jobs, + time.strftime("%H:%M"))) + safetysleeptime = as_conf.get_safetysleeptime() + Log.debug("Sleep: {0}", safetysleeptime) + default_retrials = as_conf.get_retrials() + Log.debug("Number of retrials: {0}", default_retrials) + + check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() + Log.debug('WRAPPER CHECK TIME = {0}'.format( + check_wrapper_jobs_sleeptime)) + + save = False + + slurm = [] + for platform in platforms_to_test: + list_jobid = "" + completed_joblist = [] + list_prevStatus = [] + queuing_jobs = job_list.get_in_queue_grouped_id( + platform) + for job_id, job in list(queuing_jobs.items()): + if job_list.job_package_map and job_id in job_list.job_package_map: + Log.debug( + 'Checking wrapper job with id ' + str(job_id)) + wrapper_job = job_list.job_package_map[job_id] + check_wrapper = True + if wrapper_job.status == Status.RUNNING: + check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now( + ) - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False + if check_wrapper: + wrapper_job.checked_time = datetime.datetime.now() + platform.check_job(wrapper_job) + Log.info( + 'Wrapper job ' + wrapper_job.name + ' is ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) + wrapper_job.check_status( + wrapper_job.new_status) + save = True + else: + Log.info( + "Waiting for wrapper check time: {0}\n", check_wrapper_jobs_sleeptime) + else: + job = job[0] + prev_status = job.status + if job.status == Status.FAILED: + continue + + if platform.type == "slurm": + list_jobid += str(job_id) + ',' + list_prevStatus.append(prev_status) + completed_joblist.append(job) + else: + platform.check_job(job) + if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): + if as_conf.get_notifications() == 'true': + if Status.VALUE_TO_KEY[job.status] in job.notify_on: + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, + Status.VALUE_TO_KEY[prev_status], + Status.VALUE_TO_KEY[job.status], + as_conf.get_mails_to()) + save = True + + if platform.type == "slurm" and list_jobid != "": + slurm.append( + [platform, list_jobid, list_prevStatus, completed_joblist]) + # END LOOP + for platform_jobs in slurm: + platform = platform_jobs[0] + jobs_to_check = platform_jobs[1] + platform.check_Alljobs( + platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + + for j_Indx in range(0, len(platform_jobs[3])): + prev_status = platform_jobs[2][j_Indx] + job = platform_jobs[3][j_Indx] + + if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): + if as_conf.get_notifications() == 'true': + if Status.VALUE_TO_KEY[job.status] in job.notify_on: + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, + Status.VALUE_TO_KEY[prev_status], + Status.VALUE_TO_KEY[job.status], + as_conf.get_mails_to()) + save = True + + if job_list.update_list(as_conf) or save: + job_list.save() + + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): + job_list.save() + + if Autosubmit.exit: + return 2 + time.sleep(safetysleeptime) + + Log.info("No more jobs to run.") + if len(job_list.get_failed()) > 0: + Log.info("Some jobs have failed and reached maximum retrials") + return False + else: + Log.result("Run successful") + return True + + except portalocker.AlreadyLocked: + Autosubmit.show_lock_warning(expid) + + except WrongTemplateException: + return False + + @staticmethod + def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, + only_wrappers=False): + """ + Gets READY jobs and send them to the platforms if there is available space on the queues + + :param as_conf: autosubmit config object. \n + :type as_conf: AutosubmitConfig Object. \n + :param job_list: JobList as a single entity. \n + :type job_list: JobList() Object. \n + :param platforms_to_test: List of platforms that will be used in the experiment. \n + :type platforms_to_test: Set() of Platform() Object. e.g. EcPlatform(), LsfPlatform(), etc. \n + :return: True if at least one job was submitted, False otherwise + :rtype: bool + """ + save = False + + for platform in platforms_to_test: + Log.debug("\nJobs ready for {1}: {0}", len( + job_list.get_ready(platform)), platform.name) + packages_to_submit, remote_dependencies_dict = JobPackager( + as_conf, platform, job_list).build_packages() + if not inspect: + platform.open_submit_script() + valid_packages_to_submit = [] + for package in packages_to_submit: + try: + if hasattr(package, "name"): + if remote_dependencies_dict and package.name in remote_dependencies_dict['dependencies']: + remote_dependency = remote_dependencies_dict['dependencies'][package.name] + remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] + package.set_job_dependency(remote_dependency_id) + if not only_wrappers: + try: + package.submit( + as_conf, job_list.parameters, inspect) + valid_packages_to_submit.append(package) + except (IOError, OSError): + # write error file + continue + if only_wrappers or inspect: + for innerJob in package._jobs: + innerJob.status = Status.COMPLETED + + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.READY, 0, + package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + if isinstance(package, JobPackageThread): + packages_persistence.save( + package.name, package.jobs, package._expid, inspect) + save = True + except WrongTemplateException as e: + Log.error( + "Invalid parameter substitution in {0} template", e.job_name) + raise + except Exception: + Log.error( + "{0} submission failed due to Unknown error", platform.name) + raise + + if platform.type == "slurm" and not inspect and not only_wrappers: + try: + save = True + if len(valid_packages_to_submit) > 0: + jobs_id = platform.submit_Script() + if jobs_id is None: + raise BaseException( + "Exiting AS being unable to get jobID") + i = 0 + for package in valid_packages_to_submit: + for job in package.jobs: + job.id = str(jobs_id[i]) + Log.info("{0} submitted", job.name) + job.status = Status.SUBMITTED + job.write_submit_time() + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, + package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + if remote_dependencies_dict and package.name in remote_dependencies_dict[ + 'name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + if isinstance(package, JobPackageThread): + packages_persistence.save( + package.name, package.jobs, package._expid, inspect) + i += 1 + + except WrongTemplateException as e: + Log.error( + "Invalid parameter substitution in {0} template", e.job_name) + raise + except Exception: + Log.error("{0} submission failed", platform.name) + raise + + return save + + @staticmethod + def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, + group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False, check_wrapper=False, txt_logfiles=False): + """ + Plots workflow graph for a given experiment with status of each job coded by node color. + Plot is created in experiment's plot folder with name __