diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 24439f359ce36c79d55be92003e631b2c667d765..1fa7c95abe469698447ad3d708019512cdccf386 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -75,6 +75,8 @@ from job.job_grouping import JobGrouping # noinspection PyPackageRequirements from bscearth.utils.log import Log from database.db_common import create_db +from database.db_common import create_details +from database.db_common import get_id_db from experiment.experiment_common import new_experiment from experiment.experiment_common import copy_experiment from database.db_common import delete_experiment @@ -110,6 +112,9 @@ def signal_handler_create(signal_received, frame): """ Log.info('Autosubmit has been closed in an unexpected way. If problems with your experiment arise, review the FAQ.') + + + class Autosubmit: """ Interface class for autosubmit. @@ -1285,7 +1290,7 @@ class Autosubmit: job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) Log.debug("Job list restored from {0} files", pkl_dir) - + if not isinstance(job_list, type([])): jobs = [] @@ -2486,6 +2491,8 @@ class Autosubmit: Log.critical("The directory %s is needed and does not exist." % exp_path) Log.warning("Does an experiment with the given id exist?") return 1 + + # checking if there is a lock file to avoid multiple running on the same expid try: @@ -2603,6 +2610,17 @@ class Autosubmit: fh.flush() os.fsync(fh.fileno()) + # Saving experiment details after lock has been released + try: + # From custom describe + det_expid, det_user, det_created, det_model, det_branch, det_hpc = Autosubmit.simple_describe(expid) + # From db_common + create_details(det_expid, det_user, det_created, det_model, det_branch, det_hpc) + except Exception as ex: + # If details were not saved, don't stop process. + Log.info("Some details were not saved." + str(ex)) + pass + return True # catching Exception except (KeyboardInterrupt, Exception) as e: @@ -3646,4 +3664,65 @@ class Autosubmit: def show_lock_warning(expid): Log.warning("We have detected that there is another Autosubmit instance using the experiment {0}.", expid) Log.warning("We have stopped this execution in order to prevent incoherency errors.") - Log.warning("Stop other Autosubmit instances that are using the experiment {0} and try it again.", expid) \ No newline at end of file + Log.warning("Stop other Autosubmit instances that are using the experiment {0} and try it again.", expid) + + @staticmethod + def simple_describe(experiment_id): + """ + Simplified version of Autosubmit.describe. Show details for specified experiment. + + :param experiment_id: experiment identifier: + :type experiment_id: str + :return: id, user, created, model, branch, hpc + :rtype: 6-tuple + """ + # Default values + id_exp = 0 + user = "" + created = datetime.datetime.fromtimestamp(time.time()) + model = "" + branch = "" + hpc = "" + + BasicConfig.read() + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, experiment_id) + if not os.path.exists(exp_path): + raise Exception("Experiment not found") + as_conf = AutosubmitConfig(experiment_id, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + return False + user = os.stat(as_conf.experiment_file).st_uid + user = pwd.getpwuid(user).pw_name + created = datetime.datetime.fromtimestamp(os.path.getmtime(as_conf.experiment_file)) + project_type = as_conf.get_project_type() + if project_type != "none": + if not as_conf.check_proj(): + return False + if (as_conf.get_svn_project_url()): + model = as_conf.get_svn_project_url() + branch = as_conf.get_svn_project_url() + else: + model = as_conf.get_git_project_origin() + branch = as_conf.get_git_project_branch() + if model is "": + model = "Not Found" + if branch is "": + branch = "Not Found" + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if len(submitter.platforms) == 0: + return False + hpc = as_conf.get_platform() + # Getting id in details + id_exp = get_id_db(experiment_id) + # Log.result("Job Id: {0}", id_exp) + # Log.result("Owner: {0}", user) + # Log.result("Created: {0}", created) + # Log.result("Model: {0}", model) + # Log.result("Branch: {0}", branch) + # Log.result("HPC: {0}", hpc) + + return id_exp, user, created, model, branch, hpc + + diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 5d717e669ba6f7beee073c441f29dc421afdb2af..ec0454e52185fd7664996153182850541a5da02c 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -191,6 +191,38 @@ def check_experiment_exists(name, error_on_inexistence=True): return False return True +def check_details_exists(expid, error_on_inexistence=True): + """ + Checks if details for the experiment with the given name have been already registered + + :param error_on_inexistence: if True, adds an error log if experiment does not exists + :type error_on_inexistence: bool + :param exp_id: Id of experiment in table details + :type exp_id: int + :return: If experiment exists returns true, if not returns false + :rtype: bool + """ + if not check_db(): + return False + try: + (conn, cursor) = open_conn() + except DbException as e: + Log.error('Connection to database could not be established: {0}', e.message) + return False + conn.isolation_level = None + + # SQLite always return a unicode object, but we can change this + # behaviour with the next sentence + conn.text_factory = str + cursor.execute('select exp_id from details where exp_id=:exp_id', {'exp_id': expid}) + row = cursor.fetchone() + close_conn(conn, cursor) + if row is None: + if error_on_inexistence: + Log.error('The details for this experiment could not be found') + return False + return True + def get_autosubmit_version(expid): """ @@ -291,12 +323,29 @@ def delete_experiment(experiment_id): except DbException as e: Log.error('Connection to database could not be established: {0}', e.message) return False + + # Get id of experiment in details + id_exp = get_id_db(experiment_id) + # Deletes details first in case there some FK restriction. There isn't but anyway. + cursor.execute('DELETE FROM details ' + 'WHERE exp_id=:exp_id', {'exp_id': id_exp}) + row = cursor.fetchone() + if row is None: + Log.debug('Details for {0} have been deleted.', experiment_id) + else: + Log.info('Details were not deleted!') + + # Deletes experiment cursor.execute('DELETE FROM experiment ' - 'WHERE name=:name', {'name': experiment_id}) + 'WHERE name=:name', {'name': experiment_id}) row = cursor.fetchone() if row is None: Log.debug('The experiment {0} has been deleted!!!', experiment_id) + + + close_conn(conn, cursor) + return True @@ -330,6 +379,85 @@ def _update_database(version, cursor): Log.info("Update completed") return True +def get_id_db(expid): + """ + Get experiment id from db (not the same as experiment name e.g. a25s) + :param expid: Experiment name (e.g. a25s) + :type expid: string + :return: Id of expid in detials table of autosubmit database + :rtype: int + """ + if not check_db(): + return False + + try: + (conn, cursor) = open_conn() + except DbException as e: + Log.error('Connection to database could not be established: {0}', e.message) + return False + conn.isolation_level = None + + # SQLite always return a unicode object, but we can change this + # behaviour with the next sentence + conn.text_factory = str + # id is an integer while expid is an string + cursor.execute('SELECT id FROM experiment WHERE name=:expid', {'expid': expid}) + row = cursor.fetchone() + close_conn(conn, cursor) + if row is None: + Log.error('The experiment "{0}" does not exist yet!!!', expid) + return None + return row[0] + + + +def create_details(expid, user, created, model, branch, hpc): + """ + Create new details for experiment, or if details exists update details. + :param expid: Experiment Id + :type expid: string + :param user: Creator user + :type user: string + :param created: Date created + :type created: string + :param model: Model used in experiment + :type model: string + :param branch: Branch of model used in experiment + :type branch: string + :param hpc: HPC target + :type hcp: string + :return: + """ + if not check_db(): + return False + try: + (conn, cursor) = open_conn() + except DbException as e: + Log.error('Connection to database could not be established: {0}', e.message) + return False + try: + if check_details_exists(expid, False): + # An update should not happen, but it might have some future usage + cursor.execute('UPDATE details SET user = :user, created = :created , model = :model , branch = :branch, hpc = :hpc WHERE exp_id = :exp_id', + {'exp_id': expid, 'user': user, 'created': created, 'model': model, 'branch': branch, 'hpc': hpc}) + Log.info("Details were updated.") + else: + # Insertion in table details + cursor.execute('INSERT INTO details(exp_id,user,created,model,branch,hpc) VALUES (:exp_id, :user, ' + ':created, :model, :branch, :hpc)', + {'exp_id': expid, 'user': user, 'created': created, 'model': model, 'branch': branch, 'hpc': hpc}) + Log.info("Details were registered.") + except sqlite3.IntegrityError as e: + close_conn(conn, cursor) + Log.error('Could not register detail: {0}'.format(e)) + return False + + conn.commit() + close_conn(conn, cursor) + return True + + + class DbException(Exception): """ @@ -338,3 +466,4 @@ class DbException(Exception): def __init__(self, message): self.message = message + diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index c772f0e6385361ad94811a9360ad35911894ebae..272dfbd2ee4c4b1cf8b2c4fdb61a829bc325fdb4 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -18,25 +18,22 @@ # along with Autosubmit. If not, see . import networkx +import time +import sys from networkx.algorithms.dag import is_directed_acyclic_graph from networkx import DiGraph from networkx import dfs_edges from networkx import NetworkXError - def transitive_reduction(graph): - if not is_directed_acyclic_graph(graph): - raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") - reduced_graph = DiGraph() - reduced_graph.add_nodes_from(graph.nodes()) - for u in graph: - u_edges = set(graph[u]) - for v in graph[u]: - u_edges -= {y for x, y in dfs_edges(graph, v)} - reduced_graph.add_edges_from((u, v) for v in u_edges) - return reduced_graph - + """ + Simplified version of transitive reduction. Uses algorithm from networkx. + Although previous implementation was very similar, it lacked important + optimizations like avoiding repeat DFS on the same node multiple time, + that made it very slow. This one works 50% faster. + """ + return networkx.algorithms.dag.transitive_reduction(graph) class Dependency(object): """