diff --git a/.coveragerc b/.coveragerc
new file mode 100644
index 0000000000000000000000000000000000000000..2ba850c90bab1159a214ec1c1479f646a96200f9
--- /dev/null
+++ b/.coveragerc
@@ -0,0 +1,2 @@
+[run]
+omit=
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 40e7cb3ec7e789f1d55658d9559726592bd3eb72..fe2b46562e35aa2536d9abd67e2c116b1508bdc9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,8 +16,10 @@ build/
# Coverage files
coverage/
-/.coverage
+.coverage
test/coverage.xml
+coverage.xml
+test/htmlcov
# Docker
dockerfiles/id_rsa*
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index f0f0a2895a8c4965919d7d6d8c4045ce8b0a805e..1f8da30d6bf8dd6f89155eff736cc37961e95af8 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -21,14 +21,14 @@ prepare:
- git submodule update --init --recursive
- conda update -n base -c defaults conda
- conda update conda
- - conda install -n autosubmit3 coverage=6
+ - conda install -n autosubmit3 pytest pytest-cov pytest-mock
- conda env update -f environment.yml -n autosubmit3 python=3.7.3
test_python3:
stage: test
script:
- conda activate autosubmit3
- - python3 -m 'nose' --exclude=regression --verbosity=3 test/unit --with-coverage --cover-package=autosubmit --cover-inclusive --cover-xml --cover-xml-file=test/coverage.xml
+ - pytest -m "not postgres"
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
# These artifacts are saved with every build in GitLab and can be reviewed later. If
# we have a folder with HTML files, as in this example, users can navigate with their
diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py
index bb7ea8fd9a6d65ac7daf4d3a6607e7753abc31ee..77ec83f6188b19a8e787300f423517add091c798 100644
--- a/autosubmit/autosubmit.py
+++ b/autosubmit/autosubmit.py
@@ -1487,8 +1487,7 @@ class Autosubmit:
safetysleeptime = as_conf.get_safetysleeptime()
Log.debug("The Experiment name is: {0}", expid)
Log.debug("Sleep: {0}", safetysleeptime)
- packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid)
+ packages_persistence = JobPackagePersistence(expid)
os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid,
"pkl", "job_packages_" + expid + ".db"), 0o644)
@@ -2014,8 +2013,7 @@ class Autosubmit:
Log.debug("Loading job packages")
# Packages == wrappers and jobs inside wrappers. Name is also missleading.
try:
- packages_persistence = JobPackagePersistence(os.path.join(
- BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid)
+ packages_persistence = JobPackagePersistence(expid)
except IOError as e:
raise AutosubmitError(
"job_packages not found", 6016, str(e))
@@ -2669,8 +2667,7 @@ class Autosubmit:
try:
if len(as_conf.experiment_data.get("WRAPPERS", {})) > 0 and check_wrapper:
# Class constructor creates table if it does not exist
- packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid)
+ packages_persistence = JobPackagePersistence(expid)
# Permissions
os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0o644)
# Database modification
@@ -2682,11 +2679,9 @@ class Autosubmit:
packages_persistence, True)
packages = packages_persistence.load(True)
- packages += JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load()
+ packages += JobPackagePersistence(expid).load()
else:
- packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load()
+ packages = JobPackagePersistence(expid).load()
except BaseException as e:
raise AutosubmitCritical("Issues during the wrapper loading, may be related to IO issues", 7040, str(e))
finally:
@@ -3001,8 +2996,7 @@ class Autosubmit:
raise AutosubmitCritical("Couldn't restore the experiment workflow", 7040, str(e))
try:
- packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load()
+ packages = JobPackagePersistence(expid).load()
groups_dict = dict()
if group_by:
@@ -4058,15 +4052,20 @@ class Autosubmit:
Creates a new database instance for autosubmit at the configured path
"""
- if not os.path.exists(BasicConfig.DB_PATH):
- Log.info("Creating autosubmit database...")
- qry = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1])
- #qry = importlib.resources.read_text('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1])
- if not create_db(qry):
- raise AutosubmitCritical("Can not write database file", 7004)
- Log.result("Autosubmit database created successfully")
+ if BasicConfig.DATABASE_BACKEND == 'sqlite':
+ if not os.path.exists(BasicConfig.DB_PATH):
+ Log.info("Creating autosubmit database...")
+ qry = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1])
+ #qry = importlib.resources.read_text('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1])
+ if not create_db(qry):
+ raise AutosubmitCritical("Can not write database file", 7004)
+ Log.result("Autosubmit database created successfully")
+ else:
+ raise AutosubmitCritical("Database already exists.", 7004)
else:
- raise AutosubmitCritical("Database already exists.", 7004)
+ Log.info("Creating autosubmit Postgres database...")
+ if not create_db(''):
+ raise AutosubmitCritical("Failed to create Postgres database", 7004)
return True
@staticmethod
@@ -4826,8 +4825,7 @@ class Autosubmit:
job_list.save()
as_conf.save()
try:
- packages_persistence = JobPackagePersistence(
- os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid)
+ packages_persistence = JobPackagePersistence(expid)
packages_persistence.reset_table()
packages_persistence.reset_table(True)
except:
@@ -5644,9 +5642,7 @@ class Autosubmit:
#Visualization stuff that should be in a function common to monitor , create, -cw flag, inspect and so on
if not noplot:
if as_conf.get_wrapper_type() != 'none' and check_wrapper:
- packages_persistence = JobPackagePersistence(
- os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid)
+ packages_persistence = JobPackagePersistence(expid)
os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR,
expid, "pkl", "job_packages_" + expid + ".db"), 0o775)
packages_persistence.reset_table(True)
@@ -5658,8 +5654,7 @@ class Autosubmit:
packages = packages_persistence.load(True)
else:
- packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load()
+ packages = JobPackagePersistence(expid).load()
groups_dict = dict()
if group_by:
status = list()
@@ -5822,8 +5817,7 @@ class Autosubmit:
if storage_type == 'pkl':
return JobListPersistencePkl()
elif storage_type == 'db':
- return JobListPersistenceDb(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_list_" + expid)
+ return JobListPersistenceDb(expid)
raise AutosubmitCritical('Storage type not known', 7014)
@staticmethod
diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py
index 9f93e04c5ee48c9378fc0f0751b1a3a87bf01b42..7c35c450c1106a770567d107366ceb4c07f5d487 100644
--- a/autosubmit/database/db_common.py
+++ b/autosubmit/database/db_common.py
@@ -28,6 +28,11 @@ from log.log import Log, AutosubmitCritical, AutosubmitError
Log.get_logger("Autosubmit")
from autosubmitconfigparser.config.basicconfig import BasicConfig
+from autosubmit.database import tables, session
+from sqlalchemy import delete, select, Connection, insert, update, func
+from sqlalchemy.schema import CreateTable
+from typing import List, Optional, cast
+
CURRENT_DATABASE_VERSION = 1
TIMEOUT = 10
@@ -38,6 +43,9 @@ def create_db(qry):
:param qry: query to create the new database
:type qry: str """
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ return create_db_pg()
+
try:
(conn, cursor) = open_conn(False)
except DbException as e:
@@ -144,8 +152,12 @@ def save_experiment(name, description, version):
:param description: experiment's description
:type description: str
"""
+ fn = _save_experiment
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _save_experiment_sqlalchemy
+
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_save_experiment, queue, name, description, version))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, description, version))
proc.start()
try:
@@ -168,8 +180,11 @@ def check_experiment_exists(name, error_on_inexistence=True):
:return: If experiment exists returns true, if not returns false
:rtype: bool
"""
+ fn = _check_experiment_exists
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _check_experiment_exists_sqlalchemy
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_check_experiment_exists, queue, name, error_on_inexistence))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, error_on_inexistence))
proc.start()
try:
@@ -194,8 +209,11 @@ def update_experiment_descrip_version(name, description=None, version=None):
:return: If description has been update, True; otherwise, False.
:rtype: bool
"""
+ fn = _update_experiment_descrip_version
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _update_experiment_descrip_version_sqlalchemy
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_update_experiment_descrip_version, queue, name, description, version))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, name, description, version))
proc.start()
try:
@@ -215,9 +233,12 @@ def get_autosubmit_version(expid):
:type expid: str
:return: If experiment exists returns the autosubmit version for it, if not returns None
:rtype: str
- """
+ """
+ fn = _get_autosubmit_version
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _get_autosubmit_version_sqlalchemy
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_get_autosubmit_version, queue, expid))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, expid))
proc.start()
try:
@@ -240,8 +261,11 @@ def last_name_used(test=False, operational=False):
:return: last experiment identifier used, 'empty' if there is none
:rtype: str
"""
+ fn = _last_name_used
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _last_name_used_sqlalchemy
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_last_name_used, queue, test, operational))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, test, operational))
proc.start()
try:
@@ -262,8 +286,11 @@ def delete_experiment(experiment_id):
:return: True if delete is successful
:rtype: bool
"""
+ fn = _delete_experiment
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ fn = _delete_experiment_sqlalchemy
queue = multiprocessing.Queue(1)
- proc = multiprocessing.Process(target=fn_wrapper, args=(_delete_experiment, queue, experiment_id))
+ proc = multiprocessing.Process(target=fn_wrapper, args=(fn, queue, experiment_id))
proc.start()
try:
@@ -349,6 +376,8 @@ def _check_experiment_exists(name, error_on_inexistence=True):
return True
def get_experiment_descrip(expid):
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ return get_experiment_descrip_sqlalchemy(expid)
if not check_db():
return False
try:
@@ -568,3 +597,196 @@ class DbException(Exception):
def __init__(self, message):
self.message = message
+
+
+# Code added for SQLAlchemy support
+
+
+def _get_sqlalchemy_conn() -> Connection:
+ """Return the database connection.
+
+ It captures any exception, returning an ``AutosubmitCritical``
+ as in the previous SQLite-only code. With this function we
+ can use a context-manager and keep the previous behaviour
+ intact.
+ """
+ try:
+ return session.create_engine().connect()
+ except Exception as e:
+ raise AutosubmitCritical("Could not establish a connection to database", 7001, str(e))
+
+
+def create_db_pg() -> bool:
+ """Create the Postgres tables (not really the database).
+
+ This function is the equivalent to the old ``create_db`` function
+ for SQLite, with the difference that that function has a parameter
+ with the query.
+
+ However, at the moment of writing, the only use of that function is
+ to execute the contents of ``autosubmit.sql``, which create the
+ ``experiment`` and the ``db_version`` tables.
+
+ This whole module was not well-designed, and is up for a refactoring
+ at some point in the future, where both code might be superseded by
+ a better version.
+
+ :raise AutosubmitCritical: If there are any issues with the database.
+ """
+ tables_to_create = [tables.ExperimentTable, tables.DBVersionTable]
+
+ try:
+ with _get_sqlalchemy_conn() as conn:
+ for table in tables_to_create:
+ conn.execute(CreateTable(table, if_not_exists=True))
+ conn.execute(delete(tables.DBVersionTable))
+ conn.execute(insert(tables.DBVersionTable).values({"version": 1}))
+ conn.commit()
+ except AutosubmitCritical:
+ raise
+ except Exception as exc:
+ raise AutosubmitCritical("Database can not be created", 7004, str(exc))
+
+ return True
+
+
+def _save_experiment_sqlalchemy(name: str, description: str, version: str) -> bool:
+ with _get_sqlalchemy_conn() as conn:
+ try:
+ conn.execute(insert(tables.ExperimentTable).values(
+ name=name, description=description, autosubmit_version=version)
+ )
+ conn.commit()
+ except AutosubmitCritical:
+ raise
+ except Exception as exc:
+ conn.rollback()
+ raise AutosubmitCritical(
+ 'Could not register experiment', 7005, str(exc))
+ return True
+
+
+def _check_experiment_exists_sqlalchemy(name: str, error_on_inexistence=True) -> bool:
+ row = None
+ with _get_sqlalchemy_conn() as conn:
+ try:
+ query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == name)
+ row = conn.execute(query).one_or_none()
+ except AutosubmitCritical:
+ raise
+ except Exception as exc:
+ conn.rollback()
+ raise AutosubmitCritical(
+ 'Could not register experiment', 7005, str(exc))
+
+ if row is None:
+ if error_on_inexistence:
+ raise AutosubmitCritical(
+ 'The experiment name "{0}" does not exist yet!!!'.format(name), 7005)
+ if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, name)):
+ try:
+ _save_experiment(name, 'No description', "3.14.0")
+ except BaseException as e:
+ pass
+ return True
+ return False
+
+ return True
+
+
+def get_experiment_descrip_sqlalchemy(expid) -> List[List[str]]:
+ with _get_sqlalchemy_conn() as conn:
+ query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == expid)
+ row = conn.execute(query).one_or_none()
+
+ if row:
+ return [[row.description]]
+ return []
+
+
+def _update_experiment_descrip_version_sqlalchemy(
+ name: str,
+ description: Optional[str] = None,
+ version: Optional[str] = None) -> bool:
+ # Conditional update statement
+ if description is None and version is None:
+ raise AutosubmitCritical("Not enough data to update {}.".format(name), 7005)
+
+ query = update(tables.ExperimentTable).where(tables.ExperimentTable.c.name == name)
+ vals = {}
+ if isinstance(description, str):
+ vals["description"] = description
+ if isinstance(version, str):
+ vals["autosubmit_version"] = version
+ query = query.values(vals)
+
+ with _get_sqlalchemy_conn() as conn:
+ result = conn.execute(query)
+ conn.commit()
+
+ if result.rowcount == 0:
+ raise AutosubmitCritical("Update on experiment {} failed.".format(name), 7005)
+ return True
+
+
+def _get_autosubmit_version_sqlalchemy(expid) -> str:
+ with _get_sqlalchemy_conn() as conn:
+ query = select(tables.ExperimentTable).where(tables.ExperimentTable.c.name == expid)
+ row = conn.execute(query).one_or_none()
+
+ if row is None:
+ raise AutosubmitCritical(
+ 'The experiment "{0}" does not exist'.format(expid), 7005)
+ return row.autosubmit_version
+
+
+def _last_name_used_sqlalchemy(test=False, operational=False) -> str:
+ if test:
+ condition = tables.ExperimentTable.c.name.like("t%")
+ elif operational:
+ condition = tables.ExperimentTable.c.name.like("o%")
+ else:
+ condition = tables.ExperimentTable.c.name.not_like(
+ "t%"
+ ) & tables.ExperimentTable.c.name.not_like("o%")
+
+ sub_query = select(func.max(tables.ExperimentTable.c.id).label("id")).where(
+ condition
+ & tables.ExperimentTable.c.autosubmit_version.is_not(None)
+ & tables.ExperimentTable.c.autosubmit_version.not_like("%3.0.0b%")
+ ).scalar_subquery()
+ query = select(tables.ExperimentTable.c.name).where(
+ tables.ExperimentTable.c.id == sub_query
+ )
+
+ with _get_sqlalchemy_conn() as conn:
+ row = conn.execute(query).one_or_none()
+
+ if row is None:
+ return 'empty'
+
+ # If starts by number (during 3.0 beta some jobs starting with numbers where created), returns empty.
+ try:
+ if row.name.isnumeric():
+ return 'empty'
+ else:
+ return row.name
+ except ValueError:
+ return row.name
+
+
+def _delete_experiment_sqlalchemy(experiment_id: str) -> bool:
+ if not _check_experiment_exists_sqlalchemy(experiment_id, False): # Reference the no anti-lock version.
+ return True
+
+ with _get_sqlalchemy_conn() as conn:
+ query = delete(tables.ExperimentTable).where(
+ tables.ExperimentTable.c.name == experiment_id
+ )
+ result = conn.execute(query)
+ conn.commit()
+
+ if cast(int, result.rowcount) > 0:
+ Log.debug("The experiment {0} has been deleted!!!", experiment_id)
+ return True
+
diff --git a/autosubmit/database/db_manager.py b/autosubmit/database/db_manager.py
index 6e7376b9b1fbdd9c9b3081f42493c23f52ece210..fb7c39f9d9a23f7b89a89959f034e5c49d6dffc4 100644
--- a/autosubmit/database/db_manager.py
+++ b/autosubmit/database/db_manager.py
@@ -20,6 +20,13 @@
import sqlite3
import os
+from typing import Optional, Protocol, Union, cast
+from sqlalchemy import Connection, Engine, delete, insert, select
+from autosubmit.database import session
+from autosubmit.database.tables import get_table_from_name
+from sqlalchemy.schema import CreateTable, CreateSchema, DropTable
+# FIXME: what happens in DbManager when db_version is None or 0000, etc?
+
class DbManager(object):
"""
@@ -234,3 +241,200 @@ class DbManager(object):
for condition in where:
select_command += ' AND ' + condition
return select_command
+
+
+class DatabaseManager(Protocol):
+ """Common interface for database managers.
+
+ We used a protocol here to avoid having to modify the existing
+ SQLite code (as we would if we used an abstract/ABC class).
+
+ And the new database manager will "quack" like the other one does.
+ """
+
+ connection: Union[Connection, sqlite3.Connection]
+
+ def backup(self): ...
+ def restore(self): ...
+ def disconnect(self): ...
+ def create_table(self, table_name, fields): ...
+ def drop_table(self, table_name): ...
+ def insert(self, table_name, columns, values): ...
+ def insertMany(self, table_name, data): ...
+ def select_first(self, table_name): ...
+ def select_first_where(self, table_name, where): ...
+ def select_all(self, table_name): ...
+ def select_all_where(self, table_name, where): ...
+ def count(self, table_name): ...
+ def drop(self): ...
+
+
+class SqlAlchemyDbManager:
+ """A database manager using SQLAlchemy.
+
+ It contains the same public functions as ``DbManager``
+ (SQLite only), but uses SQLAlchmey instead of calling database
+ driver functions directly.
+
+ It can be used with any engine supported by SQLAlchemy, such
+ as Postgres, Mongo, MySQL, etc.
+
+ Static functions from ``DbManager`` were skipped, as those are
+ similar to what SQLAlchemy already provides for creating and
+ queries.
+
+ Some operations here may raise ``NotImplemented``, as they existed in
+ the ``DbManager`` but were never used in Autosubmit (i.e. we can
+ delete that code -- for later).
+
+ TODO: At some point in the future, we can drop ``DbManager`` and
+ replace it with this one (or rename this to DbManager), as
+ SQLAlchemy should be able to handle SQLite too.
+ """
+
+ def __init__(self, schema: Optional[str] = None) -> None:
+ self.engine: Engine = session.create_engine()
+ self.schema = schema
+ # TODO: if we cannot do that (too long-lived conns?), then we can open for each operation...
+ self.connection: Connection = self.engine.connect()
+ # TODO: in SQLite-land, it creates the tables if they do not exist... what about here?
+
+ def backup(self):
+ pass
+
+ def restore(self):
+ pass
+
+ def disconnect(self):
+ self.connection.close()
+
+ def create_table(self, table_name, fields):
+ # NOTE: ``fields`` is ignored as they are defined in the SQLAlchemy
+ # tables definitions (see ``.database.tables``). Kept for
+ # backward compatibility with the old API for SQLite.
+ table = get_table_from_name(schema=self.schema, table_name=table_name)
+ self.connection.execute(CreateSchema(self.schema, if_not_exists=True))
+ self.connection.execute(CreateTable(table, if_not_exists=True))
+ self.connection.commit()
+
+ def drop_table(self, table_name):
+ table = get_table_from_name(schema=self.schema, table_name=table_name)
+ self.connection.execute(DropTable(table, if_exists=True))
+ self.connection.commit()
+
+ def insert(self, table_name, columns, values):
+ """Not implemented.
+
+ In the original ``DbManager`` (SQLite), this function is used
+ only to initialize the SQLite database (i.e. no external users).
+ """
+ raise NotImplementedError()
+
+ def insertMany(self, table_name, data):
+ """
+ N.B.: One difference between SQLite and SQLAlchemy here;
+ whereas with SQLite you insert a list of values
+ matching the columns, for SQLAlchemy you provide
+ a dictionary. For backward-compatibility, we risk
+ accepting a list or dictionary here. If a list
+ is provided, then we will use the Table metadata
+ to fetch the columns. Not the safest approach, so
+ prefer to send a dictionary!
+
+ :type table_name: str
+ :type data: Union[List, Dict]
+ """
+ if len(data) == 0: # type: ignore
+ return 0
+ table = get_table_from_name(schema=self.schema, table_name=table_name)
+ if type(data[0]) == list:
+ # Convert into a dictionary; we know the keys from the
+ # table metadata columns.
+ columns = [column.name for column in cast(list, table.columns)]
+ data = map(lambda values: {
+ key: value
+ for key, value in
+ zip(columns, values)
+ }, cast(list, data))
+ data = list(data)
+ result = self.connection.execute(insert(table), data)
+ self.connection.commit()
+ return cast(int, result.rowcount)
+
+ def select_first(self, table_name):
+ """Not used in the original ``DbManager``!"""
+ raise NotImplementedError()
+
+ def select_first_where(self, table_name, where):
+ """Not used in the original ``DbManager``!"""
+ raise NotImplementedError()
+
+ def select_all(self, table_name):
+ table = get_table_from_name(schema=self.schema, table_name=table_name)
+ rows = self.connection.execute(select(table)).all()
+ return rows
+
+ def select_all_where(self, table_name, where):
+ """Not used in the original ``DbManager``!"""
+ raise NotImplementedError()
+
+ def count(self, table_name):
+ """Not used in the original ``DbManager``!"""
+ raise NotImplementedError()
+
+ def drop(self):
+ """Not used in the original ``DbManager``!"""
+ raise NotImplementedError()
+
+ # New! Used for testing...
+
+ def delete_all(self, table_name: str) -> int:
+ """Deletes all the rows of the table.
+
+ :params table_name: The name of the table.
+ :return: number of tables modified.
+ """
+ table = get_table_from_name(schema=self.schema, table_name=table_name)
+ result = self.connection.execute(delete(table))
+ self.connection.commit()
+ return cast(int, result.rowcount)
+
+
+def create_db_manager(db_engine: str, **options) -> DatabaseManager:
+ """
+ Creates a Postgres or SQLite database manager based on the Autosubmit configuration.
+
+ Note that you must provide the options even if they are optional, in which case
+ you must provide ``options=None``, or you will get a ``KeyError``.
+
+ Later we might be able to drop the SQLite database manager. So, for the moment,
+ please call the function providing the database engine type, and the arguments
+ for both SQLite and for SQLAlchemy.
+
+ This means you do not have to do an ``if/else`` in your code, just give
+ this function the engine type, and all the valid options, and it should
+ handle choosing and building the database manager for you. e.g.
+
+ >>> from autosubmit.database.db_manager import create_db_manager
+ >>> options = {
+ >>> # these are for sqlite
+ >>> 'root_path': '/tmp/',
+ >>> 'db_name': 'name.db',
+ >>> 'db_version': 1,
+ >>> # and these for sqlalchemy -- not very elegant, but this is
+ >>> # to work-effectively-with-legacy-code (as in that famous book).
+ >>> 'schema': 'a001'
+ >>> }
+ >>> db_manager = create_db_manager(db_engine='postgres', **options)
+
+ :param db_engine: The database engine type.
+ :return: A ``DatabaseManager``.
+ :raises ValueError: If the database engine type is not valid.
+ :raises KeyError: If the ``options`` dictionary is missing a required parameter for an engine.
+ """
+ if db_engine == "postgres":
+ return cast(DatabaseManager, SqlAlchemyDbManager(options['schema']))
+ elif db_engine == "sqlite":
+ return cast(DatabaseManager, DbManager(options['root_path'], options['db_name'], options['db_version']))
+ else:
+ raise ValueError(f"Invalid database engine: {db_engine}")
diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py
index 31dc42740a56c6536355b06ce6dfaba255d4a77c..4da0149e6b003c9bf83efdfaeaad62bb5396534d 100644
--- a/autosubmit/database/db_structure.py
+++ b/autosubmit/database/db_structure.py
@@ -23,9 +23,12 @@ import textwrap
import traceback
import sqlite3
-from typing import Dict, List
+from typing import Dict, List, Optional
from log.log import Log
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+from autosubmit.database.db_manager import DatabaseManager, create_db_manager
+
def get_structure(exp_id, structures_path):
# type: (str, str) -> Dict[str, List[str]]
@@ -35,7 +38,9 @@ def get_structure(exp_id, structures_path):
:return: Map from experiment name source to name destination
:rtype: Dictionary Key: String, Value: List(of String)
"""
- try:
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ return get_structure_sqlalchemy(exp_id, structures_path)
+ try:
if os.path.exists(structures_path):
db_structure_path = os.path.join(
structures_path, "structure_" + exp_id + ".db")
@@ -118,6 +123,8 @@ def save_structure(graph, exp_id, structures_path):
"""
Saves structure if path is valid
"""
+ if BasicConfig.DATABASE_BACKEND == 'postgres':
+ return save_structure_sqlalchemy(graph, exp_id, str)
conn = None
# Path to structures folder exists
if os.path.exists(structures_path):
@@ -175,3 +182,70 @@ def _delete_table_content(conn):
except sqlite3.Error as e:
Log.debug(traceback.format_exc())
Log.warning("Error on Delete : {0}".format(str(type(e).__name__)))
+
+
+# Code added for SQLAlchemy support
+
+def get_structure_sqlalchemy(exp_id: str, structures_path: str) -> Dict[str, List[str]]:
+ """
+ Creates file of database and table of experiment structure if it does not exist. Returns current structure.
+
+ :return: Map from experiment name source to name destination
+ """
+ db_manager: Optional[DatabaseManager] = None
+ options = {'schema': exp_id}
+ try:
+ db_manager = create_db_manager('postgres', **options)
+
+ # FIXME: to verify, the old code did what's shown below;
+ # does that also apply to the new code?
+ # if not os.path.exists(db_structure_path):
+ # open(db_structure_path, "w")
+ # print(db_structure_path)
+ # There used to be a function ``create_table`` here, but its feature/behaviour
+ # is already implemented in the ``DbManager`` class (which handles DDL), so that
+ # can be deleted as ``DbManager`` creates SQLite and SQLAlchemy tables.
+ db_manager.create_table(
+ table_name='experiment_structure',
+ fields=['e_from text NOT NULL',
+ 'e_to text NOT NULL',
+ 'UNIQUE(e_from,e_to)']
+ )
+ # The call below is equivalent of ``_get_exp_structure`` in the old code.
+ current_table = db_manager.select_all('experiment_structure')
+ current_table_structure = dict()
+ for item in current_table:
+ _from, _to = item
+ current_table_structure.setdefault(_from, []).append(_to)
+ current_table_structure.setdefault(_to, [])
+ return current_table_structure
+ except Exception as exp:
+ Log.printlog("Get structure error: {0}".format(str(exp)), 6014)
+ Log.debug(traceback.format_exc())
+ finally:
+ if db_manager:
+ db_manager.disconnect()
+
+
+def save_structure_sqlalchemy(graph, exp_id, structures_path):
+ """
+ Saves structure if path is valid
+ """
+ db_manager: Optional[DatabaseManager] = None
+ options = {'schema': exp_id}
+ try:
+ db_manager = create_db_manager('postgres', **options)
+
+ # Save structure
+ nodes_edges = {u for u, v in graph.edges()}
+ nodes_edges.update({v for u, v in graph.edges()})
+ independent_nodes = {
+ u for u in graph.nodes() if u not in nodes_edges}
+ data = {(u, v) for u, v in graph.edges()}
+ data.update({(u, u) for u in independent_nodes})
+ # save
+ edges = [{"e_from": e[0], "e_to": e[1]} for e in data]
+ db_manager.insertMany('experiment_structure', edges)
+ finally:
+ if db_manager:
+ db_manager.disconnect()
diff --git a/autosubmit/database/session.py b/autosubmit/database/session.py
new file mode 100644
index 0000000000000000000000000000000000000000..03bff2ecd96ef1eb070aec1e4e2a7df2241617af
--- /dev/null
+++ b/autosubmit/database/session.py
@@ -0,0 +1,24 @@
+from pathlib import Path
+from sqlalchemy import Engine, NullPool, create_engine as sqlalchemy_create_engine
+from typing import Optional
+
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+
+_SQLITE_IN_MEMORY_URL = "sqlite://"
+
+
+def _create_sqlite_engine(path: Optional[str] = None) -> Engine:
+ # file-based, or in-memory database?
+ sqlite_url = f"sqlite:///{Path(path).resolve()}" if path else _SQLITE_IN_MEMORY_URL
+ return sqlalchemy_create_engine(sqlite_url, poolclass=NullPool)
+
+
+def create_engine() -> Engine:
+ """Create SQLAlchemy Core engine."""
+ if BasicConfig.DATABASE_BACKEND == "postgres":
+ return sqlalchemy_create_engine(BasicConfig.DATABASE_CONN_URL)
+ else:
+ return _create_sqlite_engine()
+
+
+__all__ = ["create_engine"]
diff --git a/autosubmit/database/tables.py b/autosubmit/database/tables.py
new file mode 100644
index 0000000000000000000000000000000000000000..e21891cf1a7fd88022a2ae020e4405696c5c7eca
--- /dev/null
+++ b/autosubmit/database/tables.py
@@ -0,0 +1,207 @@
+from sqlalchemy import (
+ MetaData,
+ Integer,
+ String,
+ Table,
+ Text,
+ Float,
+ UniqueConstraint,
+ Column,
+)
+from typing import List, Optional, cast
+
+metadata_obj = MetaData()
+
+
+ExperimentTable = Table(
+ 'experiment',
+ metadata_obj,
+ Column('id', Integer, nullable=False, primary_key=True),
+ Column('name', String, nullable=False),
+ Column('description', String, nullable=False),
+ Column('autosubmit_version', String)
+)
+"""The main table, populated by Autosubmit. Should be read-only by the API."""
+
+# NOTE: In the original SQLite DB, db_version.version was the only field,
+# and not a PK.
+DBVersionTable = Table(
+ 'db_version',
+ metadata_obj,
+ Column('version', Integer, nullable=False, primary_key=True)
+)
+
+ExperimentStructureTable = Table(
+ 'experiment_structure',
+ metadata_obj,
+ Column('e_from', Text, nullable=False, primary_key=True),
+ Column('e_to', Text, nullable=False, primary_key=True),
+)
+"""Table that holds the structure of the experiment jobs."""
+
+ExperimentStatusTable = Table(
+ 'experiment_status',
+ metadata_obj,
+ Column('exp_id', Integer, primary_key=True),
+ Column('name', Text, nullable=False),
+ Column('status', Text, nullable=False),
+ Column('seconds_diff', Integer, nullable=False),
+ Column('modified', Text, nullable=False)
+)
+"""Stores the status of the experiments."""
+
+JobPackageTable = Table(
+ 'job_package',
+ metadata_obj,
+ Column('package_name', Text, primary_key=True),
+ Column('job_name', Text, primary_key=True),
+ Column('exp_id', Text)
+)
+"""Stores a mapping between the wrapper name and the actual job in SLURM."""
+
+WrapperJobPackageTable = Table(
+ 'wrapper_job_package',
+ metadata_obj,
+ Column('package_name', Text, primary_key=True),
+ Column('job_name', Text, primary_key=True),
+ Column('exp_id', Text)
+)
+"""It is a replication.
+
+It is only created/used when using inspect and create or monitor
+with flag -cw in Autosubmit.
+
+This replication is used to not interfere with the current
+autosubmit run of that experiment since wrapper_job_package
+will contain a preview, not the real wrapper packages."""
+
+# NOTE: The column ``metadata`` has a name that is reserved in
+# SQLAlchemy ORM. It works for SQLAlchemy Core, here, but
+# if you plan to use ORM, be warned that you will have to
+# search how to workaround it (or will probably have to
+# use SQLAlchemy core here).
+ExperimentRunTable = Table(
+ "experiment_run",
+ metadata_obj,
+ Column("run_id", Integer, primary_key=True),
+ Column("created", Text, nullable=False),
+ Column("modified", Text, nullable=True),
+ Column("start", Integer, nullable=False),
+ Column("finish", Integer),
+ Column("chunk_unit", Text, nullable=False),
+ Column("chunk_size", Integer, nullable=False),
+ Column("completed", Integer, nullable=False),
+ Column("total", Integer, nullable=False),
+ Column("failed", Integer, nullable=False),
+ Column("queuing", Integer, nullable=False),
+ Column("running", Integer, nullable=False),
+ Column("submitted", Integer, nullable=False),
+ Column("suspended", Integer, nullable=False, default=0),
+ Column("metadata", Text),
+)
+
+JobDataTable = Table(
+ 'job_data',
+ metadata_obj,
+ Column('id', Integer, nullable=False, primary_key=True),
+ Column('counter', Integer, nullable=False),
+ Column('job_name', Text, nullable=False, index=True),
+ Column('created', Text, nullable=False),
+ Column('modified', Text, nullable=False),
+ Column('submit', Integer, nullable=False),
+ Column('start', Integer, nullable=False),
+ Column('finish', Integer, nullable=False),
+ Column('status', Text, nullable=False),
+ Column('rowtype', Integer, nullable=False),
+ Column('ncpus', Integer, nullable=False),
+ Column('wallclock', Text, nullable=False),
+ Column('qos', Text, nullable=False),
+ Column('energy', Integer, nullable=False),
+ Column('date', Text, nullable=False),
+ Column('section', Text, nullable=False),
+ Column('member', Text, nullable=False),
+ Column('chunk', Integer, nullable=False),
+ Column('last', Integer, nullable=False),
+ Column('platform', Text, nullable=False),
+ Column('job_id', Integer, nullable=False),
+ Column('extra_data', Text, nullable=False),
+ Column('nnodes', Integer, nullable=False, default=0),
+ Column('run_id', Integer),
+ Column('MaxRSS', Float, nullable=False, default=0.0),
+ Column('AveRSS', Float, nullable=False, default=0.0),
+ Column('out', Text, nullable=False),
+ Column('err', Text, nullable=False),
+ Column('rowstatus', Integer, nullable=False, default=0),
+ Column('children', Text, nullable=True),
+ Column('platform_output', Text, nullable=True),
+ UniqueConstraint('counter', 'job_name', name='unique_counter_and_job_name')
+)
+
+JobListTable = Table(
+ 'job_list',
+ metadata_obj,
+ Column('name', String, primary_key=True),
+ Column('id', Integer),
+ Column('status', Integer),
+ Column('priority', Integer),
+ Column('section', String),
+ Column('date', String),
+ Column('member', String),
+ Column('chunk', Integer),
+ Column('split', Integer),
+ Column('local_out', String),
+ Column('local_err', String),
+ Column('remote_out', String),
+ Column('remote_err', String)
+)
+
+TABLES = (ExperimentTable,
+ ExperimentStatusTable,
+ ExperimentStructureTable,
+ ExperimentRunTable,
+ DBVersionTable,
+ JobPackageTable,
+ JobDataTable,
+ JobListTable,
+ WrapperJobPackageTable)
+"""The tables available in the Autosubmit databases."""
+
+
+def get_table_with_schema(schema: str, table: Table) -> Table:
+ """Get the ``Table`` instance with the metadata modified.
+
+ The metadata will use the given container. This means you can
+ have table ``A`` with no schema, then call this function with
+ ``schema=a000``, and then a new table ``A`` with ``schema=a000``
+ will be returned.
+
+ :param schema: The target schema for the table metadata.
+ :param table: The SQLAlchemy Table.
+ :return: The same table, but with the given schema set as metadata.
+ """
+ if not isinstance(table, Table):
+ raise RuntimeError("Invalid source type on table schema change")
+
+ metadata = MetaData(schema=schema)
+ dest_table = Table(table.name, metadata)
+
+ for col in cast(List, table.columns):
+ dest_table.append_column(col.copy())
+
+ return dest_table
+
+
+def get_table_from_name(*, schema: str, table_name: str) -> Optional[Table]:
+ """Get the table from a given table name.
+
+ :param schema: The schema name.
+ :param table_name: The table name.
+ :return: The table if found, ``None`` otherwise.
+ :raises ValueError: If the table name is not provided.
+ """
+ if not table_name:
+ raise ValueError(f'Missing table name: {table_name}')
+
+ predicate = lambda table: table.name.lower() == table_name.lower()
+ table = next(filter(predicate, TABLES), None)
+ return get_table_with_schema(schema, table)
diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py
index 8df415c94682435e781588f939a850301bf784f3..4acafd4b06d8e41162afe4cedf31aec872284a94 100644
--- a/autosubmit/history/database_managers/experiment_history_db_manager.py
+++ b/autosubmit/history/database_managers/experiment_history_db_manager.py
@@ -23,6 +23,12 @@ from autosubmit.history.data_classes.job_data import JobData
from autosubmit.history.data_classes.experiment_run import ExperimentRun
from .database_manager import DatabaseManager, DEFAULT_JOBDATA_DIR
+from typing import Any, Optional, Protocol, cast
+from sqlalchemy import Engine, and_, inspect, desc, insert, select, update
+from sqlalchemy.schema import CreateTable, CreateSchema
+from autosubmit.database import session
+from autosubmit.database.tables import ExperimentRunTable, JobDataTable, get_table_with_schema
+
CURRENT_DB_VERSION = 18
DB_EXPERIMENT_HEADER_SCHEMA_CHANGES = 14
DB_VERSION_SCHEMA_CHANGES = 12
@@ -387,3 +393,367 @@ class ExperimentHistoryDbManager(DatabaseManager):
if len(pragma_result) <= 0:
raise Exception("Error while getting the pragma version. This might be a signal of a deeper problem. Review previous errors.")
return Models.PragmaVersion(*pragma_result[0]).version
+
+
+class ExperimentHistoryDatabaseManager(Protocol):
+ def initialize(self):...
+ def my_database_exists(self):...
+ def is_header_ready_db_version(self):...
+ def is_current_version(self):...
+ def create_historical_database(self):...
+ def update_historical_database(self):...
+ def get_experiment_run_dc_with_max_id(self):...
+ def register_experiment_run_dc(self, experiment_run_dc):...
+ def update_experiment_run_dc_by_id(self, experiment_run_dc):...
+ def is_there_a_last_experiment_run(self):...
+ def get_job_data_all(self):...
+ def register_submitted_job_data_dc(self, job_data_dc):...
+ def update_job_data_dc_by_id(self, job_data_dc):...
+ def update_list_job_data_dc_by_each_id(self, job_data_dcs):...
+ def get_job_data_dc_unique_latest_by_job_name(self, job_name):...
+ def get_job_data_dcs_last_by_run_id(self, run_id):...
+ def get_job_data_dcs_last_by_wrapper_code(self, wrapper_code):...
+ def get_all_last_job_data_dcs(self):...
+ def update_many_job_data_change_status(self, changes):...
+ def _update_job_data_by_id(self, job_data_dc):...
+ def get_job_data_by_name(self, job_name):...
+ def get_job_data_max_counter(self):...
+ def delete_job_data(self, id_):...
+ def delete_experiment_run(self, run_id):...
+
+
+class SqlAlchemyExperimentHistoryDbManager:
+ """A SQLAlchemy experiment history database manager.
+
+ Its interface was designed based on the SQLite database manager,
+ with the following differences:
+
+ - We do not have the DB migration system that they used, as that
+ used SQLite pragmas, which are not portable across DB engines
+ (i.e. no ``_set_schema_changes()`` nor ``_set_table_queries()``.
+ """
+
+ def __init__(self, schema: Optional[str]):
+ self.engine: Engine = session.create_engine()
+ self.schema = schema
+
+ def initialize(self):
+ if self.my_database_exists():
+ if not self.is_current_version():
+ self.update_historical_database()
+ else:
+ self.create_historical_database()
+
+ def my_database_exists(self):
+ """Return ``True`` if the schema exists in the database. ``False`` otherwise."""
+ engine = session.create_engine()
+ inspector = inspect(engine)
+ return self.schema in inspector.get_schema_names()
+
+ def is_header_ready_db_version(self):
+ if self.my_database_exists():
+ return self._get_pragma_version() >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES
+ return False
+
+ def is_current_version(self):
+ if self.my_database_exists():
+ return self._get_pragma_version() == CURRENT_DB_VERSION
+ return False
+
+ def create_historical_database(self):
+ with self.engine.connect() as conn:
+ conn.execute(CreateSchema(self.schema, if_not_exists=True))
+ conn.execute(CreateTable(get_table_with_schema(self.schema, ExperimentRunTable), if_not_exists=True))
+ conn.execute(CreateTable(get_table_with_schema(self.schema, JobDataTable), if_not_exists=True))
+ # TODO: implement db migrations?
+ # self._set_historical_pragma_version(CURRENT_DB_VERSION)
+
+ def update_historical_database(self):
+ raise NotImplementedError("This feature has not been implemented yet with SQLAlchemy / Alembic.")
+
+ def get_experiment_run_dc_with_max_id(self):
+ return ExperimentRun.from_model(self._get_experiment_run_with_max_id())
+
+ def register_experiment_run_dc(self, experiment_run_dc):
+ query = (
+ insert(get_table_with_schema(self.schema, ExperimentRunTable)).
+ values(
+ created=HUtils.get_current_datetime(),
+ modified=HUtils.get_current_datetime(),
+ start=experiment_run_dc.start,
+ finish=experiment_run_dc.finish,
+ chunk_unit=experiment_run_dc.chunk_unit,
+ chunk_size=experiment_run_dc.chunk_size,
+ completed=experiment_run_dc.completed,
+ total=experiment_run_dc.total,
+ failed=experiment_run_dc.failed,
+ queing=experiment_run_dc.queuing,
+ running=experiment_run_dc.running,
+ submitted=experiment_run_dc.submitted,
+ suspended=experiment_run_dc.suspended,
+ metadata=experiment_run_dc.metadata
+ )
+ )
+ with self.engine.connect() as conn:
+ conn.execute(query)
+ return ExperimentRun.from_model(self._get_experiment_run_with_max_id())
+
+ def update_experiment_run_dc_by_id(self, experiment_run_dc):
+ experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable)
+ query = (
+ update(experiment_run_table).
+ where(experiment_run_table.c.run_id == experiment_run_dc.run_id). # type: ignore
+ values(
+ finish=experiment_run_dc.finish,
+ chunk_unit=experiment_run_dc.chunk_unit,
+ chunk_size=experiment_run_dc.chunk_size,
+ completed=experiment_run_dc.completed,
+ total=experiment_run_dc.total,
+ failed=experiment_run_dc.failed,
+ queuing=experiment_run_dc.queuing,
+ running=experiment_run_dc.running,
+ submitted=experiment_run_dc.submitted,
+ suspended=experiment_run_dc.suspended,
+ modified=HUtils.get_current_datetime()
+ )
+ )
+ with self.engine.connect() as conn:
+ conn.execute(query)
+ return ExperimentRun.from_model(self._get_experiment_run_with_max_id())
+
+ def _get_experiment_run_with_max_id(self):
+ experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable)
+ query = (
+ select(experiment_run_table).
+ where(experiment_run_table.c.run_id > 0).
+ order_by(desc(experiment_run_table.c.run_id))
+ )
+ with self.engine.connect() as conn:
+ row = conn.execute(query).one_or_none()
+ if not row:
+ raise Exception("No Experiment Runs registered.")
+ return Models.ExperimentRunRow(row.__dict__)
+
+ def is_there_a_last_experiment_run(self):
+ experiment_run_table = get_table_with_schema(self.schema, ExperimentRunTable)
+ query = (
+ select(experiment_run_table).
+ where(experiment_run_table.c.run_id > 0).
+ order_by(desc(experiment_run_table.c.run_id))
+ )
+ with self.engine.connect() as conn:
+ result = conn.execute(query).one_or_none()
+ return result is not None
+
+ def get_job_data_all(self):
+ with self.engine.connect() as conn:
+ job_data_rows = conn.execute(select(get_table_with_schema(self.schema, JobDataTable))).all()
+ return [Models.JobDataRow(row.__dict__) for row in job_data_rows]
+
+ def register_submitted_job_data_dc(self, job_data_dc):
+ self._set_current_job_data_rows_last_to_zero_by_job_name(job_data_dc.job_name)
+ self._insert_job_data(job_data_dc)
+ return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name)
+
+ def _set_current_job_data_rows_last_to_zero_by_job_name(self, job_name):
+ """ Sets the column last = 0 for all job_rows by job_name and last = 1. """
+ job_data_row_last = self._get_job_data_last_by_name(job_name)
+ job_data_dc_list = [JobData.from_model(row) for row in job_data_row_last]
+ for job_data_dc in job_data_dc_list:
+ job_data_dc.last = 0
+ self._update_job_data_by_id(job_data_dc)
+
+ def update_job_data_dc_by_id(self, job_data_dc):
+ """ Update JobData data class. Returns latest last=1 row from job_data by job_name. """
+ self._update_job_data_by_id(job_data_dc)
+ return self.get_job_data_dc_unique_latest_by_job_name(job_data_dc.job_name)
+
+ def update_list_job_data_dc_by_each_id(self, job_data_dcs):
+ """ Return length of updated list. """
+ for job_data_dc in job_data_dcs:
+ self._update_job_data_by_id(job_data_dc)
+ return len(job_data_dcs)
+
+ def get_job_data_dc_unique_latest_by_job_name(self, job_name):
+ job_data_row_last = self._get_job_data_last_by_name(job_name)
+ if len(job_data_row_last) > 0:
+ return JobData.from_model(job_data_row_last[0])
+ return None
+
+ def _get_job_data_last_by_name(self, job_name):
+ job_data_table = get_table_with_schema(self.schema, JobDataTable)
+ query = (
+ select(job_data_table).
+ where(
+ and_(job_data_table.c.last == 1, job_data_table.c.job_name == job_name)
+ ).
+ order_by(desc(job_data_table.c.counter))
+ )
+ with self.engine.connect() as conn:
+ job_data_rows_last = conn.execute(query).all()
+ # if previous job didn't finished but a new create has been made
+ if not job_data_rows_last:
+ new_query = (
+ select(job_data_table).
+ where(
+ and_(job_data_table.c.last == 0, job_data_table.c.job_name == job_name)
+ ).
+ order_by(desc(job_data_table.c.counter))
+ )
+ with self.engine.connect() as conn:
+ job_data_rows_last = conn.execute(new_query).all()
+ return [Models.JobDataRow(row.__dict__) for row in job_data_rows_last]
+
+ def get_job_data_dcs_last_by_run_id(self, run_id):
+ job_data_rows = self._get_job_data_last_by_run_id(run_id)
+ return [JobData.from_model(row) for row in job_data_rows]
+
+ def _get_job_data_last_by_run_id(self, run_id):
+ """ Get List of Models.JobDataRow for last=1 and run_id """
+ statement = self.get_built_select_statement("job_data", "run_id=? and last=1 and rowtype >= 2 ORDER BY id")
+ arguments = (run_id,)
+ job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
+ return [Models.JobDataRow(*row) for row in job_data_rows]
+
+ def get_job_data_dcs_last_by_wrapper_code(self, wrapper_code):
+ if wrapper_code and wrapper_code > 2:
+ return [JobData.from_model(row) for row in self._get_job_data_last_by_wrapper_code(wrapper_code)]
+ else:
+ return []
+
+ def _get_job_data_last_by_wrapper_code(self, wrapper_code):
+ """ Get List of Models.JobDataRow for last=1 and rowtype=wrapper_code """
+ statement = self.get_built_select_statement("job_data", "rowtype = ? and last=1 ORDER BY id")
+ arguments = (wrapper_code,)
+ job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
+ return [Models.JobDataRow(*row) for row in job_data_rows]
+
+ def get_all_last_job_data_dcs(self):
+ """ Gets JobData data classes in job_data for last=1. """
+ job_data_rows = self._get_all_last_job_data_rows()
+ return [JobData.from_model(row) for row in job_data_rows]
+
+ def _get_all_last_job_data_rows(self):
+ """ Get List of Models.JobDataRow for last=1. """
+ statement = self.get_built_select_statement("job_data", "last=1")
+ job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement)
+ return [Models.JobDataRow(*row) for row in job_data_rows]
+
+ def _insert_job_data(self, job_data):
+ job_data_table = get_table_with_schema(self.schema, JobDataTable)
+ insert_query = (
+ insert(job_data_table).
+ values(
+ counter=job_data.counter,
+ job_name=job_data.job_name,
+ created=HUtils.get_current_datetime(),
+ modified=HUtils.get_current_datetime(),
+ submit=job_data.submit,
+ start=job_data.start,
+ finish=job_data.finish,
+ status=job_data.status,
+ rowtype=job_data.rowtype,
+ ncpus=job_data.ncpus,
+ wallclock=job_data.wallclock,
+ qos=job_data.qos,
+ energy=job_data.energy,
+ date=job_data.date,
+ section=job_data.section,
+ member=job_data.member,
+ chunk=job_data.chunk,
+ last=job_data.last,
+ platform=job_data.platform,
+ job_id=job_data.job_id,
+ extra_data=job_data.extra_data,
+ nnodes=job_data.nnodes,
+ run_id=job_data.run_id,
+ MaxRSS=job_data.MaxRSS,
+ AveRSS=job_data.AveRSS,
+ out=job_data.out,
+ err=job_data.err,
+ rowstatus=job_data.rowstatus,
+ children=job_data.children,
+ platform_output=job_data.platform_output
+ )
+ )
+ with self.engine.connect() as conn:
+ result = conn.execute(insert_query)
+ conn.commit()
+ return result.lastrowid
+
+ def update_many_job_data_change_status(self, changes):
+ # type : (List[Tuple]) -> None
+ """
+ Update many job_data rows in bulk. Requires a changes list of argument tuples.
+ Only updates finish, modified, status, and rowstatus by id.
+ """
+ statement = ''' UPDATE job_data SET modified=?, status=?, rowstatus=? WHERE id=? '''
+ self.execute_many_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, changes)
+
+ def _update_job_data_by_id(self, job_data_dc):
+ job_data_table = get_table_with_schema(self.schema, JobDataTable)
+ query = (
+ update(job_data_table).
+ where(job_data_table.c.id == job_data_dc._id). # type: ignore
+ values(
+ last=job_data_dc.last,
+ submit=job_data_dc.submit,
+ start=job_data_dc.start,
+ finish=job_data_dc.finish,
+ modified=HUtils.get_current_datetime(),
+ job_id=job_data_dc.job_id,
+ status=job_data_dc.status,
+ energy=job_data_dc.energy,
+ extra_data=job_data_dc.extra_data,
+ nnodes=job_data_dc.nnodes,
+ ncpus=job_data_dc.ncpus,
+ rowstatus=job_data_dc.rowstatus,
+ out=job_data_dc.out,
+ err=job_data_dc.err,
+ children=job_data_dc.children,
+ platform_output=job_data_dc.platform_output,
+ )
+ )
+ with self.engine.connect() as conn:
+ conn.execute(query)
+
+ def get_job_data_by_name(self, job_name):
+ """ Get List of Models.JobDataRow for job_name """
+ statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC")
+ arguments = (job_name,)
+ job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
+ return [Models.JobDataRow(*row) for row in job_data_rows]
+
+ def get_job_data_max_counter(self):
+ """ The max counter is the maximum count value for the count column in job_data. """
+ statement = "SELECT MAX(counter) as maxcounter FROM job_data"
+ counter_result = self.get_from_statement(self.historicaldb_file_path, statement)
+ if len(counter_result) <= 0:
+ return DEFAULT_MAX_COUNTER
+ else:
+ max_counter = Models.MaxCounterRow(*counter_result[0]).maxcounter
+ return max_counter if max_counter else DEFAULT_MAX_COUNTER
+
+ def delete_job_data(self, id_):
+ """ Deletes row from job_data by id. Useful for testing. """
+ statement = ''' DELETE FROM job_data WHERE id=? '''
+ arguments = (id_,)
+ self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments)
+
+ def delete_experiment_run(self, run_id):
+ """ Deletes row in experiment_run by run_id. Useful for testing. """
+ statement = ''' DELETE FROM experiment_run where run_id=? '''
+ arguments = (run_id,)
+ self.execute_statement_with_arguments_on_dbfile(self.historicaldb_file_path, statement, arguments)
+
+
+def create_experiment_history_db_manager(db_engine: str, **options: Any) -> ExperimentHistoryDatabaseManager:
+ if db_engine == 'postgres':
+ return cast(ExperimentHistoryDatabaseManager, SqlAlchemyExperimentHistoryDbManager(options['schema']))
+ elif db_engine == 'sqlite':
+ return cast(ExperimentHistoryDatabaseManager, ExperimentHistoryDbManager(
+ options['schema'],
+ options.get('jobdata_dir_path', DEFAULT_JOBDATA_DIR)
+ ))
+ else:
+ raise ValueError(f"Invalid database engine: {db_engine}")
diff --git a/autosubmit/history/database_managers/experiment_status_db_manager.py b/autosubmit/history/database_managers/experiment_status_db_manager.py
index ed2f9f7b8b1c7def2fac6be5aa0cb60d81752090..cda19e6b951c1d25af3290115f80b593996ad97f 100644
--- a/autosubmit/history/database_managers/experiment_status_db_manager.py
+++ b/autosubmit/history/database_managers/experiment_status_db_manager.py
@@ -25,6 +25,14 @@ from autosubmitconfigparser.config.basicconfig import BasicConfig
import autosubmit.history.utils as HUtils
from . import database_models as Models
+from typing import Optional, Protocol, cast
+from sqlalchemy import Engine, delete, insert, select, update
+from sqlalchemy.schema import CreateTable, CreateSchema
+from autosubmit.database import session
+from autosubmit.database.tables import ExperimentRunTable, ExperimentStatusTable, ExperimentTable, get_table_with_schema
+# FIXME: Why re-load the configuration globally here? Callers of this
+# module are probably responsible for doing that; i.e. not this
+# module's (or its classes') responsibility.
BasicConfig.read()
class ExperimentStatusDbManager(DatabaseManager):
@@ -143,4 +151,147 @@ class ExperimentStatusDbManager(DatabaseManager):
""" Deletes experiment_status row by expid. Useful for testing purposes. """
statement = ''' DELETE FROM experiment_status where name = ? '''
arguments = (expid,)
- self.execute_statement_with_arguments_on_dbfile(self._as_times_file_path, statement, arguments)
\ No newline at end of file
+ self.execute_statement_with_arguments_on_dbfile(self._as_times_file_path, statement, arguments)
+
+
+class ExperimentStatusDatabaseManager(Protocol):
+ def print_current_table(self): ...
+ def is_running(self, time_condition=600): ...
+ def set_existing_experiment_status_as_running(self, expid): ...
+ def create_experiment_status_as_running(self, experiment): ...
+ def get_experiment_status_row_by_expid(self, expid): ...
+ def get_experiment_row_by_expid(self, expid): ...
+ def get_experiment_status_row_by_exp_id(self, exp_id): ...
+ def create_exp_status(self, exp_id, expid, status): ...
+ def update_exp_status(self, expid, status="RUNNING"): ...
+ def delete_exp_status(self, expid): ...
+
+
+class SqlAlchemyExperimentStatusDbManager:
+ """An experiment status database manager using SQLAlchemy.
+
+ It contains the same public functions as ``ExperimentStatusDbManager``
+ (SQLite only), but uses SQLAlchmey instead of calling database
+ driver functions directly.
+
+ It can be used with any engine supported by SQLAlchemy, such
+ as Postgres, Mongo, MySQL, etc.
+
+ Some operations here may raise ``NotImplemented``, as they existed in
+ the ``ExperimentStatusDbManager`` but were never used in Autosubmit (i.e. we can
+ delete that code -- for later).
+ """
+
+ def __init__(self, schema: Optional[str] = None) -> None:
+ self.engine: Engine = session.create_engine()
+ self.schema = schema
+ with self.engine.connect() as conn:
+ conn.execute(CreateSchema(self.schema, if_not_exists=True))
+ conn.execute(CreateTable(get_table_with_schema(self.schema, ExperimentRunTable), if_not_exists=True))
+
+ def print_current_table(self):
+ """Not used!"""
+ raise NotImplementedError()
+
+ def is_running(self, time_condition=600):
+ """Not used!"""
+ raise NotImplementedError()
+
+ def set_existing_experiment_status_as_running(self, expid):
+ self.update_exp_status(expid, Models.RunningStatus.RUNNING)
+
+ def create_experiment_status_as_running(self, experiment):
+ self.create_exp_status(experiment.id, experiment.name, Models.RunningStatus.RUNNING)
+
+ def get_experiment_status_row_by_expid(self, expid: str) -> Optional[Models.ExperimentRow]:
+ experiment_row = self.get_experiment_row_by_expid(expid)
+ return self.get_experiment_status_row_by_exp_id(experiment_row.id)
+
+ def get_experiment_row_by_expid(self, expid: str) -> Models.ExperimentRow:
+ query = (
+ select(ExperimentTable).
+ where(ExperimentTable.c.name == expid)
+ )
+ with session.create_engine() as conn:
+ current_rows = conn.execute(query).all()
+ if len(current_rows) <= 0:
+ raise ValueError("Experiment {0} not found in Postgres {1}".format(expid, expid))
+ return Models.ExperimentRow(*current_rows[0])
+
+ def get_experiment_status_row_by_exp_id(self, exp_id: int) -> Optional[Models.ExperimentStatusRow]:
+ query = (
+ select(ExperimentStatusTable).
+ where(ExperimentStatusTable.c.exp_id == exp_id)
+ )
+ with session.create_engine() as conn:
+ current_rows = conn.execute(query).all()
+ if len(current_rows) <= 0:
+ return None
+ return Models.ExperimentStatusRow(*current_rows[0])
+
+ def create_exp_status(self, exp_id: int, expid: str, status: str) -> int:
+ query = (
+ insert(ExperimentStatusTable).
+ values(
+ exp_id=exp_id,
+ name=expid,
+ status=status,
+ seconds_diff=0,
+ modified=HUtils.get_current_datetime()
+ )
+ )
+ with session.create_engine() as conn:
+ result = conn.execute(query)
+ lastrow_id = result.lastrowid
+ conn.commit()
+ return lastrow_id
+
+ def update_exp_status(self, expid: str, status="RUNNING") -> None:
+ query = (
+ update(ExperimentStatusTable).
+ where(ExperimentStatusTable.c.name == expid).
+ values(
+ status=status,
+ seconds_diff=0,
+ modified=HUtils.get_current_datetime()
+ )
+ )
+ with session.create_engine() as conn:
+ conn.execute(query)
+ conn.commit()
+
+ def delete_exp_status(self, expid: str):
+ query = (
+ delete(ExperimentStatusTable).
+ where(ExperimentStatusTable.c.name == expid)
+ )
+ with session.create_engine() as conn:
+ conn.execute(query)
+ conn.commit()
+
+
+def create_experiment_status_db_manager(db_engine: str, **options) -> ExperimentStatusDatabaseManager:
+ """
+ Creates a Postgres or SQLite database manager based on the Autosubmit configuration.
+
+ Note that you must provide the options even if they are optional, in which case
+ you must provide ``options=None``, or you will get a ``KeyError``.
+
+ TODO: better example and/or link to DbManager.
+
+ :param db_engine: The database engine type.
+ :return: An ``ExperimentStatusDatabaseManager``.
+ :raises ValueError: If the database engine type is not valid.
+ :raises KeyError: If the ``options`` dictionary is missing a required parameter for an engine.
+ """
+ if db_engine == "postgres":
+ return cast(ExperimentStatusDatabaseManager, SqlAlchemyExperimentStatusDbManager(options['schema']))
+ elif db_engine == "sqlite":
+ return cast(ExperimentStatusDatabaseManager,
+ ExperimentStatusDbManager(
+ expid=options['schema'],
+ db_dir_path=options['root_path'],
+ main_db_name=options['db_name'],
+ local_root_dir_path=options['local_root_dir_path']))
+ else:
+ raise ValueError(f"Invalid database engine: {db_engine}")
diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py
index 5fd081600ba82f151b9c7e5d42a0b711ce26cf82..96ce1718b9332c48d06b8a9cc269ce3cda8c8095 100644
--- a/autosubmit/history/experiment_history.py
+++ b/autosubmit/history/experiment_history.py
@@ -25,7 +25,7 @@ from log.log import Log
from .data_classes.experiment_run import ExperimentRun
from .data_classes.job_data import JobData
from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR
-from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager
+from .database_managers.experiment_history_db_manager import create_experiment_history_db_manager
from .internal_logging import Logging
from .platform_monitor.slurm_monitor import SlurmMonitor
from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \
@@ -42,7 +42,11 @@ class ExperimentHistory:
self._job_data_dir_path = BasicConfig.JOBDATA_DIR
self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR
try:
- self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR)
+ options = {
+ 'jobdata_dir_path': BasicConfig.JOBDATA_DIR,
+ 'schema': self.expid
+ }
+ self.manager = create_experiment_history_db_manager(BasicConfig.DATABASE_BACKEND, options)
except Exception as exp:
self._log.log(str(exp), traceback.format_exc())
Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
diff --git a/autosubmit/history/experiment_status.py b/autosubmit/history/experiment_status.py
index 9d4c7deb056c88931e32fc6da469c0cf20369e96..47c709b7f44530de1d4a4c32ada93757279c77ae 100644
--- a/autosubmit/history/experiment_status.py
+++ b/autosubmit/history/experiment_status.py
@@ -17,7 +17,7 @@
# along with Autosubmit. If not, see .
import traceback
-from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager
+from .database_managers.experiment_status_db_manager import create_experiment_status_db_manager
from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR
from .internal_logging import Logging
from autosubmitconfigparser.config.basicconfig import BasicConfig
@@ -29,7 +29,13 @@ class ExperimentStatus:
self.expid = expid # type : str
BasicConfig.read()
try:
- self.manager = ExperimentStatusDbManager(self.expid, BasicConfig.DB_DIR, BasicConfig.DB_FILE, local_root_dir_path=BasicConfig.LOCAL_ROOT_DIR)
+ options = {
+ 'root_path': BasicConfig.DB_DIR,
+ 'db_name': BasicConfig.DB_FILE,
+ 'local_root_dir_path': BasicConfig.LOCAL_ROOT_DIR,
+ 'schema': self.expid
+ }
+ self.manager = create_experiment_status_db_manager(BasicConfig.DATABASE_BACKEND, **options)
except Exception as exp:
message = "Error while trying to update {0} in experiment_status.".format(str(self.expid))
Logging(self.expid, BasicConfig.HISTORICAL_LOG_DIR).log(message, traceback.format_exc())
diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py
index 163689316fff64d4d48389874a50df9be66400b5..79a658dc6d822eaf57eae95963dbf9e875a6eb81 100644
--- a/autosubmit/job/job_list.py
+++ b/autosubmit/job/job_list.py
@@ -3194,14 +3194,12 @@ class JobList(object):
# monitor = Monitor()
packages = None
try:
- packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load(wrapper=False)
+ packages = JobPackagePersistence(expid).load(wrapper=False)
except Exception as ex:
print("Wrapper table not found, trying packages.")
packages = None
try:
- packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
- "job_packages_" + expid).load(wrapper=True)
+ packages = JobPackagePersistence(expid).load(wrapper=True)
except Exception as exp2:
packages = None
pass
diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py
index 48c638db22d4e2d33fdf875f89c966af318bb466..9403b360d7b80ec508e6d159c219c882d63e9b8d 100644
--- a/autosubmit/job/job_list_persistence.py
+++ b/autosubmit/job/job_list_persistence.py
@@ -20,9 +20,11 @@ import os
import pickle
from sys import setrecursionlimit
import shutil
-from autosubmit.database.db_manager import DbManager
+from autosubmit.database.db_manager import create_db_manager
from log.log import AutosubmitCritical, Log
from contextlib import suppress
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+from pathlib import Path
class JobListPersistence(object):
@@ -141,8 +143,14 @@ class JobListPersistenceDb(JobListPersistence):
"wrapper_type",
]
- def __init__(self, persistence_path, persistence_file):
- self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION)
+ def __init__(self, expid):
+ options = {
+ 'root_path': str(Path(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl')),
+ 'db_name': f'job_list_{expid}',
+ 'db_version': self.VERSION,
+ 'schema': expid
+ }
+ self.db_manager = create_db_manager(BasicConfig.DATABASE_BACKEND, **options)
def load(self, persistence_path, persistence_file):
"""
@@ -175,4 +183,4 @@ class JobListPersistenceDb(JobListPersistence):
"""
self.db_manager.drop_table(self.JOB_LIST_TABLE)
- self.db_manager.create_table(self.JOB_LIST_TABLE, self.TABLE_FIELDS)
+ self.db_manager.create_table(self.JOB_LIST_TABLE, self.TABLE_FIELDS)
\ No newline at end of file
diff --git a/autosubmit/job/job_package_persistence.py b/autosubmit/job/job_package_persistence.py
index 91411274e56870175ae5657b308c15e40c9caee1..01a1ebd26f825056882fbea5bcecc2f42214b225 100644
--- a/autosubmit/job/job_package_persistence.py
+++ b/autosubmit/job/job_package_persistence.py
@@ -17,7 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see .
-from autosubmit.database.db_manager import DbManager
+from autosubmit.database.db_manager import create_db_manager
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+from pathlib import Path
class JobPackagePersistence(object):
@@ -37,8 +39,14 @@ class JobPackagePersistence(object):
WRAPPER_JOB_PACKAGES_TABLE = 'wrapper_job_package'
TABLE_FIELDS = ['exp_id', 'package_name', 'job_name']
- def __init__(self, persistence_path, persistence_file):
- self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION)
+ def __init__(self, expid):
+ options = {
+ 'root_path': Path(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),
+ 'db_name': f"job_packages_{expid}",
+ 'db_version': self.VERSION,
+ 'schema': expid
+ }
+ self.db_manager = create_db_manager(BasicConfig.DATABASE_BACKEND, **options)
self.db_manager.create_table(self.JOB_PACKAGES_TABLE, self.TABLE_FIELDS)
self.db_manager.create_table(self.WRAPPER_JOB_PACKAGES_TABLE, self.TABLE_FIELDS)
def load(self,wrapper=False):
diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py
index dae0ccc5e2c2d5f8e4e3ce29b57b6a774f2464b6..93182676c573460002f642be6fd2d29481b52894 100644
--- a/autosubmit/job/job_utils.py
+++ b/autosubmit/job/job_utils.py
@@ -249,8 +249,8 @@ def get_job_package_code(expid, job_name):
try:
basic_conf = BasicConfig()
basic_conf.read()
- packages_wrapper = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=True)
- packages_wrapper_plus = JobPackagePersistence(os.path.join(basic_conf.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid).load(wrapper=False)
+ packages_wrapper = JobPackagePersistence(expid).load(wrapper=True)
+ packages_wrapper_plus = JobPackagePersistence(expid).load(wrapper=False)
if packages_wrapper or packages_wrapper_plus:
packages = packages_wrapper if len(packages_wrapper) > len(packages_wrapper_plus) else packages_wrapper_plus
for exp, package_name, _job_name in packages:
@@ -451,4 +451,4 @@ class SubJobManager(object):
# type: () -> Dict[str, int]
"""
"""
- return self.subjobfixes
\ No newline at end of file
+ return self.subjobfixes
diff --git a/environment.yml b/environment.yml
index 56b5d33ea8262dca0dc94b22c7a42a196e77e435..2f6d88c1b98fba4431bf187bf6a70943a0142030 100644
--- a/environment.yml
+++ b/environment.yml
@@ -44,4 +44,6 @@ dependencies:
- configobj
- psutil
- setproctitle
+ - sqlalchemy[mypy]
+ - testcontainers
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000000000000000000000000000000000000..0c4433beb2f50b5411e500191f7b419bee6f00cc
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,14 @@
+[pytest]
+addopts =
+ --verbose --ignore=regression
+ --cov=autosubmit --cov-config=.coveragerc
+ --cov-report=html:test/htmlcov --cov-report=xml:test/coverage.xml
+ --strict-markers
+testpaths =
+ test/unit
+doctest_optionflags =
+ NORMALIZE_WHITESPACE
+ IGNORE_EXCEPTION_DETAIL
+ ELLIPSIS
+markers =
+ postgres
diff --git a/setup.py b/setup.py
index d4a763a162dc03f748edc3f1e18b57fb18efe243..422edfa36e0ce8ad72082c953b4fd52ee80edc61 100644
--- a/setup.py
+++ b/setup.py
@@ -27,6 +27,101 @@ here = path.abspath(path.dirname(__file__))
with open(path.join(here, 'VERSION')) as f:
version = f.read().strip()
+install_requires = [
+ 'xlib==0.21',
+ 'setuptools<=68.2.2',
+ 'bscearth.utils<=0.5.2',
+ 'requests<=2.31.0',
+ 'networkx<=2.6.3',
+ 'portalocker<=2.7.0',
+ 'mock<=5.1.0',
+ 'paramiko<=3.4',
+ 'pyparsing==3.1.1',
+ 'matplotlib<=3.8.3',
+ 'argparse<=1.4.0',
+ 'packaging<=23.2',
+ 'ruamel.yaml.clib<=0.2.8',
+ 'typing_extensions<=4.9.0',
+ 'typing<=3.7.4.3',
+ 'psutil<=5.6.1',
+ 'networkx<=2.6.3',
+ 'py3dotplus==1.1.0',
+ 'matplotlib<=3.8.3',
+ 'numpy<2',
+ 'ruamel.yaml==0.17.21',
+ 'rocrate==0.*',
+ 'autosubmitconfigparser==1.0.62',
+ 'configparser',
+ 'pathlib',
+ 'setproctitle'
+]
+
+python_version_less_37_require = [
+ 'PyNaCl==1.5.0',
+ 'pythondialog==3.5.3',
+ 'xlib==0.21',
+ 'setuptools==68.2.2',
+ 'cryptography==41.0.5',
+ 'bscearth.utils==0.5.2',
+ 'requests==2.31.0',
+ 'networkx==2.6.3',
+ 'portalocker==2.7.0',
+ 'mock==5.1.0',
+ 'paramiko==3.3.1',
+ 'matplotlib==3.5.3',
+ 'python_dateutil==2.8.2',
+ 'argparse==1.4.0',
+ 'configobj==5.0.8',
+ 'packaging==23.2',
+ 'bcrypt==4.0.1',
+ 'charset_normalizer==3.3.1',
+ 'kiwisolver==1.4.5',
+ 'fonttools==4.43.1',
+ 'cycler==0.12.1',
+ 'typing_extensions==4.8.0',
+ 'psutil==5.6.1',
+ 'Pygments==2.3.1',
+ 'coverage==5.0',
+ 'nose==1.3.7',
+ 'six==1.12.0',
+ 'Cython==0.29.6',
+ 'cffi==1.12.2',
+ 'py==1.8.0',
+ 'atomicwrites==1.3.0',
+ 'attrs==19.1.0',
+ 'more_itertools==6.0.0',
+ 'urllib3==1.24.1',
+ 'idna==2.8',
+ 'Pillow==6.2.1',
+ 'numpy==1.17.4',
+ 'typing_extensions<=4.9.0',
+ 'typing<=3.7.4.3',
+ 'sqlalchemy[mypy]',
+]
+
+pg_require = [
+ 'psycopg2'
+]
+
+tests_require = [
+ 'mock==5.1.0',
+ 'nose==1.3.7',
+ 'pytest==8.2.*',
+ 'pytest-cov',
+ 'pytest-mock',
+ 'testcontainers'
+]
+
+# You can add more groups, e.g. all_require = tests_require + graph_require, etc...
+all_require = tests_require + pg_require
+
+extras_require = {
+ 'postgres': pg_require,
+ 'tests': tests_require,
+ ':python_version <= "3.7"': python_version_less_37_require,
+ 'all': all_require
+}
+
setup(
name='autosubmit',
license='GNU GPL v3',
@@ -39,79 +134,11 @@ setup(
url='http://www.bsc.es/projects/earthscience/autosubmit/',
download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit',
keywords=['climate', 'weather', 'workflow', 'HPC'],
- install_requires=[
- 'xlib==0.21',
- 'setuptools<=68.2.2',
- 'bscearth.utils<=0.5.2',
- 'requests<=2.31.0',
- 'networkx<=2.6.3',
- 'portalocker<=2.7.0',
- 'mock<=5.1.0',
- 'paramiko<=3.4',
- 'pyparsing==3.1.1',
- 'matplotlib<=3.8.3',
- 'argparse<=1.4.0',
- 'packaging<=23.2',
- 'ruamel.yaml.clib<=0.2.8',
- 'typing_extensions<=4.9.0',
- 'typing<=3.7.4.3',
- 'psutil<=5.6.1',
- 'networkx<=2.6.3',
- 'py3dotplus==1.1.0',
- 'matplotlib<=3.8.3',
- 'numpy<2',
- 'ruamel.yaml==0.17.21',
- 'rocrate==0.*',
- 'autosubmitconfigparser==1.0.62',
- 'configparser',
- 'pathlib',
- 'setproctitle'
-
- ],
- extras_require={
- ':python_version <= "3.7"':
- [
- 'PyNaCl==1.5.0',
- 'pythondialog==3.5.3',
- 'xlib==0.21',
- 'setuptools==68.2.2',
- 'cryptography==41.0.5',
- 'bscearth.utils==0.5.2',
- 'requests==2.31.0',
- 'networkx==2.6.3',
- 'portalocker==2.7.0',
- 'mock==5.1.0',
- 'paramiko==3.3.1',
- 'matplotlib==3.5.3',
- 'python_dateutil==2.8.2',
- 'argparse==1.4.0',
- 'configobj==5.0.8',
- 'packaging==23.2',
- 'bcrypt==4.0.1',
- 'charset_normalizer==3.3.1',
- 'kiwisolver==1.4.5',
- 'fonttools==4.43.1',
- 'cycler==0.12.1',
- 'typing_extensions==4.8.0',
- 'psutil==5.6.1',
- 'Pygments==2.3.1',
- 'coverage==5.0',
- 'nose==1.3.7',
- 'six==1.12.0',
- 'Cython==0.29.6',
- 'cffi==1.12.2',
- 'py==1.8.0',
- 'atomicwrites==1.3.0',
- 'attrs==19.1.0',
- 'more_itertools==6.0.0',
- 'urllib3==1.24.1',
- 'idna==2.8',
- 'Pillow==6.2.1',
- 'numpy==1.17.4',
- ],
- },
+ install_requires=install_requires,
+ extras_require=extras_require,
classifiers=[
"Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: GNU General Public License (GPL)",
"Operating System :: POSIX :: Linux",
@@ -131,4 +158,3 @@ setup(
},
scripts=['bin/autosubmit']
)
-
diff --git a/test/unit/conftest.py b/test/unit/conftest.py
new file mode 100644
index 0000000000000000000000000000000000000000..803781060a0956a56bf2d5a559c3cc63a0e7836c
--- /dev/null
+++ b/test/unit/conftest.py
@@ -0,0 +1,126 @@
+import os
+import pytest
+from pathlib import Path
+from pytest import MonkeyPatch
+from sqlalchemy import Connection, text
+from testcontainers.postgres import PostgresContainer
+from typing import Type
+
+from autosubmit.database.session import create_engine
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+
+PG_USER = "postgres"
+PG_PASS = "mysecretpassword"
+PG_HOST = "localhost"
+PG_PORT = 5432
+PG_DB = "autosubmit_test"
+
+DEFAULT_DATABASE_CONN_URL = f"postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
+"""Default Postgres connection URL."""
+
+_identity_value = lambda value=None: lambda *ignore_args, **ignore_kwargs: value
+"""A type of identity function; returns a function that returns ``value``."""
+
+
+@pytest.fixture
+def as_db_sqlite(monkeypatch: MonkeyPatch, tmp_path: Path) -> Type[BasicConfig]:
+ """Overwrites the BasicConfig to use SQLite database for testing.
+
+ Args:
+ monkeypatch: Monkey Patcher.
+ Returns:
+ BasicConfig class.
+ """
+ monkeypatch.setattr(BasicConfig, "read", _identity_value())
+ monkeypatch.setattr(BasicConfig, "DATABASE_BACKEND", "sqlite")
+ monkeypatch.setattr(BasicConfig, 'DB_PATH', str(tmp_path / 'autosubmit.db'))
+
+ return BasicConfig
+
+
+@pytest.fixture(scope="session")
+def run_test_pg_db() -> PostgresContainer:
+ """Run a TestContainer for PostgreSQL.
+
+ It is started for the test session, and stopped at the end of such.
+
+ Returns:
+ Postgres test container instance.
+ """
+ with PostgresContainer(
+ image="postgres:16-bookworm",
+ port=PG_PORT,
+ username=PG_USER,
+ password=PG_PASS,
+ dbname=PG_DB,
+ driver=None
+ ).with_env(
+ "POSTGRES_HOST_AUTH_METHOD", "trust"
+ ).with_bind_ports(
+ 5432, 5432
+ ) as postgres:
+ yield postgres
+
+
+def _setup_pg_db(conn: Connection) -> None:
+ """Reset the database.
+
+ Drops all schemas except the system ones and restoring the public schema.
+
+ Args:
+ conn: Database connection.
+ """
+ # Get all schema names that are not from the system
+ results = conn.execute(
+ text("""SELECT schema_name FROM information_schema.schemata
+ WHERE schema_name NOT LIKE 'pg_%'
+ AND schema_name != 'information_schema'""")
+ ).all()
+ schema_names = [res[0] for res in results]
+
+ # Drop all schemas
+ for schema_name in schema_names:
+ conn.execute(text(f"""DROP SCHEMA IF EXISTS "{schema_name}" CASCADE"""))
+
+ # Restore default public schema
+ conn.execute(text("CREATE SCHEMA public"))
+ conn.execute(text("GRANT ALL ON SCHEMA public TO public"))
+ conn.execute(text("GRANT ALL ON SCHEMA public TO postgres"))
+
+
+@pytest.fixture
+def as_db_postgres(monkeypatch: MonkeyPatch, run_test_pg_db) -> BasicConfig:
+ """Fixture to set up and tear down a Postgres database for testing.
+
+ It will overwrite the ``BasicConfig`` to use Postgres.
+
+ It uses the environment variable ``PYTEST_DATABASE_CONN_URL`` to connect to the database.
+ If the variable is not set, it uses the default connection URL.
+
+ Args:
+ monkeypatch: Monkey Patcher.
+ run_test_pg_db: Fixture that starts the Postgres container.
+ Returns:
+ Autosubmit configuration for Postgres.
+ """
+
+ # Apply patch BasicConfig
+ monkeypatch.setattr(BasicConfig, "read", _identity_value())
+ monkeypatch.setattr(BasicConfig, "DATABASE_BACKEND", "postgres")
+ monkeypatch.setattr(
+ BasicConfig,
+ "DATABASE_CONN_URL",
+ os.environ.get("PYTEST_DATABASE_CONN_URL", DEFAULT_DATABASE_CONN_URL),
+ )
+
+ # Setup database
+ with create_engine().connect() as conn:
+ _setup_pg_db(conn)
+ conn.commit()
+
+ yield BasicConfig
+
+ # Teardown database
+ with create_engine().connect() as conn:
+ _setup_pg_db(conn)
+ conn.commit()
diff --git a/test/unit/test_checkpoints.py b/test/unit/test_checkpoints.py
index 35dca3350841c6a7a0d38ea7c72811893cbd569c..bfd4e2b3102beb1cad921e651d5098b9a61a36c6 100644
--- a/test/unit/test_checkpoints.py
+++ b/test/unit/test_checkpoints.py
@@ -3,8 +3,9 @@ from unittest import TestCase
import inspect
import shutil
import tempfile
-from mock import Mock, MagicMock
+from mock import Mock, MagicMock, patch
from random import randrange
+from pathlib import Path
from autosubmit.job.job import Job
from autosubmit.job.job_common import Status
@@ -12,18 +13,48 @@ from autosubmit.job.job_list import JobList
from autosubmit.job.job_list_persistence import JobListPersistenceDb
from autosubmitconfigparser.config.yamlparser import YAMLParserFactory
+class FakeBasicConfig:
+ def props(self):
+ pr = {}
+ for name in dir(self):
+ value = getattr(self, name)
+ if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
+ pr[name] = value
+ return pr
+ DB_DIR = '/dummy/db/dir'
+ DB_FILE = '/dummy/db/file'
+ DB_PATH = '/dummy/db/path'
+ LOCAL_ROOT_DIR = '/dummy/local/root/dir'
+ LOCAL_TMP_DIR = '/dummy/local/temp/dir'
+ LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
+ DEFAULT_PLATFORMS_CONF = ''
+ DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
+
class TestJobList(TestCase):
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self.as_conf = Mock()
self.as_conf.experiment_data = dict()
self.as_conf.experiment_data["JOBS"] = dict()
self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"]
self.as_conf.experiment_data["PLATFORMS"] = dict()
- self.temp_directory = tempfile.mkdtemp()
- self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf)
+ self.temp_directory = str(tempfile.mkdtemp())
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
+ self.job_list = JobList(self.experiment_id, patched_basic_config, YAMLParserFactory(),
+ JobListPersistenceDb(self.experiment_id), self.as_conf)
dummy_serial_platform = MagicMock()
dummy_serial_platform.name = 'serial'
dummy_platform = MagicMock()
@@ -149,22 +180,3 @@ class TestJobList(TestCase):
job = Job(job_name, job_id, status, 0)
job.type = randrange(0, 2)
return job
-
-class FakeBasicConfig:
- def __init__(self):
- pass
- def props(self):
- pr = {}
- for name in dir(self):
- value = getattr(self, name)
- if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
- pr[name] = value
- return pr
- DB_DIR = '/dummy/db/dir'
- DB_FILE = '/dummy/db/file'
- DB_PATH = '/dummy/db/path'
- LOCAL_ROOT_DIR = '/dummy/local/root/dir'
- LOCAL_TMP_DIR = '/dummy/local/temp/dir'
- LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
- DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''
diff --git a/test/unit/test_database_managers.py b/test/unit/test_database_managers.py
index 9999fe9488dfafa49e5ee2bcbf666150eda403af..c0bc9d3c0f1a77d3ba18b77f321f7dadc7075d70 100644
--- a/test/unit/test_database_managers.py
+++ b/test/unit/test_database_managers.py
@@ -17,6 +17,7 @@
# along with Autosubmit. If not, see .
import unittest
+import pytest
import time
import random
import os
@@ -35,6 +36,7 @@ BasicConfig.read()
JOBDATA_DIR = BasicConfig.JOBDATA_DIR
LOCAL_ROOT_DIR = BasicConfig.LOCAL_ROOT_DIR
+@pytest.mark.skip()
@unittest.skip('TODO: looks like another test that used actual experiments data')
class TestExperimentStatusDatabaseManager(unittest.TestCase):
""" Covers Experiment Status Database Manager """
diff --git a/test/unit/test_db_common.py b/test/unit/test_db_common.py
new file mode 100644
index 0000000000000000000000000000000000000000..27b90dff2e7dca197d60a343ace4922be13fdfb2
--- /dev/null
+++ b/test/unit/test_db_common.py
@@ -0,0 +1,120 @@
+import locale
+import pytest
+from pathlib import Path
+from pkg_resources import resource_string
+
+from autosubmit.database import db_common
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+from log.log import AutosubmitCritical
+
+
+@pytest.mark.parametrize(
+ 'db_engine',
+ [
+ # postgres
+ pytest.param('postgres', marks=[pytest.mark.postgres]),
+ # sqlite
+ 'sqlite'
+ ]
+)
+def test_db_common(
+ tmp_path: Path,
+ db_engine: str,
+ request
+):
+ """Regression tests for ``db_common.py``.
+
+ Tests for regression issues in ``db_common.py`` functions, and
+ for compatibility issues with the new functions for SQLAlchemy.
+
+ The parameters allow the test to be run with different engine+options.
+ You can also mark certain tests belonging to a group (e.g. postgres)
+ so that they are skipped/executed selectively in CICD environments.
+ """
+
+ # Dynamically load the fixture for that DB,
+ # ref: https://stackoverflow.com/a/64348247.
+ request.getfixturevalue(f'as_db_{db_engine}')
+
+ create_db_query = ''
+
+ # The only differences in this test for SQLite and SQLAlchemy are
+ # i) the SQL query used to create a DB, ii) we check that the sqlite
+ # database file was created and iii) we load a different fixture for
+ # sqlite and sqlalchemy (to mock ``BasicConfig`` and run a container
+ # for sqlalchemy).
+ is_sqlite = db_engine == 'sqlite'
+ if is_sqlite:
+ # Code copied from ``autosubmit.py``.
+ create_db_query = resource_string('autosubmit.database', 'data/autosubmit.sql').decode(locale.getlocale()[1])
+
+ assert db_common.create_db(create_db_query)
+
+ if is_sqlite:
+ assert Path(BasicConfig.DB_PATH).exists()
+
+ # Test last name used
+ assert "empty" == db_common.last_name_used()
+ assert "empty" == db_common.last_name_used(test=True)
+ assert "empty" == db_common.last_name_used(operational=True)
+
+ new_exp = {
+ "name": "a700",
+ "description": "Description",
+ "autosubmit_version": "4.0.0",
+ }
+
+ # Experiment doesn't exist yet
+ with pytest.raises(Exception):
+ db_common.check_experiment_exists(new_exp["name"])
+
+ # Test save
+ db_common.save_experiment(
+ new_exp["name"], new_exp["description"], new_exp["autosubmit_version"]
+ )
+ assert db_common.check_experiment_exists(
+ new_exp["name"], error_on_inexistence=False
+ )
+ assert db_common.last_name_used() == new_exp["name"]
+
+ # Get version
+ assert (
+ db_common.get_autosubmit_version(new_exp["name"])
+ == new_exp["autosubmit_version"]
+ )
+ new_version = "v4.1.0"
+ db_common.update_experiment_descrip_version(
+ new_exp["name"], version=new_version
+ )
+ assert db_common.get_autosubmit_version(new_exp["name"]) == new_version
+
+ # Update description
+ assert (
+ db_common.get_experiment_descrip(new_exp["name"])[0][0]
+ == new_exp["description"]
+ )
+ new_desc = "New Description"
+ db_common.update_experiment_descrip_version(
+ new_exp["name"], description=new_desc
+ )
+ assert db_common.get_experiment_descrip(new_exp["name"])[0][0] == new_desc
+
+ # Update back both: description and version
+ db_common.update_experiment_descrip_version(
+ new_exp["name"],
+ description=new_exp["description"],
+ version=new_exp["autosubmit_version"],
+ )
+ assert (
+ db_common.get_experiment_descrip(new_exp["name"])[0][0]
+ == new_exp["description"]
+ and db_common.get_autosubmit_version(new_exp["name"])
+ == new_exp["autosubmit_version"]
+ )
+
+ # Delete experiment
+ assert db_common.delete_experiment(new_exp["name"])
+ with pytest.raises(AutosubmitCritical):
+ assert db_common.get_autosubmit_version(new_exp["name"]) == new_exp[
+ "autosubmit_version"
+ ]
diff --git a/test/unit/test_db_manager.py b/test/unit/test_db_manager.py
index a46133c9f76b78f6184dd3c899d341d1b04bc8a7..a80dca205fb336f19c0ef272491cec2025f96be0 100644
--- a/test/unit/test_db_manager.py
+++ b/test/unit/test_db_manager.py
@@ -4,7 +4,16 @@ import os
import sys
from mock import MagicMock
from mock import patch
-from autosubmit.database.db_manager import DbManager
+from autosubmit.database.db_manager import DbManager, DatabaseManager, SqlAlchemyDbManager, create_db_manager
+
+import pytest
+import random
+from pathlib import Path
+from sqlalchemy.exc import SQLAlchemyError
+from sqlite3 import OperationalError
+from typing import Optional, Type, Union, cast
+
+from autosubmit.database.tables import DBVersionTable
class TestDbManager(TestCase):
@@ -60,3 +69,111 @@ class TestDbManager(TestCase):
DbManager('dummy-path', 'dummy-name', 999)
connection_mock.cursor.assert_not_called()
sys.modules['sqlite3'].connect = original_connect
+
+
+@pytest.mark.parametrize(
+ 'db_engine,options,clazz,expected_exception',
+ [
+ # postgres
+ pytest.param('postgres', {'schema': 'test_schema'}, SqlAlchemyDbManager, None, marks=[pytest.mark.postgres]),
+ # sqlite
+ ('sqlite', {'db_name': 'test_db_manager.db', 'db_version': 999}, DbManager, None),
+ # invalid engine
+ ('super-duper-database', {}, None, ValueError)
+ ])
+def test_db_manager(
+ tmp_path: Path,
+ db_engine: str,
+ options: dict,
+ clazz: Type,
+ expected_exception: BaseException,
+ request
+):
+ """Regression tests for ``DbManager`` and ``SqlAlchemy``.
+
+ Tests for regression issues in ``DbManager``, and for compatibility issues
+ with the new ``SqlAlchemyDbManager``.
+
+ The parameters allow the test to be run with different engine+options,
+ accepting also the expected database type (``clazz``). You can also
+ mark certain tests belonging to a group (e.g. postgres) so that they
+ are skipped/executed selectively in CICD environments.
+ """
+
+ is_sqlalchemy = db_engine != 'sqlite'
+ if not is_sqlalchemy:
+ # N.B.: We do it here, as we don't know the temporary path name until the fixture exists,
+ # and because it's harmless to the Postgres test to have the tmp_path fixture.
+ options['root_path'] = str(tmp_path)
+
+ # In this test we will create a random table, to show that it works with any table,
+ # not only the ones we have in the ``database.tables`` package.
+ table_name = DBVersionTable.name
+
+ # The database manager under test will be either the old type ``DbManager``, or
+ # of the new type ``SqlAlchemyManager``.
+ db_manager: Optional[Union[DbManager, SqlAlchemyDbManager]] = None
+
+ try:
+ # Is this parametrized test expected to fail?
+ if expected_exception is not None:
+ with pytest.raises(expected_exception): # type: ignore # TODO: [testing] how to type this?
+ create_db_manager(db_engine, **options)
+ return
+
+ # If not, then now we dynamically load the fixture for that DB,
+ # ref: https://stackoverflow.com/a/64348247.
+ request.getfixturevalue(f'as_db_{db_engine}')
+
+ database_manager: DatabaseManager = create_db_manager(db_engine, **options)
+
+ # The database manager created has the right type?
+ assert isinstance(database_manager, clazz)
+ db_manager: clazz = cast(clazz, database_manager)
+
+ # The database manager was constructed right?
+ if is_sqlalchemy:
+ assert db_engine in db_manager.engine.dialect.name
+ assert db_manager.schema == options['schema']
+ else:
+ assert db_manager.root_path == options['root_path']
+ assert db_manager.db_name == options['db_name']
+ assert db_manager.db_version == options['db_version']
+
+ # In both cases, we must have a connection set up now
+ assert db_manager.connection is not None
+
+ # NOTE: From this part forward, the behaviour MUST be the same for
+ # SQLite and Postgres or any other engine. This is the test
+ # that verifies that whatever we do with SQLite, works the
+ # same with another engine.
+
+ # Create table
+ # TODO: get the fields dynamically?
+ db_manager.create_table(table_name, ['version'])
+
+ # Table should be empty
+ records = db_manager.select_all(table_name)
+ assert isinstance(records, list) and len(records) == 0
+
+ # Insert N items
+ rand_len = random.randint(10, 50)
+ # db_manager.insertMany(table_name, [{"version": i} for i in range(rand_len)])
+ db_manager.insertMany(table_name, [[i] for i in range(rand_len)])
+ records = db_manager.select_all(table_name)
+ assert isinstance(records, list) and len(records) == rand_len
+
+ # Delete table items (only available in the new SQLAlchemy DB Manager)
+ if is_sqlalchemy:
+ db_manager.delete_all(table_name)
+ records = db_manager.select_all(table_name)
+ assert isinstance(records, list) and len(records) == 0
+
+ # Drop table
+ db_manager.drop_table(table_name)
+ with pytest.raises((SQLAlchemyError, OperationalError)): # type: ignore # different errors, but expected...
+ # Table not exist
+ _ = db_manager.select_all(table_name)
+ finally:
+ if db_manager is not None:
+ db_manager.disconnect()
diff --git a/test/unit/test_db_structure.py b/test/unit/test_db_structure.py
new file mode 100644
index 0000000000000000000000000000000000000000..a3dce8ad553ce992073b7eb5bbcfa932d6733113
--- /dev/null
+++ b/test/unit/test_db_structure.py
@@ -0,0 +1,49 @@
+import networkx as nx
+import pytest
+from pathlib import Path
+from typing import Type
+
+from autosubmit.database import db_structure
+from autosubmit.database.db_manager import DbManager, SqlAlchemyDbManager
+
+
+@pytest.mark.parametrize(
+ 'db_engine,options,clazz',
+ [
+ # postgres
+ pytest.param('postgres', {'schema': 'test_schema'}, SqlAlchemyDbManager, marks=[pytest.mark.postgres]),
+ # sqlite
+ ('sqlite', {'db_name': 'test_db_manager.db', 'db_version': 999}, DbManager)
+ ])
+def test_db_structure(
+ tmp_path: Path,
+ db_engine: str,
+ options: dict,
+ clazz: Type,
+ request
+):
+ # Load dynamically the fixture,
+ # ref: https://stackoverflow.com/a/64348247.
+ request.getfixturevalue(f'as_db_{db_engine}')
+
+ graph = nx.DiGraph([("a", "b"), ("b", "c"), ("a", "d")])
+ graph.add_node("z")
+
+ # Creates a new SQLite db file
+ expid = "ut01"
+
+ # Table not exists
+ assert db_structure.get_structure(expid, str(tmp_path)) == {}
+
+ # Save table
+ db_structure.save_structure(graph, expid, str(tmp_path))
+
+ # Get correct data
+ structure_data = db_structure.get_structure(expid, str(tmp_path))
+ assert sorted(structure_data) == sorted({
+ "a": ["b", "d"],
+ "b": ["c"],
+ "c": [],
+ "d": [],
+ "z": ["z"],
+ })
diff --git a/test/unit/test_dependencies.py b/test/unit/test_dependencies.py
index 2bb91211140c68715fdf0084864c773dcb2dacb9..f4452976381695ee10fc25a5d874eebcc11a4f3b 100644
--- a/test/unit/test_dependencies.py
+++ b/test/unit/test_dependencies.py
@@ -8,6 +8,7 @@ import unittest
from copy import deepcopy
from datetime import datetime
from mock import patch
+from pathlib import Path
from autosubmit.job.job_dict import DicJobs
from autosubmit.job.job import Job
@@ -36,11 +37,12 @@ class FakeBasicConfig:
LOCAL_TMP_DIR = '/dummy/local/temp/dir'
LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
class TestJobList(unittest.TestCase):
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self.as_conf = mock.Mock()
self.as_conf.experiment_data = dict()
@@ -48,8 +50,17 @@ class TestJobList(unittest.TestCase):
self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"]
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
self.JobList = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf)
+ JobListPersistenceDb(self.experiment_id), self.as_conf)
self.date_list = ["20020201", "20020202", "20020203", "20020204", "20020205", "20020206", "20020207",
"20020208", "20020209", "20020210"]
self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"]
diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py
index f8b2138e656428c4db95c65b58c4a12e101cfcda..e9ba8e1b73f5d8afc77eba31acbeff29ac42a6a5 100644
--- a/test/unit/test_dic_jobs.py
+++ b/test/unit/test_dic_jobs.py
@@ -7,6 +7,7 @@ from mock import Mock
import math
import shutil
import tempfile
+from pathlib import Path
from autosubmit.job.job import Job
from autosubmitconfigparser.config.yamlparser import YAMLParserFactory
@@ -16,10 +17,32 @@ from autosubmit.job.job_dict import DicJobs
from autosubmit.job.job_list import JobList
from autosubmit.job.job_list_persistence import JobListPersistenceDb
from unittest.mock import patch
+import inspect
+
+class FakeBasicConfig:
+ def __init__(self):
+ pass
+ def props(self):
+ pr = {}
+ for name in dir(self):
+ value = getattr(self, name)
+ if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
+ pr[name] = value
+ return pr
+ DB_DIR = '/dummy/db/dir'
+ DB_FILE = '/dummy/db/file'
+ DB_PATH = '/dummy/db/path'
+ LOCAL_ROOT_DIR = '/dummy/local/root/dir'
+ LOCAL_TMP_DIR = '/dummy/local/temp/dir'
+ LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
+ DEFAULT_PLATFORMS_CONF = ''
+ DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
class TestDicJobs(TestCase):
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self.as_conf = Mock()
@@ -30,8 +53,19 @@ class TestDicJobs(TestCase):
self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"]
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf)
+ JobListPersistenceDb(self.experiment_id), self.as_conf)
self.parser_mock = Mock(spec='SafeConfigParser')
self.date_list = ['fake-date1', 'fake-date2']
self.member_list = ["fc1", "fc2", "fc3", "fc4", "fc5", "fc6", "fc7", "fc8", "fc9", "fc10"]
@@ -600,26 +634,3 @@ class TestDicJobs(TestCase):
section_data = []
self.dictionary._create_jobs_split(5,'fake-section','fake-date', 'fake-member', 'fake-chunk', 0,Type.BASH, section_data)
self.assertEqual(5, len(section_data))
-
-
-
-
-import inspect
-class FakeBasicConfig:
- def __init__(self):
- pass
- def props(self):
- pr = {}
- for name in dir(self):
- value = getattr(self, name)
- if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
- pr[name] = value
- return pr
- DB_DIR = '/dummy/db/dir'
- DB_FILE = '/dummy/db/file'
- DB_PATH = '/dummy/db/path'
- LOCAL_ROOT_DIR = '/dummy/local/root/dir'
- LOCAL_TMP_DIR = '/dummy/local/temp/dir'
- LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
- DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''
diff --git a/test/unit/test_history.py b/test/unit/test_history.py
index 68495d8e9944a3f558a3e085d285ab231e07bf79..3695e86f46ed85f0fb9591ac1593c4fc3a37df05 100644
--- a/test/unit/test_history.py
+++ b/test/unit/test_history.py
@@ -1,5 +1,6 @@
#!/usr/bin/python
+import pytest
# Copyright 2015-2020 Earth Sciences Department, BSC-CNS
# This file is part of Autosubmit.
@@ -36,6 +37,7 @@ JOBDATA_DIR = BasicConfig.JOBDATA_DIR
LOCAL_ROOT_DIR = BasicConfig.LOCAL_ROOT_DIR
job = namedtuple("Job", ["name", "date", "member", "status_str", "children"])
+@pytest.mark.skip()
@unittest.skip('TODO: another test that uses actual data. See if there is anything useful, and extract into functional/integration/unit tests that run on any machine')
class TestExperimentHistory(unittest.TestCase):
# @classmethod
@@ -290,6 +292,7 @@ class TestExperimentHistory(unittest.TestCase):
+@pytest.mark.skip()
class TestLogging(unittest.TestCase):
def setUp(self):
@@ -314,4 +317,4 @@ class TestLogging(unittest.TestCase):
if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_job_graph.py b/test/unit/test_job_graph.py
index 579aee5adb3bf2c82ead800e7c8552cbfb57876b..bd9ee4e8cb9d6d3c4a8a33d9aa1d551a1f2e3348 100644
--- a/test/unit/test_job_graph.py
+++ b/test/unit/test_job_graph.py
@@ -1,8 +1,9 @@
import shutil
import tempfile
+from pathlib import Path
from unittest import TestCase
-from mock import Mock
+from mock import Mock, patch
from autosubmit.job.job_common import Status
from autosubmit.job.job_list import JobList
@@ -12,9 +13,26 @@ from random import randrange
from autosubmit.job.job import Job
from autosubmit.monitor.monitor import Monitor
import unittest
+
+class FakeBasicConfig:
+ def __init__(self):
+ pass
+
+ DB_DIR = '/dummy/db/dir'
+ DB_FILE = '/dummy/db/file'
+ DB_PATH = '/dummy/db/path'
+ LOCAL_ROOT_DIR = '/dummy/local/root/dir'
+ LOCAL_TMP_DIR = '/dummy/local/temp/dir'
+ LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
+ DEFAULT_PLATFORMS_CONF = ''
+ DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
+
+
class TestJobGraph(TestCase):
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self.as_conf = Mock()
self.as_conf.experiment_data = dict()
@@ -22,8 +40,19 @@ class TestJobGraph(TestCase):
self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"]
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf)
+ JobListPersistenceDb(self.experiment_id),self.as_conf)
self.parser_mock = Mock(spec='SafeConfigParser')
# Basic workflow with SETUP, INI, SIM, POST, CLEAN
@@ -926,15 +955,3 @@ class TestJobGraph(TestCase):
job.split = split
return job
-class FakeBasicConfig:
- def __init__(self):
- pass
-
- DB_DIR = '/dummy/db/dir'
- DB_FILE = '/dummy/db/file'
- DB_PATH = '/dummy/db/path'
- LOCAL_ROOT_DIR = '/dummy/local/root/dir'
- LOCAL_TMP_DIR = '/dummy/local/temp/dir'
- LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
- DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''
\ No newline at end of file
diff --git a/test/unit/test_job_grouping.py b/test/unit/test_job_grouping.py
index 01b53761a2b98e72ee95dc9c0f7743da61da7e0f..d5649afaedb89b552b1a7c35c6eb065cb25c81a1 100644
--- a/test/unit/test_job_grouping.py
+++ b/test/unit/test_job_grouping.py
@@ -1,5 +1,6 @@
import shutil
import tempfile
+from pathlib import Path
from unittest import TestCase
from mock import Mock
@@ -14,9 +15,31 @@ from mock import patch
from autosubmit.job.job_grouping import JobGrouping
+import inspect
+class FakeBasicConfig:
+ def __init__(self):
+ pass
+ def props(self):
+ pr = {}
+ for name in dir(self):
+ value = getattr(self, name)
+ if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
+ pr[name] = value
+ return pr
+ DB_DIR = '/dummy/db/dir'
+ DB_FILE = '/dummy/db/file'
+ DB_PATH = '/dummy/db/path'
+ LOCAL_ROOT_DIR = '/dummy/local/root/dir'
+ LOCAL_TMP_DIR = '/dummy/local/temp/dir'
+ LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
+ DEFAULT_PLATFORMS_CONF = ''
+ DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
+
class TestJobGrouping(TestCase):
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self.as_conf = Mock()
self.as_conf.experiment_data = dict()
@@ -24,8 +47,19 @@ class TestJobGrouping(TestCase):
self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"]
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf)
+ JobListPersistenceDb(self.experiment_id),self.as_conf)
self.parser_mock = Mock(spec='SafeConfigParser')
# Basic workflow with SETUP, INI, SIM, POST, CLEAN
@@ -992,25 +1026,3 @@ class TestJobGrouping(TestCase):
job.split = split
return job
-import inspect
-class FakeBasicConfig:
- def __init__(self):
- pass
- def props(self):
- pr = {}
- for name in dir(self):
- value = getattr(self, name)
- if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
- pr[name] = value
- return pr
- DB_DIR = '/dummy/db/dir'
- DB_FILE = '/dummy/db/file'
- DB_PATH = '/dummy/db/path'
- LOCAL_ROOT_DIR = '/dummy/local/root/dir'
- LOCAL_TMP_DIR = '/dummy/local/temp/dir'
- LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
- DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''
-
-
-
diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py
index e70f1087225bd228027490c577a692f37dcd4e34..8b41c2002dbb0d68cb0be8b9085eb9ff5946f03a 100644
--- a/test/unit/test_job_package.py
+++ b/test/unit/test_job_package.py
@@ -34,10 +34,12 @@ class FakeBasicConfig:
LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
DEFAULT_PLATFORMS_CONF = ''
DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
class TestJobPackage(TestCase):
- def setUpWrappers(self,options):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUpWrappers(self,options,patched_basic_config):
# reset
self.as_conf = None
self.job_package_wrapper = None
@@ -53,8 +55,19 @@ class TestJobPackage(TestCase):
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.as_conf.experiment_data["WRAPPERS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'), self.as_conf)
+ JobListPersistenceDb(self.experiment_id), self.as_conf)
self.parser_mock = MagicMock(spec='SafeConfigParser')
for job in self.jobs:
job._init_runtime_parameters()
diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py
index 2c605bef3e4c25002d59ec72574d08276e83665b..e88b7c0ee30d46b053a5d52d9e2c2b9f6109b251 100644
--- a/test/unit/test_wrappers.py
+++ b/test/unit/test_wrappers.py
@@ -3,9 +3,10 @@ from operator import attrgetter
import shutil
import tempfile
+from pathlib import Path
from unittest import TestCase
-from mock import MagicMock
+from mock import MagicMock, patch
import log.log
from autosubmit.job.job_packager import JobPackager
@@ -19,6 +20,28 @@ from autosubmit.job.job_list_persistence import JobListPersistenceDb
from autosubmit.job.job_common import Status
from random import randrange
from collections import OrderedDict
+import inspect
+
+
+class FakeBasicConfig:
+ def __init__(self):
+ pass
+ def props(self):
+ pr = {}
+ for name in dir(self):
+ value = getattr(self, name)
+ if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
+ pr[name] = value
+ return pr
+ DB_DIR = '/dummy/db/dir'
+ DB_FILE = '/dummy/db/file'
+ DB_PATH = '/dummy/db/path'
+ LOCAL_ROOT_DIR = '/dummy/local/root/dir'
+ LOCAL_TMP_DIR = '/dummy/local/temp/dir'
+ LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
+ DEFAULT_PLATFORMS_CONF = ''
+ DEFAULT_JOBS_CONF = ''
+ DATABASE_BACKEND = 'sqlite'
class TestWrappers(TestCase):
@@ -156,7 +179,8 @@ class TestWrappers(TestCase):
cls.workflows['running_once']['sections']["s5"]["WALLCLOCK"] = '00:30'
cls.workflows['running_once']['sections']["s5"]["DEPENDENCIES"] = "s2"
- def setUp(self):
+ @patch('autosubmit.job.job_list_persistence.BasicConfig', new_callable=FakeBasicConfig)
+ def setUp(self, patched_basic_config):
self.experiment_id = 'random-id'
self._wrapper_factory = MagicMock()
@@ -170,8 +194,19 @@ class TestWrappers(TestCase):
self.as_conf.experiment_data["PLATFORMS"] = dict()
self.as_conf.experiment_data["WRAPPERS"] = dict()
self.temp_directory = tempfile.mkdtemp()
+
+ patched_basic_config.DB_DIR = self.temp_directory
+ patched_basic_config.DB_FILE = Path(self.temp_directory, 'test-db.db')
+ patched_basic_config.DB_PATH = patched_basic_config.DB_FILE
+ patched_basic_config.LOCAL_PROJ_DIR = self.temp_directory
+ patched_basic_config.LOCAL_ROOT_DIR = self.temp_directory
+ patched_basic_config.LOCAL_TMP_DIR = self.temp_directory
+
+ Path(patched_basic_config.DB_FILE).touch()
+ Path(patched_basic_config.LOCAL_ROOT_DIR, self.experiment_id, 'pkl').mkdir(parents=True, exist_ok=True)
+
self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(),
- JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf)
+ JobListPersistenceDb(self.experiment_id),self.as_conf)
self.parser_mock = MagicMock(spec='SafeConfigParser')
@@ -1937,24 +1972,3 @@ class TestWrappers(TestCase):
job.section = section
return job
-
-
-import inspect
-class FakeBasicConfig:
- def __init__(self):
- pass
- def props(self):
- pr = {}
- for name in dir(self):
- value = getattr(self, name)
- if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value):
- pr[name] = value
- return pr
- DB_DIR = '/dummy/db/dir'
- DB_FILE = '/dummy/db/file'
- DB_PATH = '/dummy/db/path'
- LOCAL_ROOT_DIR = '/dummy/local/root/dir'
- LOCAL_TMP_DIR = '/dummy/local/temp/dir'
- LOCAL_PROJ_DIR = '/dummy/local/proj/dir'
- DEFAULT_PLATFORMS_CONF = ''
- DEFAULT_JOBS_CONF = ''