From 949bee5d2905ce5901d6b6a834e54fd9c7defb64 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Thu, 21 Oct 2021 11:34:49 +0200 Subject: [PATCH] Added anti lock mechanism on database functions. Added some validation to the improved delete experiment function. Modified a condition on the main exception handling mechanism, trace can be an integer and len(trace) can raise a further exception. --- autosubmit/autosubmit.py | 25 ++++- autosubmit/database/db_common.py | 179 ++++++++++++++++++++++++++++--- bin/autosubmit | 4 +- 3 files changed, 187 insertions(+), 21 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 73a18c2dc..4cfd0135e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -790,14 +790,16 @@ class Autosubmit: error_message += 'Can not delete directory: {0}\n'.format(e.message) try: Log.info("Removing Structure db...") - os.remove(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - BasicConfig.STRUCTURES_DIR, "structure_{0}.db".format(expid_delete))) + structures_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.STRUCTURES_DIR, "structure_{0}.db".format(expid_delete)) + if os.path.exists(structures_path): + os.remove(structures_path) except BaseException as e: error_message += 'Can not delete structure: {0}\n'.format(e.message) try: Log.info("Removing job_data db...") - os.remove(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid_delete))) + job_data_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid_delete)) + if os.path.exists(job_data_path): + os.remove(job_data_path) except BaseException as e: error_message += 'Can not delete job_data: {0}\n'.format(e.message) except OSError as e: @@ -3537,6 +3539,21 @@ class Autosubmit: expid, as_conf.get_version(), Autosubmit.autosubmit_version) as_conf.set_version(Autosubmit.autosubmit_version) return True + + @staticmethod + def update_description(expid, new_description): + Log.info("Checking if experiment exists...") + check_experiment_exists(expid) + Log.info("Experiment found.") + Log.info("Setting {0} description to '{1}'".format( + expid, new_description)) + result = update_experiment_descrip_version( + expid, description=new_description) + if result: + Log.info("Update completed successfully.") + else: + Log.critical("Update failed.") + return True @staticmethod def pkl_fix(expid): diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index e9fe114d9..aaaf4875a 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -22,13 +22,15 @@ Module containing functions to manage autosubmit's database. """ import os import sqlite3 +import multiprocessing +import Queue from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") from autosubmit.config.basicConfig import BasicConfig CURRENT_DATABASE_VERSION = 1 - +TIMEOUT = 10 def create_db(qry): """ @@ -126,8 +128,155 @@ def close_conn(conn, cursor): conn.close() return +def fn_wrapper(database_fn, queue, *args): + # TODO: We can also implement the anti-lock mechanism as function decorators in a next iteration. + result = database_fn(*args) + queue.put(result) + queue.close() def save_experiment(name, description, version): + """ + Stores experiment in database. Anti-lock version. + + :param version: + :type version: str + :param name: experiment's name + :type name: str + :param description: experiment's description + :type description: str + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_save_experiment, queue, name, description, version)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Your experiment {1} couldn't be stored in the database.".format(TIMEOUT, name)) + finally: + proc.terminate() + return result + +def check_experiment_exists(name, error_on_inexistence=True): + """ + Checks if exist an experiment with the given name. Anti-lock version. + + :param error_on_inexistence: if True, adds an error log if experiment does not exists + :type error_on_inexistence: bool + :param name: Experiment name + :type name: str + :return: If experiment exists returns true, if not returns false + :rtype: bool + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_check_experiment_exists, queue, name, error_on_inexistence)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Check if experiment {1} exists failed to complete.".format(TIMEOUT, name)) + finally: + proc.terminate() + return result + +def update_experiment_descrip_version(name, description=None, version=None): + """ + Updates the experiment's description and/or version. Anti-lock version. + + :param name: experiment name (expid) + :rtype name: str + :param description: experiment new description + :rtype description: str + :param version: experiment autosubmit version + :rtype version: str + :return: If description has been update, True; otherwise, False. + :rtype: bool + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_update_experiment_descrip_version, queue, name, description, version)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Update experiment {1} version failed to complete.".format(TIMEOUT, name)) + finally: + proc.terminate() + return result + +def get_autosubmit_version(expid): + """ + Get the minimun autosubmit version needed for the experiment. Anti-lock version. + + :param expid: Experiment name + :type expid: str + :return: If experiment exists returns the autosubmit version for it, if not returns None + :rtype: str + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_get_autosubmit_version, queue, expid)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Get experiment {1} version failed to complete.".format(TIMEOUT, expid)) + finally: + proc.terminate() + return result + +def last_name_used(test=False, operational=False): + """ + Gets last experiment identifier used. Anti-lock version. + + :param test: flag for test experiments + :type test: bool + :param operational: flag for operational experiments + :type test: bool + :return: last experiment identifier used, 'empty' if there is none + :rtype: str + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_last_name_used, queue, test, operational)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Get last named used failed to complete.".format(TIMEOUT)) + finally: + proc.terminate() + return result + +def delete_experiment(experiment_id): + """ + Removes experiment from database. Anti-lock version. + + :param experiment_id: experiment identifier + :type experiment_id: str + :return: True if delete is succesful + :rtype: bool + """ + queue = multiprocessing.Queue(1) + proc = multiprocessing.Process(target=fn_wrapper, args=(_delete_experiment, queue, experiment_id)) + proc.start() + + try: + result = queue.get(True, TIMEOUT) + except Queue.Empty: + raise AutosubmitCritical( + "The database process exceeded the timeout limit {0}s. Delete experiment {1} failed to complete.".format(TIMEOUT, experiment_id)) + finally: + proc.terminate() + return result + +def _save_experiment(name, description, version): """ Stores experiment in database @@ -159,7 +308,7 @@ def save_experiment(name, description, version): return True -def check_experiment_exists(name, error_on_inexistence=True): +def _check_experiment_exists(name, error_on_inexistence=True): """ Checks if exist an experiment with the given name. @@ -194,17 +343,17 @@ def check_experiment_exists(name, error_on_inexistence=True): return True -def update_experiment_descrip_version(name, description=None, version=None): +def _update_experiment_descrip_version(name, description=None, version=None): """ Updates the experiment's description and/or version - :param name: experiment name (expid) - :rtype name: str - :param description: experiment new description - :rtype description: str - :param version: experiment autosubmit version - :rtype version: str - :return: If description has been update, True; otherwise, False. + :param name: experiment name (expid) + :rtype name: str + :param description: experiment new description + :rtype description: str + :param version: experiment autosubmit version + :rtype version: str + :return: If description has been update, True; otherwise, False. :rtype: bool """ if not check_db(): @@ -240,7 +389,7 @@ def update_experiment_descrip_version(name, description=None, version=None): return True -def get_autosubmit_version(expid): +def _get_autosubmit_version(expid): """ Get the minimun autosubmit version needed for the experiment @@ -272,7 +421,7 @@ def get_autosubmit_version(expid): return row[0] -def last_name_used(test=False, operational=False): +def _last_name_used(test=False, operational=False): """ Gets last experiment identifier used @@ -282,7 +431,7 @@ def last_name_used(test=False, operational=False): :type test: bool :return: last experiment identifier used, 'empty' if there is none :rtype: str - """ + """ if not check_db(): return '' try: @@ -322,7 +471,7 @@ def last_name_used(test=False, operational=False): return row[0] -def delete_experiment(experiment_id): +def _delete_experiment(experiment_id): """ Removes experiment from database @@ -333,7 +482,7 @@ def delete_experiment(experiment_id): """ if not check_db(): return False - if not check_experiment_exists(experiment_id, False): + if not _check_experiment_exists(experiment_id, False): # Reference the no anti-lock version. return True try: (conn, cursor) = open_conn() diff --git a/bin/autosubmit b/bin/autosubmit index 2667fa82a..fe1547a25 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -42,7 +42,7 @@ def main(): except AutosubmitError as e: if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): os.remove(os.path.join(Log.file_path, "autosubmit.lock")) - if e.trace is not None and len(e.trace) > 0: + if e.trace is not None: # trace might be int. Log.error("Trace: {0}", e.trace) Log.critical("{1} [eCode={0}]", e.code, e.message) Log.info("More info at https://autosubmit.readthedocs.io/en/latest/faq.html") @@ -50,7 +50,7 @@ def main(): except AutosubmitCritical as e: if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): os.remove(os.path.join(Log.file_path, "autosubmit.lock")) - if e.trace is not None and len(e.trace) > 0: + if e.trace is not None: Log.error("Trace: {0}", e.trace) Log.critical("{1} [eCode={0}]", e.code, e.message) Log.info("More info at https://autosubmit.readthedocs.io/en/latest/faq.html") -- GitLab