diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8122146d550e191aada90792fe4d22993cd4c406..11e926f30607de64c34d0ad33d0149b1cfc15bb9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -20,7 +20,7 @@ import threading import traceback import requests import collections - +import platform from .job.job_packager import JobPackager from .job.job_exceptions import WrongTemplateException from .platforms.paramiko_submitter import ParamikoSubmitter @@ -636,8 +636,7 @@ class Autosubmit: Autosubmit._init_logs(args, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, - args.start_after, args.run_only_members) + return Autosubmit.run_experiment(args.expid, args.notransitive,args.start_time,args.start_after, args.run_only_members) elif args.command == 'expid': return Autosubmit.expid(args.description,args.HPC,args.copy, args.dummy,args.minimal_configuration,args.git_repo,args.git_branch,args.git_as_conf,args.operational,args.use_local_minimal) != '' elif args.command == 'delete': @@ -1569,244 +1568,394 @@ class Autosubmit: return wrapper_job, save @staticmethod - def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, - run_only_members=None): + def wrapper_notify(as_conf, expid, wrapper_job): + if as_conf.get_notifications() == "true": + for inner_job in wrapper_job.job_list: + Autosubmit.job_notify(as_conf, expid, inner_job, inner_job.prev_status,{}) + @staticmethod + def job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker): + job_changes_tracker[job.name] = (job_prev_status, job.status) + if as_conf.get_notifications() == "true": + if Status.VALUE_TO_KEY[job.status] in job.notify_on: + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, + Status.VALUE_TO_KEY[job_prev_status], + Status.VALUE_TO_KEY[job.status], + as_conf.experiment_data["MAIL"]["TO"]) + return job_changes_tracker + @staticmethod + def check_wrappers(as_conf, job_list, platforms_to_test, expid): + """ + Check wrappers and inner jobs status also order the non-wrapped jobs to be submitted by active platforms + :param as_conf: a AutosubmitConfig object + :param job_list: a JobList object + :param platforms_to_test: a list of Platform + :param expid: a string with the experiment id + :return: non-wrapped jobs to check and a dictionary with the changes in the jobs status + """ + jobs_to_check = dict() + job_changes_tracker = dict() + for platform in platforms_to_test: + queuing_jobs = job_list.get_in_queue_grouped_id(platform) + Log.debug('Checking jobs for platform={0}'.format(platform.name)) + for job_id, job in queuing_jobs.items(): + # Check Wrappers one-by-one + if job_list.job_package_map and job_id in job_list.job_package_map: + wrapper_job, save = Autosubmit.manage_wrapper_job(as_conf, job_list, platform, + job_id) + # Notifications e-mail + Autosubmit.wrapper_notify(as_conf, expid, wrapper_job) + # Detect and store changes for the GUI + job_changes_tracker = {job.name: ( + job.prev_status, job.status) for job in wrapper_job.job_list if + job.prev_status != job.status} + else: # Adds to a list all running jobs to be checked. + if job.status == Status.FAILED: + continue + job_prev_status = job.status + # If exist key has been pressed and previous status was running, do not check + if not (Autosubmit.exit is True and job_prev_status == Status.RUNNING): + if platform.name in jobs_to_check: + jobs_to_check[platform.name].append([job, job_prev_status]) + else: + jobs_to_check[platform.name] = [[job, job_prev_status]] + return jobs_to_check,job_changes_tracker + @staticmethod + def check_wrapper_stored_status(as_conf,job_list): + """ + Check if the wrapper job has been submitted and the inner jobs are in the queue. + :param as_conf: a BasicConfig object + :param job_list: a JobList object + :return: JobList object updated + """ + # if packages_dict attr is in job_list + if hasattr(job_list, "packages_dict"): + for package_name, jobs in job_list.packages_dict.items(): + from .job.job import WrapperJob + wrapper_status = Status.SUBMITTED + all_completed = True + running = False + queuing = False + failed = False + hold = False + submitted = False + if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: + running = True + for job in jobs: + if job.status == Status.QUEUING: + queuing = True + all_completed = False + elif job.status == Status.FAILED: + failed = True + all_completed = False + elif job.status == Status.HELD: + hold = True + all_completed = False + elif job.status == Status.SUBMITTED: + submitted = True + all_completed = False + if all_completed: + wrapper_status = Status.COMPLETED + elif hold: + wrapper_status = Status.HELD + else: + if running: + wrapper_status = Status.RUNNING + elif queuing: + wrapper_status = Status.QUEUING + elif submitted: + wrapper_status = Status.SUBMITTED + elif failed: + wrapper_status = Status.FAILED + else: + wrapper_status = Status.SUBMITTED + wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, + None, + None, jobs[0].platform, as_conf, jobs[0].hold) + job_list.job_package_map[jobs[0].id] = wrapper_job + return job_list + @staticmethod + def get_historical_database(expid, job_list, as_conf): + """ + Get the historical database for the experiment + :param expid: a string with the experiment id + :param job_list: a JobList object + :param as_conf: a AutosubmitConfig object + :return: a experiment history object + """ + exp_history = None + try: + # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.initialize_database() + exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), + as_conf.get_chunk_size(), + current_config=as_conf.get_full_config_as_json()) + Autosubmit.database_backup(expid) + except Exception as e: + try: + Autosubmit.database_fix(expid) + # This error is important + except Exception as e: + pass + try: + ExperimentStatus(expid).set_as_running() + except Exception as e: + # Connection to status database ec_earth.db can fail. + # API worker will fix the status. + Log.debug( + "Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format( + str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) + return exp_history + @staticmethod + def process_historical_data_iteration(job_list,job_changes_tracker, expid): + """ + Process the historical data for the current iteration. + :param job_list: a JobList object. + :param job_changes_tracker: a dictionary with the changes in the job status. + :param expid: a string with the experiment id. + :return: an ExperimentHistory object. + """ + + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + if len(job_changes_tracker) > 0: + exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) + Autosubmit.database_backup(expid) + return exp_history + @staticmethod + def prepare_run(expid, notransitive=False, start_time=None, start_after=None, + run_only_members=None,recover = False): + """ + Prepare the run of the experiment. + :param expid: a string with the experiment id. + :param notransitive: a boolean to indicate for the experiment to not use transitive dependencies. + :param start_time: a string with the starting time of the experiment. + :param start_after: a string with the experiment id to start after. + :param run_only_members: a string with the members to run. + :param recover: a boolean to indicate if the experiment is recovering from a failure. + :return: a tuple + """ + host = platform.node() + # Init the autosubmitconfigparser and check that every file exists and it is a valid configuration. + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(running_time=True, force_load=True) + if not recover: + # Database stuff, to check if the experiment is active or not. + try: + # Handling starting time + AutosubmitHelper.handle_start_time(start_time) + # Start after completion trigger block + AutosubmitHelper.handle_start_after(start_after, expid, BasicConfig()) + # Handling run_only_members + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Failure during setting the start time check trace for details", 7014, str(e)) + os.system('clear') + signal.signal(signal.SIGINT, signal_handler) + # The time between running iterations, default to 10 seconds. Can be changed by the user + safetysleeptime = as_conf.get_safetysleeptime() + retrials = as_conf.get_retrials() + Log.debug("The Experiment name is: {0}", expid) + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Default retrials: {0}", retrials) + # Is where Autosubmit stores the job_list and wrapper packages to the disc. + pkl_dir = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + Log.debug( + "Starting from job list restored from {0} files", pkl_dir) + + # Loads the communication lib, always paramiko. + # Paramiko is the only way to communicate with the remote machines. Previously we had also Saga. + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + # Tries to loads the job_list from disk, discarding any changes in running time ( if recovery ). + # Could also load a backup from previous iteration. + # The submit ready functions will cancel all job submitted if one submitted in that iteration had issues, so it should be safe to recover from a backup without losing job ids + if recover: + Log.info("Recovering job_list") + try: + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) + except IOError as e: + raise AutosubmitError( + "Job_list not found", 6016, str(e)) + except AutosubmitCritical as e: + raise AutosubmitCritical( + "Corrupted job_list, backup couldn't be restored", 7040, e.message) + except BaseException as e: + raise AutosubmitCritical( + "Corrupted job_list, backup couldn't be restored", 7040, str(e)) + Log.debug("Length of the jobs list: {0}", len(job_list)) + if recover: + Log.info("Recovering parameters info") + # This function name is not clear after the transformation it recieved across years. + # What it does, is to load and transform all as_conf.experiment_data into a 1D dict stored in job_list object. + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + Log.debug("Checking experiment templates...") + platforms_to_test = set() + hpcarch = as_conf.get_platform() + # Load only platforms used by the experiment, by looking at JOBS.$JOB.PLATFORM. So Autosubmit only establishes connections to the machines that are used. + # Also, it ignores platforms used by "COMPLETED/FAILED" jobs as they are no need any more. ( in case of recovery or run a workflow that were already running ) + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == "": + job.platform_name = hpcarch + # noinspection PyTypeChecker + try: + job.platform = submitter.platforms[job.platform_name.upper()] + except Exception as e: + raise AutosubmitCritical( + "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), + 7014) + # noinspection PyTypeChecker + if job.status not in (Status.COMPLETED, Status.SUSPENDED): + platforms_to_test.add(job.platform) + # This function, looks at %JOBS.$JOB.FILE% ( mandatory ) and %JOBS.$JOB.CHECK% ( default True ). + # Checks the contents of the .sh/.py/r files and looks for AS placeholders. + try: + job_list.check_scripts(as_conf) + except Exception as e: + raise AutosubmitCritical( + "Error while checking job templates", 7014, str(e)) + Log.debug("Loading job packages") + # Packages == wrappers and jobs inside wrappers. Name is also missleading. + try: + packages_persistence = JobPackagePersistence(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, str(e)) + # Check if the user wants to continuing using wrappers and loads the appropiate info. + if as_conf.experiment_data.get("WRAPPERS",None) is not None: + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid, "pkl", "job_packages_" + expid + ".db"), 0o644) + try: + packages = packages_persistence.load() + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, str(e)) + Log.debug("Processing job packages") + try: + for (exp_id, package_name, job_name) in packages: + if package_name not in job_list.packages_dict: + job_list.packages_dict[package_name] = [] + job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) + # This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable. + job_list = Autosubmit.check_wrapper_stored_status(job_list, as_conf) + except Exception as e: + raise AutosubmitCritical( + "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", + 7014, str(e)) + if recover: + Log.info("Recovering wrappers... Done") + + Log.debug("Checking job_list current status") + job_list.update_list(as_conf, first_time=True) + job_list.save() + if not recover: + Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) + # Before starting main loop, setup historical database tables and main information + # Check if the user has launch autosubmit run with -rom option ( previously named -rm ) + allowed_members = AutosubmitHelper.get_allowed_members(run_only_members, as_conf) + if allowed_members: + # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. + job_list.run_members = allowed_members + Log.result( + "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( + str(allowed_members))) + if not recover: + # This function, looks at the "TWO_STEP_START" variable in the experiment configuration file. + # This may not be neccesary anymore as the same can be achieved by using the new DEPENDENCIES dict. + # I replicated the same functionality in the new DEPENDENCIES dict using crossdate wrappers of auto-monarch da ( documented in rst .) + # We can look at it when auto-monarch starts to use AS 4.0, now it is maintened for compatibility. + unparsed_two_step_start = as_conf.get_parse_two_step_start() + if unparsed_two_step_start != "": + job_list.parse_jobs_by_filter(unparsed_two_step_start) + Log.debug("Running job data structure") + exp_history = Autosubmit.get_historical_database(expid, job_list,as_conf) + # establish the connection to all platforms + # Restore is a missleading, it is actually a "connect" function when the recover flag is not set. + Autosubmit.restore_platforms(platforms_to_test) + + return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False + else: + return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True + @staticmethod + def get_iteration_info(as_conf,job_list): + """ + Prints the current iteration information + :param as_conf: autosubmit configuration object + :param job_list: job list object + :return: common parameters for the iteration + """ + total_jobs = len(job_list.get_job_list()) + Log.info("\n\n{0} of {1} jobs remaining ({2})".format( + total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) + if len(job_list.get_failed()) > 0: + Log.info("{0} jobs has been failed ({1})".format( + len(job_list.get_failed()), time.strftime("%H:%M"))) + safetysleeptime = as_conf.get_safetysleeptime() + default_retrials = as_conf.get_retrials() + check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Number of retrials: {0}", default_retrials) + return total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime + @staticmethod + def run_experiment(expid, notransitive=False, start_time=None, start_after=None,run_only_members=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). + :param expid: the experiment id + :param notransitive: if True, the transitive closure of the graph is not computed + :param start_time: the time at which the experiment should start + :param start_after: the expid after which the experiment should start + :param run_only_members: the members to run + :return: None - :param run_only_ºmembers: - :param start_after: - :param start_time: - :param update_version: - :param notransitive: - :type expid: str - :param expid: identifier of experiment to be run - :return: True if run to the end, False otherwise - :rtype: bool """ + # Initialize common folders try: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - import platform - host = platform.node() except BaseException as e: raise AutosubmitCritical("Failure during the loading of the experiment configuration, check file paths", 7014, str(e)) - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.check_conf_files(running_time=True, force_load=True) - try: - # Handling starting time - AutosubmitHelper.handle_start_time(start_time) - - # Start after completion trigger block - AutosubmitHelper.handle_start_after(start_after, expid, BasicConfig()) - - # Handling run_only_members - allowed_members = AutosubmitHelper.get_allowed_members(run_only_members, as_conf) - except AutosubmitCritical as e: - raise - except BaseException as e: - raise AutosubmitCritical("Failure during setting the start time check trace for details", 7014, str(e)) # checking if there is a lock file to avoid multiple running on the same expid try: + # Portalocker is used to avoid multiple autosubmit running on the same experiment, we have to change this system in #806 with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: - Log.info( - "Preparing .lock file to avoid multiple instances with same experiment id") - os.system('clear') - signal.signal(signal.SIGINT, signal_handler) - - hpcarch = as_conf.get_platform() - safetysleeptime = as_conf.get_safetysleeptime() - retrials = as_conf.get_retrials() - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - Log.debug("The Experiment name is: {0}", expid) - Log.debug("Sleep: {0}", safetysleeptime) - Log.debug("Default retrials: {0}", retrials) - Log.info("Starting job submission...") - pkl_dir = os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - try: - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - except IOError as e: - raise AutosubmitError( - "Job_list not found", 6016, str(e)) - except AutosubmitCritical as e: - raise AutosubmitCritical( - "Corrupted job_list, backup couldn't be restored", 7040, e.message) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_list, backup couldn't be restored", 7040, str(e)) - - Log.debug( - "Starting from job list restored from {0} files", pkl_dir) - Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - # check the job list script creation - Log.debug("Checking experiment templates...") - platforms_to_test = set() - for job in job_list.get_job_list(): - if job.platform_name is None or job.platform_name == "": - job.platform_name = hpcarch - # noinspection PyTypeChecker - try: - job.platform = submitter.platforms[job.platform_name.upper()] - except Exception as e: - raise AutosubmitCritical( - "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), - 7014) - # noinspection PyTypeChecker - if job.status not in (Status.COMPLETED, Status.SUSPENDED): - platforms_to_test.add(job.platform) - try: - job_list.check_scripts(as_conf) - except Exception as e: - raise AutosubmitCritical( - "Error while checking job templates", 7014, str(e)) - Log.debug("Loading job packages") - try: - packages_persistence = JobPackagePersistence(os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - except IOError as e: - raise AutosubmitError( - "job_packages not found", 6016, str(e)) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", - 7040, str(e)) - if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0o644) - try: - packages = packages_persistence.load() - except IOError as e: - raise AutosubmitError( - "job_packages not found", 6016, str(e)) - except BaseException as e: - raise AutosubmitCritical( - "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages(will work on autosubmit4)", - 7040, str(e)) - Log.debug("Processing job packages") - - try: - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): - from .job.job import WrapperJob - wrapper_status = Status.SUBMITTED - all_completed = True - running = False - queuing = False - failed = False - hold = False - submitted = False - if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: - running = True - for job in jobs: - if job.status == Status.QUEUING: - queuing = True - all_completed = False - elif job.status == Status.FAILED: - failed = True - all_completed = False - elif job.status == Status.HELD: - hold = True - all_completed = False - elif job.status == Status.SUBMITTED: - submitted = True - all_completed = False - if all_completed: - wrapper_status = Status.COMPLETED - elif hold: - wrapper_status = Status.HELD - else: - if running: - wrapper_status = Status.RUNNING - elif queuing: - wrapper_status = Status.QUEUING - elif submitted: - wrapper_status = Status.SUBMITTED - elif failed: - wrapper_status = Status.FAILED - else: - wrapper_status = Status.SUBMITTED - wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, - None, - None, jobs[0].platform, as_conf, jobs[0].hold) - job_list.job_package_map[jobs[0].id] = wrapper_job - except Exception as e: - raise AutosubmitCritical( - "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", - 7014, str(e)) - - Log.debug("Checking job_list current status") - job_list.update_list(as_conf, first_time=True) - job_list.save() - - Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) - # Before starting main loop, setup historical database tables and main information - Log.debug("Running job data structure") - try: - # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.initialize_database() - exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), - as_conf.get_chunk_size(), - current_config=as_conf.get_full_config_as_json()) - Autosubmit.database_backup(expid) - except Exception as e: - try: - Autosubmit.database_fix(expid) - # This error is important - except Exception as e: - pass - try: - ExperimentStatus(expid).set_as_running() - except Exception as e: - # Connection to status database ec_earth.db can fail. - # API worker will fix the status. - Log.debug( - "Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format( - str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) - if allowed_members: - # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. - job_list.run_members = allowed_members - Log.result( - "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( - str(allowed_members))) + Log.debug("Preparing run") + # This function is called only once, when the experiment is started. It is used to initialize the experiment and to check the correctness of the configuration files. + # If there are issues while running, this function will be called again to reinitialize the experiment. + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) except Exception as e: - raise AutosubmitCritical( - "Error in run initialization", 7014, str(e)) # Changing default to 7014 - # Related to TWO_STEP_START new variable defined in expdef - unparsed_two_step_start = as_conf.get_parse_two_step_start() - if unparsed_two_step_start != "": - job_list.parse_jobs_by_filter(unparsed_two_step_start) - - main_loop_retrials = 3650 # Hard limit of tries 3650 tries at 15-120seconds sleep each try - # establish the connection to all platforms - - Autosubmit.restore_platforms(platforms_to_test) - save = True - # @main - Log.debug("Running main loop") + raise AutosubmitCritical("Error in run initialization", 7014, str(e)) # Changing default to 7014 + Log.debug("Running main running loop") ######################### # AUTOSUBMIT - MAIN LOOP ######################### - # Main loop. Finishing when all jobs have been submitted - + # Main loop + # Recovery retrials, when platforms have issues. Hard limit is set just in case is an Autosubmit bug or bad config and the minium duration is the weekend (72 h). + # Run experiment steps: + # 0. Prepare the experiment to start running it. + # 1. Check if there are jobs in the workflow that has to run (get_active) + # For each platform: + # 2. Check the status of all jobs in the current workflow that are queuing or running. Also updates all workflow jobs status by checking the status in the platform machines and job parent status. + # 3. Submit jobs that are on ready status. + # 4. When there are no more active jobs, wait until all log recovery threads finishes and exit Autosubmit. + # In case of issues, the experiment is reinitialized and the process starts with the last non-corrupted workflow status. + # User can always stop the run, and unless force killed, Autosubmit will exit in a clean way. + # Experiment run will always start from the last known workflow status. + + max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) + recovery_retrials = 0 while job_list.get_active(): - # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) try: if Autosubmit.exit: Autosubmit.terminate(threading.enumerate()) @@ -1814,95 +1963,45 @@ class Autosubmit: # reload parameters changes Log.debug("Reloading parameters...") try: + # This function name is not clear after the transformation it recieved across years. + # What it does, is to load and transform all as_conf.experiment_data into a 1D dict stored in job_list object. Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # Log.info("FD 2: {0}".format(log.fd_show.fd_table_status_str())) except BaseException as e: raise AutosubmitError("Config files seems to not be accessible", 6040, str(e)) - total_jobs = len(job_list.get_job_list()) - Log.info("\n\n{0} of {1} jobs remaining ({2})".format( - total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) - if len(job_list.get_failed()) > 0: - Log.info("{0} jobs has been failed ({1})".format( - len(job_list.get_failed()), time.strftime("%H:%M"))) - safetysleeptime = as_conf.get_safetysleeptime() - default_retrials = as_conf.get_retrials() - check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() - Log.debug("Sleep: {0}", safetysleeptime) - Log.debug("Number of retrials: {0}", default_retrials) + total_jobs, safetysleeptime, default_retrials, check_wrapper_jobs_sleeptime = Autosubmit.get_iteration_info(as_conf,job_list) - if save: # previous iteration - job_list.backup_save() save = False - slurm = [] - job_changes_tracker = {} # to easily keep track of changes per iteration - Log.debug("End of checking") # End Check Current jobs - # jobs_to_check are no wrapped jobs. - jobs_to_check = dict() - # Check Wrappers and add non-wrapped jobs to be checked later. - for platform in platforms_to_test: - queuing_jobs = job_list.get_in_queue_grouped_id(platform) - Log.debug('Checking jobs for platform={0}'.format(platform.name)) - for job_id, job in queuing_jobs.items(): - # Check Wrappers one-by-one - # TODO CHECK this before continue - if job_list.job_package_map and job_id in job_list.job_package_map: - wrapper_job, save = Autosubmit.manage_wrapper_job(as_conf, job_list, platform, - job_id) - - # Notifications e-mail - if as_conf.get_notifications() == "true": - for inner_job in wrapper_job.job_list: - if inner_job.prev_status != inner_job.status: - if Status.VALUE_TO_KEY[inner_job.status] in inner_job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, - inner_job.name, - Status.VALUE_TO_KEY[ - inner_job.prev_status], - Status.VALUE_TO_KEY[inner_job.status], - as_conf.experiment_data["MAIL"]["TO"]) - # Detect and store changes for the GUI - job_changes_tracker = {job.name: ( - job.prev_status, job.status) for job in wrapper_job.job_list if - job.prev_status != job.status} - else: # Adds to a list all running jobs to be checked. - if job.status == Status.FAILED: - continue - job_prev_status = job.status - # If exist key has been pressed and previous status was running, do not check - if not (Autosubmit.exit is True and job_prev_status == Status.RUNNING): - if platform.name in jobs_to_check: - jobs_to_check[platform.name].append([job, job_prev_status]) - else: - jobs_to_check[platform.name] = [[job, job_prev_status]] + if save: # previous iteration + job_list.backup_save() + # This function name is totally missleading, yes it check the status of the wrappers, but also orders jobs the jobs that are not wrapped by platform. + jobs_to_check,job_changes_tracker = Autosubmit.check_wrappers(as_conf, job_list, platforms_to_test, expid) + # Jobs to check are grouped by platform. + # platforms_to_test could be renamed to active_platforms or something like that. for platform in platforms_to_test: platform_jobs = jobs_to_check.get(platform.name, []) - - # not all platforms are doing this check simultaneously if len(platform_jobs) == 0: + Log.info("No jobs to check for platform {0}".format(platform.name)) continue + + Log.info("Checking {0} jobs for platform {1}".format(len(platform_jobs), platform.name)) + # Check all non-wrapped jobs status for the current platform platform.check_Alljobs(platform_jobs, as_conf) - # mail notification + # mail notification ( in case of changes ) for job, job_prev_status in jobs_to_check[platform.name]: if job_prev_status != job.update_status(as_conf): - # Keeping track of changes - job_changes_tracker[job.name] = (job_prev_status, job.status) - if as_conf.get_notifications() == "true": - if Status.VALUE_TO_KEY[job.status] in job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, - Status.VALUE_TO_KEY[job_prev_status], - Status.VALUE_TO_KEY[job.status], - as_conf.experiment_data["MAIL"]["TO"]) - save = True - save2 = job_list.update_list( - as_conf, submitter=submitter) + Autosubmit.job_notify(as_conf,expid,job,job_prev_status,job_changes_tracker) + # Updates all workflow status with the new information. + job_list.update_list(as_conf, submitter=submitter) job_list.save() + # Submit jobs that are ready to run if len(job_list.get_ready()) > 0: - save = Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, hold=False) + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) job_list.save() - + # Submit jobs that are prepared to hold (if remote dependencies parameter are enabled) + # This currently is not used as SLURM not longer allow to jobs to adquire priority while in hold state. + # This only works for SLURM. ( Prepare status can not be achieve in other platforms ) if as_conf.get_remote_dependencies() == "true" and len(job_list.get_prepared()) > 0: Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, hold=True) @@ -1910,21 +2009,14 @@ class Autosubmit: job_list.save() # Safe spot to store changes try: - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - if len(job_changes_tracker) > 0: - exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) - Autosubmit.database_backup(expid) + exp_history = Autosubmit.process_historical_data_iteration(job_list, job_changes_tracker, expid) except BaseException as e: Log.printlog("Historic database seems corrupted, AS will repair it and resume the run", Log.INFO) try: Autosubmit.database_fix(expid) - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, - historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - if len(job_changes_tracker) > 0: - exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) - Autosubmit.database_backup(expid) + exp_history = Autosubmit.process_historical_data_iteration(job_list, + job_changes_tracker, expid) except Exception as e: Log.warning( "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") @@ -1950,102 +2042,20 @@ class Autosubmit: except BaseException as e: Log.printlog("Error trying to store failed job count", Log.WARNING) Log.result("Storing failed job count...done") - while not recovery and main_loop_retrials > 0: + while not recovery and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ): + delay = min(15 * consecutive_retrials, 120) - main_loop_retrials = main_loop_retrials - 1 + recovery_retrials += 1 sleep(delay) consecutive_retrials = consecutive_retrials + 1 Log.info("Waiting {0} seconds before continue".format(delay)) try: - as_conf.reload(force_load=True) - Log.info("Recovering job_list...") - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - Log.info("Recovering job_list... Done") - if allowed_members: - # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. - job_list.run_members = allowed_members - Log.result( - "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( - str(allowed_members))) - platforms_to_test = set() - Log.info("Recovering platform information...") - for job in job_list.get_job_list(): - if job.platform_name is None or job.platform_name == "": - job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name] - platforms_to_test.add(job.platform) - - Log.info("Recovering platform information... Done") - Log.info("Recovering Failure count...") - for job in job_list.get_job_list(): - if job.name in list(failed_names.keys()): - job.fail_count = failed_names[job.name] - Log.info("Recovering Failure count... Done") - - Log.info("Recovering parameters...") - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # Recovery wrapper [Packages] - Log.info("Recovering Wrappers...") - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - # Recovery wrappers [Wrapper status] - for package_name, jobs in job_list.packages_dict.items(): - from .job.job import WrapperJob - wrapper_status = Status.SUBMITTED - all_completed = True - running = False - queuing = False - failed = False - hold = False - submitted = False - if jobs[0].status == Status.RUNNING or jobs[0].status == Status.COMPLETED: - running = True - for job in jobs: - if job.status == Status.QUEUING: - queuing = True - all_completed = False - elif job.status == Status.FAILED: - failed = True - all_completed = False - elif job.status == Status.HELD: - hold = True - all_completed = False - elif job.status == Status.SUBMITTED: - submitted = True - all_completed = False - if all_completed: - wrapper_status = Status.COMPLETED - elif hold: - wrapper_status = Status.HELD - else: - if running: - wrapper_status = Status.RUNNING - elif queuing: - wrapper_status = Status.QUEUING - elif submitted: - wrapper_status = Status.SUBMITTED - elif failed: - wrapper_status = Status.FAILED - else: - wrapper_status = Status.SUBMITTED - wrapper_job = WrapperJob(package_name, jobs[0].id, wrapper_status, 0, jobs, - None, - None, jobs[0].platform, as_conf, jobs[0].hold) - job_list.job_package_map[jobs[0].id] = wrapper_job - Log.info("Recovering wrappers... Done") - job_list.update_list(as_conf) - Log.info("Saving recovered job list...") - job_list.save() - Log.info("Saving recovered job list... Done") - recovery = True - Log.result("Recover of job_list is completed") + job_list, submitter, _, _, as_conf, platforms_to_test, packages_persistence, recovery = Autosubmit.prepare_run(expid, + notransitive, + start_time, + start_after, + run_only_members, + recover=True) except AutosubmitError as e: recovery = False Log.result("Recover of job_list has fail {0}".format(e.message)) @@ -2057,14 +2067,13 @@ class Autosubmit: Log.result("Recover of job_list has fail {0}".format(str(e))) # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. reconnected = False - mail_notify = True times = 0 max_times = 10 Log.info("Restoring the connection to all experiment platforms") consecutive_retrials = 1 delay = min(15 * consecutive_retrials, 120) - while not reconnected and main_loop_retrials > 0: - main_loop_retrials = main_loop_retrials - 1 + while not reconnected and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ) : + recovery_retrials += 1 Log.info("Recovering the remote platform connection") Log.info("Waiting {0} seconds before continue".format(delay)) sleep(delay) @@ -2089,9 +2098,8 @@ class Autosubmit: reconnected = False except BaseException: reconnected = False - if main_loop_retrials <= 0: - raise AutosubmitCritical( - "Autosubmit Encounter too much errors during running time, limit of 4hours reached", + if recovery_retrials == max_recovery_retrials and max_recovery_retrials > 0: + raise AutosubmitCritical(f"Autosubmit Encounter too much errors during running time, limit of {max_recovery_retrials*120} reached", 7051, e.message) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error raise AutosubmitCritical(e.message, e.code, e.trace) @@ -2100,6 +2108,8 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise AutosubmitCritical("There is a bug in the code, please contact via git", 7070, str(e)) + + Log.result("No more jobs to run.") # Updating job data header with current information when experiment ends try: @@ -4123,16 +4133,14 @@ class Autosubmit: @staticmethod def database_backup(expid): try: - database_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid)) + database_path= os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid)) backup_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}.sql".format(expid)) - tmp_backup_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}_tmp.sql".format(expid)) - command = "sqlite3 {0} .dump > {1} ".format(database_path, tmp_backup_path) + command = "sqlite3 {0} .dump > {1} ".format(database_path, backup_path) Log.debug("Backing up jobs_data...") out = subprocess.call(command, shell=True) Log.debug("Jobs_data database backup completed.") except BaseException as e: Log.debug("Jobs_data database backup failed.") - @staticmethod def database_fix(expid): """ @@ -4143,7 +4151,7 @@ class Autosubmit: :return: :rtype: """ - os.umask(0) # Overrides user permissions + os.umask(0) # Overrides user permissions current_time = int(time.time()) corrupted_db_path = os.path.join(BasicConfig.JOBDATA_DIR, "job_data_{0}_corrupted.db".format(expid)) @@ -4153,15 +4161,15 @@ class Autosubmit: dump_file_path = os.path.join(BasicConfig.JOBDATA_DIR, dump_file_name) bash_command = 'cat {1} | sqlite3 {0}'.format(database_path, dump_file_path) try: - if os.path.exists(database_path): - result = os.popen("mv {0} {1}".format(database_path, corrupted_db_path)).read() + if os.path.exists(database_path): + os.popen("mv {0} {1}".format(database_path, corrupted_db_path)).read() time.sleep(1) Log.info("Original database moved.") try: exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) Log.info("Restoring from sql") - result = os.popen(bash_command).read() + os.popen(bash_command).read() exp_history.initialize_database() except: @@ -4172,7 +4180,7 @@ class Autosubmit: historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.initialize_database() except Exception as exp: - Log.warning(str(exp)) + Log.critical(str(exp)) @staticmethod def archive(expid, noclean=True, uncompress=True): diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index b92c54d1579a40be13c80b3fe7364b4da3dd8eac..7f6a496487edbd0d1b891f375fa3594326851fb9 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -193,12 +193,9 @@ class ExperimentHistory: try: try: current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() - except: - current_experiment_run_dc = 0 - try: update_these_changes = self._get_built_list_of_changes(job_list) except: - # May be a new experiment run, so we don't have any changes to update ( could happen due job_data issues from 3.14.0) + current_experiment_run_dc = 0 update_these_changes = [] #("no runs") should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size,create) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index e8327b9ad2e125319b3e86d055ac53c97d76f3b4..d580d024dfef96049296a6c0623311c948b3a2f0 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -488,6 +488,25 @@ class ParamikoPlatform(Platform): """ raise NotImplementedError + def get_estimated_queue_time_cmd(self, job_id): + """ + Returns command to get estimated queue time on remote platforms + + :param job_id: id of job to check + :param job_id: str + :return: command to get estimated queue time + """ + raise NotImplementedError + def parse_estimated_time(self, output): + """ + Parses estimated queue time from output of get_estimated_queue_time_cmd + + :param output: output of get_estimated_queue_time_cmd + :type output: str + :return: estimated queue time + :rtype: + """ + raise NotImplementedError def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False): """ Checks job running status @@ -685,7 +704,6 @@ class ParamikoPlatform(Platform): Log.error( 'check_job() The job id ({0}) status is {1}.', job.id, job_status) job.new_status = job_status - reason = str() if self.type == 'slurm' and len(in_queue_jobs) > 0: cmd = self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) @@ -715,6 +733,10 @@ class ParamikoPlatform(Platform): job.new_status = Status.WAITING job.platform.send_command( job.platform.cancel_cmd + " {0}".format(job.id)) + else: + self.send_command(self.get_estimated_queue_time_cmd(job.id)) + estimated_time = self.parse_estimated_time(self._ssh_output) + Log.info(f"{job.name} will be elegible to run the day {estimated_time.get('date', 'Unknown')} at {estimated_time.get('time', 'Unknown')} due: {reason[1:-1]}") else: for job in job_list: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 53d0d115d9841d84dcb1babdf3f1b9af1cfd1b11..8968c9ed699949c4399f172022d16479a4c609f0 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,6 +18,7 @@ # along with Autosubmit. If not, see . import locale import os +from contextlib import suppress from time import sleep from time import mktime from time import time @@ -308,13 +309,16 @@ class SlurmPlatform(ParamikoPlatform): self.send_command(cmd) queue_status = self._ssh_output - reason = str() reason = self.parse_queue_reason(queue_status, job.id) - if reason == '(JobHeldUser)': - return True - else: + self.send_command(self.get_estimated_queue_time_cmd(job.id)) + estimated_time = self.parse_estimated_time(self._ssh_output) + if reason == '(JobHeldAdmin)': # Job is held by the system self.send_command("scancel {0}".format(job.id)) return False + else: + Log.info( + f"The {job.name} will be elegible to run the day {estimated_time.get('date', 'Unknown')} at {estimated_time.get('time', 'Unknown')}\nQueuing reason is: {reason}") + return True except BaseException as e: try: self.send_command("scancel {0}".format(job.id)) @@ -549,6 +553,8 @@ class SlurmPlatform(ParamikoPlatform): def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X --jobs {1} -o jobid,State".format(self.host, jobs_id) + def get_estimated_queue_time_cmd(self, job_id): + return f"scontrol -o show JobId {job_id} | grep -Po '(?<=EligibleTime=)[0-9-:T]*'" def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) @@ -564,12 +570,36 @@ class SlurmPlatform(ParamikoPlatform): return 'sacct -n --jobs {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) def parse_queue_reason(self, output, job_id): + """ + Parses the queue reason from the output of the command + :param output: output of the command + :param job_id: job id + :return: queue reason + :rtype: str + """ reason = [x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] - if len(reason) > 0: - return reason[0] + if isinstance(reason,list): + # convert reason to str + return ''.join(reason) return reason + def parse_estimated_time(self, output): + """ + Parses the estimated time from the output of the command + :param output: output of the command + :return: estimated time date and time + :rtype: dict + """ + parsed_output = {} + parsed_output["date"] = "Unknown" + parsed_output["time"] = "Unknown" + with suppress(Exception): + output = output.split("T") + parsed_output["date"] = output[0] + parsed_output["time"] = output[1] + return parsed_output + @staticmethod def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads", partition=""): if method == 'srun':