From c150913b1f500faf54e0a6f66b02d3c0b173df89 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 9 Oct 2020 14:55:53 +0200 Subject: [PATCH 1/3] Implementing #569. Added start after experiment completion trigger. --- autosubmit/autosubmit.py | 82 +++++++++++++++++++++++++------ autosubmit/database/db_common.py | 54 ++++++++++++-------- autosubmit/database/db_jobdata.py | 71 ++++++++++++++++---------- 3 files changed, 145 insertions(+), 62 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 94b0da491..ab1cd5bfc 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -26,7 +26,7 @@ from notifications.notifier import Notifier from notifications.mail_notifier import MailNotifier from bscearth.utils.date import date2str from monitor.monitor import Monitor -from database.db_common import get_autosubmit_version +from database.db_common import get_autosubmit_version, check_experiment_exists from database.db_common import delete_experiment from experiment.experiment_common import copy_experiment from experiment.experiment_common import new_experiment @@ -155,6 +155,8 @@ class Autosubmit: default=False, help='Update experiment version') subparser.add_argument('-st', '--start_time', required=False, help='Sets the starting time for this experiment') + subparser.add_argument('-sa', '--start_after', required=False, + help='Sets a experiment expid which completion will trigger the start of this experiment.') # Expid subparser = subparsers.add_parser( @@ -509,7 +511,7 @@ class Autosubmit: args.command, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time) + return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, args.start_after) elif args.command == 'expid': return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, args.operational, args.config) != '' @@ -1158,7 +1160,7 @@ class Autosubmit: job_list.update_list(as_conf, False) @staticmethod - def run_experiment(expid, notransitive=False, update_version=False, start_time=None): + def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). @@ -1228,6 +1230,43 @@ class Autosubmit: sleep(1) # End of handling starting time block + # Start start after completion trigger block + if start_after: + Log.info("User provided expid completion trigger has been detected.") + # The user tries to be tricky + if str(start_after) == str(expid): + Log.info( + "Hey! What do you think is going to happen? In theory, your experiment will run again after it has been completed. Good luck!") + # Check if experiment exists. If False or None, it does not exist + if not check_experiment_exists(start_after): + return None + # JobStructure object, check_only flag to avoid updating remote experiment + jobStructure = JobDataStructure(start_after, check_only=True) + # Check if database exists + if jobStructure.database_exists == False: + Log.critical( + "Experiment {0} does not have a valid database. Make sure that it is running under the latest version of Autosubmit.".format(start_after)) + return + # Check if database version is correct + if jobStructure.is_header_ready_db_version() == False: + Log.critical("Experiment {0} is running DB version {1} which is not supported by the completion trigger function. An updated DB version is needed.".format( + start_after, jobStructure.db_version)) + return + Log.info("Autosubmit will start monitoring experiment {0}. When the number of completed jobs plus suspended jobs becomes equal to the total number of jobs of experiment {0}, experiment {1} will start. Querying every 60 seconds. Status format Completed/Queuing/Running/Suspended/Failed.".format( + start_after, expid)) + while True: + # Query current run + current_run = jobStructure.get_max_id_experiment_run() + if current_run and current_run.finish > 0 and current_run.total > 0 and current_run.completed + current_run.suspended == current_run.total: + break + else: + sys.stdout.write( + "\rExperiment {0} ({1} total jobs) status {2}/{3}/{4}/{5}/{6}".format(start_after, current_run.total, current_run.completed, current_run.queuing, current_run.running, current_run.suspended, current_run.failed)) + sys.stdout.flush() + # Update every 60 seconds + sleep(60) + # End of completion trigger block + # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): @@ -1508,7 +1547,8 @@ class Autosubmit: for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower()] + job.platform = submitter.platforms[job.platform_name.lower( + )] packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) @@ -1707,10 +1747,13 @@ class Autosubmit: try: can_continue = True while can_continue and retries > 0: - cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i]) - package.jobs[0].platform.send_command(cmd) + cmd = package.jobs[0].platform.get_queue_status_cmd( + jobs_id[i]) + package.jobs[0].platform.send_command( + cmd) queue_status = package.jobs[0].platform._ssh_output - reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i]) + reason = package.jobs[0].platform.parse_queue_reason( + queue_status, jobs_id[i]) if reason == '(JobHeldAdmin)': can_continue = False elif reason == '(JobHeldUser)': @@ -1720,9 +1763,11 @@ class Autosubmit: sleep(5) retries = retries - 1 if not can_continue: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) + package.jobs[0].platform.send_command( + package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) i = i + 1 - continue # skip job if is bug by the admin bug. + # skip job if is bug by the admin bug. + continue if not platform.hold_job(package.jobs[0]): i = i + 1 continue @@ -1743,13 +1788,16 @@ class Autosubmit: job_list.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package - packages_persistence.save(package.name, package.jobs, package._expid, inspect) + packages_persistence.save( + package.name, package.jobs, package._expid, inspect) i += 1 save = True if len(failed_packages) > 0: for job_id in failed_packages: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(job_id)) - raise AutosubmitError("{0} submission failed, some hold jobs failed to be held".format(platform.name), 6015) + package.jobs[0].platform.send_command( + package.jobs[0].platform.cancel_cmd + " {0}".format(job_id)) + raise AutosubmitError( + "{0} submission failed, some hold jobs failed to be held".format(platform.name), 6015) except WrongTemplateException as e: raise AutosubmitCritical("Invalid parameter substitution in {0} template".format( e.job_name), 7014, e.message) @@ -1758,7 +1806,8 @@ class Autosubmit: except AutosubmitCritical as e: raise except Exception as e: - raise AutosubmitError("{0} submission failed".format(platform.name), 6015, e.message) + raise AutosubmitError("{0} submission failed".format( + platform.name), 6015, e.message) return save @@ -3066,7 +3115,6 @@ class Autosubmit: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - # checking if there is a lock file to avoid multiple running on the same expid try: # Encapsulating the lock @@ -3086,7 +3134,8 @@ class Autosubmit: return False update_job = not os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_list_" + expid + ".pkl")) - Autosubmit._create_project_associated_conf(as_conf, False, update_job) + Autosubmit._create_project_associated_conf( + as_conf, False, update_job) # Load parameters Log.info("Loading parameters...") @@ -3167,7 +3216,8 @@ class Autosubmit: for job in jobs_wr: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove - Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, packages_persistence, True) + Autosubmit.generate_scripts_andor_wrappers( + as_conf, job_list_wrappers, jobs_wr, packages_persistence, True) packages = packages_persistence.load(True) else: diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index cfc1c7bed..af0e7149b 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -23,7 +23,7 @@ Module containing functions to manage autosubmit's database. import os import sqlite3 -from log.log import Log,AutosubmitCritical,AutosubmitError +from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") from autosubmit.config.basicConfig import BasicConfig @@ -40,14 +40,15 @@ def create_db(qry): try: (conn, cursor) = open_conn(False) except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) - + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) try: cursor.executescript(qry) except sqlite3.Error as e: close_conn(conn, cursor) - raise AutosubmitCritical('Database can not be created',7004,e.message) + raise AutosubmitCritical( + 'Database can not be created', 7004, e.message) conn.commit() close_conn(conn, cursor) @@ -62,7 +63,8 @@ def check_db(): """ if not os.path.exists(BasicConfig.DB_PATH): - raise AutosubmitCritical('DB path does not exists: {0}'.format(BasicConfig.DB_PATH),7003) + raise AutosubmitCritical( + 'DB path does not exists: {0}'.format(BasicConfig.DB_PATH), 7003) return True @@ -100,12 +102,13 @@ def open_conn(check_version=True): # If database version is not the expected, update database.... if version < CURRENT_DATABASE_VERSION: if not _update_database(version, cursor): - raise AutosubmitCritical('Database version doesn''t match', 7001) + raise AutosubmitCritical( + 'Database version doesn''t match', 7001) # ... or ask for autosubmit upgrade elif version > CURRENT_DATABASE_VERSION: raise AutosubmitCritical('Database version is not compatible with this autosubmit version. Please execute pip install ' - 'autosubmit --upgrade', 7002) + 'autosubmit --upgrade', 7002) return conn, cursor @@ -140,15 +143,16 @@ def save_experiment(name, description, version): try: (conn, cursor) = open_conn() except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) try: cursor.execute('INSERT INTO experiment (name, description, autosubmit_version) VALUES (:name, :description, ' ':version)', {'name': name, 'description': description, 'version': version}) except sqlite3.IntegrityError as e: close_conn(conn, cursor) - raise AutosubmitCritical('Couldn''t register experiment',7005,e.message) - + raise AutosubmitCritical( + 'Couldn''t register experiment', 7005, e.message) conn.commit() close_conn(conn, cursor) @@ -171,18 +175,21 @@ def check_experiment_exists(name, error_on_inexistence=True): try: (conn, cursor) = open_conn() except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) conn.isolation_level = None # SQLite always return a unicode object, but we can change this # behaviour with the next sentence conn.text_factory = str - cursor.execute('select name from experiment where name=:name', {'name': name}) + cursor.execute( + 'select name from experiment where name=:name', {'name': name}) row = cursor.fetchone() close_conn(conn, cursor) if row is None: if error_on_inexistence: - raise AutosubmitCritical('The experiment name "{0}" does not exist yet!!!', 7005) + raise AutosubmitCritical( + 'The experiment name "{0}" does not exist yet!!!'.format(name), 7005) return False return True @@ -202,17 +209,20 @@ def get_autosubmit_version(expid): try: (conn, cursor) = open_conn() except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) conn.isolation_level = None # SQLite always return a unicode object, but we can change this # behaviour with the next sentence conn.text_factory = str - cursor.execute('SELECT autosubmit_version FROM experiment WHERE name=:expid', {'expid': expid}) + cursor.execute('SELECT autosubmit_version FROM experiment WHERE name=:expid', { + 'expid': expid}) row = cursor.fetchone() close_conn(conn, cursor) if row is None: - raise AutosubmitCritical('The experiment "{0}" does not exist'.format(expid),7005) + raise AutosubmitCritical( + 'The experiment "{0}" does not exist'.format(expid), 7005) return row[0] @@ -232,7 +242,8 @@ def last_name_used(test=False, operational=False): try: (conn, cursor) = open_conn() except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) conn.text_factory = str if test: cursor.execute('SELECT name ' @@ -281,7 +292,8 @@ def delete_experiment(experiment_id): try: (conn, cursor) = open_conn() except DbException as e: - raise AutosubmitCritical("Could not establish a connection to database",7001,e.message) + raise AutosubmitCritical( + "Could not establish a connection to database", 7001, e.message) return False cursor.execute('DELETE FROM experiment ' 'WHERE name=:name', {'name': experiment_id}) @@ -315,9 +327,11 @@ def _update_database(version, cursor): 'ALTER TABLE experiment ADD COLUMN autosubmit_version VARCHAR;' 'UPDATE experiment SET autosubmit_version = "3.0.0b" ' 'WHERE autosubmit_version NOT NULL;') - cursor.execute('UPDATE db_version SET version={0};'.format(CURRENT_DATABASE_VERSION)) + cursor.execute('UPDATE db_version SET version={0};'.format( + CURRENT_DATABASE_VERSION)) except sqlite3.Error as e: - raise AutosubmitCritical('unable to update database version', 7001,e.message) + raise AutosubmitCritical( + 'unable to update database version', 7001, e.message) Log.info("Update completed") return True diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 35775760b..aa1103b7b 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -37,6 +37,7 @@ from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_da from log.log import Log, AutosubmitCritical, AutosubmitError CURRENT_DB_VERSION = 14 # Used to be 10 +EXPERIMENT_HEADER_CHANGES_DB_VERSION = 14 # Defining RowType standard @@ -567,9 +568,15 @@ class ExperimentStatus(MainDataBase): return None +def check_if_database_exists(expid): + BasicConfig.read() + folder_path = BasicConfig.JOBDATA_DIR + database_path = os.path.join(folder_path, "job_data_" + str(expid) + ".db") + + class JobDataStructure(MainDataBase): - def __init__(self, expid): + def __init__(self, expid, check_only=False): """Initializes the object based on the unique identifier of the experiment. Args: @@ -647,29 +654,41 @@ class JobDataStructure(MainDataBase): self.create_index_query = textwrap.dedent(''' CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); ''') - # print(self.database_path) + + self.database_exists = True + self.db_version = 0 try: - if not os.path.exists(self.database_path): - open(self.database_path, "w") - self.conn = self.create_connection(self.database_path) - self.create_table(self.create_table_header_query) - self.create_table(self.create_table_query) - self.create_index() - if self._set_pragma_version(CURRENT_DB_VERSION): - Log.info("Database version set.") - else: - self.conn = self.create_connection(self.database_path) - db_version = self._select_pragma_version() - if db_version != CURRENT_DB_VERSION: - # Update to current version - Log.info("Database schema needs update.") - self.update_table_schema() - self.create_index() + if check_only == False: + if not os.path.exists(self.database_path): + open(self.database_path, "w") + self.conn = self.create_connection(self.database_path) self.create_table(self.create_table_header_query) + self.create_table(self.create_table_query) + self.create_index() if self._set_pragma_version(CURRENT_DB_VERSION): - Log.info("Database version set to {0}.".format( - CURRENT_DB_VERSION)) - self.current_run_id = self.get_current_run_id() + Log.info("Database version set.") + self.db_version = CURRENT_DB_VERSION + else: + self.conn = self.create_connection(self.database_path) + self.db_version = self._select_pragma_version() + if self.db_version != CURRENT_DB_VERSION: + # Update to current version + Log.info("Database schema needs update.") + self.update_table_schema() + self.create_index() + self.create_table(self.create_table_header_query) + if self._set_pragma_version(CURRENT_DB_VERSION): + Log.info("Database version set to {0}.".format( + CURRENT_DB_VERSION)) + self.db_version = CURRENT_DB_VERSION + self.current_run_id = self.get_current_run_id() + else: + if not os.path.exists(self.database_path): + self.database_exists = False + else: + self.conn = self.create_connection(self.database_path) + self.db_version = self._select_pragma_version() + except IOError as e: raise AutosubmitCritical("Historic Database route {0} is not accesible".format( BasicConfig.JOBDATA_DIR), 7067, e.message) @@ -677,6 +696,9 @@ class JobDataStructure(MainDataBase): raise AutosubmitCritical( "Historic Database {0} due an database error".format(), 7067, e.message) + def is_header_ready_db_version(self): + return True if self.db_version >= EXPERIMENT_HEADER_CHANGES_DB_VERSION else False + def determine_rowtype(self, code): """ Determines rowtype based on job information. @@ -1425,11 +1447,8 @@ class JobDataStructure(MainDataBase): def get_max_id_experiment_run(self): """Get Max experiment run object (last experiment run) - Raises: - Exception: [description] - - Returns: - [type]: [description] + :return: ExperimentRun object + :rtype: Object """ try: #expe = list() -- GitLab From 37daf1354c378a2104738d896bf5061ba8c42582 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 9 Oct 2020 14:58:58 +0200 Subject: [PATCH 2/3] Implementing #569. Added documentation. --- docs/source/usage/run.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/usage/run.rst b/docs/source/usage/run.rst index 47ec68a67..832edfc5c 100644 --- a/docs/source/usage/run.rst +++ b/docs/source/usage/run.rst @@ -17,8 +17,10 @@ Options: prevents doing the transitive reduction when plotting the workflow -v --update_version update the experiment version to match the actual autosubmit version - -s --start_time + -st --start_time sets the starting time for the experiment. Accepted format: 'yyyy-mm-dd HH:MM:SS' or 'HH:MM:SS' (defaults to current day) + -sa --start_after + sets a experiment expid that will be tracked for completion. When this experiment is completed, the current instance of Autosubmit run will start. -h, --help show this help message and exit Example: -- GitLab From 24371c06207893bf0f1f1679e434bc51c8742e42 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 9 Oct 2020 19:27:29 +0200 Subject: [PATCH 3/3] Documentation fix --- docs/source/usage/run.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/usage/run.rst b/docs/source/usage/run.rst index 832edfc5c..19b0af783 100644 --- a/docs/source/usage/run.rst +++ b/docs/source/usage/run.rst @@ -18,9 +18,9 @@ Options: -v --update_version update the experiment version to match the actual autosubmit version -st --start_time - sets the starting time for the experiment. Accepted format: 'yyyy-mm-dd HH:MM:SS' or 'HH:MM:SS' (defaults to current day) - -sa --start_after - sets a experiment expid that will be tracked for completion. When this experiment is completed, the current instance of Autosubmit run will start. + Sets the starting time for the experiment. Accepted format: 'yyyy-mm-dd HH:MM:SS' or 'HH:MM:SS' (defaults to current day). + -sa --start_after + Sets a experiment expid that will be tracked for completion. When this experiment is completed, the current instance of Autosubmit run will start. -h, --help show this help message and exit Example: -- GitLab