diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6bc39f8627db2dba04db63e5d975ad57b756e3e4..e1d1d011b5beb45738844b6bd1ca9e7abc104784 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -74,8 +74,8 @@ import re import random import signal import datetime -# import log.fd_show as fd_show -import portalocker +import log.fd_show as fd_show +from autosubmit.helpers.utils import ASLock from pkg_resources import require, resource_listdir, resource_string, resource_filename #import importlib from collections import defaultdict @@ -2133,8 +2133,8 @@ class Autosubmit: # checking if there is a lock file to avoid multiple running on the same expid try: - # Portalocker is used to avoid multiple autosubmit running on the same experiment, we have to change this system in #806 - with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): + # ASLock is used to avoid multiple autosubmit running on the same experiment. #806 + with ASLock(expid): try: Log.debug("Preparing run") # This function is called only once, when the experiment is started. It is used to initialize the experiment and to check the correctness of the configuration files. @@ -2325,9 +2325,6 @@ class Autosubmit: 7051, e.message) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error raise AutosubmitCritical(e.message, e.code, e.trace) - except (portalocker.AlreadyLocked, portalocker.LockException) as e: - message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message, 7000) except BaseException as e: raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") @@ -2381,10 +2378,6 @@ class Autosubmit: exp_history.finish_current_experiment_run() except: Log.warning("Database is locked") - except (portalocker.AlreadyLocked, portalocker.LockException) as e: - message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - terminate_child_process(expid) - raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: terminate_child_process(expid) raise @@ -4020,7 +4013,7 @@ class Autosubmit: backup_pkl_path = os.path.join( pkl_folder_path, "job_list_{}_backup.pkl".format(expid)) try: - with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): + with ASLock(expid): # Not locked Log.info("Looking for backup file {}".format(backup_pkl_path)) if os.path.exists(backup_pkl_path): @@ -4076,9 +4069,6 @@ class Autosubmit: else: Log.info( "Backup file not found. Pkl restore operation stopped. No changes have been made.") - except (portalocker.AlreadyLocked, portalocker.LockException) as e: - message = "Another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on the /tmp folder." - raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: raise AutosubmitCritical(e.message, e.code, e.trace) except BaseException as e: @@ -4451,7 +4441,7 @@ class Autosubmit: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) # Encapsulating the lock - with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1) as fh: + with ASLock(expid): try: Log.info( "Preparing .lock file to avoid multiple instances with same expid.") @@ -4607,8 +4597,6 @@ class Autosubmit: Log.result("\nJob list created successfully") Log.warning( "Remember to MODIFY the MODEL config files!") - fh.flush() - os.fsync(fh.fileno()) if detail: Autosubmit.detail(job_list) return True @@ -4616,14 +4604,9 @@ class Autosubmit: except KeyboardInterrupt as e: # Setting signal handler to handle subsequent CTRL-C signal.signal(signal.SIGINT, signal_handler_create) - fh.flush() - os.fsync(fh.fileno()) raise AutosubmitCritical("Stopped by user input", 7010) except BaseException as e: raise - except (portalocker.AlreadyLocked, portalocker.LockException) as e: - message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message, 7000) except AutosubmitError as e: raise except AutosubmitCritical as e: @@ -5160,7 +5143,7 @@ class Autosubmit: job_validation_message = " " # checking if there is a lock file to avoid multiple running on the same expid try: - with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): + with ASLock(expid): Log.info( "Preparing .lock file to avoid multiple instances with same expid.") @@ -5399,9 +5382,6 @@ class Autosubmit: groups=groups_dict, job_list_object=job_list) return True - except (portalocker.AlreadyLocked, portalocker.LockException) as e: - message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message, 7000) except (AutosubmitError, AutosubmitCritical): raise except BaseException as e: diff --git a/autosubmit/database/repositories/__init__.py b/autosubmit/database/repositories/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/autosubmit/database/repositories/locks.py b/autosubmit/database/repositories/locks.py new file mode 100644 index 0000000000000000000000000000000000000000..c3551ae5cf223ca8ffad3b9c7bcaab902208cca8 --- /dev/null +++ b/autosubmit/database/repositories/locks.py @@ -0,0 +1,204 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime, timezone +import os +import json +import sqlite3 +from typing import Optional +from autosubmitconfigparser.config.basicconfig import BasicConfig + + +@dataclass +class LockModel: + """ + Data model of the Autosubmit Lock + """ + + lock_id: str + hostname: str + pid: int + created: datetime = datetime.now(timezone.utc) + + +class LocksRepository(ABC): + """ + Interface for Autosubmit Locks Repository + """ + + @abstractmethod + def get_lock(self, lock_id: str) -> Optional[LockModel]: + """ + Get the lock by lock_id. Return None if the lock does not exist. + """ + + @abstractmethod + def insert_lock_from_model(self, lock: LockModel): + """ + Insert the lock + """ + + @abstractmethod + def insert_lock(self, lock_id: str, hostname: str, pid: int): + """ + Insert the lock. Create a LockModel object first. + """ + + @abstractmethod + def delete_lock(self, lock_id: str): + """ + Delete the lock + """ + + +class LocksFileRepository(LocksRepository): + """ + Repository of Autosubmit Locks using files. + It creates a lock file for each lock which is stored in the directory. + """ + + def __init__(self, dir_path: str): + self.dir_path = dir_path + self._initialize() + + def _initialize(self): + """ + Create the locks directory if it does not exist + """ + os.makedirs(self.dir_path, exist_ok=True) + + def get_lock(self, lock_id: str) -> Optional[LockModel]: + """ + Get the lock by lock_id. Return None if the lock does not exist. + """ + lock_file = os.path.join(self.dir_path, lock_id) + if not os.path.exists(lock_file): + return None + + # Load from JSON format + with open(lock_file, "r") as f: + lock_data = json.load(f) + return LockModel( + lock_data["lock_id"], + lock_data["hostname"], + lock_data["pid"], + datetime.fromisoformat(lock_data["created"]), + ) + + def insert_lock_from_model(self, lock: LockModel): + """ + Insert the lock into the directory. Save content in JSON format. + """ + lock_file = os.path.join(self.dir_path, lock.lock_id) + # Save in JSON format + with open(lock_file, "w") as f: + json.dump( + { + "lock_id": lock.lock_id, + "hostname": lock.hostname, + "pid": lock.pid, + "created": lock.created.isoformat(), + }, + f, + ) + + def insert_lock(self, lock_id: str, hostname: str, pid: int): + """ + Insert the lock into the directory. Create a LockModel object first. + """ + new_lock = LockModel(lock_id, hostname, pid, datetime.now(timezone.utc)) + self.insert_lock_from_model(new_lock) + + def delete_lock(self, lock_id: str): + """ + Delete the lock from the directory + """ + lock_file = os.path.join(self.dir_path, lock_id) + os.remove(lock_file) + + +class LocksDBRepository(LocksRepository): + """ + Repository of Autosubmit Locks using SQLite + """ + + # TODO: Generalize with SQLAlchemy + + def __init__(self, db_path: str): + self.db_path = db_path + self._initialize() + + def _initialize(self): + """ + Create the locks table if it does not exist + """ + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS locks ( + lock_id TEXT PRIMARY KEY, + hostname TEXT, + pid INTEGER, + created TEXT + ) + """) + conn.commit() + + def get_lock(self, lock_id: str) -> Optional[LockModel]: + """ + Get the lock by lock_id + """ + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT lock_id, hostname, pid, created FROM locks WHERE lock_id = ?", + (lock_id,), + ) + row = cursor.fetchone() + if row is None: + return None + return LockModel( + row[0], # lock_id + row[1], # hostname + row[2], # pid + datetime.fromisoformat(row[3]), # created + ) + + def insert_lock_from_model(self, lock: LockModel): + """ + Insert the lock into the database + """ + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "INSERT INTO locks VALUES (?, ?, ?, ?)", + (lock.lock_id, lock.hostname, lock.pid, lock.created.isoformat()), + ) + conn.commit() + + def insert_lock(self, lock_id: str, hostname: str, pid: int): + """ + Insert the lock into the database. Create a LockModel object first. + """ + new_lock = LockModel(lock_id, hostname, pid, datetime.now(timezone.utc)) + self.insert_lock_from_model(new_lock) + + def delete_lock(self, lock_id: str): + """ + Delete the lock from the database + """ + with sqlite3.connect(self.db_path) as conn: + conn.execute("DELETE FROM locks WHERE lock_id = ?", (lock_id,)) + conn.commit() + + +def create_locks_repository() -> LocksRepository: + """ + Factory function to create the LocksRepository. + + """ + basic_config = BasicConfig() + basic_config.read() + + # TODO: Use the BasicConfig to determine the repository type. For now, it is hardcoded to use the file repository. + + locks_dir_path = os.path.join(basic_config.LOCAL_ROOT_DIR, "locks") + + return LocksFileRepository(locks_dir_path) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index 6d13cb53b32215864e8036ec223a2689dbb6efe2..adf5b0131b2f41806a648763beda50dc18e5a36a 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -5,12 +5,69 @@ import re import signal import subprocess from itertools import zip_longest - from autosubmitconfigparser.config.basicconfig import BasicConfig - +from autosubmit.database.repositories.locks import create_locks_repository from autosubmit.notifications.mail_notifier import MailNotifier from autosubmit.notifications.notifier import Notifier from log.log import AutosubmitCritical, Log +import socket +import psutil + + +class ASLock: + def __init__(self, lock_id: str): + """ + Autosubmit Lock implementation + + :param lock_id: The lock identifier + + ### Usage: + ```python + with ASLock("lock_id"): + # Do something + ``` + """ + self.locks_repository = create_locks_repository() + + # Lock data + self.lock_id = lock_id + self.current_hostname = socket.gethostname() + self.current_pid = os.getpid() + + def __enter__(self): + """ + Acquire the lock + """ + # Check if there is a lock exisitng with the same lock_id + curr_lock = self.locks_repository.get_lock(self.lock_id) + if curr_lock: + if curr_lock.hostname == self.current_hostname: + if psutil.pid_exists(curr_lock.pid): + raise AutosubmitCritical( + f"Lock {self.lock_id} already exists and is being used by " + "process with PID {curr_lock.pid} on host {curr_lock.hostname}", + 7000, + ) + else: + raise AutosubmitCritical( + f"Lock {self.lock_id} already exists and is being used by process with PID {curr_lock.pid} " + f"on host {curr_lock.hostname}.\nPlease, use the previous hostname and try again", + 7000, + ) + # TODO: Include a way to force the lock release e.g. autosubmit force-release-lock + + # Create the lock + self.locks_repository.insert_lock( + self.lock_id, self.current_hostname, self.current_pid + ) + Log.info(f"Lock {self.lock_id} created") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Release the lock + """ + self.locks_repository.delete_lock(self.lock_id) def check_jobs_file_exists(as_conf, current_section_name=None): diff --git a/bin/autosubmit b/bin/autosubmit index d08f47593a4bffef1fe577076a6ffe24c394b856..717ab109f13467d26b66010a7f430b88ff1650d8 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -39,8 +39,6 @@ def exit_from_error(e: BaseException): Log.debug(trace) except: print(trace) - with suppress(FileNotFoundError, PermissionError): - os.remove(os.path.join(Log.file_path, "autosubmit.lock")) if isinstance(e, (AutosubmitCritical, AutosubmitError)): e: Union[AutosubmitError, AutosubmitCritical] = e if e.trace: @@ -57,8 +55,6 @@ def exit_from_error(e: BaseException): def main(): try: return_value = Autosubmit.parse_args() - if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): - os.remove(os.path.join(Log.file_path, "autosubmit.lock")) if type(return_value) is int: os._exit(return_value) os._exit(0) diff --git a/docs/source/conf.py b/docs/source/conf.py index e9c6c69d9caf0375df94609da64a7b332fabfa4f..0c89769f21c6faee1844b9ab9bf5b90d95516bfc 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -121,7 +121,7 @@ pygments_style = 'sphinx' # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = False -autodoc_mock_imports = ["portalocker", "argparse", "python-dateutil", "py3dotplus", "pyparsing", +autodoc_mock_imports = ["argparse", "python-dateutil", "py3dotplus", "pyparsing", 'numpy', 'matplotlib', 'matplotlib.pyplot', 'matplotlib.gridspec', 'matplotlib.patches', 'paramiko', 'mock', "networkx", 'networkx.algorithms.dag', 'bscearth.utils', 'bscearth.utils.config_parser', 'bscearth.utils.date'] diff --git a/docs/source/installation/index.rst b/docs/source/installation/index.rst index f8a68532dbc087fd64e43aacaa848fcd7cfb784c..f10569a4b12745952fe608d14af18dba0c84c295 100644 --- a/docs/source/installation/index.rst +++ b/docs/source/installation/index.rst @@ -11,7 +11,7 @@ The Autosubmit code is hosted in Git, at the BSC GitLab public repository. The A .. important:: (SYSTEM) Graphviz version must be >= 2.38 except 2.40(not working). You can check the version using dot -v. -- Python dependencies: configobj>=5.0.6, argparse>=1.4.0 , python-dateutil>=2.8.2, matplotlib==3.4.3, numpy==1.21.6, py3dotplus>=1.1.0, pyparsing>=3.0.7, paramiko>=2.9.2, mock>=4.0.3, six>=1.10, portalocker>=2.3.2, networkx==2.6.3, requests>=2.27.1, bscearth.utils>=0.5.2, cryptography>=36.0.1, setuptools>=60.8.2, xlib>=0.21, pip>=22.0.3, ruamel.yaml, pythondialog, pytest, nose, coverage, PyNaCl==1.4.0, six>=1.10.0, requests, xlib, Pygments, packaging==19, typing>=3.7, autosubmitconfigparser +- Python dependencies: configobj>=5.0.6, argparse>=1.4.0 , python-dateutil>=2.8.2, matplotlib==3.4.3, numpy==1.21.6, py3dotplus>=1.1.0, pyparsing>=3.0.7, paramiko>=2.9.2, mock>=4.0.3, six>=1.10, networkx==2.6.3, requests>=2.27.1, bscearth.utils>=0.5.2, cryptography>=36.0.1, setuptools>=60.8.2, xlib>=0.21, pip>=22.0.3, ruamel.yaml, pythondialog, pytest, nose, coverage, PyNaCl==1.4.0, six>=1.10.0, requests, xlib, Pygments, packaging==19, typing>=3.7, autosubmitconfigparser .. important:: ``dot -v`` command should contain "dot", pdf, png, SVG, Xlib in the device section. diff --git a/setup.py b/setup.py index 8d1f5b1001c304d12a82508756b162fc54ba7284..bc9d7d2ad9f4be3adcbd9ce012019acf3ac4d61d 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,6 @@ install_requires = [ 'bscearth.utils<=0.5.2', 'requests<=2.31.0', 'networkx<=2.6.3', - 'portalocker<=2.7.0', 'paramiko<=3.4', 'pyparsing==3.1.1', 'matplotlib<=3.8.3', diff --git a/test/regression/local_check_details.py b/test/regression/local_check_details.py index ad757806369bea9702c6ac27cce5bdb24303d5a6..5445c1498ecda2cde36ea41972403289672fadb1 100644 --- a/test/regression/local_check_details.py +++ b/test/regression/local_check_details.py @@ -8,6 +8,7 @@ Works under local_computer TODO introduce in CI import os import subprocess +import time BIN_PATH = '../../bin' ACTIVE_DOCS = True # Use autosubmit_docs database VERSION = 4.1 # 4.0 or 4.1 @@ -68,4 +69,8 @@ for experiment in os.listdir(f"{EXPERIMENTS_PATH}"): expids.append(experiment) # Force # expids = ["a001"] -perform_test(expids) \ No newline at end of file +#General time +start = time.time() +perform_test(expids) +end = time.time() +print(f"Acc time taken: {end-start}") \ No newline at end of file diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..3e903a28c0b1c6a0864c37745bd77488710612ad --- /dev/null +++ b/test/unit/test_utils.py @@ -0,0 +1,91 @@ +import os +import socket +import pytest +from pathlib import Path +from autosubmit.database.repositories.locks import ( + create_locks_repository, + LocksRepository, + LocksFileRepository, +) +from autosubmit.helpers.utils import ASLock +from autosubmitconfigparser.config.basicconfig import BasicConfig +from log.log import AutosubmitCritical + + +@pytest.fixture +def utils_tmp_path(tmp_path_factory): + return tmp_path_factory.mktemp("utils") + + +class MockBasicConfig(BasicConfig): + def read(self): + pass + + +@pytest.fixture +def mock_basic_config(monkeypatch, utils_tmp_path): + MockBasicConfig.LOCAL_ROOT_DIR = utils_tmp_path + monkeypatch.setattr("autosubmit.helpers.utils.BasicConfig", MockBasicConfig) + return MockBasicConfig + + +def test_aslock(mock_basic_config): + """ + Test acquiring and releasing the lock + """ + lock_id = "test_aslock" + current_hostname = socket.gethostname() + current_pid = os.getpid() + + # Test acquiring the lock + lock_type = None + with ASLock(lock_id) as locker: + assert isinstance(locker.locks_repository, LocksRepository) + + if isinstance(locker.locks_repository, LocksFileRepository): + # Check the lock file + lock_type = "file" + lock_file_path = Path(locker.locks_repository.dir_path) / lock_id + assert lock_file_path.exists(), "Lock file should exist" + assert lock_file_path.is_file(), "Lock file should be a file" + # TODO: Add more lock types (SQLAlchemy) + + # Get and check the lock + curr_lock = locker.locks_repository.get_lock(lock_id) + assert curr_lock.hostname == current_hostname, "Hostname should match" + assert curr_lock.pid == current_pid, "PID should match" + + # Test releasing the lock + if lock_type == "file": + assert not lock_file_path.exists(), "Lock file should not exist" + # TODO: Add more lock types (SQLAlchemy) + + locks_repository = create_locks_repository() + curr_lock = locks_repository.get_lock(lock_id) + assert curr_lock is None + + +def test_aslock_error(mock_basic_config): + """ + Test acquiring the lock multiple times should raise an error + """ + lock_id = "test_aslock_error" + + # Test acquiring the lock + with ASLock(lock_id): + # Snapshot the current lock + locks_repository = create_locks_repository() + lock_snapshot = locks_repository.get_lock(lock_id) + + # Try creating lock multiple times + for i in range(10): + with pytest.raises(AutosubmitCritical): + with ASLock(lock_id): + pass + + curr_lock = locks_repository.get_lock(lock_id) + assert curr_lock.hostname == lock_snapshot.hostname, "Hostname should match" + assert curr_lock.pid == lock_snapshot.pid, "PID should match" + assert ( + curr_lock.created == lock_snapshot.created + ), "Created datetime should match"