diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000000000000000000000000000000000000..2ba850c90bab1159a214ec1c1479f646a96200f9 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +omit= \ No newline at end of file diff --git a/.gitignore b/.gitignore index 40e7cb3ec7e789f1d55658d9559726592bd3eb72..fe2b46562e35aa2536d9abd67e2c116b1508bdc9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,8 +16,10 @@ build/ # Coverage files coverage/ -/.coverage +.coverage test/coverage.xml +coverage.xml +test/htmlcov # Docker dockerfiles/id_rsa* diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f0f0a2895a8c4965919d7d6d8c4045ce8b0a805e..1f8da30d6bf8dd6f89155eff736cc37961e95af8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -21,14 +21,14 @@ prepare: - git submodule update --init --recursive - conda update -n base -c defaults conda - conda update conda - - conda install -n autosubmit3 coverage=6 + - conda install -n autosubmit3 pytest pytest-cov pytest-mock - conda env update -f environment.yml -n autosubmit3 python=3.7.3 test_python3: stage: test script: - conda activate autosubmit3 - - python3 -m 'nose' --exclude=regression --verbosity=3 test/unit --with-coverage --cover-package=autosubmit --cover-inclusive --cover-xml --cover-xml-file=test/coverage.xml + - pytest -m "not postgres" coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/' # These artifacts are saved with every build in GitLab and can be reviewed later. If # we have a folder with HTML files, as in this example, users can navigate with their diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index bb7ea8fd9a6d65ac7daf4d3a6607e7753abc31ee..77ec83f6188b19a8e787300f423517add091c798 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1487,8 +1487,7 @@ class Autosubmit: safetysleeptime = as_conf.get_safetysleeptime() Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) + packages_persistence = JobPackagePersistence(expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0o644) @@ -2014,8 +2013,7 @@ class Autosubmit: Log.debug("Loading job packages") # Packages == wrappers and jobs inside wrappers. Name is also missleading. try: - packages_persistence = JobPackagePersistence(os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages_persistence = JobPackagePersistence(expid) except IOError as e: raise AutosubmitError( "job_packages not found", 6016, str(e)) @@ -2669,8 +2667,7 @@ class Autosubmit: try: if len(as_conf.experiment_data.get("WRAPPERS", {})) > 0 and check_wrapper: # Class constructor creates table if it does not exist - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) + packages_persistence = JobPackagePersistence(expid) # Permissions os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0o644) # Database modification @@ -2682,11 +2679,9 @@ class Autosubmit: packages_persistence, True) packages = packages_persistence.load(True) - packages += JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + packages += JobPackagePersistence(expid).load() else: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + packages = JobPackagePersistence(expid).load() except BaseException as e: raise AutosubmitCritical("Issues during the wrapper loading, may be related to IO issues", 7040, str(e)) finally: @@ -3001,8 +2996,7 @@ class Autosubmit: raise AutosubmitCritical("Couldn't restore the experiment workflow", 7040, str(e)) try: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + packages = JobPackagePersistence(expid).load() groups_dict = dict() if group_by: @@ -4058,15 +4052,20 @@ class Autosubmit: Creates a new database instance for autosubmit at the configured path """ - if not os.path.exists(BasicConfig.DB_PATH): - Log.info("Creating autosubmit database...") - qry = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1]) - #qry = importlib.resources.read_text('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1]) - if not create_db(qry): - raise AutosubmitCritical("Can not write database file", 7004) - Log.result("Autosubmit database created successfully") + if BasicConfig.DATABASE_BACKEND == 'sqlite': + if not os.path.exists(BasicConfig.DB_PATH): + Log.info("Creating autosubmit database...") + qry = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1]) + #qry = importlib.resources.read_text('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1]) + if not create_db(qry): + raise AutosubmitCritical("Can not write database file", 7004) + Log.result("Autosubmit database created successfully") + else: + raise AutosubmitCritical("Database already exists.", 7004) else: - raise AutosubmitCritical("Database already exists.", 7004) + Log.info("Creating autosubmit Postgres database...") + if not create_db(''): + raise AutosubmitCritical("Failed to create Postgres database", 7004) return True @staticmethod @@ -4826,8 +4825,7 @@ class Autosubmit: job_list.save() as_conf.save() try: - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages_persistence = JobPackagePersistence(expid) packages_persistence.reset_table() packages_persistence.reset_table(True) except: @@ -5644,9 +5642,7 @@ class Autosubmit: #Visualization stuff that should be in a function common to monitor , create, -cw flag, inspect and so on if not noplot: if as_conf.get_wrapper_type() != 'none' and check_wrapper: - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) + packages_persistence = JobPackagePersistence(expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0o775) packages_persistence.reset_table(True) @@ -5658,8 +5654,7 @@ class Autosubmit: packages = packages_persistence.load(True) else: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + packages = JobPackagePersistence(expid).load() groups_dict = dict() if group_by: status = list() @@ -5822,8 +5817,7 @@ class Autosubmit: if storage_type == 'pkl': return JobListPersistencePkl() elif storage_type == 'db': - return JobListPersistenceDb(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_list_" + expid) + return JobListPersistenceDb(expid) raise AutosubmitCritical('Storage type not known', 7014) @staticmethod diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 9f93e04c5ee48c9378fc0f0751b1a3a87bf01b42..7c35c450c1106a770567d107366ceb4c07f5d487 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -28,6 +28,11 @@ from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmit.database import tables, session +from sqlalchemy import delete, select, Connection, insert, update, func +from sqlalchemy.schema import CreateTable +from typing import List, Optional, cast + CURRENT_DATABASE_VERSION = 1 TIMEOUT = 10 @@ -38,6 +43,9 @@ def create_db(qry): :param qry: query to create the new database :type qry: str """ + if BasicConfig.DATABASE_BACKEND == 'postgres': + return create_db_pg() + try: (conn, cursor) = open_conn(False) except DbException as e: @@ -144,8 +152,12 @@ def save_experiment(name, description, version): :param description: experiment's description :type description: str """ + fn = _save_experiment + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _save_experiment_sqlalchemy + queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_save_experiment, queue, name, description, version)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, description, version)) proc.start() try: @@ -168,8 +180,11 @@ def check_experiment_exists(name, error_on_inexistence=True): :return: If experiment exists returns true, if not returns false :rtype: bool """ + fn = _check_experiment_exists + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _check_experiment_exists_sqlalchemy queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_check_experiment_exists, queue, name, error_on_inexistence)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, error_on_inexistence)) proc.start() try: @@ -194,8 +209,11 @@ def update_experiment_descrip_version(name, description=None, version=None): :return: If description has been update, True; otherwise, False. :rtype: bool """ + fn = _update_experiment_descrip_version + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _update_experiment_descrip_version_sqlalchemy queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_update_experiment_descrip_version, queue, name, description, version)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, description, version)) proc.start() try: @@ -215,9 +233,12 @@ def get_autosubmit_version(expid): :type expid: str :return: If experiment exists returns the autosubmit version for it, if not returns None :rtype: str - """ + """ + fn = _get_autosubmit_version + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _get_autosubmit_version_sqlalchemy queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_get_autosubmit_version, queue, expid)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, expid)) proc.start() try: @@ -240,8 +261,11 @@ def last_name_used(test=False, operational=False): :return: last experiment identifier used, 'empty' if there is none :rtype: str """ + fn = _last_name_used + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _last_name_used_sqlalchemy queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_last_name_used, queue, test, operational)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, test, operational)) proc.start() try: @@ -262,8 +286,11 @@ def delete_experiment(experiment_id): :return: True if delete is successful :rtype: bool """ + fn = _delete_experiment + if BasicConfig.DATABASE_BACKEND == 'postgres': + fn = _delete_experiment_sqlalchemy queue = multiprocessing.Queue(1) - proc = multiprocessing.Process(target=fn_wrapper, args=(_delete_experiment, queue, experiment_id)) + proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, experiment_id)) proc.start() try: @@ -349,6 +376,8 @@ def _check_experiment_exists(name, error_on_inexistence=True): return True def get_experiment_descrip(expid): + if BasicConfig.DATABASE_BACKEND == 'postgres': + return get_experiment_descrip_sqlalchemy(expid) if not check_db(): return False try: @@ -568,3 +597,196 @@ class DbException(Exception): def __init__(self, message): self.message = message + + +# Code added for SQLAlchemy support + + +def _get_sqlalchemy_conn() -> Connection: + """Return the database connection. + + It captures any exception, returning an ``AutosubmitCritical`` + as in the previous SQLite-only code. With this function we + can use a context-manager and keep the previous behaviour + intact. + """ + try: + return session.create_engine().connect() + except Exception as e: + raise AutosubmitCritical("Could not establish a connection to database", 7001, str(e)) + + +def create_db_pg() -> bool: + """Create the Postgres tables (not really the database). + + This function is the equivalent to the old ``create_db`` function + for SQLite, with the difference that that function has a parameter + with the query. + + However, at the moment of writing, the only use of that function is + to execute the contents of ``autosubmit.sql``, which create the + ``experiment`` and the ``db_version`` tables. + + This whole module was not well-designed, and is up for a refactoring + at some point in the future, where both code might be superseded by + a better version. + + :raise AutosubmitCritical: If there are any issues with the database. + """ + tables_to_create = [tables.ExperimentTable, tables.DBVersionTable] + + try: + with _get_sqlalchemy_conn() as conn: + for table in tables_to_create: + conn.execute(CreateTable(table, if_not_exists=True)) + conn.execute(delete(tables.DBVersionTable)) + conn.execute(insert(tables.DBVersionTable).values({"version": 1})) + conn.commit() + except AutosubmitCritical: + raise + except Exception as exc: + raise AutosubmitCritical("Database can not be created", 7004, str(exc)) + + return True + + +def _save_experiment_sqlalchemy(name: str, description: str, version: str) -> bool: + with _get_sqlalchemy_conn() as conn: + try: + conn.execute(insert(tables.ExperimentTable).values( + name=name, description=description, autosubmit_version=version) + ) + conn.commit() + except AutosubmitCritical: + raise + except Exception as exc: + conn.rollback() + raise AutosubmitCritical( + 'Could not register experiment', 7005, str(exc)) + return True + + +def _check_experiment_exists_sqlalchemy(name: str, error_on_inexistence=True) -> bool: + row = None + with _get_sqlalchemy_conn() as conn: + try: + query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == name) + row = conn.execute(query).one_or_none() + except AutosubmitCritical: + raise + except Exception as exc: + conn.rollback() + raise AutosubmitCritical( + 'Could not register experiment', 7005, str(exc)) + + if row is None: + if error_on_inexistence: + raise AutosubmitCritical( + 'The experiment name "{0}" does not exist yet!!!'.format(name), 7005) + if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, name)): + try: + _save_experiment(name, 'No description', "3.14.0") + except BaseException as e: + pass + return True + return False + + return True + + +def get_experiment_descrip_sqlalchemy(expid) -> List[List[str]]: + with _get_sqlalchemy_conn() as conn: + query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == expid) + row = conn.execute(query).one_or_none() + + if row: + return [[row.description]] + return [] + + +def _update_experiment_descrip_version_sqlalchemy( + name: str, + description: Optional[str] = None, + version: Optional[str] = None) -> bool: + # Conditional update statement + if description is None and version is None: + raise AutosubmitCritical("Not enough data to update {}.".format(name), 7005) + + query = update(tables.ExperimentTable).where(tables.ExperimentTable.c.name == name) + vals = {} + if isinstance(description, str): + vals["description"] = description + if isinstance(version, str): + vals["autosubmit_version"] = version + query = query.values(vals) + + with _get_sqlalchemy_conn() as conn: + result = conn.execute(query) + conn.commit() + + if result.rowcount == 0: + raise AutosubmitCritical("Update on experiment {} failed.".format(name), 7005) + return True + + +def _get_autosubmit_version_sqlalchemy(expid) -> str: + with _get_sqlalchemy_conn() as conn: + query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == expid) + row = conn.execute(query).one_or_none() + + if row is None: + raise AutosubmitCritical( + 'The experiment "{0}" does not exist'.format(expid), 7005) + return row.autosubmit_version + + +def _last_name_used_sqlalchemy(test=False, operational=False) -> str: + if test: + condition = tables.ExperimentTable.c.name.like("t%") + elif operational: + condition = tables.ExperimentTable.c.name.like("o%") + else: + condition = tables.ExperimentTable.c.name.not_like( + "t%" + ) & tables.ExperimentTable.c.name.not_like("o%") + + sub_query = select(func.max(tables.ExperimentTable.c.id).label("id")).where( + condition + & tables.ExperimentTable.c.autosubmit_version.is_not(None) + & tables.ExperimentTable.c.autosubmit_version.not_like("%3.0.0b%") + ).scalar_subquery() + query = select(tables.ExperimentTable.c.name).where( + tables.ExperimentTable.c.id == sub_query + ) + + with _get_sqlalchemy_conn() as conn: + row = conn.execute(query).one_or_none() + + if row is None: + return 'empty' + + # If starts by number (during 3.0 beta some jobs starting with numbers where created), returns empty. + try: + if row.name.isnumeric(): + return 'empty' + else: + return row.name + except ValueError: + return row.name + + +def _delete_experiment_sqlalchemy(experiment_id: str) -> bool: + if not _check_experiment_exists_sqlalchemy(experiment_id, False): # Reference the no anti-lock version. + return True + + with _get_sqlalchemy_conn() as conn: + query = delete(tables.ExperimentTable).where( + tables.ExperimentTable.c.name == experiment_id + ) + result = conn.execute(query) + conn.commit() + + if cast(int, result.rowcount) > 0: + Log.debug("The experiment {0} has been deleted!!!", experiment_id) + return True + diff --git a/autosubmit/database/db_manager.py b/autosubmit/database/db_manager.py index 6e7376b9b1fbdd9c9b3081f42493c23f52ece210..fb7c39f9d9a23f7b89a89959f034e5c49d6dffc4 100644 --- a/autosubmit/database/db_manager.py +++ b/autosubmit/database/db_manager.py @@ -20,6 +20,13 @@ import sqlite3 import os +from typing import Optional, Protocol, Union, cast +from sqlalchemy import Connection, Engine, delete, insert, select +from autosubmit.database import session +from autosubmit.database.tables import get_table_from_name +from sqlalchemy.schema import CreateTable, CreateSchema, DropTable +# FIXME: what happens in DbManager when db_version is None or 0000, etc? + class DbManager(object): """ @@ -234,3 +241,200 @@ class DbManager(object): for condition in where: select_command += ' AND ' + condition return select_command + + +class DatabaseManager(Protocol): + """Common interface for database managers. + + We used a protocol here to avoid having to modify the existing + SQLite code (as we would if we used an abstract/ABC class). + + And the new database manager will "quack" like the other one does. + """ + + connection: Union[Connection, sqlite3.Connection] + + def backup(self): ... + def restore(self): ... + def disconnect(self): ... + def create_table(self, table_name, fields): ... + def drop_table(self, table_name): ... + def insert(self, table_name, columns, values): ... + def insertMany(self, table_name, data): ... + def select_first(self, table_name): ... + def select_first_where(self, table_name, where): ... + def select_all(self, table_name): ... + def select_all_where(self, table_name, where): ... + def count(self, table_name): ... + def drop(self): ... + + +class SqlAlchemyDbManager: + """A database manager using SQLAlchemy. + + It contains the same public functions as ``DbManager`` + (SQLite only), but uses SQLAlchmey instead of calling database + driver functions directly. + + It can be used with any engine supported by SQLAlchemy, such + as Postgres, Mongo, MySQL, etc. + + Static functions from ``DbManager`` were skipped, as those are + similar to what SQLAlchemy already provides for creating and + queries. + + Some operations here may raise ``NotImplemented``, as they existed in + the ``DbManager`` but were never used in Autosubmit (i.e. we can + delete that code -- for later). + + TODO: At some point in the future, we can drop ``DbManager`` and + replace it with this one (or rename this to DbManager), as + SQLAlchemy should be able to handle SQLite too. + """ + + def __init__(self, schema: Optional[str] = None) -> None: + self.engine: Engine = session.create_engine() + self.schema = schema + # TODO: if we cannot do that (too long-lived conns?), then we can open for each operation... + self.connection: Connection = self.engine.connect() + # TODO: in SQLite-land, it creates the tables if they do not exist... what about here? + + def backup(self): + pass + + def restore(self): + pass + + def disconnect(self): + self.connection.close() + + def create_table(self, table_name, fields): + # NOTE: ``fields`` is ignored as they are defined in the SQLAlchemy + # tables definitions (see ``.database.tables``). Kept for + # backward compatibility with the old API for SQLite. + table = get_table_from_name(schema=self.schema, table_name=table_name) + self.connection.execute(CreateSchema(self.schema, if_not_exists=True)) + self.connection.execute(CreateTable(table, if_not_exists=True)) + self.connection.commit() + + def drop_table(self, table_name): + table = get_table_from_name(schema=self.schema, table_name=table_name) + self.connection.execute(DropTable(table, if_exists=True)) + self.connection.commit() + + def insert(self, table_name, columns, values): + """Not implemented. + + In the original ``DbManager`` (SQLite), this function is used + only to initialize the SQLite database (i.e. no external users). + """ + raise NotImplementedError() + + def insertMany(self, table_name, data): + """ + N.B.: One difference between SQLite and SQLAlchemy here; + whereas with SQLite you insert a list of values + matching the columns, for SQLAlchemy you provide + a dictionary. For backward-compatibility, we risk + accepting a list or dictionary here. If a list + is provided, then we will use the Table metadata + to fetch the columns. Not the safest approach, so + prefer to send a dictionary! + + :type table_name: str + :type data: Union[List, Dict] + """ + if len(data) == 0: # type: ignore + return 0 + table = get_table_from_name(schema=self.schema, table_name=table_name) + if type(data[0]) == list: + # Convert into a dictionary; we know the keys from the + # table metadata columns. + columns = [column.name for column in cast(list, table.columns)] + data = map(lambda values: { + key: value + for key, value in + zip(columns, values) + }, cast(list, data)) + data = list(data) + result = self.connection.execute(insert(table), data) + self.connection.commit() + return cast(int, result.rowcount) + + def select_first(self, table_name): + """Not used in the original ``DbManager``!""" + raise NotImplementedError() + + def select_first_where(self, table_name, where): + """Not used in the original ``DbManager``!""" + raise NotImplementedError() + + def select_all(self, table_name): + table = get_table_from_name(schema=self.schema, table_name=table_name) + rows = self.connection.execute(select(table)).all() + return rows + + def select_all_where(self, table_name, where): + """Not used in the original ``DbManager``!""" + raise NotImplementedError() + + def count(self, table_name): + """Not used in the original ``DbManager``!""" + raise NotImplementedError() + + def drop(self): + """Not used in the original ``DbManager``!""" + raise NotImplementedError() + + # New! Used for testing... + + def delete_all(self, table_name: str) -> int: + """Deletes all the rows of the table. + + :params table_name: The name of the table. + :return: number of tables modified. + """ + table = get_table_from_name(schema=self.schema, table_name=table_name) + result = self.connection.execute(delete(table)) + self.connection.commit() + return cast(int, result.rowcount) + + +def create_db_manager(db_engine: str, **options) -> DatabaseManager: + """ + Creates a Postgres or SQLite database manager based on the Autosubmit configuration. + + Note that you must provide the options even if they are optional, in which case + you must provide ``options=None``, or you will get a ``KeyError``. + + Later we might be able to drop the SQLite database manager. So, for the moment, + please call the function providing the database engine type, and the arguments + for both SQLite and for SQLAlchemy. + + This means you do not have to do an ``if/else`` in your code, just give + this function the engine type, and all the valid options, and it should + handle choosing and building the database manager for you. e.g. + + >>> from autosubmit.database.db_manager import create_db_manager + >>> options = { + >>> # these are for sqlite + >>> 'root_path': '/tmp/', + >>> 'db_name': 'name.db', + >>> 'db_version': 1, + >>> # and these for sqlalchemy -- not very elegant, but this is + >>> # to work-effectively-with-legacy-code (as in that famous book). + >>> 'schema': 'a001' + >>> } + >>> db_manager = create_db_manager(db_engine='postgres', **options) + + :param db_engine: The database engine type. + :return: A ``DatabaseManager``. + :raises ValueError: If the database engine type is not valid. + :raises KeyError: If the ``options`` dictionary is missing a required parameter for an engine. + """ + if db_engine == "postgres": + return cast(DatabaseManager, SqlAlchemyDbManager(options['schema'])) + elif db_engine == "sqlite": + return cast(DatabaseManager, DbManager(options['root_path'], options['db_name'], options['db_version'])) + else: + raise ValueError(f"Invalid database engine: {db_engine}") diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index 31dc42740a56c6536355b06ce6dfaba255d4a77c..4da0149e6b003c9bf83efdfaeaad62bb5396534d 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -23,9 +23,12 @@ import textwrap import traceback import sqlite3 -from typing import Dict, List +from typing import Dict, List, Optional from log.log import Log +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmit.database.db_manager import DatabaseManager, create_db_manager + def get_structure(exp_id, structures_path): # type: (str, str) -> Dict[str, List[str]] @@ -35,7 +38,9 @@ def get_structure(exp_id, structures_path): :return: Map from experiment name source to name destination :rtype: Dictionary Key: String, Value: List(of String) """ - try: + if BasicConfig.DATABASE_BACKEND == 'postgres': + return get_structure_sqlalchemy(exp_id, structures_path) + try: if os.path.exists(structures_path): db_structure_path = os.path.join( structures_path, "structure_" + exp_id + ".db") @@ -118,6 +123,8 @@ def save_structure(graph, exp_id, structures_path): """ Saves structure if path is valid """ + if BasicConfig.DATABASE_BACKEND == 'postgres': + return save_structure_sqlalchemy(graph, exp_id, str) conn = None # Path to structures folder exists if os.path.exists(structures_path): @@ -175,3 +182,70 @@ def _delete_table_content(conn): except sqlite3.Error as e: Log.debug(traceback.format_exc()) Log.warning("Error on Delete : {0}".format(str(type(e).__name__))) + + +# Code added for SQLAlchemy support + +def get_structure_sqlalchemy(exp_id: str, structures_path: str) -> Dict[str, List[str]]: + """ + Creates file of database and table of experiment structure if it does not exist. Returns current structure. + + :return: Map from experiment name source to name destination + """ + db_manager: Optional[DatabaseManager] = None + options = {'schema': exp_id} + try: + db_manager = create_db_manager('postgres', **options) + + # FIXME: to verify, the old code did what's shown below; + # does that also apply to the new code? + # if not os.path.exists(db_structure_path): + # open(db_structure_path, "w") + # print(db_structure_path) + # There used to be a function ``create_table`` here, but its feature/behaviour + # is already implemented in the ``DbManager`` class (which handles DDL), so that + # can be deleted as ``DbManager`` creates SQLite and SQLAlchemy tables. + db_manager.create_table( + table_name='experiment_structure', + fields=['e_from text NOT NULL', + 'e_to text NOT NULL', + 'UNIQUE(e_from,e_to)'] + ) + # The call below is equivalent of ``_get_exp_structure`` in the old code. + current_table = db_manager.select_all('experiment_structure') + current_table_structure = dict() + for item in current_table: + _from, _to = item + current_table_structure.setdefault(_from, []).append(_to) + current_table_structure.setdefault(_to, []) + return current_table_structure + except Exception as exp: + Log.printlog("Get structure error: {0}".format(str(exp)), 6014) + Log.debug(traceback.format_exc()) + finally: + if db_manager: + db_manager.disconnect() + + +def save_structure_sqlalchemy(graph, exp_id, structures_path): + """ + Saves structure if path is valid + """ + db_manager: Optional[DatabaseManager] = None + options = {'schema': exp_id} + try: + db_manager = create_db_manager('postgres', **options) + + # Save structure + nodes_edges = {u for u, v in graph.edges()} + nodes_edges.update({v for u, v in graph.edges()}) + independent_nodes = { + u for u in graph.nodes() if u not in nodes_edges} + data = {(u, v) for u, v in graph.edges()} + data.update({(u, u) for u in independent_nodes}) + # save + edges = [{"e_from": e[0], "e_to": e[1]} for e in data] + db_manager.insertMany('experiment_structure', edges) + finally: + if db_manager: + db_manager.disconnect() diff --git a/autosubmit/database/session.py b/autosubmit/database/session.py new file mode 100644 index 0000000000000000000000000000000000000000..03bff2ecd96ef1eb070aec1e4e2a7df2241617af --- /dev/null +++ b/autosubmit/database/session.py @@ -0,0 +1,24 @@ +from pathlib import Path +from sqlalchemy import Engine, NullPool, create_engine as sqlalchemy_create_engine +from typing import Optional + +from autosubmitconfigparser.config.basicconfig import BasicConfig + +_SQLITE_IN_MEMORY_URL = "sqlite://" + + +def _create_sqlite_engine(path: Optional[str] = None) -> Engine: + # file-based, or in-memory database? + sqlite_url = f"sqlite:///{Path(path).resolve()}" if path else _SQLITE_IN_MEMORY_URL + return sqlalchemy_create_engine(sqlite_url, poolclass=NullPool) + + +def create_engine() -> Engine: + """Create SQLAlchemy Core engine.""" + if BasicConfig.DATABASE_BACKEND == "postgres": + return sqlalchemy_create_engine(BasicConfig.DATABASE_CONN_URL) + else: + return _create_sqlite_engine() + + +__all__ = ["create_engine"] diff --git a/autosubmit/database/tables.py b/autosubmit/database/tables.py new file mode 100644 index 0000000000000000000000000000000000000000..e21891cf1a7fd88022a2ae020e4405696c5c7eca --- /dev/null +++ b/autosubmit/database/tables.py @@ -0,0 +1,207 @@ +from sqlalchemy import ( + MetaData, + Integer, + String, + Table, + Text, + Float, + UniqueConstraint, + Column, +) +from typing import List, Optional, cast + +metadata_obj = MetaData() + + +ExperimentTable = Table( + 'experiment', + metadata_obj, + Column('id', Integer, nullable=False, primary_key=True), + Column('name', String, nullable=False), + Column('description', String, nullable=False), + Column('autosubmit_version', String) +) +"""The main table, populated by Autosubmit. Should be read-only by the API.""" + +# NOTE: In the original SQLite DB, db_version.version was the only field, +# and not a PK. +DBVersionTable = Table( + 'db_version', + metadata_obj, + Column('version', Integer, nullable=False, primary_key=True) +) + +ExperimentStructureTable = Table( + 'experiment_structure', + metadata_obj, + Column('e_from', Text, nullable=False, primary_key=True), + Column('e_to', Text, nullable=False, primary_key=True), +) +"""Table that holds the structure of the experiment jobs.""" + +ExperimentStatusTable = Table( + 'experiment_status', + metadata_obj, + Column('exp_id', Integer, primary_key=True), + Column('name', Text, nullable=False), + Column('status', Text, nullable=False), + Column('seconds_diff', Integer, nullable=False), + Column('modified', Text, nullable=False) +) +"""Stores the status of the experiments.""" + +JobPackageTable = Table( + 'job_package', + metadata_obj, + Column('package_name', Text, primary_key=True), + Column('job_name', Text, primary_key=True), + Column('exp_id', Text) +) +"""Stores a mapping between the wrapper name and the actual job in SLURM.""" + +WrapperJobPackageTable = Table( + 'wrapper_job_package', + metadata_obj, + Column('package_name', Text, primary_key=True), + Column('job_name', Text, primary_key=True), + Column('exp_id', Text) +) +"""It is a replication. + +It is only created/used when using inspect and create or monitor +with flag -cw in Autosubmit. + +This replication is used to not interfere with the current +autosubmit run of that experiment since wrapper_job_package +will contain a preview, not the real wrapper packages.""" + +# NOTE: The column ``metadata`` has a name that is reserved in +# SQLAlchemy ORM. It works for SQLAlchemy Core, here, but +# if you plan to use ORM, be warned that you will have to +# search how to workaround it (or will probably have to +# use SQLAlchemy core here). +ExperimentRunTable = Table( + "experiment_run", + metadata_obj, + Column("run_id", Integer, primary_key=True), + Column("created", Text, nullable=False), + Column("modified", Text, nullable=True), + Column("start", Integer, nullable=False), + Column("finish", Integer), + Column("chunk_unit", Text, nullable=False), + Column("chunk_size", Integer, nullable=False), + Column("completed", Integer, nullable=False), + Column("total", Integer, nullable=False), + Column("failed", Integer, nullable=False), + Column("queuing", Integer, nullable=False), + Column("running", Integer, nullable=False), + Column("submitted", Integer, nullable=False), + Column("suspended", Integer, nullable=False, default=0), + Column("metadata", Text), +) + +JobDataTable = Table( + 'job_data', + metadata_obj, + Column('id', Integer, nullable=False, primary_key=True), + Column('counter', Integer, nullable=False), + Column('job_name', Text, nullable=False, index=True), + Column('created', Text, nullable=False), + Column('modified', Text, nullable=False), + Column('submit', Integer, nullable=False), + Column('start', Integer, nullable=False), + Column('finish', Integer, nullable=False), + Column('status', Text, nullable=False), + Column('rowtype', Integer, nullable=False), + Column('ncpus', Integer, nullable=False), + Column('wallclock', Text, nullable=False), + Column('qos', Text, nullable=False), + Column('energy', Integer, nullable=False), + Column('date', Text, nullable=False), + Column('section', Text, nullable=False), + Column('member', Text, nullable=False), + Column('chunk', Integer, nullable=False), + Column('last', Integer, nullable=False), + Column('platform', Text, nullable=False), + Column('job_id', Integer, nullable=False), + Column('extra_data', Text, nullable=False), + Column('nnodes', Integer, nullable=False, default=0), + Column('run_id', Integer), + Column('MaxRSS', Float, nullable=False, default=0.0), + Column('AveRSS', Float, nullable=False, default=0.0), + Column('out', Text, nullable=False), + Column('err', Text, nullable=False), + Column('rowstatus', Integer, nullable=False, default=0), + Column('children', Text, nullable=True), + Column('platform_output', Text, nullable=True), + UniqueConstraint('counter', 'job_name', name='unique_counter_and_job_name') +) + +JobListTable = Table( + 'job_list', + metadata_obj, + Column('name', String, primary_key=True), + Column('id', Integer), + Column('status', Integer), + Column('priority', Integer), + Column('section', String), + Column('date', String), + Column('member', String), + Column('chunk', Integer), + Column('split', Integer), + Column('local_out', String), + Column('local_err', String), + Column('remote_out', String), + Column('remote_err', String) +) + +TABLES = (ExperimentTable, + ExperimentStatusTable, + ExperimentStructureTable, + ExperimentRunTable, + DBVersionTable, + JobPackageTable, + JobDataTable, + JobListTable, + WrapperJobPackageTable) +"""The tables available in the Autosubmit databases.""" + + +def get_table_with_schema(schema: str, table: Table) -> Table: + """Get the ``Table`` instance with the metadata modified. + + The metadata will use the given container. This means you can + have table ``A`` with no schema, then call this function with + ``schema=a000``, and then a new table ``A`` with ``schema=a000`` + will be returned. + + :param schema: The target schema for the table metadata. + :param table: The SQLAlchemy Table. + :return: The same table, but with the given schema set as metadata. + """ + if not isinstance(table, Table): + raise RuntimeError("Invalid source type on table schema change") + + metadata = MetaData(schema=schema) + dest_table = Table(table.name, metadata) + + for col in cast(List, table.columns): + dest_table.append_column(col.copy()) + + return dest_table + + +def get_table_from_name(*, schema: str, table_name: str) -> Optional[Table]: + """Get the table from a given table name. + + :param schema: The schema name. + :param table_name: The table name. + :return: The table if found, ``None`` otherwise. + :raises ValueError: If the table name is not provided. + """ + if not table_name: + raise ValueError(f'Missing table name: {table_name}') + + predicate = lambda table: table.name.lower() == table_name.lower() + table = next(filter(predicate, TABLES), None) + return get_table_with_schema(schema, table) diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 8df415c94682435e781588f939a850301bf784f3..4acafd4b06d8e41162afe4cedf31aec872284a94 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -23,6 +23,12 @@ from autosubmit.history.data_classes.job_data import JobData from autosubmit.history.data_classes.experiment_run import ExperimentRun from .database_manager import DatabaseManager, DEFAULT_JOBDATA_DIR +from typing import Any, Optional, Protocol, cast +from sqlalchemy import Engine, and_, inspect, desc, insert, select, update +from sqlalchemy.schema import CreateTable, CreateSchema +from autosubmit.database import session +from autosubmit.database.tables import ExperimentRunTable, JobDataTable, get_table_with_schema + CURRENT_DB_VERSION = 18 DB_EXPERIMENT_HEADER_SCHEMA_CHANGES = 14 DB_VERSION_SCHEMA_CHANGES = 12 @@ -387,3 +393,367 @@ class ExperimentHistoryDbManager(DatabaseManager): if len(pragma_result) <= 0: raise Exception("Error while getting the pragma version. This might be a signal of a deeper problem. Review previous errors.") return Models.PragmaVersion(*pragma_result[0]).version + + +class ExperimentHistoryDatabaseManager(Protocol): + def initialize(self):... + def my_database_exists(self):... + def is_header_ready_db_version(self):... + def is_current_version(self):... + def create_historical_database(self):... + def update_historical_database(self):... + def get_experiment_run_dc_with_max_id(self):... + def register_experiment_run_dc(self, experiment_run_dc):... + def update_experiment_run_dc_by_id(self, experiment_run_dc):... + def is_there_a_last_experiment_run(self):... + def get_job_data_all(self):... + def register_submitted_job_data_dc(self, job_data_dc):... + def update_job_data_dc_by_id(self, job_data_dc):... + def update_list_job_data_dc_by_each_id(self, job_data_dcs):... + def get_job_data_dc_unique_latest_by_job_name(self, job_name):... + def get_job_data_dcs_last_by_run_id(self, run_id):... + def get_job_data_dcs_last_by_wrapper_code(self, wrapper_code):... + def get_all_last_job_data_dcs(self):... + def update_many_job_data_change_status(self, changes):... + def _update_job_data_by_id(self, job_data_dc):... + def get_job_data_by_name(self, job_name):... + def get_job_data_max_counter(self):... + def delete_job_data(self, id_):... + def delete_experiment_run(self, run_id):... + + +class SqlAlchemyExperimentHistoryDbManager: + """A SQLAlchemy experiment history database manager. + + Its interface was designed based on the SQLite database manager, + with the following differences: + + - We do not have the DB migration system that they used, as that + used SQLite pragmas, which are not portable across DB engines + (i.e. no ``_set_schema_changes()`` nor ``_set_table_queries()``. + """ + + def __init__(self, schema: Optional[str]): + self.engine: Engine = session.create_engine() + self.schema = schema + + def initialize(self): + if self.my_database_exists(): + if not self.is_current_version(): + self.update_historical_database() + else: + self.create_historical_database() + + def my_database_exists(self): + """Return ``True`` if the schema exists in the database. ``False`` otherwise.""" + engine = session.create_engine() + inspector = inspect(engine) + return self.schema in inspector.get_schema_names() + + def is_header_ready_db_version(self): + if self.my_database_exists(): + return self._get_pragma_version() >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES + return False + + def is_current_version(self): + if self.my_database_exists(): + return self._get_pragma_version() == CURRENT_DB_VERSION + return False + + def create_historical_database(self): + with self.engine.connect() as conn: + conn.execute(CreateSchema(self.schema, if_not_exists=True)) + conn.execute(CreateTable(get_table_with_schema(self.schema, ExperimentRunTable), if_not_exists=True)) + conn.execute(CreateTable(get_table_with_schema(self.schema, JobDataTable), if_not_exists=True)) + # TODO: implement db migrations? + # self._set_historical_pragma_version(CURRENT_DB_VERSION) + + def update_historical_database(self): + raise NotImplementedError("This feature has not been implemented yet with SQLAlchemy / Alembic.") + + def get_experiment_run_dc_with_max_id(self): + return ExperimentRun.from_model(self._get_experiment_run_with_max_id()) + + def register_experiment_run_dc(self, experiment_run_dc): + query = ( + insert(get_table_with_schema(self.schema, ExperimentRunTable)). + values( + created=HUtils.get_current_datetime(), + modified=HUtils.get_current_datetime(), + start=experiment_run_dc.start, + finish=experiment_run_dc.finish, + chunk_unit=experiment_run_dc.chunk_unit, + chunk_size=experiment_run_dc.chunk_size, + completed=experiment_run_dc.completed, + total=experiment_run_dc.total, + failed=experiment_run_dc.failed, + queing=experiment_run_dc.queuing, + running=experiment_run_dc.running, + submitted=experiment_run_dc.submitted, + suspended=experiment_run_dc.suspended, + metadata=experiment_run_dc.metadata + ) + ) + with self.engine.connect() as conn: + conn.execute(query) + return ExperimentRun.from_model(self._get_experiment_run_with_max_id()) + + def update_experiment_run_dc_by_id(self, experiment_run_dc): + experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable) + query = ( + update(experiment_run_table). + where(experiment_run_table.c.run_id == experiment_run_dc.run_id). # type: ignore + values( + finish=experiment_run_dc.finish, + chunk_unit=experiment_run_dc.chunk_unit, + chunk_size=experiment_run_dc.chunk_size, + completed=experiment_run_dc.completed, + total=experiment_run_dc.total, + failed=experiment_run_dc.failed, + queuing=experiment_run_dc.queuing, + running=experiment_run_dc.running, + submitted=experiment_run_dc.submitted, + suspended=experiment_run_dc.suspended, + modified=HUtils.get_current_datetime() + ) + ) + with self.engine.connect() as conn: + conn.execute(query) + return ExperimentRun.from_model(self._get_experiment_run_with_max_id()) + + def _get_experiment_run_with_max_id(self): + experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable) + query = ( + select(experiment_run_table). + where(experiment_run_table.c.run_id > 0). + order_by(desc(experiment_run_table.c.run_id)) + ) + with self.engine.connect() as conn: + row = conn.execute(query).one_or_none() + if not row: + raise Exception("No Experiment Runs registered.") + return Models.ExperimentRunRow(row.__dict__) + + def is_there_a_last_experiment_run(self): + experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable) + query = ( + select(experiment_run_table). + where(experiment_run_table.c.run_id > 0). + order_by(desc(experiment_run_table.c.run_id)) + ) + with self.engine.connect() as conn: + result = conn.execute(query).one_or_none() + return result is not None + + def get_job_data_all(self): + with self.engine.connect() as conn: + job_data_rows = conn.execute(select(get_table_with_schema(self.schema, JobDataTable))).all() + return [Models.JobDataRow(row.__dict__) for row in job_data_rows] + + def register_submitted_job_data_dc(self, job_data_dc): + self._set_current_job_data_rows_last_to_zero_by_job_name(job_data_dc.job_name) + self._insert_job_data(job_data_dc) + return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) + + def _set_current_job_data_rows_last_to_zero_by_job_name(self, job_name): + """ Sets the column last = 0 for all job_rows by job_name and last = 1. """ + job_data_row_last = self._get_job_data_last_by_name(job_name) + job_data_dc_list = [JobData.from_model(row) for row in job_data_row_last] + for job_data_dc in job_data_dc_list: + job_data_dc.last = 0 + self._update_job_data_by_id(job_data_dc) + + def update_job_data_dc_by_id(self, job_data_dc): + """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """ + self._update_job_data_by_id(job_data_dc) + return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name) + + def update_list_job_data_dc_by_each_id(self, job_data_dcs): + """ Return length of updated list. """ + for job_data_dc in job_data_dcs: + self._update_job_data_by_id(job_data_dc) + return len(job_data_dcs) + + def get_job_data_dc_unique_latest_by_job_name(self, job_name): + job_data_row_last = self._get_job_data_last_by_name(job_name) + if len(job_data_row_last) > 0: + return JobData.from_model(job_data_row_last[0]) + return None + + def _get_job_data_last_by_name(self, job_name): + job_data_table = get_table_with_schema(self.schema, JobDataTable) + query = ( + select(job_data_table). + where( + and_(job_data_table.c.last == 1, job_data_table.c.job_name == job_name) + ). + order_by(desc(job_data_table.c.counter)) + ) + with self.engine.connect() as conn: + job_data_rows_last = conn.execute(query).all() + # if previous job didn't finished but a new create has been made + if not job_data_rows_last: + new_query = ( + select(job_data_table). + where( + and_(job_data_table.c.last == 0, job_data_table.c.job_name == job_name) + ). + order_by(desc(job_data_table.c.counter)) + ) + with self.engine.connect() as conn: + job_data_rows_last = conn.execute(new_query).all() + return [Models.JobDataRow(row.__dict__) for row in job_data_rows_last] + + def get_job_data_dcs_last_by_run_id(self, run_id): + job_data_rows = self._get_job_data_last_by_run_id(run_id) + return [JobData.from_model(row) for row in job_data_rows] + + def _get_job_data_last_by_run_id(self, run_id): + """ Get List of Models.JobDataRow for last=1 and run_id """ + statement = self.get_built_select_statement("job_data", "run_id=? and last=1 and rowtype >= 2 ORDER BY id") + arguments = (run_id,) + job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) + return [Models.JobDataRow(*row) for row in job_data_rows] + + def get_job_data_dcs_last_by_wrapper_code(self, wrapper_code): + if wrapper_code and wrapper_code > 2: + return [JobData.from_model(row) for row in self._get_job_data_last_by_wrapper_code(wrapper_code)] + else: + return [] + + def _get_job_data_last_by_wrapper_code(self, wrapper_code): + """ Get List of Models.JobDataRow for last=1 and rowtype=wrapper_code """ + statement = self.get_built_select_statement("job_data", "rowtype = ? and last=1 ORDER BY id") + arguments = (wrapper_code,) + job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) + return [Models.JobDataRow(*row) for row in job_data_rows] + + def get_all_last_job_data_dcs(self): + """ Gets JobData data classes in job_data for last=1. """ + job_data_rows = self._get_all_last_job_data_rows() + return [JobData.from_model(row) for row in job_data_rows] + + def _get_all_last_job_data_rows(self): + """ Get List of Models.JobDataRow for last=1. """ + statement = self.get_built_select_statement("job_data", "last=1") + job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement) + return [Models.JobDataRow(*row) for row in job_data_rows] + + def _insert_job_data(self, job_data): + job_data_table = get_table_with_schema(self.schema, JobDataTable) + insert_query = ( + insert(job_data_table). + values( + counter=job_data.counter, + job_name=job_data.job_name, + created=HUtils.get_current_datetime(), + modified=HUtils.get_current_datetime(), + submit=job_data.submit, + start=job_data.start, + finish=job_data.finish, + status=job_data.status, + rowtype=job_data.rowtype, + ncpus=job_data.ncpus, + wallclock=job_data.wallclock, + qos=job_data.qos, + energy=job_data.energy, + date=job_data.date, + section=job_data.section, + member=job_data.member, + chunk=job_data.chunk, + last=job_data.last, + platform=job_data.platform, + job_id=job_data.job_id, + extra_data=job_data.extra_data, + nnodes=job_data.nnodes, + run_id=job_data.run_id, + MaxRSS=job_data.MaxRSS, + AveRSS=job_data.AveRSS, + out=job_data.out, + err=job_data.err, + rowstatus=job_data.rowstatus, + children=job_data.children, + platform_output=job_data.platform_output + ) + ) + with self.engine.connect() as conn: + result = conn.execute(insert_query) + conn.commit() + return result.lastrowid + + def update_many_job_data_change_status(self, changes): + # type : (List[Tuple]) -> None + """ + Update many job_data rows in bulk. Requires a changes list of argument tuples. + Only updates finish, modified, status, and rowstatus by id. + """ + statement = ''' UPDATE job_data SET modified=?, status=?, rowstatus=? WHERE id=? ''' + self.execute_many_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, changes) + + def _update_job_data_by_id(self, job_data_dc): + job_data_table = get_table_with_schema(self.schema, JobDataTable) + query = ( + update(job_data_table). + where(job_data_table.c.id == job_data_dc._id). # type: ignore + values( + last=job_data_dc.last, + submit=job_data_dc.submit, + start=job_data_dc.start, + finish=job_data_dc.finish, + modified=HUtils.get_current_datetime(), + job_id=job_data_dc.job_id, + status=job_data_dc.status, + energy=job_data_dc.energy, + extra_data=job_data_dc.extra_data, + nnodes=job_data_dc.nnodes, + ncpus=job_data_dc.ncpus, + rowstatus=job_data_dc.rowstatus, + out=job_data_dc.out, + err=job_data_dc.err, + children=job_data_dc.children, + platform_output=job_data_dc.platform_output, + ) + ) + with self.engine.connect() as conn: + conn.execute(query) + + def get_job_data_by_name(self, job_name): + """ Get List of Models.JobDataRow for job_name """ + statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC") + arguments = (job_name,) + job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) + return [Models.JobDataRow(*row) for row in job_data_rows] + + def get_job_data_max_counter(self): + """ The max counter is the maximum count value for the count column in job_data. """ + statement = "SELECT MAX(counter) as maxcounter FROM job_data" + counter_result = self.get_from_statement(self.historicaldb_file_path, statement) + if len(counter_result) <= 0: + return DEFAULT_MAX_COUNTER + else: + max_counter = Models.MaxCounterRow(*counter_result[0]).maxcounter + return max_counter if max_counter else DEFAULT_MAX_COUNTER + + def delete_job_data(self, id_): + """ Deletes row from job_data by id. Useful for testing. """ + statement = ''' DELETE FROM job_data WHERE id=? ''' + arguments = (id_,) + self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) + + def delete_experiment_run(self, run_id): + """ Deletes row in experiment_run by run_id. Useful for testing. """ + statement = ''' DELETE FROM experiment_run where run_id=? ''' + arguments = (run_id,) + self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments) + + +def create_experiment_history_db_manager(db_engine: str, **options: Any) -> ExperimentHistoryDatabaseManager: + if db_engine == 'postgres': + return cast(ExperimentHistoryDatabaseManager, SqlAlchemyExperimentHistoryDbManager(options['schema'])) + elif db_engine == 'sqlite': + return cast(ExperimentHistoryDatabaseManager, ExperimentHistoryDbManager( + options['schema'], + options.get('jobdata_dir_path', DEFAULT_JOBDATA_DIR) + )) + else: + raise ValueError(f"Invalid database engine: {db_engine}") diff --git a/autosubmit/history/database_managers/experiment_status_db_manager.py b/autosubmit/history/database_managers/experiment_status_db_manager.py index ed2f9f7b8b1c7def2fac6be5aa0cb60d81752090..cda19e6b951c1d25af3290115f80b593996ad97f 100644 --- a/autosubmit/history/database_managers/experiment_status_db_manager.py +++ b/autosubmit/history/database_managers/experiment_status_db_manager.py @@ -25,6 +25,14 @@ from autosubmitconfigparser.config.basicconfig import BasicConfig import autosubmit.history.utils as HUtils from . import database_models as Models +from typing import Optional, Protocol, cast +from sqlalchemy import Engine, delete, insert, select, update +from sqlalchemy.schema import CreateTable, CreateSchema +from autosubmit.database import session +from autosubmit.database.tables import ExperimentRunTable, ExperimentStatusTable, ExperimentTable, get_table_with_schema +# FIXME: Why re-load the configuration globally here? Callers of this +# module are probably responsible for doing that; i.e. not this +# module's (or its classes') responsibility. BasicConfig.read() class ExperimentStatusDbManager(DatabaseManager): @@ -143,4 +151,147 @@ class ExperimentStatusDbManager(DatabaseManager): """ Deletes experiment_status row by expid. Useful for testing purposes. """ statement = ''' DELETE FROM experiment_status where name = ? ''' arguments = (expid,) - self.execute_statement_with_arguments_on_dbfile(self._as_times_file_path, statement, arguments) \ No newline at end of file + self.execute_statement_with_arguments_on_dbfile(self._as_times_file_path, statement, arguments) + + +class ExperimentStatusDatabaseManager(Protocol): + def print_current_table(self): ... + def is_running(self, time_condition=600): ... + def set_existing_experiment_status_as_running(self, expid): ... + def create_experiment_status_as_running(self, experiment): ... + def get_experiment_status_row_by_expid(self, expid): ... + def get_experiment_row_by_expid(self, expid): ... + def get_experiment_status_row_by_exp_id(self, exp_id): ... + def create_exp_status(self, exp_id, expid, status): ... + def update_exp_status(self, expid, status="RUNNING"): ... + def delete_exp_status(self, expid): ... + + +class SqlAlchemyExperimentStatusDbManager: + """An experiment status database manager using SQLAlchemy. + + It contains the same public functions as ``ExperimentStatusDbManager`` + (SQLite only), but uses SQLAlchmey instead of calling database + driver functions directly. + + It can be used with any engine supported by SQLAlchemy, such + as Postgres, Mongo, MySQL, etc. + + Some operations here may raise ``NotImplemented``, as they existed in + the ``ExperimentStatusDbManager`` but were never used in Autosubmit (i.e. we can + delete that code -- for later). + """ + + def __init__(self, schema: Optional[str] = None) -> None: + self.engine: Engine = session.create_engine() + self.schema = schema + with self.engine.connect() as conn: + conn.execute(CreateSchema(self.schema, if_not_exists=True)) + conn.execute(CreateTable(get_table_with_schema(self.schema, ExperimentRunTable), if_not_exists=True)) + + def print_current_table(self): + """Not used!""" + raise NotImplementedError() + + def is_running(self, time_condition=600): + """Not used!""" + raise NotImplementedError() + + def set_existing_experiment_status_as_running(self, expid): + self.update_exp_status(expid, Models.RunningStatus.RUNNING) + + def create_experiment_status_as_running(self, experiment): + self.create_exp_status(experiment.id, experiment.name, Models.RunningStatus.RUNNING) + + def get_experiment_status_row_by_expid(self, expid: str) -> Optional[Models.ExperimentRow]: + experiment_row = self.get_experiment_row_by_expid(expid) + return self.get_experiment_status_row_by_exp_id(experiment_row.id) + + def get_experiment_row_by_expid(self, expid: str) -> Models.ExperimentRow: + query = ( + select(ExperimentTable). + where(ExperimentTable.c.name == expid) + ) + with session.create_engine() as conn: + current_rows = conn.execute(query).all() + if len(current_rows) <= 0: + raise ValueError("Experiment {0} not found in Postgres {1}".format(expid, expid)) + return Models.ExperimentRow(*current_rows[0]) + + def get_experiment_status_row_by_exp_id(self, exp_id: int) -> Optional[Models.ExperimentStatusRow]: + query = ( + select(ExperimentStatusTable). + where(ExperimentStatusTable.c.exp_id == exp_id) + ) + with session.create_engine() as conn: + current_rows = conn.execute(query).all() + if len(current_rows) <= 0: + return None + return Models.ExperimentStatusRow(*current_rows[0]) + + def create_exp_status(self, exp_id: int, expid: str, status: str) -> int: + query = ( + insert(ExperimentStatusTable). + values( + exp_id=exp_id, + name=expid, + status=status, + seconds_diff=0, + modified=HUtils.get_current_datetime() + ) + ) + with session.create_engine() as conn: + result = conn.execute(query) + lastrow_id = result.lastrowid + conn.commit() + return lastrow_id + + def update_exp_status(self, expid: str, status="RUNNING") -> None: + query = ( + update(ExperimentStatusTable). + where(ExperimentStatusTable.c.name == expid). + values( + status=status, + seconds_diff=0, + modified=HUtils.get_current_datetime() + ) + ) + with session.create_engine() as conn: + conn.execute(query) + conn.commit() + + def delete_exp_status(self, expid: str): + query = ( + delete(ExperimentStatusTable). + where(ExperimentStatusTable.c.name == expid) + ) + with session.create_engine() as conn: + conn.execute(query) + conn.commit() + + +def create_experiment_status_db_manager(db_engine: str, **options) -> ExperimentStatusDatabaseManager: + """ + Creates a Postgres or SQLite database manager based on the Autosubmit configuration. + + Note that you must provide the options even if they are optional, in which case + you must provide ``options=None``, or you will get a ``KeyError``. + + TODO: better example and/or link to DbManager. + + :param db_engine: The database engine type. + :return: An ``ExperimentStatusDatabaseManager``. + :raises ValueError: If the database engine type is not valid. + :raises KeyError: If the ``options`` dictionary is missing a required parameter for an engine. + """ + if db_engine == "postgres": + return cast(ExperimentStatusDatabaseManager, SqlAlchemyExperimentStatusDbManager(options['schema'])) + elif db_engine == "sqlite": + return cast(ExperimentStatusDatabaseManager, + ExperimentStatusDbManager( + expid=options['schema'], + db_dir_path=options['root_path'], + main_db_name=options['db_name'], + local_root_dir_path=options['local_root_dir_path'])) + else: + raise ValueError(f"Invalid database engine: {db_engine}") diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 5fd081600ba82f151b9c7e5d42a0b711ce26cf82..96ce1718b9332c48d06b8a9cc269ce3cda8c8095 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -25,7 +25,7 @@ from log.log import Log from .data_classes.experiment_run import ExperimentRun from .data_classes.job_data import JobData from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR -from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager +from .database_managers.experiment_history_db_manager import create_experiment_history_db_manager from .internal_logging import Logging from .platform_monitor.slurm_monitor import SlurmMonitor from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \ @@ -42,7 +42,11 @@ class ExperimentHistory: self._job_data_dir_path = BasicConfig.JOBDATA_DIR self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR try: - self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR) + options = { + 'jobdata_dir_path': BasicConfig.JOBDATA_DIR, + 'schema': self.expid + } + self.manager = create_experiment_history_db_manager(BasicConfig.DATABASE_BACKEND, options) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') diff --git a/autosubmit/history/experiment_status.py b/autosubmit/history/experiment_status.py index 9d4c7deb056c88931e32fc6da469c0cf20369e96..47c709b7f44530de1d4a4c32ada93757279c77ae 100644 --- a/autosubmit/history/experiment_status.py +++ b/autosubmit/history/experiment_status.py @@ -17,7 +17,7 @@ # along with Autosubmit. If not, see . import traceback -from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager +from .database_managers.experiment_status_db_manager import create_experiment_status_db_manager from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR from .internal_logging import Logging from autosubmitconfigparser.config.basicconfig import BasicConfig @@ -29,7 +29,13 @@ class ExperimentStatus: self.expid = expid # type : str BasicConfig.read() try: - self.manager = ExperimentStatusDbManager(self.expid, BasicConfig.DB_DIR, BasicConfig.DB_FILE, local_root_dir_path=BasicConfig.LOCAL_ROOT_DIR) + options = { + 'root_path': BasicConfig.DB_DIR, + 'db_name': BasicConfig.DB_FILE, + 'local_root_dir_path': BasicConfig.LOCAL_ROOT_DIR, + 'schema': self.expid + } + self.manager = create_experiment_status_db_manager(BasicConfig.DATABASE_BACKEND, **options) except Exception as exp: message = "Error while trying to update {0} in experiment_status.".format(str(self.expid)) Logging(self.expid, BasicConfig.HISTORICAL_LOG_DIR).log(message, traceback.format_exc()) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 163689316fff64d4d48389874a50df9be66400b5..79a658dc6d822eaf57eae95963dbf9e875a6eb81 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -3194,14 +3194,12 @@ class JobList(object): # monitor = Monitor() packages = None try: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load(wrapper=False) + packages = JobPackagePersistence(expid).load(wrapper=False) except Exception as ex: print("Wrapper table not found, trying packages.") packages = None try: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load(wrapper=True) + packages = JobPackagePersistence(expid).load(wrapper=True) except Exception as exp2: packages = None pass diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 48c638db22d4e2d33fdf875f89c966af318bb466..9403b360d7b80ec508e6d159c219c882d63e9b8d 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -20,9 +20,11 @@ import os import pickle from sys import setrecursionlimit import shutil -from autosubmit.database.db_manager import DbManager +from autosubmit.database.db_manager import create_db_manager from log.log import AutosubmitCritical, Log from contextlib import suppress +from autosubmitconfigparser.config.basicconfig import BasicConfig +from pathlib import Path class JobListPersistence(object): @@ -141,8 +143,14 @@ class JobListPersistenceDb(JobListPersistence): "wrapper_type", ] - def __init__(self, persistence_path, persistence_file): - self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) + def __init__(self, expid): + options = { + 'root_path': str(Path(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl')), + 'db_name': f'job_list_{expid}', + 'db_version': self.VERSION, + 'schema': expid + } + self.db_manager = create_db_manager(BasicConfig.DATABASE_BACKEND, **options) def load(self, persistence_path, persistence_file): """ @@ -175,4 +183,4 @@ class JobListPersistenceDb(JobListPersistence): """ self.db_manager.drop_table(self.JOB_LIST_TABLE) - self.db_manager.create_table(self.JOB_LIST_TABLE, self.TABLE_FIELDS) + self.db_manager.create_table(self.JOB_LIST_TABLE, self.TABLE_FIELDS) \ No newline at end of file diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py index 91411274e56870175ae5657b308c15e40c9caee1..01a1ebd26f825056882fbea5bcecc2f42214b225 100644 --- a/autosubmit/job/job_package_persistence.py +++ b/autosubmit/job/job_package_persistence.py @@ -17,7 +17,9 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit.database.db_manager import DbManager +from autosubmit.database.db_manager import create_db_manager +from autosubmitconfigparser.config.basicconfig import BasicConfig +from pathlib import Path class JobPackagePersistence(object): @@ -37,8 +39,14 @@ class JobPackagePersistence(object): WRAPPER_JOB_PACKAGES_TABLE = 'wrapper_job_package' TABLE_FIELDS = ['exp_id', 'package_name', 'job_name'] - def __init__(self, persistence_path, persistence_file): - self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) + def __init__(self, expid): + options = { + 'root_path': Path(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + 'db_name': f"job_packages_{expid}", + 'db_version': self.VERSION, + 'schema': expid + } + self.db_manager = create_db_manager(BasicConfig.DATABASE_BACKEND, **options) self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS) self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS) def load(self,wrapper=False): diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index dae0ccc5e2c2d5f8e4e3ce29b57b6a774f2464b6..93182676c573460002f642be6fd2d29481b52894 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -249,8 +249,8 @@ def get_job_package_code(expid, job_name): try: basic_conf = BasicConfig() basic_conf.read() - packages_wrapper = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=True) - packages_wrapper_plus = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=False) + packages_wrapper = JobPackagePersistence(expid).load(wrapper=True) + packages_wrapper_plus = JobPackagePersistence(expid).load(wrapper=False) if packages_wrapper or packages_wrapper_plus: packages = packages_wrapper if len(packages_wrapper) > len(packages_wrapper_plus) else packages_wrapper_plus for exp, package_name, _job_name in packages: @@ -451,4 +451,4 @@ class SubJobManager(object): # type: () -> Dict[str, int] """ """ - return self.subjobfixes \ No newline at end of file + return self.subjobfixes diff --git a/environment.yml b/environment.yml index 56b5d33ea8262dca0dc94b22c7a42a196e77e435..2f6d88c1b98fba4431bf187bf6a70943a0142030 100644 --- a/environment.yml +++ b/environment.yml @@ -44,4 +44,6 @@ dependencies: - configobj - psutil - setproctitle + - sqlalchemy[mypy] + - testcontainers diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000000000000000000000000000000000..0c4433beb2f50b5411e500191f7b419bee6f00cc --- /dev/null +++ b/pytest.ini @@ -0,0 +1,14 @@ +[pytest] +addopts = + --verbose --ignore=regression + --cov=autosubmit --cov-config=.coveragerc + --cov-report=html:test/htmlcov --cov-report=xml:test/coverage.xml + --strict-markers +testpaths = + test/unit +doctest_optionflags = + NORMALIZE_WHITESPACE + IGNORE_EXCEPTION_DETAIL + ELLIPSIS +markers = + postgres diff --git a/setup.py b/setup.py index d4a763a162dc03f748edc3f1e18b57fb18efe243..422edfa36e0ce8ad72082c953b4fd52ee80edc61 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,101 @@ here = path.abspath(path.dirname(__file__)) with open(path.join(here, 'VERSION')) as f: version = f.read().strip() +install_requires = [ + 'xlib==0.21', + 'setuptools<=68.2.2', + 'bscearth.utils<=0.5.2', + 'requests<=2.31.0', + 'networkx<=2.6.3', + 'portalocker<=2.7.0', + 'mock<=5.1.0', + 'paramiko<=3.4', + 'pyparsing==3.1.1', + 'matplotlib<=3.8.3', + 'argparse<=1.4.0', + 'packaging<=23.2', + 'ruamel.yaml.clib<=0.2.8', + 'typing_extensions<=4.9.0', + 'typing<=3.7.4.3', + 'psutil<=5.6.1', + 'networkx<=2.6.3', + 'py3dotplus==1.1.0', + 'matplotlib<=3.8.3', + 'numpy<2', + 'ruamel.yaml==0.17.21', + 'rocrate==0.*', + 'autosubmitconfigparser==1.0.62', + 'configparser', + 'pathlib', + 'setproctitle' +] + +python_version_less_37_require = [ + 'PyNaCl==1.5.0', + 'pythondialog==3.5.3', + 'xlib==0.21', + 'setuptools==68.2.2', + 'cryptography==41.0.5', + 'bscearth.utils==0.5.2', + 'requests==2.31.0', + 'networkx==2.6.3', + 'portalocker==2.7.0', + 'mock==5.1.0', + 'paramiko==3.3.1', + 'matplotlib==3.5.3', + 'python_dateutil==2.8.2', + 'argparse==1.4.0', + 'configobj==5.0.8', + 'packaging==23.2', + 'bcrypt==4.0.1', + 'charset_normalizer==3.3.1', + 'kiwisolver==1.4.5', + 'fonttools==4.43.1', + 'cycler==0.12.1', + 'typing_extensions==4.8.0', + 'psutil==5.6.1', + 'Pygments==2.3.1', + 'coverage==5.0', + 'nose==1.3.7', + 'six==1.12.0', + 'Cython==0.29.6', + 'cffi==1.12.2', + 'py==1.8.0', + 'atomicwrites==1.3.0', + 'attrs==19.1.0', + 'more_itertools==6.0.0', + 'urllib3==1.24.1', + 'idna==2.8', + 'Pillow==6.2.1', + 'numpy==1.17.4', + 'typing_extensions<=4.9.0', + 'typing<=3.7.4.3', + 'sqlalchemy[mypy]', +] + +pg_require = [ + 'psycopg2' +] + +tests_require = [ + 'mock==5.1.0', + 'nose==1.3.7', + 'pytest==8.2.*', + 'pytest-cov', + 'pytest-mock', + 'testcontainers' +] + +# You can add more groups, e.g. all_require = tests_require + graph_require, etc... +all_require = tests_require + pg_require + +extras_require = { + 'postgres': pg_require, + 'tests': tests_require, + ':python_version <= "3.7"': python_version_less_37_require, + 'all': all_require +} + setup( name='autosubmit', license='GNU GPL v3', @@ -39,79 +134,11 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=[ - 'xlib==0.21', - 'setuptools<=68.2.2', - 'bscearth.utils<=0.5.2', - 'requests<=2.31.0', - 'networkx<=2.6.3', - 'portalocker<=2.7.0', - 'mock<=5.1.0', - 'paramiko<=3.4', - 'pyparsing==3.1.1', - 'matplotlib<=3.8.3', - 'argparse<=1.4.0', - 'packaging<=23.2', - 'ruamel.yaml.clib<=0.2.8', - 'typing_extensions<=4.9.0', - 'typing<=3.7.4.3', - 'psutil<=5.6.1', - 'networkx<=2.6.3', - 'py3dotplus==1.1.0', - 'matplotlib<=3.8.3', - 'numpy<2', - 'ruamel.yaml==0.17.21', - 'rocrate==0.*', - 'autosubmitconfigparser==1.0.62', - 'configparser', - 'pathlib', - 'setproctitle' - - ], - extras_require={ - ':python_version <= "3.7"': - [ - 'PyNaCl==1.5.0', - 'pythondialog==3.5.3', - 'xlib==0.21', - 'setuptools==68.2.2', - 'cryptography==41.0.5', - 'bscearth.utils==0.5.2', - 'requests==2.31.0', - 'networkx==2.6.3', - 'portalocker==2.7.0', - 'mock==5.1.0', - 'paramiko==3.3.1', - 'matplotlib==3.5.3', - 'python_dateutil==2.8.2', - 'argparse==1.4.0', - 'configobj==5.0.8', - 'packaging==23.2', - 'bcrypt==4.0.1', - 'charset_normalizer==3.3.1', - 'kiwisolver==1.4.5', - 'fonttools==4.43.1', - 'cycler==0.12.1', - 'typing_extensions==4.8.0', - 'psutil==5.6.1', - 'Pygments==2.3.1', - 'coverage==5.0', - 'nose==1.3.7', - 'six==1.12.0', - 'Cython==0.29.6', - 'cffi==1.12.2', - 'py==1.8.0', - 'atomicwrites==1.3.0', - 'attrs==19.1.0', - 'more_itertools==6.0.0', - 'urllib3==1.24.1', - 'idna==2.8', - 'Pillow==6.2.1', - 'numpy==1.17.4', - ], - }, + install_requires=install_requires, + extras_require=extras_require, classifiers=[ "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "License :: OSI Approved :: GNU General Public License (GPL)", "Operating System :: POSIX :: Linux", @@ -131,4 +158,3 @@ setup( }, scripts=['bin/autosubmit'] ) - diff --git a/test/unit/conftest.py b/test/unit/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..803781060a0956a56bf2d5a559c3cc63a0e7836c --- /dev/null +++ b/test/unit/conftest.py @@ -0,0 +1,126 @@ +import os +import pytest +from pathlib import Path +from pytest import MonkeyPatch +from sqlalchemy import Connection, text +from testcontainers.postgres import PostgresContainer +from typing import Type + +from autosubmit.database.session import create_engine +from autosubmitconfigparser.config.basicconfig import BasicConfig + +PG_USER = "postgres" +PG_PASS = "mysecretpassword" +PG_HOST = "localhost" +PG_PORT = 5432 +PG_DB = "autosubmit_test" + +DEFAULT_DATABASE_CONN_URL = f"postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}" +"""Default Postgres connection URL.""" + +_identity_value = lambda value=None: lambda *ignore_args, **ignore_kwargs: value +"""A type of identity function; returns a function that returns ``value``.""" + + +@pytest.fixture +def as_db_sqlite(monkeypatch: MonkeyPatch, tmp_path: Path) -> Type[BasicConfig]: + """Overwrites the BasicConfig to use SQLite database for testing. + + Args: + monkeypatch: Monkey Patcher. + Returns: + BasicConfig class. + """ + monkeypatch.setattr(BasicConfig, "read", _identity_value()) + monkeypatch.setattr(BasicConfig, "DATABASE_BACKEND", "sqlite") + monkeypatch.setattr(BasicConfig, 'DB_PATH', str(tmp_path / 'autosubmit.db')) + + return BasicConfig + + +@pytest.fixture(scope="session") +def run_test_pg_db() -> PostgresContainer: + """Run a TestContainer for PostgreSQL. + + It is started for the test session, and stopped at the end of such. + + Returns: + Postgres test container instance. + """ + with PostgresContainer( + image="postgres:16-bookworm", + port=PG_PORT, + username=PG_USER, + password=PG_PASS, + dbname=PG_DB, + driver=None + ).with_env( + "POSTGRES_HOST_AUTH_METHOD", "trust" + ).with_bind_ports( + 5432, 5432 + ) as postgres: + yield postgres + + +def _setup_pg_db(conn: Connection) -> None: + """Reset the database. + + Drops all schemas except the system ones and restoring the public schema. + + Args: + conn: Database connection. + """ + # Get all schema names that are not from the system + results = conn.execute( + text("""SELECT schema_name FROM information_schema.schemata + WHERE schema_name NOT LIKE 'pg_%' + AND schema_name != 'information_schema'""") + ).all() + schema_names = [res[0] for res in results] + + # Drop all schemas + for schema_name in schema_names: + conn.execute(text(f"""DROP SCHEMA IF EXISTS "{schema_name}" CASCADE""")) + + # Restore default public schema + conn.execute(text("CREATE SCHEMA public")) + conn.execute(text("GRANT ALL ON SCHEMA public TO public")) + conn.execute(text("GRANT ALL ON SCHEMA public TO postgres")) + + +@pytest.fixture +def as_db_postgres(monkeypatch: MonkeyPatch, run_test_pg_db) -> BasicConfig: + """Fixture to set up and tear down a Postgres database for testing. + + It will overwrite the ``BasicConfig`` to use Postgres. + + It uses the environment variable ``PYTEST_DATABASE_CONN_URL`` to connect to the database. + If the variable is not set, it uses the default connection URL. + + Args: + monkeypatch: Monkey Patcher. + run_test_pg_db: Fixture that starts the Postgres container. + Returns: + Autosubmit configuration for Postgres. + """ + + # Apply patch BasicConfig + monkeypatch.setattr(BasicConfig, "read", _identity_value()) + monkeypatch.setattr(BasicConfig, "DATABASE_BACKEND", "postgres") + monkeypatch.setattr( + BasicConfig, + "DATABASE_CONN_URL", + os.environ.get("PYTEST_DATABASE_CONN_URL", DEFAULT_DATABASE_CONN_URL), + ) + + # Setup database + with create_engine().connect() as conn: + _setup_pg_db(conn) + conn.commit() + + yield BasicConfig + + # Teardown database + with create_engine().connect() as conn: + _setup_pg_db(conn) + conn.commit() diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py index 35dca3350841c6a7a0d38ea7c72811893cbd569c..bfd4e2b3102beb1cad921e651d5098b9a61a36c6 100644 --- a/test/unit/test_checkpoints.py +++ b/test/unit/test_checkpoints.py @@ -3,8 +3,9 @@ from unittest import TestCase import inspect import shutil import tempfile -from mock import Mock, MagicMock +from mock import Mock, MagicMock, patch from random import randrange +from pathlib import Path from autosubmit.job.job import Job from autosubmit.job.job_common import Status @@ -12,18 +13,48 @@ from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistenceDb from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +class FakeBasicConfig: + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' + class TestJobList(TestCase): - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self.as_conf = Mock() self.as_conf.experiment_data = dict() self.as_conf.experiment_data["JOBS"] = dict() self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] self.as_conf.experiment_data["PLATFORMS"] = dict() - self.temp_directory = tempfile.mkdtemp() - self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + self.temp_directory = str(tempfile.mkdtemp()) + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + + self.job_list = JobList(self.experiment_id, patched_basic_config, YAMLParserFactory(), + JobListPersistenceDb(self.experiment_id), self.as_conf) dummy_serial_platform = MagicMock() dummy_serial_platform.name = 'serial' dummy_platform = MagicMock() @@ -149,22 +180,3 @@ class TestJobList(TestCase): job = Job(job_name, job_id, status, 0) job.type = randrange(0, 2) return job - -class FakeBasicConfig: - def __init__(self): - pass - def props(self): - pr = {} - for name in dir(self): - value = getattr(self, name) - if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): - pr[name] = value - return pr - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' diff --git a/test/unit/test_database_managers.py b/test/unit/test_database_managers.py index 9999fe9488dfafa49e5ee2bcbf666150eda403af..c0bc9d3c0f1a77d3ba18b77f321f7dadc7075d70 100644 --- a/test/unit/test_database_managers.py +++ b/test/unit/test_database_managers.py @@ -17,6 +17,7 @@ # along with Autosubmit. If not, see . import unittest +import pytest import time import random import os @@ -35,6 +36,7 @@ BasicConfig.read() JOBDATA_DIR = BasicConfig.JOBDATA_DIR LOCAL_ROOT_DIR = BasicConfig.LOCAL_ROOT_DIR +@pytest.mark.skip() @unittest.skip('TODO: looks like another test that used actual experiments data') class TestExperimentStatusDatabaseManager(unittest.TestCase): """ Covers Experiment Status Database Manager """ diff --git a/test/unit/test_db_common.py b/test/unit/test_db_common.py new file mode 100644 index 0000000000000000000000000000000000000000..27b90dff2e7dca197d60a343ace4922be13fdfb2 --- /dev/null +++ b/test/unit/test_db_common.py @@ -0,0 +1,120 @@ +import locale +import pytest +from pathlib import Path +from pkg_resources import resource_string + +from autosubmit.database import db_common +from autosubmitconfigparser.config.basicconfig import BasicConfig +from log.log import AutosubmitCritical + + +@pytest.mark.parametrize( + 'db_engine', + [ + # postgres + pytest.param('postgres', marks=[pytest.mark.postgres]), + # sqlite + 'sqlite' + ] +) +def test_db_common( + tmp_path: Path, + db_engine: str, + request +): + """Regression tests for ``db_common.py``. + + Tests for regression issues in ``db_common.py`` functions, and + for compatibility issues with the new functions for SQLAlchemy. + + The parameters allow the test to be run with different engine+options. + You can also mark certain tests belonging to a group (e.g. postgres) + so that they are skipped/executed selectively in CICD environments. + """ + + # Dynamically load the fixture for that DB, + # ref: https://stackoverflow.com/a/64348247. + request.getfixturevalue(f'as_db_{db_engine}') + + create_db_query = '' + + # The only differences in this test for SQLite and SQLAlchemy are + # i) the SQL query used to create a DB, ii) we check that the sqlite + # database file was created and iii) we load a different fixture for + # sqlite and sqlalchemy (to mock ``BasicConfig`` and run a container + # for sqlalchemy). + is_sqlite = db_engine == 'sqlite' + if is_sqlite: + # Code copied from ``autosubmit.py``. + create_db_query = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1]) + + assert db_common.create_db(create_db_query) + + if is_sqlite: + assert Path(BasicConfig.DB_PATH).exists() + + # Test last name used + assert "empty" == db_common.last_name_used() + assert "empty" == db_common.last_name_used(test=True) + assert "empty" == db_common.last_name_used(operational=True) + + new_exp = { + "name": "a700", + "description": "Description", + "autosubmit_version": "4.0.0", + } + + # Experiment doesn't exist yet + with pytest.raises(Exception): + db_common.check_experiment_exists(new_exp["name"]) + + # Test save + db_common.save_experiment( + new_exp["name"], new_exp["description"], new_exp["autosubmit_version"] + ) + assert db_common.check_experiment_exists( + new_exp["name"], error_on_inexistence=False + ) + assert db_common.last_name_used() == new_exp["name"] + + # Get version + assert ( + db_common.get_autosubmit_version(new_exp["name"]) + == new_exp["autosubmit_version"] + ) + new_version = "v4.1.0" + db_common.update_experiment_descrip_version( + new_exp["name"], version=new_version + ) + assert db_common.get_autosubmit_version(new_exp["name"]) == new_version + + # Update description + assert ( + db_common.get_experiment_descrip(new_exp["name"])[0][0] + == new_exp["description"] + ) + new_desc = "New Description" + db_common.update_experiment_descrip_version( + new_exp["name"], description=new_desc + ) + assert db_common.get_experiment_descrip(new_exp["name"])[0][0] == new_desc + + # Update back both: description and version + db_common.update_experiment_descrip_version( + new_exp["name"], + description=new_exp["description"], + version=new_exp["autosubmit_version"], + ) + assert ( + db_common.get_experiment_descrip(new_exp["name"])[0][0] + == new_exp["description"] + and db_common.get_autosubmit_version(new_exp["name"]) + == new_exp["autosubmit_version"] + ) + + # Delete experiment + assert db_common.delete_experiment(new_exp["name"]) + with pytest.raises(AutosubmitCritical): + assert db_common.get_autosubmit_version(new_exp["name"]) == new_exp[ + "autosubmit_version" + ] diff --git a/test/unit/test_db_manager.py b/test/unit/test_db_manager.py index a46133c9f76b78f6184dd3c899d341d1b04bc8a7..a80dca205fb336f19c0ef272491cec2025f96be0 100644 --- a/test/unit/test_db_manager.py +++ b/test/unit/test_db_manager.py @@ -4,7 +4,16 @@ import os import sys from mock import MagicMock from mock import patch -from autosubmit.database.db_manager import DbManager +from autosubmit.database.db_manager import DbManager, DatabaseManager, SqlAlchemyDbManager, create_db_manager + +import pytest +import random +from pathlib import Path +from sqlalchemy.exc import SQLAlchemyError +from sqlite3 import OperationalError +from typing import Optional, Type, Union, cast + +from autosubmit.database.tables import DBVersionTable class TestDbManager(TestCase): @@ -60,3 +69,111 @@ class TestDbManager(TestCase): DbManager('dummy-path', 'dummy-name', 999) connection_mock.cursor.assert_not_called() sys.modules['sqlite3'].connect = original_connect + + +@pytest.mark.parametrize( + 'db_engine,options,clazz,expected_exception', + [ + # postgres + pytest.param('postgres', {'schema': 'test_schema'}, SqlAlchemyDbManager, None, marks=[pytest.mark.postgres]), + # sqlite + ('sqlite', {'db_name': 'test_db_manager.db', 'db_version': 999}, DbManager, None), + # invalid engine + ('super-duper-database', {}, None, ValueError) + ]) +def test_db_manager( + tmp_path: Path, + db_engine: str, + options: dict, + clazz: Type, + expected_exception: BaseException, + request +): + """Regression tests for ``DbManager`` and ``SqlAlchemy``. + + Tests for regression issues in ``DbManager``, and for compatibility issues + with the new ``SqlAlchemyDbManager``. + + The parameters allow the test to be run with different engine+options, + accepting also the expected database type (``clazz``). You can also + mark certain tests belonging to a group (e.g. postgres) so that they + are skipped/executed selectively in CICD environments. + """ + + is_sqlalchemy = db_engine != 'sqlite' + if not is_sqlalchemy: + # N.B.: We do it here, as we don't know the temporary path name until the fixture exists, + # and because it's harmless to the Postgres test to have the tmp_path fixture. + options['root_path'] = str(tmp_path) + + # In this test we will create a random table, to show that it works with any table, + # not only the ones we have in the ``database.tables`` package. + table_name = DBVersionTable.name + + # The database manager under test will be either the old type ``DbManager``, or + # of the new type ``SqlAlchemyManager``. + db_manager: Optional[Union[DbManager, SqlAlchemyDbManager]] = None + + try: + # Is this parametrized test expected to fail? + if expected_exception is not None: + with pytest.raises(expected_exception): # type: ignore # TODO: [testing] how to type this? + create_db_manager(db_engine, **options) + return + + # If not, then now we dynamically load the fixture for that DB, + # ref: https://stackoverflow.com/a/64348247. + request.getfixturevalue(f'as_db_{db_engine}') + + database_manager: DatabaseManager = create_db_manager(db_engine, **options) + + # The database manager created has the right type? + assert isinstance(database_manager, clazz) + db_manager: clazz = cast(clazz, database_manager) + + # The database manager was constructed right? + if is_sqlalchemy: + assert db_engine in db_manager.engine.dialect.name + assert db_manager.schema == options['schema'] + else: + assert db_manager.root_path == options['root_path'] + assert db_manager.db_name == options['db_name'] + assert db_manager.db_version == options['db_version'] + + # In both cases, we must have a connection set up now + assert db_manager.connection is not None + + # NOTE: From this part forward, the behaviour MUST be the same for + # SQLite and Postgres or any other engine. This is the test + # that verifies that whatever we do with SQLite, works the + # same with another engine. + + # Create table + # TODO: get the fields dynamically? + db_manager.create_table(table_name, ['version']) + + # Table should be empty + records = db_manager.select_all(table_name) + assert isinstance(records, list) and len(records) == 0 + + # Insert N items + rand_len = random.randint(10, 50) + # db_manager.insertMany(table_name, [{"version": i} for i in range(rand_len)]) + db_manager.insertMany(table_name, [[i] for i in range(rand_len)]) + records = db_manager.select_all(table_name) + assert isinstance(records, list) and len(records) == rand_len + + # Delete table items (only available in the new SQLAlchemy DB Manager) + if is_sqlalchemy: + db_manager.delete_all(table_name) + records = db_manager.select_all(table_name) + assert isinstance(records, list) and len(records) == 0 + + # Drop table + db_manager.drop_table(table_name) + with pytest.raises((SQLAlchemyError, OperationalError)): # type: ignore # different errors, but expected... + # Table not exist + _ = db_manager.select_all(table_name) + finally: + if db_manager is not None: + db_manager.disconnect() diff --git a/test/unit/test_db_structure.py b/test/unit/test_db_structure.py new file mode 100644 index 0000000000000000000000000000000000000000..a3dce8ad553ce992073b7eb5bbcfa932d6733113 --- /dev/null +++ b/test/unit/test_db_structure.py @@ -0,0 +1,49 @@ +import networkx as nx +import pytest +from pathlib import Path +from typing import Type + +from autosubmit.database import db_structure +from autosubmit.database.db_manager import DbManager, SqlAlchemyDbManager + + +@pytest.mark.parametrize( + 'db_engine,options,clazz', + [ + # postgres + pytest.param('postgres', {'schema': 'test_schema'}, SqlAlchemyDbManager, marks=[pytest.mark.postgres]), + # sqlite + ('sqlite', {'db_name': 'test_db_manager.db', 'db_version': 999}, DbManager) + ]) +def test_db_structure( + tmp_path: Path, + db_engine: str, + options: dict, + clazz: Type, + request +): + # Load dynamically the fixture, + # ref: https://stackoverflow.com/a/64348247. + request.getfixturevalue(f'as_db_{db_engine}') + + graph = nx.DiGraph([("a", "b"), ("b", "c"), ("a", "d")]) + graph.add_node("z") + + # Creates a new SQLite db file + expid = "ut01" + + # Table not exists + assert db_structure.get_structure(expid, str(tmp_path)) == {} + + # Save table + db_structure.save_structure(graph, expid, str(tmp_path)) + + # Get correct data + structure_data = db_structure.get_structure(expid, str(tmp_path)) + assert sorted(structure_data) == sorted({ + "a": ["b", "d"], + "b": ["c"], + "c": [], + "d": [], + "z": ["z"], + }) diff --git a/test/unit/test_dependencies.py b/test/unit/test_dependencies.py index 2bb91211140c68715fdf0084864c773dcb2dacb9..f4452976381695ee10fc25a5d874eebcc11a4f3b 100644 --- a/test/unit/test_dependencies.py +++ b/test/unit/test_dependencies.py @@ -8,6 +8,7 @@ import unittest from copy import deepcopy from datetime import datetime from mock import patch +from pathlib import Path from autosubmit.job.job_dict import DicJobs from autosubmit.job.job import Job @@ -36,11 +37,12 @@ class FakeBasicConfig: LOCAL_TMP_DIR = '/dummy/local/temp/dir' LOCAL_PROJ_DIR = '/dummy/local/proj/dir' DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' class TestJobList(unittest.TestCase): - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self.as_conf = mock.Mock() self.as_conf.experiment_data = dict() @@ -48,8 +50,17 @@ class TestJobList(unittest.TestCase): self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] self.as_conf.experiment_data["PLATFORMS"] = dict() self.temp_directory = tempfile.mkdtemp() + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) self.JobList = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + JobListPersistenceDb(self.experiment_id), self.as_conf) self.date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207", "20020208", "20020209", "20020210"] self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"] diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index f8b2138e656428c4db95c65b58c4a12e101cfcda..e9ba8e1b73f5d8afc77eba31acbeff29ac42a6a5 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -7,6 +7,7 @@ from mock import Mock import math import shutil import tempfile +from pathlib import Path from autosubmit.job.job import Job from autosubmitconfigparser.config.yamlparser import YAMLParserFactory @@ -16,10 +17,32 @@ from autosubmit.job.job_dict import DicJobs from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistenceDb from unittest.mock import patch +import inspect + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' class TestDicJobs(TestCase): - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self.as_conf = Mock() @@ -30,8 +53,19 @@ class TestDicJobs(TestCase): self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] self.as_conf.experiment_data["PLATFORMS"] = dict() self.temp_directory = tempfile.mkdtemp() + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) + JobListPersistenceDb(self.experiment_id), self.as_conf) self.parser_mock = Mock(spec='SafeConfigParser') self.date_list = ['fake-date1', 'fake-date2'] self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"] @@ -600,26 +634,3 @@ class TestDicJobs(TestCase): section_data = [] self.dictionary._create_jobs_split(5,'fake-section','fake-date', 'fake-member', 'fake-chunk', 0,Type.BASH, section_data) self.assertEqual(5, len(section_data)) - - - - -import inspect -class FakeBasicConfig: - def __init__(self): - pass - def props(self): - pr = {} - for name in dir(self): - value = getattr(self, name) - if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): - pr[name] = value - return pr - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' diff --git a/test/unit/test_history.py b/test/unit/test_history.py index 68495d8e9944a3f558a3e085d285ab231e07bf79..3695e86f46ed85f0fb9591ac1593c4fc3a37df05 100644 --- a/test/unit/test_history.py +++ b/test/unit/test_history.py @@ -1,5 +1,6 @@ #!/usr/bin/python +import pytest # Copyright 2015-2020 Earth Sciences Department, BSC-CNS # This file is part of Autosubmit. @@ -36,6 +37,7 @@ JOBDATA_DIR = BasicConfig.JOBDATA_DIR LOCAL_ROOT_DIR = BasicConfig.LOCAL_ROOT_DIR job = namedtuple("Job", ["name", "date", "member", "status_str", "children"]) +@pytest.mark.skip() @unittest.skip('TODO: another test that uses actual data. See if there is anything useful, and extract into functional/integration/unit tests that run on any machine') class TestExperimentHistory(unittest.TestCase): # @classmethod @@ -290,6 +292,7 @@ class TestExperimentHistory(unittest.TestCase): +@pytest.mark.skip() class TestLogging(unittest.TestCase): def setUp(self): @@ -314,4 +317,4 @@ class TestLogging(unittest.TestCase): if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/unit/test_job_graph.py b/test/unit/test_job_graph.py index 579aee5adb3bf2c82ead800e7c8552cbfb57876b..bd9ee4e8cb9d6d3c4a8a33d9aa1d551a1f2e3348 100644 --- a/test/unit/test_job_graph.py +++ b/test/unit/test_job_graph.py @@ -1,8 +1,9 @@ import shutil import tempfile +from pathlib import Path from unittest import TestCase -from mock import Mock +from mock import Mock, patch from autosubmit.job.job_common import Status from autosubmit.job.job_list import JobList @@ -12,9 +13,26 @@ from random import randrange from autosubmit.job.job import Job from autosubmit.monitor.monitor import Monitor import unittest + +class FakeBasicConfig: + def __init__(self): + pass + + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' + + class TestJobGraph(TestCase): - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self.as_conf = Mock() self.as_conf.experiment_data = dict() @@ -22,8 +40,19 @@ class TestJobGraph(TestCase): self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] self.as_conf.experiment_data["PLATFORMS"] = dict() self.temp_directory = tempfile.mkdtemp() + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) + JobListPersistenceDb(self.experiment_id),self.as_conf) self.parser_mock = Mock(spec='SafeConfigParser') # Basic workflow with SETUP, INI, SIM, POST, CLEAN @@ -926,15 +955,3 @@ class TestJobGraph(TestCase): job.split = split return job -class FakeBasicConfig: - def __init__(self): - pass - - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' \ No newline at end of file diff --git a/test/unit/test_job_grouping.py b/test/unit/test_job_grouping.py index 01b53761a2b98e72ee95dc9c0f7743da61da7e0f..d5649afaedb89b552b1a7c35c6eb065cb25c81a1 100644 --- a/test/unit/test_job_grouping.py +++ b/test/unit/test_job_grouping.py @@ -1,5 +1,6 @@ import shutil import tempfile +from pathlib import Path from unittest import TestCase from mock import Mock @@ -14,9 +15,31 @@ from mock import patch from autosubmit.job.job_grouping import JobGrouping +import inspect +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' + class TestJobGrouping(TestCase): - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self.as_conf = Mock() self.as_conf.experiment_data = dict() @@ -24,8 +47,19 @@ class TestJobGrouping(TestCase): self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] self.as_conf.experiment_data["PLATFORMS"] = dict() self.temp_directory = tempfile.mkdtemp() + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) + JobListPersistenceDb(self.experiment_id),self.as_conf) self.parser_mock = Mock(spec='SafeConfigParser') # Basic workflow with SETUP, INI, SIM, POST, CLEAN @@ -992,25 +1026,3 @@ class TestJobGrouping(TestCase): job.split = split return job -import inspect -class FakeBasicConfig: - def __init__(self): - pass - def props(self): - pr = {} - for name in dir(self): - value = getattr(self, name) - if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): - pr[name] = value - return pr - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = '' - - - diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index e70f1087225bd228027490c577a692f37dcd4e34..8b41c2002dbb0d68cb0be8b9085eb9ff5946f03a 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -34,10 +34,12 @@ class FakeBasicConfig: LOCAL_PROJ_DIR = '/dummy/local/proj/dir' DEFAULT_PLATFORMS_CONF = '' DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' class TestJobPackage(TestCase): - def setUpWrappers(self,options): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUpWrappers(self,options,patched_basic_config): # reset self.as_conf = None self.job_package_wrapper = None @@ -53,8 +55,19 @@ class TestJobPackage(TestCase): self.as_conf.experiment_data["PLATFORMS"] = dict() self.as_conf.experiment_data["WRAPPERS"] = dict() self.temp_directory = tempfile.mkdtemp() + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf) + JobListPersistenceDb(self.experiment_id), self.as_conf) self.parser_mock = MagicMock(spec='SafeConfigParser') for job in self.jobs: job._init_runtime_parameters() diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 2c605bef3e4c25002d59ec72574d08276e83665b..e88b7c0ee30d46b053a5d52d9e2c2b9f6109b251 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -3,9 +3,10 @@ from operator import attrgetter import shutil import tempfile +from pathlib import Path from unittest import TestCase -from mock import MagicMock +from mock import MagicMock, patch import log.log from autosubmit.job.job_packager import JobPackager @@ -19,6 +20,28 @@ from autosubmit.job.job_list_persistence import JobListPersistenceDb from autosubmit.job.job_common import Status from random import randrange from collections import OrderedDict +import inspect + + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + DATABASE_BACKEND = 'sqlite' class TestWrappers(TestCase): @@ -156,7 +179,8 @@ class TestWrappers(TestCase): cls.workflows['running_once']['sections']["s5"]["WALLCLOCK"] = '00:30' cls.workflows['running_once']['sections']["s5"]["DEPENDENCIES"] = "s2" - def setUp(self): + @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig) + def setUp(self, patched_basic_config): self.experiment_id = 'random-id' self._wrapper_factory = MagicMock() @@ -170,8 +194,19 @@ class TestWrappers(TestCase): self.as_conf.experiment_data["PLATFORMS"] = dict() self.as_conf.experiment_data["WRAPPERS"] = dict() self.temp_directory = tempfile.mkdtemp() + + patched_basic_config.DB_DIR = self.temp_directory + patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db') + patched_basic_config.DB_PATH = patched_basic_config.DB_FILE + patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory + patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory + patched_basic_config.LOCAL_TMP_DIR = self.temp_directory + + Path(patched_basic_config.DB_FILE).touch() + Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True) + self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(), - JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) + JobListPersistenceDb(self.experiment_id),self.as_conf) self.parser_mock = MagicMock(spec='SafeConfigParser') @@ -1937,24 +1972,3 @@ class TestWrappers(TestCase): job.section = section return job - - -import inspect -class FakeBasicConfig: - def __init__(self): - pass - def props(self): - pr = {} - for name in dir(self): - value = getattr(self, name) - if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): - pr[name] = value - return pr - DB_DIR = '/dummy/db/dir' - DB_FILE = '/dummy/db/file' - DB_PATH = '/dummy/db/path' - LOCAL_ROOT_DIR = '/dummy/local/root/dir' - LOCAL_TMP_DIR = '/dummy/local/temp/dir' - LOCAL_PROJ_DIR = '/dummy/local/proj/dir' - DEFAULT_PLATFORMS_CONF = '' - DEFAULT_JOBS_CONF = ''