diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dcd94d11b51ce33a6428f421687eabf07ecbb753..dca614587847a600a84e6166066b12d0969c137c 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1743,27 +1743,6 @@ class Autosubmit: for job in job_list.get_job_list(): job.status = Status.WAITING - @staticmethod - def terminate(all_threads): - # Closing threads on Ctrl+C - Log.info( - "Looking for active threads before closing Autosubmit. Ending the program before these threads finish may result in unexpected behavior. This procedure will last until all threads have finished or the program has waited for more than 30 seconds.") - timeout = 0 - active_threads = True - while active_threads and timeout <= 60: - active_threads = False - for thread in all_threads: - if "JOB_" in thread.name: - if thread.is_alive(): - active_threads = True - Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format( - thread.name, 60 - timeout)) - break - if active_threads: - sleep(10) - timeout += 10 - - @staticmethod def manage_wrapper_job(as_conf, job_list, platform, wrapper_id, save=False): check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() @@ -2144,7 +2123,12 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Number of retrials: {0}", default_retrials) return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime - + + @staticmethod + def check_logs_status(job_list, as_conf, new_run): + for job in job_list.get_completed_failed_without_logs(): + job_list.update_log_status(job, as_conf, new_run) + @staticmethod def run_experiment(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None, profile=False): """ @@ -2203,6 +2187,7 @@ class Autosubmit: max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 + Autosubmit.check_logs_status(job_list, as_conf, new_run=True) while job_list.get_active(): for platform in platforms_to_test: # Send keep_alive signal platform.work_event.set() @@ -2211,7 +2196,7 @@ class Autosubmit: did_run = True try: if Autosubmit.exit: - Autosubmit.terminate(threading.enumerate()) + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) if job_list.get_failed(): return 1 return 0 @@ -2281,6 +2266,7 @@ class Autosubmit: "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") job_changes_tracker = {} if Autosubmit.exit: + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) job_list.save() as_conf.save() time.sleep(safetysleeptime) @@ -2368,8 +2354,6 @@ class Autosubmit: raise # If this happens, there is a bug in the code or an exception not-well caught Log.result("No more jobs to run.") # search hint - finished run - for job in job_list.get_completed_failed_without_logs(): - job_list.update_log_status(job, as_conf) job_list.save() if not did_run and len(job_list.get_completed_failed_without_logs()) > 0: # Revise if there is any log unrecovered from previous run Log.info(f"Connecting to the platforms, to recover missing logs") @@ -2382,11 +2366,9 @@ class Autosubmit: Log.info("Waiting for all logs to be updated") for p in platforms_to_test: if p.log_recovery_process: - p.cleanup_event.set() # Send cleanup event + p.cleanup_event.set() # Send cleanup event p.log_recovery_process.join() - Log.info(f"Log recovery process for {p.name} has finished") - for job in job_list.get_completed_failed_without_logs(): # update the info gathered from the childs - job_list.update_log_status(job, as_conf) + Autosubmit.check_logs_status(job_list, as_conf, new_run=False) job_list.save() if len(job_list.get_completed_failed_without_logs()) == 0: Log.result(f"Autosubmit recovered all job logs.") diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9b58970777c433092d2bc14d64b18917afaede54..2e3cb27ce616f1eb0ee956d28f597b1954cebf6d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -274,7 +274,6 @@ class Job(object): self._memory_per_task = '' self.log_retrieved = False self.start_time_timestamp = time.time() - self.end_time_placeholder = time.time() self.processors_per_node = "" self.stat_file = self.script_name[:-4] + "_STAT_0" diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index dadff822877c1b225e5ec4e3dbfd801fbcc41462..58f3fbf4759315dce4b67dfbb4fa60daae561f3d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2666,7 +2666,7 @@ class JobList(object): non_completed_parents_current.append(parent[0]) return non_completed_parents_current, completed_parents - def update_log_status(self, job, as_conf): + def update_log_status(self, job, as_conf, new_run=False): """ Updates the log err and log out. """ @@ -2681,7 +2681,7 @@ class JobList(object): if log_recovered: job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") # we only want the last one job.updated_log = True - elif not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": + elif new_run and not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": job.platform.add_job_to_log_recover(job) return log_recovered @@ -2726,9 +2726,6 @@ class JobList(object): if self.update_from_file(store_change): save = store_change Log.debug('Updating FAILED jobs') - write_log_status = False - for job in self.get_completed_failed_without_logs(): - save = self.update_log_status(job, as_conf) if not save else save if not first_time: for job in self.get_failed(): job.packed = False diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index bbc609e7a10b1acd80cc94fc3169aa9507330388..61dd23760bc8ed0ec346c9aa9855c2d7ebaf4ef5 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,7 @@ import atexit import multiprocessing import queue # only for the exception +from copy import copy from os import _exit import setproctitle import locale @@ -50,7 +51,7 @@ class UniqueQueue(Queue): unique_name = job.name+str(job.fail_count) # We gather retrial per retrial if unique_name not in self.all_items: self.all_items.add(unique_name) - super().put(job, block, timeout) + super().put(copy(job), block, timeout) # Without copy, the process seems to modify the job for other retrials.. My guess is that the object is not serialized until it is get from the queue. class Platform(object): @@ -850,7 +851,11 @@ class Platform(object): raise NotImplementedError def add_job_to_log_recover(self, job): - self.recovery_queue.put(job) + if job.id and int(job.id) != 0: + self.recovery_queue.put(job) + else: + Log.warning(f"Job {job.name} and retrial number:{job.fail_count} has no job id. Autosubmit will no record this retrial.") + job.updated_log = True def connect(self, as_conf, reconnect=False): raise NotImplementedError @@ -937,40 +942,46 @@ class Platform(object): Set[Any]: Updated set of jobs pending to process. """ job = None - try: - while not self.recovery_queue.empty(): + + while not self.recovery_queue.empty(): + try: + job = self.recovery_queue.get( + timeout=1) # Should be non-empty, but added a timeout for other possible errors. + job.children = set() # Children can't be serialized, so we set it to an empty set for this process. + job.platform = self # Change the original platform to this process platform. + job._log_recovery_retries = 0 # Reset the log recovery retries. try: - job = self.recovery_queue.get( - timeout=1) # Should be non-empty, but added a timeout for other possible errors. - job.children = set() # Children can't be serialized, so we set it to an empty set for this process. - job.platform = self # Change the original platform to this process platform. - job._log_recovery_retries = 0 # Reset the log recovery retries. job.retrieve_logfiles(self, raise_error=True) - if job.status == Status.FAILED: - Log.result(f"{identifier} Sucessfully recovered log files for job {job.name} and retrial:{job.fail_count}") - except queue.Empty: - pass - - # This second while is to keep retring the failed jobs. - # With the unique queue, the main process won't send the job again, so we have to store it here. - while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval - job = jobs_pending_to_process.pop() - job._log_recovery_retries += 1 - Log.debug( - f"{identifier} (Retrial number: {job._log_recovery_retries}) Recovering log files for job {job.name}") - job.retrieve_logfiles(self, raise_error=True) - Log.result(f"{identifier} (Retrial) Successfully recovered log files for job {job.name}") - except Exception as e: - Log.info(f"{identifier} Error while recovering logs: {str(e)}") - try: - if job and job._log_recovery_retries < 5: # If log retrieval failed, add it to the pending jobs to process. Avoids to keep trying the same job forever. + Log.result( + f"{identifier} Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.") + except: jobs_pending_to_process.add(job) - self.connected = False - Log.info(f"{identifier} Attempting to restore connection") - self.restore_connection(None) # Always restore the connection on a failure. - Log.result(f"{identifier} Sucessfully reconnected.") - except: + job._log_recovery_retries += 1 + Log.warning(f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry:'{job.fail_count}'.") + except queue.Empty: pass + + if len(jobs_pending_to_process) > 0: # Restore the connection if there was an issue with one or more jobs. + self.restore_connection(None) + + # This second while is to keep retring the failed jobs. + # With the unique queue, the main process won't send the job again, so we have to store it here. + while len(jobs_pending_to_process) > 0: # jobs that had any issue during the log retrieval + job = jobs_pending_to_process.pop() + job._log_recovery_retries += 1 + try: + job.retrieve_logfiles(self, raise_error=True) + job._log_recovery_retries += 1 + except: + if job._log_recovery_retries < 5: + jobs_pending_to_process.add(job) + Log.warning( + f"{identifier} (Retrial) Failed to recover log for job '{job.name}' and retry '{job.fail_count}'.") + Log.result( + f"{identifier} (Retrial) Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.") + if len(jobs_pending_to_process) > 0: + self.restore_connection(None) # Restore the connection if there was an issue with one or more jobs. + return jobs_pending_to_process def recover_platform_job_logs(self) -> None: diff --git a/test/unit/test_log_recovery.py b/test/unit/test_log_recovery.py index d45f1579148d97f8a0adff763eb99c2c287160ef..86c62984b322cad4f9e5455e05a2f54fd5aceb41 100644 --- a/test/unit/test_log_recovery.py +++ b/test/unit/test_log_recovery.py @@ -142,7 +142,7 @@ def test_log_recovery_recover_log(prepare_test, local, mocker, as_conf): print(prepare_test.strpath) mocker.patch('autosubmit.platforms.platform.max', return_value=0) local.keep_alive_timeout = 20 - mocker.patch('autosubmit.job.job.Job.write_stats') # Tested in test_database_regression.py + mocker.patch('autosubmit.job.job.Job.write_stats') # Tested in test_run_command_intregation.py local.spawn_log_retrieval_process(as_conf) local.work_event.set() job = Job('t000', '0000', Status.COMPLETED, 0) diff --git a/test/unit/test_database_regression.py b/test/unit/test_run_command_intregation.py similarity index 63% rename from test/unit/test_database_regression.py rename to test/unit/test_run_command_intregation.py index 57fc36a864c4672074592618f33d954638a79026..5d0f9ab34fc48cc22fca4bb2fbe70cca819f3c92 100644 --- a/test/unit/test_database_regression.py +++ b/test/unit/test_run_command_intregation.py @@ -1,13 +1,10 @@ import shutil - import pytest from pathlib import Path +from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmit.autosubmit import Autosubmit -from log.log import Log import os import pwd -from autosubmit.platforms.locplatform import LocalPlatform - from test.unit.utils.common import create_database, init_expid import sqlite3 @@ -16,13 +13,13 @@ def _get_script_files_path() -> Path: return Path(__file__).resolve().parent / 'files' -# Maybe this should be a regression test +# TODO expand the tests to test Slurm, PSPlatform, Ecplatform whenever possible @pytest.fixture -def db_tmpdir(tmpdir_factory): - folder = tmpdir_factory.mktemp(f'db_tests') +def run_tmpdir(tmpdir_factory): + folder = tmpdir_factory.mktemp('run_tests') os.mkdir(folder.join('scratch')) - os.mkdir(folder.join('db_tmp_dir')) + os.mkdir(folder.join('run_tmp_dir')) file_stat = os.stat(f"{folder.strpath}") file_owner_id = file_stat.st_uid file_owner = pwd.getpwuid(file_owner_id).pw_name @@ -63,14 +60,14 @@ path = {folder} @pytest.fixture -def prepare_db(db_tmpdir): +def prepare_run(run_tmpdir): # touch as_misc # remove files under t000/conf - conf_folder = Path(f"{db_tmpdir.strpath}/t000/conf") + conf_folder = Path(f"{run_tmpdir.strpath}/t000/conf") shutil.rmtree(conf_folder) os.makedirs(conf_folder) - platforms_path = Path(f"{db_tmpdir.strpath}/t000/conf/platforms.yml") - main_path = Path(f"{db_tmpdir.strpath}/t000/conf/main.yml") + platforms_path = Path(f"{run_tmpdir.strpath}/t000/conf/platforms.yml") + main_path = Path(f"{run_tmpdir.strpath}/t000/conf/AAAmain.yml") # Add each platform to test with platforms_path.open('w') as f: f.write(f""" @@ -80,7 +77,7 @@ PLATFORMS: """) with main_path.open('w') as f: - f.write(f""" + f.write(""" EXPERIMENT: # List of start dates DATELIST: '20000101' @@ -89,7 +86,7 @@ EXPERIMENT: # Unit of the chunk size. Can be hour, day, month, or year. CHUNKSIZEUNIT: month # Size of each chunk. - CHUNKSIZE: '4' + CHUNKSIZE: '2' # Number of chunks of the experiment. NUMCHUNKS: '3' CHUNKINI: '' @@ -100,10 +97,10 @@ CONFIG: # Current version of Autosubmit. AUTOSUBMIT_VERSION: "" # Total number of jobs in the workflow. - TOTALJOBS: 20 + TOTALJOBS: 3 # Maximum number of jobs permitted in the waiting status. - MAXWAITINGJOBS: 20 - SAFETYSLEEPTIME: 1 + MAXWAITINGJOBS: 3 + SAFETYSLEEPTIME: 0 DEFAULT: # Job experiment ID. EXPID: "t000" @@ -117,9 +114,9 @@ project: # Folder to hold the project sources. PROJECT_DESTINATION: local_project """) - expid_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000") - dummy_dir = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/dummy_dir") - real_data = Path(f"{db_tmpdir.strpath}/scratch/whatever/{db_tmpdir.owner}/t000/real_data") + expid_dir = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000") + dummy_dir = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000/dummy_dir") + real_data = Path(f"{run_tmpdir.strpath}/scratch/whatever/{run_tmpdir.owner}/t000/real_data") # We write some dummy data inside the scratch_dir os.makedirs(expid_dir, exist_ok=True) os.makedirs(dummy_dir, exist_ok=True) @@ -129,24 +126,120 @@ project: f.write('dummy data') # create some dummy absolute symlinks in expid_dir to test migrate function (real_data / 'dummy_symlink').symlink_to(dummy_dir / 'dummy_file') - return db_tmpdir + return run_tmpdir + + +def check_db_fields(run_tmpdir, expected_entries, final_status): + """ + Check that the database contains the expected number of entries, and that all fields contain data. After a completed run. + """ + # Test database exists. + job_data = Path(f"{run_tmpdir.strpath}/job_data_t000.db") + autosubmit_db = Path(f"{run_tmpdir.strpath}/tests.db") + assert job_data.exists() + assert autosubmit_db.exists() + + # Check job_data info + conn = sqlite3.connect(job_data) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute("SELECT * FROM job_data") + rows = c.fetchall() + assert len(rows) == expected_entries + # Convert rows to a list of dictionaries + rows_as_dicts = [dict(row) for row in rows] + # Tune the print so it is more readable, so it is easier to debug in case of failure + column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] + column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col + in column_names] + print(f"Experiment folder: {run_tmpdir.strpath}") + header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) + print(f"\n{header}") + print("-" * len(header)) + # Print the rows + for row_dict in rows_as_dicts: # always print, for debug proposes + print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) + for row_dict in rows_as_dicts: + # Check that all fields contain data, except extra_data, children, and platform_output + # Check that submit, start and finish are > 0 + assert row_dict["submit"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["start"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 + assert row_dict["status"] == final_status + for key in [key for key in row_dict.keys() if + key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: + assert str(row_dict[key]) != str("") + c.close() + conn.close() + + +def check_exit_code(final_status, exit_code): + """ + Check that the exit code is correct. + """ + if final_status == "FAILED": + assert exit_code > 0 + else: + assert exit_code == 0 + + +def check_files_recovered(run_tmpdir, log_dir): + """ + Check that all files are recovered after a run. + """ + # Check logs recovered and all stat files exists. + as_conf = AutosubmitConfig("t000") + as_conf.reload() + retrials = as_conf.experiment_data['JOBS']['JOB'].get('RETRIALS', 0) + for f in log_dir.glob('*'): + assert not any(str(f).endswith(f".{i}.err") or str(f).endswith(f".{i}.out") for i in range(retrials + 1)) + stat_files = [str(f).split("_")[-1] for f in log_dir.glob('*') if "STAT" in str(f)] + for i in range(retrials + 1): + assert str(i) in stat_files + +def init_run(run_tmpdir, jobs_data): + """ + Initialize the run, writing the jobs.yml file and creating the experiment. + """ + # write jobs_data + jobs_path = Path(f"{run_tmpdir.strpath}/t000/conf/jobs.yml") + log_dir = Path(f"{run_tmpdir.strpath}/t000/tmp/LOG_t000") + with jobs_path.open('w') as f: + f.write(jobs_data) + + # Create + init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True, test_type='test') + + # This is set in _init_log which is not called + as_misc = Path(f"{run_tmpdir.strpath}/t000/conf/as_misc.yml") + with as_misc.open('w') as f: + f.write(""" + AS_MISC: True + ASMISC: + COMMAND: run + AS_COMMAND: run + """) + return log_dir -@pytest.mark.parametrize("jobs_data, expected_count, final_status", [ + +@pytest.mark.parametrize("jobs_data, expected_db_entries, final_status", [ # Success (""" + EXPERIMENT: + NUMCHUNKS: '3' JOBS: job: SCRIPT: | echo "Hello World with id=Success" PLATFORM: local - DEPENDENCIES: job-1 RUNNING: chunk wallclock: 00:01 - retrials: 2 - """, 1, "COMPLETED"), + """, 3, "COMPLETED"), # Number of jobs # Success wrapper (""" + EXPERIMENT: + NUMCHUNKS: '40' JOBS: job: SCRIPT: | @@ -155,32 +248,28 @@ project: PLATFORM: local RUNNING: chunk wallclock: 00:01 - retrials: 2 wrappers: wrapper: JOBS_IN_WRAPPER: job TYPE: vertical - """, 1, "COMPLETED"), + """, 40, "COMPLETED"), # Number of jobs # Failure (""" JOBS: job: SCRIPT: | - echo "Hello World with id=FAILED" - exit 1 - DEPENDENCIES: job-1 + decho "Hello World with id=FAILED" PLATFORM: local RUNNING: chunk wallclock: 00:01 - retrials: 2 - """, 3, "FAILED"), + retrials: 2 # In local, it started to fail at 18 retrials. + """, (2+1)*3, "FAILED"), # Retries set (N + 1) * number of jobs to run # Failure wrappers (""" JOBS: job: SCRIPT: | - echo "Hello World with id=FAILED + wrappers" - exit 1 + decho "Hello World with id=FAILED + wrappers" PLATFORM: local DEPENDENCIES: job-1 RUNNING: chunk @@ -190,71 +279,13 @@ project: wrapper: JOBS_IN_WRAPPER: job TYPE: vertical - """, 3, "FAILED"), + """, (2+1)*1, "FAILED"), # Retries set (N + 1) * job chunk 1 ( the rest shouldn't run ) ], ids=["Success", "Success with wrapper", "Failure", "Failure with wrapper"]) -def test_db(db_tmpdir, prepare_db, jobs_data, expected_count, final_status, mocker): - # write jobs_data - jobs_path = Path(f"{db_tmpdir.strpath}/t000/conf/jobs.yml") - with jobs_path.open('w') as f: - f.write(jobs_data) - - # Create - init_expid(os.environ["AUTOSUBMIT_CONFIGURATION"], platform='local', expid='t000', create=True, test_type='test') - - # This is set in _init_log which is not called - as_misc = Path(f"{db_tmpdir.strpath}/t000/conf/as_misc.yml") - with as_misc.open('w') as f: - f.write(f""" - AS_MISC: True - ASMISC: - COMMAND: run - AS_COMMAND: run - """) - +def test_run_uninterrupted(run_tmpdir, prepare_run, jobs_data, expected_db_entries, final_status, mocker): + log_dir = init_run(run_tmpdir, jobs_data) # Run the experiment - with mocker.patch('autosubmit.platforms.platform.max', return_value=20): - Autosubmit.run_experiment(expid='t000') - - # Test database exists. - job_data = Path(f"{db_tmpdir.strpath}/job_data_t000.db") - autosubmit_db = Path(f"{db_tmpdir.strpath}/tests.db") - assert job_data.exists() - assert autosubmit_db.exists() - - # Check job_data info - conn = sqlite3.connect(job_data) - conn.row_factory = sqlite3.Row - c = conn.cursor() - c.execute("SELECT * FROM job_data") - rows = c.fetchall() - # Convert rows to a list of dictionaries - rows_as_dicts = [dict(row) for row in rows] - # Tune the print so it is more readable, so it is easier to debug in case of failure - column_names = rows_as_dicts[0].keys() if rows_as_dicts else [] - column_widths = [max(len(str(row[col])) for row in rows_as_dicts + [dict(zip(column_names, column_names))]) for col - in column_names] - print(f"Experiment folder: {db_tmpdir.strpath}") - header = " | ".join(f"{name:<{width}}" for name, width in zip(column_names, column_widths)) - print(f"\n{header}") - print("-" * len(header)) - # Print the rows - for row_dict in rows_as_dicts: # always print, for debug proposes - print(" | ".join(f"{str(row_dict[col]):<{width}}" for col, width in zip(column_names, column_widths))) - for row_dict in rows_as_dicts: - # Check that all fields contain data, except extra_data, children, and platform_output - # Check that submit, start and finish are > 0 - assert row_dict["submit"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["start"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["finish"] > 0 and row_dict["finish"] != 1970010101 - assert row_dict["status"] == final_status - for key in [key for key in row_dict.keys() if - key not in ["status", "finish", "submit", "start", "extra_data", "children", "platform_output"]]: - assert str(row_dict[key]) != str("") - # Check that the job_data table has the expected number of entries - c.execute("SELECT job_name, COUNT(*) as count FROM job_data GROUP BY job_name") - count_rows = c.fetchall() - for row in count_rows: - assert row["count"] == expected_count - # Close the cursor and connection - c.close() - conn.close() + exit_code = Autosubmit.run_experiment(expid='t000') + # TODO: pipeline is not returning 0 or 1 for check_exit_code(final_status, exit_code) + # TODO: Verify job statuses are correct. Consider calling Autosubmit.load_job_list. + check_db_fields(run_tmpdir, expected_db_entries, final_status) + check_files_recovered(run_tmpdir, log_dir)