From d68d5dd2fb72d678be3b5985e80fbea984387d07 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 13 Mar 2023 16:45:02 +0100 Subject: [PATCH 01/12] Refactored run_experiment into multiple functions before starting the main issue --- autosubmit/autosubmit.py | 691 ++++++++++++++++++--------------------- 1 file changed, 312 insertions(+), 379 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8122146d5..31ca13e71 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -20,7 +20,7 @@ import threading import traceback import requests import collections - +import platform from .job.job_packager import JobPackager from .job.job_exceptions import WrongTemplateException from .platforms.paramiko_submitter import ParamikoSubmitter @@ -1569,242 +1569,315 @@ class Autosubmit: return wrapper_job, save @staticmethod + def wrapper_notify(as_conf, expid, wrapper_job): + if as_conf.get_notifications() == "true": + for inner_job in wrapper_job.job_list: + Autosubmit.job_notify(as_conf, expid, inner_job, inner_job.prev_status,{}) + @staticmethod + def job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker): + job_changes_tracker[job.name] = (job_prev_status, job.status) + if as_conf.get_notifications() == "true": + if Status.VALUE_TO_KEY[job.status] in job.notify_on: + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, + Status.VALUE_TO_KEY[job_prev_status], + Status.VALUE_TO_KEY[job.status], + as_conf.experiment_data["MAIL"]["TO"]) + return job_changes_tracker + @staticmethod + def check_wrappers(as_conf, job_list, platforms_to_test, expid): + jobs_to_check = dict() + job_changes_tracker = dict() + save = False + for platform in platforms_to_test: + queuing_jobs = job_list.get_in_queue_grouped_id(platform) + Log.debug('Checking jobs for platform={0}'.format(platform.name)) + for job_id, job in queuing_jobs.items(): + # Check Wrappers one-by-one + if job_list.job_package_map and job_id in job_list.job_package_map: + wrapper_job, save = Autosubmit.manage_wrapper_job(as_conf, job_list, platform, + job_id) + # Notifications e-mail + Autosubmit.wrapper_notify(as_conf, expid, wrapper_job) + # Detect and store changes for the GUI + job_changes_tracker = {job.name: ( + job.prev_status, job.status) for job in wrapper_job.job_list if + job.prev_status != job.status} + else: # Adds to a list all running jobs to be checked. + if job.status == Status.FAILED: + continue + job_prev_status = job.status + # If exist key has been pressed and previous status was running, do not check + if not (Autosubmit.exit is True and job_prev_status == Status.RUNNING): + if platform.name in jobs_to_check: + jobs_to_check[platform.name].append([job, job_prev_status]) + else: + jobs_to_check[platform.name] = [[job, job_prev_status]] + return jobs_to_check,job_changes_tracker + @staticmethod + def check_wrapper_stored_status(as_conf,job_list): + + # if packages_dict attr is in job_list + if hasattr(job_list, "packages_dict"): + for package_name, jobs in job_list.packages_dict.items(): + from .job.job import WrapperJob + wrapper_status = Status.SUBMITTED + all_completed = True + running = False + queuing = False + failed = False + hold = False + submitted = False + if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: + running = True + for job in jobs: + if job.status == Status.QUEUING: + queuing = True + all_completed = False + elif job.status == Status.FAILED: + failed = True + all_completed = False + elif job.status == Status.HELD: + hold = True + all_completed = False + elif job.status == Status.SUBMITTED: + submitted = True + all_completed = False + if all_completed: + wrapper_status = Status.COMPLETED + elif hold: + wrapper_status = Status.HELD + else: + if running: + wrapper_status = Status.RUNNING + elif queuing: + wrapper_status = Status.QUEUING + elif submitted: + wrapper_status = Status.SUBMITTED + elif failed: + wrapper_status = Status.FAILED + else: + wrapper_status = Status.SUBMITTED + wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, + None, + None, jobs[0].platform, as_conf, jobs[0].hold) + job_list.job_package_map[jobs[0].id] = wrapper_job + return job_list + @staticmethod + def get_historical_database(expid, job_list, as_conf): + exp_history = None + try: + # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.initialize_database() + exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), + as_conf.get_chunk_size(), + current_config=as_conf.get_full_config_as_json()) + Autosubmit.database_backup(expid) + except Exception as e: + try: + Autosubmit.database_fix(expid) + # This error is important + except Exception as e: + pass + try: + ExperimentStatus(expid).set_as_running() + except Exception as e: + # Connection to status database ec_earth.db can fail. + # API worker will fix the status. + Log.debug( + "Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format( + str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) + return exp_history + @staticmethod + def process_historical_data_iteration(job_list,job_changes_tracker, exp_history, expid): + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + if len(job_changes_tracker) > 0: + exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) + Autosubmit.database_backup(expid) + return exp_history + @staticmethod + def prepare_run(expid, notransitive=False, update_version=False, start_time=None, start_after=None, + run_only_members=None,recover = False): + + host = platform.node() + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(running_time=True, force_load=True) + if not recover: + try: + # Handling starting time + AutosubmitHelper.handle_start_time(start_time) + # Start after completion trigger block + AutosubmitHelper.handle_start_after(start_after, expid, BasicConfig()) + # Handling run_only_members + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Failure during setting the start time check trace for details", 7014, str(e)) + os.system('clear') + signal.signal(signal.SIGINT, signal_handler) + safetysleeptime = as_conf.get_safetysleeptime() + retrials = as_conf.get_retrials() + Log.debug("The Experiment name is: {0}", expid) + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Default retrials: {0}", retrials) + pkl_dir = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + Log.debug( + "Starting from job list restored from {0} files", pkl_dir) + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if recover: + Log.info("Recovering job_list") + try: + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) + except IOError as e: + raise AutosubmitError( + "Job_list not found", 6016, str(e)) + except AutosubmitCritical as e: + raise AutosubmitCritical( + "Corrupted job_list, backup couldn't be restored", 7040, e.message) + except BaseException as e: + raise AutosubmitCritical( + "Corrupted job_list, backup couldn't be restored", 7040, str(e)) + Log.debug("Length of the jobs list: {0}", len(job_list)) + if recover: + Log.info("Recovering parameters info") + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + # check the job list script creation + Log.debug("Checking experiment templates...") + platforms_to_test = set() + hpcarch = as_conf.get_platform() + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == "": + job.platform_name = hpcarch + # noinspection PyTypeChecker + try: + job.platform = submitter.platforms[job.platform_name.upper()] + except Exception as e: + raise AutosubmitCritical( + "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), + 7014) + # noinspection PyTypeChecker + if job.status not in (Status.COMPLETED, Status.SUSPENDED): + platforms_to_test.add(job.platform) + try: + job_list.check_scripts(as_conf) + except Exception as e: + raise AutosubmitCritical( + "Error while checking job templates", 7014, str(e)) + Log.debug("Loading job packages") + try: + packages_persistence = JobPackagePersistence(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, str(e)) + if as_conf.experiment_data.get("WRAPPERS",None) is not None: + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid, "pkl", "job_packages_" + expid + ".db"), 0o644) + try: + packages = packages_persistence.load() + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, str(e)) + Log.debug("Processing job packages") + try: + for (exp_id, package_name, job_name) in packages: + if package_name not in job_list.packages_dict: + job_list.packages_dict[package_name] = [] + job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) + job_list = Autosubmit.check_wrapper_stored_status(job_list, as_conf) + except Exception as e: + raise AutosubmitCritical( + "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", + 7014, str(e)) + if recover: + Log.info("Recovering wrappers... Done") + + Log.debug("Checking job_list current status") + job_list.update_list(as_conf, first_time=True) + job_list.save() + if not recover: + Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) + # Before starting main loop, setup historical database tables and main information + + allowed_members = AutosubmitHelper.get_allowed_members(run_only_members, as_conf) + if allowed_members: + # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. + job_list.run_members = allowed_members + Log.result( + "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( + str(allowed_members))) + if not recover: + # Related to TWO_STEP_START new variable defined in expdef + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + job_list.parse_jobs_by_filter(unparsed_two_step_start) + Log.debug("Running job data structure") + exp_history = Autosubmit.get_historical_database(expid, as_conf,job_list) + # establish the connection to all platforms + Autosubmit.restore_platforms(platforms_to_test) + + return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence + else: + return job_list, submitter , as_conf , platforms_to_test, packages_persistence, True + @staticmethod + def get_iteration_info(as_conf,job_list): + total_jobs = len(job_list.get_job_list()) + Log.info("\n\n{0} of {1} jobs remaining ({2})".format( + total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) + if len(job_list.get_failed()) > 0: + Log.info("{0} jobs has been failed ({1})".format( + len(job_list.get_failed()), time.strftime("%H:%M"))) + safetysleeptime = as_conf.get_safetysleeptime() + default_retrials = as_conf.get_retrials() + check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Number of retrials: {0}", default_retrials) + return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime + @staticmethod def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, run_only_members=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). + :param expid: the experiment id + :param notransitive: if True, the transitive closure of the graph is not computed + :param update_version: if True, the version of the experiment is updated + :param start_time: the time at which the experiment should start + :param start_after: the expid after which the experiment should start + :param run_only_members: the members to run + :return: None - :param run_only_ºmembers: - :param start_after: - :param start_time: - :param update_version: - :param notransitive: - :type expid: str - :param expid: identifier of experiment to be run - :return: True if run to the end, False otherwise - :rtype: bool """ try: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - import platform - host = platform.node() except BaseException as e: raise AutosubmitCritical("Failure during the loading of the experiment configuration, check file paths", 7014, str(e)) - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.check_conf_files(running_time=True, force_load=True) - try: - # Handling starting time - AutosubmitHelper.handle_start_time(start_time) - - # Start after completion trigger block - AutosubmitHelper.handle_start_after(start_after, expid, BasicConfig()) - - # Handling run_only_members - allowed_members = AutosubmitHelper.get_allowed_members(run_only_members, as_conf) - except AutosubmitCritical as e: - raise - except BaseException as e: - raise AutosubmitCritical("Failure during setting the start time check trace for details", 7014, str(e)) # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: - Log.info( - "Preparing .lock file to avoid multiple instances with same experiment id") - os.system('clear') - signal.signal(signal.SIGINT, signal_handler) - - hpcarch = as_conf.get_platform() - safetysleeptime = as_conf.get_safetysleeptime() - retrials = as_conf.get_retrials() - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - Log.debug("The Experiment name is: {0}", expid) - Log.debug("Sleep: {0}", safetysleeptime) - Log.debug("Default retrials: {0}", retrials) - Log.info("Starting job submission...") - pkl_dir = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - try: - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - except IOError as e: - raise AutosubmitError( - "Job_list not found", 6016, str(e)) - except AutosubmitCritical as e: - raise AutosubmitCritical( - "Corrupted job_list, backup couldn't be restored", 7040, e.message) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_list, backup couldn't be restored", 7040, str(e)) - - Log.debug( - "Starting from job list restored from {0} files", pkl_dir) - Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - # check the job list script creation - Log.debug("Checking experiment templates...") - platforms_to_test = set() - for job in job_list.get_job_list(): - if job.platform_name is None or job.platform_name == "": - job.platform_name = hpcarch - # noinspection PyTypeChecker - try: - job.platform = submitter.platforms[job.platform_name.upper()] - except Exception as e: - raise AutosubmitCritical( - "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), - 7014) - # noinspection PyTypeChecker - if job.status not in (Status.COMPLETED, Status.SUSPENDED): - platforms_to_test.add(job.platform) - try: - job_list.check_scripts(as_conf) - except Exception as e: - raise AutosubmitCritical( - "Error while checking job templates", 7014, str(e)) - Log.debug("Loading job packages") - try: - packages_persistence = JobPackagePersistence(os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - except IOError as e: - raise AutosubmitError( - "job_packages not found", 6016, str(e)) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", - 7040, str(e)) - if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0o644) - try: - packages = packages_persistence.load() - except IOError as e: - raise AutosubmitError( - "job_packages not found", 6016, str(e)) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages(will work on autosubmit4)", - 7040, str(e)) - Log.debug("Processing job packages") - - try: - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): - from .job.job import WrapperJob - wrapper_status = Status.SUBMITTED - all_completed = True - running = False - queuing = False - failed = False - hold = False - submitted = False - if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: - running = True - for job in jobs: - if job.status == Status.QUEUING: - queuing = True - all_completed = False - elif job.status == Status.FAILED: - failed = True - all_completed = False - elif job.status == Status.HELD: - hold = True - all_completed = False - elif job.status == Status.SUBMITTED: - submitted = True - all_completed = False - if all_completed: - wrapper_status = Status.COMPLETED - elif hold: - wrapper_status = Status.HELD - else: - if running: - wrapper_status = Status.RUNNING - elif queuing: - wrapper_status = Status.QUEUING - elif submitted: - wrapper_status = Status.SUBMITTED - elif failed: - wrapper_status = Status.FAILED - else: - wrapper_status = Status.SUBMITTED - wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, - None, - None, jobs[0].platform, as_conf, jobs[0].hold) - job_list.job_package_map[jobs[0].id] = wrapper_job - except Exception as e: - raise AutosubmitCritical( - "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", - 7014, str(e)) - - Log.debug("Checking job_list current status") - job_list.update_list(as_conf, first_time=True) - job_list.save() - - Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) - # Before starting main loop, setup historical database tables and main information - Log.debug("Running job data structure") - try: - # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.initialize_database() - exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), - as_conf.get_chunk_size(), - current_config=as_conf.get_full_config_as_json()) - Autosubmit.database_backup(expid) - except Exception as e: - try: - Autosubmit.database_fix(expid) - # This error is important - except Exception as e: - pass - try: - ExperimentStatus(expid).set_as_running() - except Exception as e: - # Connection to status database ec_earth.db can fail. - # API worker will fix the status. - Log.debug( - "Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format( - str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) - if allowed_members: - # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. - job_list.run_members = allowed_members - Log.result( - "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( - str(allowed_members))) + Log.debug("Preparing run") + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence = Autosubmit.prepare_run(expid, notransitive, update_version, start_time, start_after, run_only_members) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) except Exception as e: - raise AutosubmitCritical( - "Error in run initialization", 7014, str(e)) # Changing default to 7014 - # Related to TWO_STEP_START new variable defined in expdef - unparsed_two_step_start = as_conf.get_parse_two_step_start() - if unparsed_two_step_start != "": - job_list.parse_jobs_by_filter(unparsed_two_step_start) - - main_loop_retrials = 3650 # Hard limit of tries 3650 tries at 15-120seconds sleep each try - # establish the connection to all platforms - - Autosubmit.restore_platforms(platforms_to_test) - save = True - # @main - Log.debug("Running main loop") + raise AutosubmitCritical("Error in run initialization", 7014, str(e)) # Changing default to 7014 + Log.debug("Running main running loop") ######################### # AUTOSUBMIT - MAIN LOOP ######################### - # Main loop. Finishing when all jobs have been submitted - + # Main loop + main_loop_retrials = 3650 # Hard limit of tries 3650 tries at 15-120seconds sleep each try while job_list.get_active(): # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) try: @@ -1818,88 +1891,34 @@ class Autosubmit: # Log.info("FD 2: {0}".format(log.fd_show.fd_table_status_str())) except BaseException as e: raise AutosubmitError("Config files seems to not be accessible", 6040, str(e)) - total_jobs = len(job_list.get_job_list()) - Log.info("\n\n{0} of {1} jobs remaining ({2})".format( - total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) - if len(job_list.get_failed()) > 0: - Log.info("{0} jobs has been failed ({1})".format( - len(job_list.get_failed()), time.strftime("%H:%M"))) - safetysleeptime = as_conf.get_safetysleeptime() - default_retrials = as_conf.get_retrials() - check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() - Log.debug("Sleep: {0}", safetysleeptime) - Log.debug("Number of retrials: {0}", default_retrials) + total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime = Autosubmit.get_iteration_info(as_conf,job_list) - if save: # previous iteration - job_list.backup_save() save = False slurm = [] job_changes_tracker = {} # to easily keep track of changes per iteration - Log.debug("End of checking") # End Check Current jobs # jobs_to_check are no wrapped jobs. jobs_to_check = dict() + if save: # previous iteration + job_list.backup_save() # Check Wrappers and add non-wrapped jobs to be checked later. + jobs_to_check,job_changes_tracker = Autosubmit.check_wrappers(as_conf, job_list, platforms_to_test, expid) + # Jobs to check are grouped by platform. for platform in platforms_to_test: - queuing_jobs = job_list.get_in_queue_grouped_id(platform) - Log.debug('Checking jobs for platform={0}'.format(platform.name)) - for job_id, job in queuing_jobs.items(): - # Check Wrappers one-by-one - # TODO CHECK this before continue - if job_list.job_package_map and job_id in job_list.job_package_map: - wrapper_job, save = Autosubmit.manage_wrapper_job(as_conf, job_list, platform, - job_id) - - # Notifications e-mail - if as_conf.get_notifications() == "true": - for inner_job in wrapper_job.job_list: - if inner_job.prev_status != inner_job.status: - if Status.VALUE_TO_KEY[inner_job.status] in inner_job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, - inner_job.name, - Status.VALUE_TO_KEY[ - inner_job.prev_status], - Status.VALUE_TO_KEY[inner_job.status], - as_conf.experiment_data["MAIL"]["TO"]) - # Detect and store changes for the GUI - job_changes_tracker = {job.name: ( - job.prev_status, job.status) for job in wrapper_job.job_list if - job.prev_status != job.status} - else: # Adds to a list all running jobs to be checked. - if job.status == Status.FAILED: - continue - job_prev_status = job.status - # If exist key has been pressed and previous status was running, do not check - if not (Autosubmit.exit is True and job_prev_status == Status.RUNNING): - if platform.name in jobs_to_check: - jobs_to_check[platform.name].append([job, job_prev_status]) - else: - jobs_to_check[platform.name] = [[job, job_prev_status]] - for platform in platforms_to_test: + Log.info("Obtaining the list of jobs to check for platform {0}".format(platform.name)) platform_jobs = jobs_to_check.get(platform.name, []) - - # not all platforms are doing this check simultaneously if len(platform_jobs) == 0: continue + # Check all running/queuing jobs of current platform + Log.info("Checking {0} jobs for platform {1}".format(len(platform_jobs), platform.name)) platform.check_Alljobs(platform_jobs, as_conf) - # mail notification + # mail notification ( in case of changes) for job, job_prev_status in jobs_to_check[platform.name]: - if job_prev_status != job.update_status(as_conf): - # Keeping track of changes - job_changes_tracker[job.name] = (job_prev_status, job.status) - if as_conf.get_notifications() == "true": - if Status.VALUE_TO_KEY[job.status] in job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, - Status.VALUE_TO_KEY[job_prev_status], - Status.VALUE_TO_KEY[job.status], - as_conf.experiment_data["MAIL"]["TO"]) - save = True - save2 = job_list.update_list( - as_conf, submitter=submitter) + Autosubmit.job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker) + job_list.update_list(as_conf, submitter=submitter) job_list.save() if len(job_list.get_ready()) > 0: - save = Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=False) + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) job_list.save() @@ -1910,21 +1929,15 @@ class Autosubmit: job_list.save() # Safe spot to store changes try: - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - if len(job_changes_tracker) > 0: - exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) - Autosubmit.database_backup(expid) + exp_history = Autosubmit.process_historical_data_iteration(job_list, job_changes_tracker, exp_history, expid) except BaseException as e: Log.printlog("Historic database seems corrupted, AS will repair it and resume the run", Log.INFO) try: Autosubmit.database_fix(expid) - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - if len(job_changes_tracker) > 0: - exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) - Autosubmit.database_backup(expid) + exp_history = Autosubmit.process_historical_data_iteration(job_list, + job_changes_tracker, + exp_history, expid) except Exception as e: Log.warning( "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") @@ -1951,101 +1964,20 @@ class Autosubmit: Log.printlog("Error trying to store failed job count", Log.WARNING) Log.result("Storing failed job count...done") while not recovery and main_loop_retrials > 0: + delay = min(15 * consecutive_retrials, 120) main_loop_retrials = main_loop_retrials - 1 sleep(delay) consecutive_retrials = consecutive_retrials + 1 Log.info("Waiting {0} seconds before continue".format(delay)) try: - as_conf.reload(force_load=True) - Log.info("Recovering job_list...") - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - Log.info("Recovering job_list... Done") - if allowed_members: - # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. - job_list.run_members = allowed_members - Log.result( - "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( - str(allowed_members))) - platforms_to_test = set() - Log.info("Recovering platform information...") - for job in job_list.get_job_list(): - if job.platform_name is None or job.platform_name == "": - job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name] - platforms_to_test.add(job.platform) - - Log.info("Recovering platform information... Done") - Log.info("Recovering Failure count...") - for job in job_list.get_job_list(): - if job.name in list(failed_names.keys()): - job.fail_count = failed_names[job.name] - Log.info("Recovering Failure count... Done") - - Log.info("Recovering parameters...") - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # Recovery wrapper [Packages] - Log.info("Recovering Wrappers...") - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - # Recovery wrappers [Wrapper status] - for package_name, jobs in job_list.packages_dict.items(): - from .job.job import WrapperJob - wrapper_status = Status.SUBMITTED - all_completed = True - running = False - queuing = False - failed = False - hold = False - submitted = False - if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: - running = True - for job in jobs: - if job.status == Status.QUEUING: - queuing = True - all_completed = False - elif job.status == Status.FAILED: - failed = True - all_completed = False - elif job.status == Status.HELD: - hold = True - all_completed = False - elif job.status == Status.SUBMITTED: - submitted = True - all_completed = False - if all_completed: - wrapper_status = Status.COMPLETED - elif hold: - wrapper_status = Status.HELD - else: - if running: - wrapper_status = Status.RUNNING - elif queuing: - wrapper_status = Status.QUEUING - elif submitted: - wrapper_status = Status.SUBMITTED - elif failed: - wrapper_status = Status.FAILED - else: - wrapper_status = Status.SUBMITTED - wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, - None, - None, jobs[0].platform, as_conf, jobs[0].hold) - job_list.job_package_map[jobs[0].id] = wrapper_job - Log.info("Recovering wrappers... Done") - job_list.update_list(as_conf) - Log.info("Saving recovered job list...") - job_list.save() - Log.info("Saving recovered job list... Done") - recovery = True - Log.result("Recover of job_list is completed") + job_list, submitter, as_conf, platforms_to_test, packages_persistence, recovery = Autosubmit.prepare_run(expid, + notransitive, + update_version, + start_time, + start_after, + run_only_members, + recover=True) except AutosubmitError as e: recovery = False Log.result("Recover of job_list has fail {0}".format(e.message)) @@ -2057,7 +1989,6 @@ class Autosubmit: Log.result("Recover of job_list has fail {0}".format(str(e))) # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. reconnected = False - mail_notify = True times = 0 max_times = 10 Log.info("Restoring the connection to all experiment platforms") @@ -2100,6 +2031,8 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise AutosubmitCritical("There is a bug in the code, please contact via git", 7070, str(e)) + + Log.result("No more jobs to run.") # Updating job data header with current information when experiment ends try: -- GitLab From 7bf87f76deade858e3fcee9a8771d31cd9320c96 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 14 Mar 2023 11:03:58 +0100 Subject: [PATCH 02/12] refactored run_experiment ( split giga function into smaller ones ) --- autosubmit/autosubmit.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 31ca13e71..61aa1d2b8 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1914,7 +1914,8 @@ class Autosubmit: platform.check_Alljobs(platform_jobs, as_conf) # mail notification ( in case of changes) for job, job_prev_status in jobs_to_check[platform.name]: - Autosubmit.job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker) + if job_prev_status != job.update_status(as_conf): + Autosubmit.job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker) job_list.update_list(as_conf, submitter=submitter) job_list.save() if len(job_list.get_ready()) > 0: -- GitLab From 25fe7a0ae6349938d4c20906e4f20c9c7ce7b9d7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 14 Mar 2023 11:16:51 +0100 Subject: [PATCH 03/12] New parameter: Autosubmit.RECOVERY_RETRIALS. Default value 3650 ( 72h-122h) Added 0 or -1 will disable the hard_limit and autosubmit will try to recover itself forever ( we may need it in the future) --- autosubmit/autosubmit.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 61aa1d2b8..73e3ec6da 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1877,7 +1877,8 @@ class Autosubmit: # AUTOSUBMIT - MAIN LOOP ######################### # Main loop - main_loop_retrials = 3650 # Hard limit of tries 3650 tries at 15-120seconds sleep each try + max_recovery_retrials = as_conf.experiment_data.get("AUTOSUBMIT",{}).get("RECOVERY_RETRIALS",3650) # Hard limit of tries 3650 tries at 15-120seconds sleep each try + recovery_retrials = 0 while job_list.get_active(): # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) try: @@ -1964,10 +1965,10 @@ class Autosubmit: except BaseException as e: Log.printlog("Error trying to store failed job count", Log.WARNING) Log.result("Storing failed job count...done") - while not recovery and main_loop_retrials > 0: + while not recovery and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ): delay = min(15 * consecutive_retrials, 120) - main_loop_retrials = main_loop_retrials - 1 + recovery_retrials += 1 sleep(delay) consecutive_retrials = consecutive_retrials + 1 Log.info("Waiting {0} seconds before continue".format(delay)) @@ -1995,8 +1996,8 @@ class Autosubmit: Log.info("Restoring the connection to all experiment platforms") consecutive_retrials = 1 delay = min(15 * consecutive_retrials, 120) - while not reconnected and main_loop_retrials > 0: - main_loop_retrials = main_loop_retrials - 1 + while not reconnected and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ) : + recovery_retrials += 1 Log.info("Recovering the remote platform connection") Log.info("Waiting {0} seconds before continue".format(delay)) sleep(delay) @@ -2021,9 +2022,8 @@ class Autosubmit: reconnected = False except BaseException: reconnected = False - if main_loop_retrials <= 0: - raise AutosubmitCritical( - "Autosubmit Encounter too much errors during running time, limit of 4hours reached", + if recovery_retrials == max_recovery_retrials and max_recovery_retrials > 0: + raise AutosubmitCritical(f"Autosubmit Encounter too much errors during running time, limit of {max_recovery_retrials*120} reached", 7051, e.message) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error raise AutosubmitCritical(e.message, e.code, e.trace) -- GitLab From 930a5f4e66e7c03ee6169719c6e5e3ad3f904ea3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 14 Mar 2023 11:17:53 +0100 Subject: [PATCH 04/12] * change AUTOSUBMIT.RECOVERY_RETRIALS -> CONFIG.RECOVERY_RETRIALS --- autosubmit/autosubmit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 73e3ec6da..6caeece6f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1877,7 +1877,7 @@ class Autosubmit: # AUTOSUBMIT - MAIN LOOP ######################### # Main loop - max_recovery_retrials = as_conf.experiment_data.get("AUTOSUBMIT",{}).get("RECOVERY_RETRIALS",3650) # Hard limit of tries 3650 tries at 15-120seconds sleep each try + max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # Hard limit of tries 3650 tries at 15-120seconds sleep each try recovery_retrials = 0 while job_list.get_active(): # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) -- GitLab From 1dfc22619999bad17c990814a57cc3f9ae04fe25 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 14 Mar 2023 15:14:46 +0100 Subject: [PATCH 05/12] Added job_start_info #758 --- autosubmit/autosubmit.py | 28 ++++++++++------------ autosubmit/history/experiment_history.py | 5 +--- autosubmit/platforms/paramiko_platform.py | 5 +++- autosubmit/platforms/slurmplatform.py | 29 ++++++++++++++++++----- 4 files changed, 41 insertions(+), 26 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6caeece6f..781ccf68a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1819,7 +1819,7 @@ class Autosubmit: if unparsed_two_step_start != "": job_list.parse_jobs_by_filter(unparsed_two_step_start) Log.debug("Running job data structure") - exp_history = Autosubmit.get_historical_database(expid, as_conf,job_list) + exp_history = Autosubmit.get_historical_database(expid, job_list,as_conf) # establish the connection to all platforms Autosubmit.restore_platforms(platforms_to_test) @@ -1877,7 +1877,8 @@ class Autosubmit: # AUTOSUBMIT - MAIN LOOP ######################### # Main loop - max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # Hard limit of tries 3650 tries at 15-120seconds sleep each try + # Recovery retrials, when platforms have issues. Hard limit is set just in case is an Autosubmit bug or bad config and the minium duration is the weekend (72 h). + max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 while job_list.get_active(): # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) @@ -1895,8 +1896,6 @@ class Autosubmit: total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime = Autosubmit.get_iteration_info(as_conf,job_list) save = False - slurm = [] - job_changes_tracker = {} # to easily keep track of changes per iteration # End Check Current jobs # jobs_to_check are no wrapped jobs. jobs_to_check = dict() @@ -1906,12 +1905,13 @@ class Autosubmit: jobs_to_check,job_changes_tracker = Autosubmit.check_wrappers(as_conf, job_list, platforms_to_test, expid) # Jobs to check are grouped by platform. for platform in platforms_to_test: - Log.info("Obtaining the list of jobs to check for platform {0}".format(platform.name)) platform_jobs = jobs_to_check.get(platform.name, []) if len(platform_jobs) == 0: + Log.info("No jobs to check for platform {0}".format(platform.name)) continue - # Check all running/queuing jobs of current platform + Log.info("Checking {0} jobs for platform {1}".format(len(platform_jobs), platform.name)) + # Check all running/queuing jobs of current platform platform.check_Alljobs(platform_jobs, as_conf) # mail notification ( in case of changes) for job, job_prev_status in jobs_to_check[platform.name]: @@ -4057,16 +4057,14 @@ class Autosubmit: @staticmethod def database_backup(expid): try: - database_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid)) + database_path= os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid)) backup_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.sql".format(expid)) - tmp_backup_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}_tmp.sql".format(expid)) - command = "sqlite3 {0} .dump > {1} ".format(database_path, tmp_backup_path) + command = "sqlite3 {0} .dump > {1} ".format(database_path, backup_path) Log.debug("Backing up jobs_data...") out = subprocess.call(command, shell=True) Log.debug("Jobs_data database backup completed.") except BaseException as e: Log.debug("Jobs_data database backup failed.") - @staticmethod def database_fix(expid): """ @@ -4077,7 +4075,7 @@ class Autosubmit: :return: :rtype: """ - os.umask(0) # Overrides user permissions + os.umask(0) # Overrides user permissions current_time = int(time.time()) corrupted_db_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}_corrupted.db".format(expid)) @@ -4087,15 +4085,15 @@ class Autosubmit: dump_file_path = os.path.join(BasicConfig.JOBDATA_DIR, dump_file_name) bash_command = 'cat {1} | sqlite3 {0}'.format(database_path, dump_file_path) try: - if os.path.exists(database_path): - result = os.popen("mv {0} {1}".format(database_path, corrupted_db_path)).read() + if os.path.exists(database_path): + os.popen("mv {0} {1}".format(database_path, corrupted_db_path)).read() time.sleep(1) Log.info("Original database moved.") try: exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) Log.info("Restoring from sql") - result = os.popen(bash_command).read() + os.popen(bash_command).read() exp_history.initialize_database() except: @@ -4106,7 +4104,7 @@ class Autosubmit: historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.initialize_database() except Exception as exp: - Log.warning(str(exp)) + Log.critical(str(exp)) @staticmethod def archive(expid, noclean=True, uncompress=True): diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index b92c54d15..7f6a49648 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -193,12 +193,9 @@ class ExperimentHistory: try: try: current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() - except: - current_experiment_run_dc = 0 - try: update_these_changes = self._get_built_list_of_changes(job_list) except: - # May be a new experiment run, so we don't have any changes to update ( could happen due job_data issues from 3.14.0) + current_experiment_run_dc = 0 update_these_changes = [] #("no runs") should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size,create) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index e8327b9ad..5c562d7b6 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -685,7 +685,6 @@ class ParamikoPlatform(Platform): Log.error( 'check_job() The job id ({0}) status is {1}.', job.id, job_status) job.new_status = job_status - reason = str() if self.type == 'slurm' and len(in_queue_jobs) > 0: cmd = self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) @@ -715,6 +714,10 @@ class ParamikoPlatform(Platform): job.new_status = Status.WAITING job.platform.send_command( job.platform.cancel_cmd + " {0}".format(job.id)) + else: + self.send_command(self.get_estimated_queue_time_cmd(job.id)) + estimated_time = self.parse_estimated_time(self._ssh_output) + Log.info(f"{job.name} will be elegible to run the day {estimated_time.get('date', 'Unknown')} at {estimated_time.get('time', 'Unknown')} due: {reason[1:-1]}") else: for job in job_list: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 53d0d115d..96e71de6a 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,6 +18,7 @@ # along with Autosubmit. If not, see . import locale import os +from contextlib import suppress from time import sleep from time import mktime from time import time @@ -308,13 +309,16 @@ class SlurmPlatform(ParamikoPlatform): self.send_command(cmd) queue_status = self._ssh_output - reason = str() reason = self.parse_queue_reason(queue_status, job.id) - if reason == '(JobHeldUser)': - return True - else: + self.send_command(self.get_estimated_queue_time_cmd(job.id)) + estimated_time = self.parse_estimated_time(self._ssh_output) + if reason == '(JobHeldAdmin)': # Job is held by the system self.send_command("scancel {0}".format(job.id)) return False + else: + Log.info( + f"The {job.name} will be elegible to run the day {estimated_time.get('date', 'Unknown')} at {estimated_time.get('time', 'Unknown')}\nQueuing reason is: {reason}") + return True except BaseException as e: try: self.send_command("scancel {0}".format(job.id)) @@ -549,6 +553,8 @@ class SlurmPlatform(ParamikoPlatform): def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X --jobs {1} -o jobid,State".format(self.host, jobs_id) + def get_estimated_queue_time_cmd(self, job_id): + return f"scontrol -o show JobId {job_id} | grep -Po '(?<=EligibleTime=)[0-9-:T]*'" def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) @@ -566,10 +572,21 @@ class SlurmPlatform(ParamikoPlatform): def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] - if len(reason) > 0: - return reason[0] + if type(reason) is list: + # convert reason to str + return ''.join(reason) return reason + def parse_estimated_time(self, output): + parsed_output = {} + parsed_output["date"] = "Unknown" + parsed_output["time"] = "Unknown" + with suppress(Exception): + output = output.split("T") + parsed_output["date"] = output[0] + parsed_output["time"] = output[1] + return parsed_output + @staticmethod def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads", partition=""): if method == 'srun': -- GitLab From b5c3a7cb90d6132210f32e2131a5e74f6c75d5e5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 14:01:50 +0100 Subject: [PATCH 06/12] added to paramiko_platform --- autosubmit/platforms/paramiko_platform.py | 20 ++++++++++++++++++++ autosubmit/platforms/slurmplatform.py | 13 +++++++++++++ 2 files changed, 33 insertions(+) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 5c562d7b6..2579480eb 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -488,6 +488,26 @@ class ParamikoPlatform(Platform): """ raise NotImplementedError + def get_estimated_queue_time_cmd(self, job_id): + """ + Returns command to get estimated queue time on remote platforms + + :param job_id: id of job to check + :param job_id: int + :return: command to get estimated queue time + :rtype: str + """ + raise NotImplementedError + def parse_estimated_time(self, output): + """ + Parses estimated queue time from output of get_estimated_queue_time_cmd + + :param output: output of get_estimated_queue_time_cmd + :type output: str + :return: estimated queue time + :rtype: + """ + raise NotImplementedError def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 96e71de6a..aa7df00f7 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -570,6 +570,13 @@ class SlurmPlatform(ParamikoPlatform): return 'sacct -n --jobs {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) def parse_queue_reason(self, output, job_id): + """ + Parses the queue reason from the output of the command + :param output: output of the command + :param job_id: job id + :return: queue reason + :rtype: str + """ reason = [x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] if type(reason) is list: @@ -578,6 +585,12 @@ class SlurmPlatform(ParamikoPlatform): return reason def parse_estimated_time(self, output): + """ + Parses the estimated time from the output of the command + :param output: output of the command + :return: estimated time date and time + :rtype: dict + """ parsed_output = {} parsed_output["date"] = "Unknown" parsed_output["time"] = "Unknown" -- GitLab From 4e61355c53d1aa4bb32519749d486a6c16bec4e8 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 14:02:16 +0100 Subject: [PATCH 07/12] added to paramiko_platform --- autosubmit/platforms/paramiko_platform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 2579480eb..d580d024d 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -493,9 +493,8 @@ class ParamikoPlatform(Platform): Returns command to get estimated queue time on remote platforms :param job_id: id of job to check - :param job_id: int + :param job_id: str :return: command to get estimated queue time - :rtype: str """ raise NotImplementedError def parse_estimated_time(self, output): -- GitLab From 6bec81a72228b19af6310af97b11a10d35c20f2b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 14:06:43 +0100 Subject: [PATCH 08/12] isinstance --- autosubmit/platforms/slurmplatform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index aa7df00f7..8968c9ed6 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -579,7 +579,7 @@ class SlurmPlatform(ParamikoPlatform): """ reason = [x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] - if type(reason) is list: + if isinstance(reason,list): # convert reason to str return ''.join(reason) return reason -- GitLab From f7a9aab602ef78a4ae60206ff6bad080a3b09f9c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 14:12:45 +0100 Subject: [PATCH 09/12] update_version --- autosubmit/autosubmit.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 781ccf68a..8d0595b2d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1698,7 +1698,7 @@ class Autosubmit: Autosubmit.database_backup(expid) return exp_history @staticmethod - def prepare_run(expid, notransitive=False, update_version=False, start_time=None, start_after=None, + def prepare_run(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None,recover = False): host = platform.node() @@ -1866,7 +1866,7 @@ class Autosubmit: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: Log.debug("Preparing run") - job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence = Autosubmit.prepare_run(expid, notransitive, update_version, start_time, start_after, run_only_members) + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) @@ -1975,7 +1975,6 @@ class Autosubmit: try: job_list, submitter, as_conf, platforms_to_test, packages_persistence, recovery = Autosubmit.prepare_run(expid, notransitive, - update_version, start_time, start_after, run_only_members, -- GitLab From 2d9efe17cbf1f828e34403a29334819900dce151 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 14:15:49 +0100 Subject: [PATCH 10/12] update_version removed from run_experiment as it is handled in init_logs --- autosubmit/autosubmit.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8d0595b2d..b7c7915b5 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -636,8 +636,7 @@ class Autosubmit: Autosubmit._init_logs(args, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, - args.start_after, args.run_only_members) + return Autosubmit.run_experiment(args.expid, args.notransitive,args.start_time,args.start_after, args.run_only_members) elif args.command == 'expid': return Autosubmit.expid(args.description,args.HPC,args.copy, args.dummy,args.minimal_configuration,args.git_repo,args.git_branch,args.git_as_conf,args.operational,args.use_local_minimal) != '' elif args.command == 'delete': @@ -1841,13 +1840,11 @@ class Autosubmit: Log.debug("Number of retrials: {0}", default_retrials) return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime @staticmethod - def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, - run_only_members=None): + def run_experiment(expid, notransitive=False, start_time=None, start_after=None,run_only_members=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). :param expid: the experiment id :param notransitive: if True, the transitive closure of the graph is not computed - :param update_version: if True, the version of the experiment is updated :param start_time: the time at which the experiment should start :param start_after: the expid after which the experiment should start :param run_only_members: the members to run -- GitLab From 3ef5867a1224ae6a7d0e6ccf5ab4186917b0880a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 21 Mar 2023 16:21:19 +0100 Subject: [PATCH 11/12] normalized function return --- autosubmit/autosubmit.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index b7c7915b5..bee56eb63 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1822,9 +1822,9 @@ class Autosubmit: # establish the connection to all platforms Autosubmit.restore_platforms(platforms_to_test) - return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence + return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False else: - return job_list, submitter , as_conf , platforms_to_test, packages_persistence, True + return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True @staticmethod def get_iteration_info(as_conf,job_list): total_jobs = len(job_list.get_job_list()) @@ -1863,7 +1863,7 @@ class Autosubmit: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: Log.debug("Preparing run") - job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) @@ -1970,7 +1970,7 @@ class Autosubmit: consecutive_retrials = consecutive_retrials + 1 Log.info("Waiting {0} seconds before continue".format(delay)) try: - job_list, submitter, as_conf, platforms_to_test, packages_persistence, recovery = Autosubmit.prepare_run(expid, + job_list, submitter, _, _, as_conf, platforms_to_test, packages_persistence, recovery = Autosubmit.prepare_run(expid, notransitive, start_time, start_after, -- GitLab From e25c2462144ff1b69b0c4914b2027d614eb6c58b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 22 Mar 2023 11:56:06 +0100 Subject: [PATCH 12/12] Comments and docstrings --- autosubmit/autosubmit.py | 116 +++++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 18 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index bee56eb63..11e926f30 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1584,9 +1584,16 @@ class Autosubmit: return job_changes_tracker @staticmethod def check_wrappers(as_conf, job_list, platforms_to_test, expid): + """ + Check wrappers and inner jobs status also order the non-wrapped jobs to be submitted by active platforms + :param as_conf: a AutosubmitConfig object + :param job_list: a JobList object + :param platforms_to_test: a list of Platform + :param expid: a string with the experiment id + :return: non-wrapped jobs to check and a dictionary with the changes in the jobs status + """ jobs_to_check = dict() job_changes_tracker = dict() - save = False for platform in platforms_to_test: queuing_jobs = job_list.get_in_queue_grouped_id(platform) Log.debug('Checking jobs for platform={0}'.format(platform.name)) @@ -1614,7 +1621,12 @@ class Autosubmit: return jobs_to_check,job_changes_tracker @staticmethod def check_wrapper_stored_status(as_conf,job_list): - + """ + Check if the wrapper job has been submitted and the inner jobs are in the queue. + :param as_conf: a BasicConfig object + :param job_list: a JobList object + :return: JobList object updated + """ # if packages_dict attr is in job_list if hasattr(job_list, "packages_dict"): for package_name, jobs in job_list.packages_dict.items(): @@ -1663,6 +1675,13 @@ class Autosubmit: return job_list @staticmethod def get_historical_database(expid, job_list, as_conf): + """ + Get the historical database for the experiment + :param expid: a string with the experiment id + :param job_list: a JobList object + :param as_conf: a AutosubmitConfig object + :return: a experiment history object + """ exp_history = None try: # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. @@ -1689,7 +1708,15 @@ class Autosubmit: str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) return exp_history @staticmethod - def process_historical_data_iteration(job_list,job_changes_tracker, exp_history, expid): + def process_historical_data_iteration(job_list,job_changes_tracker, expid): + """ + Process the historical data for the current iteration. + :param job_list: a JobList object. + :param job_changes_tracker: a dictionary with the changes in the job status. + :param expid: a string with the experiment id. + :return: an ExperimentHistory object. + """ + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) if len(job_changes_tracker) > 0: @@ -1699,11 +1726,22 @@ class Autosubmit: @staticmethod def prepare_run(expid, notransitive=False, start_time=None, start_after=None, run_only_members=None,recover = False): - + """ + Prepare the run of the experiment. + :param expid: a string with the experiment id. + :param notransitive: a boolean to indicate for the experiment to not use transitive dependencies. + :param start_time: a string with the starting time of the experiment. + :param start_after: a string with the experiment id to start after. + :param run_only_members: a string with the members to run. + :param recover: a boolean to indicate if the experiment is recovering from a failure. + :return: a tuple + """ host = platform.node() + # Init the autosubmitconfigparser and check that every file exists and it is a valid configuration. as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.check_conf_files(running_time=True, force_load=True) if not recover: + # Database stuff, to check if the experiment is active or not. try: # Handling starting time AutosubmitHelper.handle_start_time(start_time) @@ -1716,18 +1754,25 @@ class Autosubmit: raise AutosubmitCritical("Failure during setting the start time check trace for details", 7014, str(e)) os.system('clear') signal.signal(signal.SIGINT, signal_handler) + # The time between running iterations, default to 10 seconds. Can be changed by the user safetysleeptime = as_conf.get_safetysleeptime() retrials = as_conf.get_retrials() Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Default retrials: {0}", retrials) + # Is where Autosubmit stores the job_list and wrapper packages to the disc. pkl_dir = os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') Log.debug( "Starting from job list restored from {0} files", pkl_dir) + # Loads the communication lib, always paramiko. + # Paramiko is the only way to communicate with the remote machines. Previously we had also Saga. submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) + # Tries to loads the job_list from disk, discarding any changes in running time ( if recovery ). + # Could also load a backup from previous iteration. + # The submit ready functions will cancel all job submitted if one submitted in that iteration had issues, so it should be safe to recover from a backup without losing job ids if recover: Log.info("Recovering job_list") try: @@ -1745,12 +1790,15 @@ class Autosubmit: Log.debug("Length of the jobs list: {0}", len(job_list)) if recover: Log.info("Recovering parameters info") + # This function name is not clear after the transformation it recieved across years. + # What it does, is to load and transform all as_conf.experiment_data into a 1D dict stored in job_list object. Autosubmit._load_parameters( as_conf, job_list, submitter.platforms) - # check the job list script creation Log.debug("Checking experiment templates...") platforms_to_test = set() hpcarch = as_conf.get_platform() + # Load only platforms used by the experiment, by looking at JOBS.$JOB.PLATFORM. So Autosubmit only establishes connections to the machines that are used. + # Also, it ignores platforms used by "COMPLETED/FAILED" jobs as they are no need any more. ( in case of recovery or run a workflow that were already running ) for job in job_list.get_job_list(): if job.platform_name is None or job.platform_name == "": job.platform_name = hpcarch @@ -1764,18 +1812,22 @@ class Autosubmit: # noinspection PyTypeChecker if job.status not in (Status.COMPLETED, Status.SUSPENDED): platforms_to_test.add(job.platform) + # This function, looks at %JOBS.$JOB.FILE% ( mandatory ) and %JOBS.$JOB.CHECK% ( default True ). + # Checks the contents of the .sh/.py/r files and looks for AS placeholders. try: job_list.check_scripts(as_conf) except Exception as e: raise AutosubmitCritical( "Error while checking job templates", 7014, str(e)) Log.debug("Loading job packages") + # Packages == wrappers and jobs inside wrappers. Name is also missleading. try: packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) except IOError as e: raise AutosubmitError( "job_packages not found", 6016, str(e)) + # Check if the user wants to continuing using wrappers and loads the appropiate info. if as_conf.experiment_data.get("WRAPPERS",None) is not None: os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0o644) @@ -1790,6 +1842,7 @@ class Autosubmit: if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) + # This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable. job_list = Autosubmit.check_wrapper_stored_status(job_list, as_conf) except Exception as e: raise AutosubmitCritical( @@ -1804,7 +1857,7 @@ class Autosubmit: if not recover: Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) # Before starting main loop, setup historical database tables and main information - + # Check if the user has launch autosubmit run with -rom option ( previously named -rm ) allowed_members = AutosubmitHelper.get_allowed_members(run_only_members, as_conf) if allowed_members: # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. @@ -1813,13 +1866,17 @@ class Autosubmit: "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( str(allowed_members))) if not recover: - # Related to TWO_STEP_START new variable defined in expdef + # This function, looks at the "TWO_STEP_START" variable in the experiment configuration file. + # This may not be neccesary anymore as the same can be achieved by using the new DEPENDENCIES dict. + # I replicated the same functionality in the new DEPENDENCIES dict using crossdate wrappers of auto-monarch da ( documented in rst .) + # We can look at it when auto-monarch starts to use AS 4.0, now it is maintened for compatibility. unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": job_list.parse_jobs_by_filter(unparsed_two_step_start) Log.debug("Running job data structure") exp_history = Autosubmit.get_historical_database(expid, job_list,as_conf) # establish the connection to all platforms + # Restore is a missleading, it is actually a "connect" function when the recover flag is not set. Autosubmit.restore_platforms(platforms_to_test) return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False @@ -1827,6 +1884,12 @@ class Autosubmit: return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True @staticmethod def get_iteration_info(as_conf,job_list): + """ + Prints the current iteration information + :param as_conf: autosubmit configuration object + :param job_list: job list object + :return: common parameters for the iteration + """ total_jobs = len(job_list.get_job_list()) Log.info("\n\n{0} of {1} jobs remaining ({2})".format( total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) @@ -1851,6 +1914,7 @@ class Autosubmit: :return: None """ + # Initialize common folders try: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) @@ -1860,9 +1924,12 @@ class Autosubmit: # checking if there is a lock file to avoid multiple running on the same expid try: + # Portalocker is used to avoid multiple autosubmit running on the same experiment, we have to change this system in #806 with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: Log.debug("Preparing run") + # This function is called only once, when the experiment is started. It is used to initialize the experiment and to check the correctness of the configuration files. + # If there are issues while running, this function will be called again to reinitialize the experiment. job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." @@ -1875,10 +1942,20 @@ class Autosubmit: ######################### # Main loop # Recovery retrials, when platforms have issues. Hard limit is set just in case is an Autosubmit bug or bad config and the minium duration is the weekend (72 h). + # Run experiment steps: + # 0. Prepare the experiment to start running it. + # 1. Check if there are jobs in the workflow that has to run (get_active) + # For each platform: + # 2. Check the status of all jobs in the current workflow that are queuing or running. Also updates all workflow jobs status by checking the status in the platform machines and job parent status. + # 3. Submit jobs that are on ready status. + # 4. When there are no more active jobs, wait until all log recovery threads finishes and exit Autosubmit. + # In case of issues, the experiment is reinitialized and the process starts with the last non-corrupted workflow status. + # User can always stop the run, and unless force killed, Autosubmit will exit in a clean way. + # Experiment run will always start from the last known workflow status. + max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 while job_list.get_active(): - # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) try: if Autosubmit.exit: Autosubmit.terminate(threading.enumerate()) @@ -1886,21 +1963,21 @@ class Autosubmit: # reload parameters changes Log.debug("Reloading parameters...") try: + # This function name is not clear after the transformation it recieved across years. + # What it does, is to load and transform all as_conf.experiment_data into a 1D dict stored in job_list object. Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # Log.info("FD 2: {0}".format(log.fd_show.fd_table_status_str())) except BaseException as e: raise AutosubmitError("Config files seems to not be accessible", 6040, str(e)) total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime = Autosubmit.get_iteration_info(as_conf,job_list) save = False # End Check Current jobs - # jobs_to_check are no wrapped jobs. - jobs_to_check = dict() if save: # previous iteration job_list.backup_save() - # Check Wrappers and add non-wrapped jobs to be checked later. + # This function name is totally missleading, yes it check the status of the wrappers, but also orders jobs the jobs that are not wrapped by platform. jobs_to_check,job_changes_tracker = Autosubmit.check_wrappers(as_conf, job_list, platforms_to_test, expid) # Jobs to check are grouped by platform. + # platforms_to_test could be renamed to active_platforms or something like that. for platform in platforms_to_test: platform_jobs = jobs_to_check.get(platform.name, []) if len(platform_jobs) == 0: @@ -1908,19 +1985,23 @@ class Autosubmit: continue Log.info("Checking {0} jobs for platform {1}".format(len(platform_jobs), platform.name)) - # Check all running/queuing jobs of current platform + # Check all non-wrapped jobs status for the current platform platform.check_Alljobs(platform_jobs, as_conf) - # mail notification ( in case of changes) + # mail notification ( in case of changes ) for job, job_prev_status in jobs_to_check[platform.name]: if job_prev_status != job.update_status(as_conf): Autosubmit.job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker) + # Updates all workflow status with the new information. job_list.update_list(as_conf, submitter=submitter) job_list.save() + # Submit jobs that are ready to run if len(job_list.get_ready()) > 0: Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) job_list.save() - + # Submit jobs that are prepared to hold (if remote dependencies parameter are enabled) + # This currently is not used as SLURM not longer allow to jobs to adquire priority while in hold state. + # This only works for SLURM. ( Prepare status can not be achieve in other platforms ) if as_conf.get_remote_dependencies() == "true" and len(job_list.get_prepared()) > 0: Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, hold=True) @@ -1928,15 +2009,14 @@ class Autosubmit: job_list.save() # Safe spot to store changes try: - exp_history = Autosubmit.process_historical_data_iteration(job_list, job_changes_tracker, exp_history, expid) + exp_history = Autosubmit.process_historical_data_iteration(job_list, job_changes_tracker, expid) except BaseException as e: Log.printlog("Historic database seems corrupted, AS will repair it and resume the run", Log.INFO) try: Autosubmit.database_fix(expid) exp_history = Autosubmit.process_historical_data_iteration(job_list, - job_changes_tracker, - exp_history, expid) + job_changes_tracker, expid) except Exception as e: Log.warning( "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") -- GitLab