diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 486de2669907fe259a90002423df420fe699af35..6480393c265f65cce71edab517930c28690acf3e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -43,7 +43,7 @@ from config.config_parser import ConfigParserFactory from config.config_common import AutosubmitConfig from config.basicConfig import BasicConfig from distutils.util import strtobool -from log.log import Log, AutosubmitError,AutosubmitCritical +from log.log import Log, AutosubmitError, AutosubmitCritical try: import dialog @@ -78,6 +78,8 @@ Main module for autosubmit. Only contains an interface class to all functionalit sys.path.insert(0, os.path.abspath('.')) # noinspection PyUnusedLocal + + def signal_handler(signal_received, frame): """ Used to handle interrupt signals, allowing autosubmit to clean before exit @@ -88,6 +90,7 @@ def signal_handler(signal_received, frame): Log.info('Autosubmit will interrupt at the next safe occasion') Autosubmit.exit = True + def signal_handler_create(signal_received, frame): """ Used to handle KeyboardInterrumpt signals while the create method is being executed @@ -95,7 +98,9 @@ def signal_handler_create(signal_received, frame): :param signal_received: :param frame: """ - raise AutosubmitCritical('Autosubmit has been closed in an unexpected way. Killed or control + c.',7010) + raise AutosubmitCritical( + 'Autosubmit has been closed in an unexpected way. Killed or control + c.', 7010) + class Autosubmit: """ @@ -130,11 +135,12 @@ class Autosubmit: parser = argparse.ArgumentParser( description='Main executable for autosubmit. ') - parser.add_argument('-v', '--version', action='version', version=Autosubmit.autosubmit_version) - parser.add_argument('-lf', '--logfile', choices=('NO_LOG','INFO','WARNING', 'DEBUG'), + parser.add_argument('-v', '--version', action='version', + version=Autosubmit.autosubmit_version) + parser.add_argument('-lf', '--logfile', choices=('NO_LOG', 'INFO', 'WARNING', 'DEBUG'), default='WARNING', type=str, help="sets file's log level.") - parser.add_argument('-lc', '--logconsole', choices=('NO_LOG','INFO','WARNING', 'DEBUG'), + parser.add_argument('-lc', '--logconsole', choices=('NO_LOG', 'INFO', 'WARNING', 'DEBUG'), default='INFO', type=str, help="sets console's log level") @@ -147,6 +153,8 @@ class Autosubmit: default=False, help='Disable transitive reduction') subparser.add_argument('-v', '--update_version', action='store_true', default=False, help='Update experiment version') + subparser.add_argument('-st', '--start_time', required=False, + help='Sets the starting time for this experiment') # Expid subparser = subparsers.add_parser( @@ -487,19 +495,21 @@ class Autosubmit: except Exception as e: if type(e) is SystemExit: - if e.message == 0: # Version keyword force an exception in parse arg due and os_exit(0) but the program is succesfully finished + # Version keyword force an exception in parse arg due and os_exit(0) but the program is succesfully finished + if e.message == 0: print(Autosubmit.autosubmit_version) os._exit(0) - raise AutosubmitCritical("Incorrect arguments for this command",7011) - + raise AutosubmitCritical( + "Incorrect arguments for this command", 7011) expid = "None" if hasattr(args, 'expid'): expid = args.expid - Autosubmit._init_logs(args.command,args.logconsole,args.logfile,expid) + Autosubmit._init_logs( + args.command, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version) + return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time) elif args.command == 'expid': return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, args.operational, args.config) != '' @@ -570,7 +580,7 @@ class Autosubmit: return False @staticmethod - def _init_logs(command,console_level='INFO',log_level='DEBUG',expid='None'): + def _init_logs(command, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) if expid != 'None': Autosubmit._check_ownership(expid) @@ -584,14 +594,19 @@ class Autosubmit: if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - Log.set_file(os.path.join(aslogs_path, command + '.log'), "out", log_level) - Log.set_file(os.path.join(aslogs_path, command + '_err.log'), "err") - if os.path.exists(os.path.join(aslogs_path,'jobs_status.log')): - os.remove(os.path.join(aslogs_path,'jobs_status.log')) - Log.set_file(os.path.join(aslogs_path, 'jobs_status.log'), "status") + Log.set_file(os.path.join( + aslogs_path, command + '.log'), "out", log_level) + Log.set_file(os.path.join( + aslogs_path, command + '_err.log'), "err") + if os.path.exists(os.path.join(aslogs_path, 'jobs_status.log')): + os.remove(os.path.join(aslogs_path, 'jobs_status.log')) + Log.set_file(os.path.join( + aslogs_path, 'jobs_status.log'), "status") else: - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, command + '.log'), "out", log_level) - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, command + '_err.log'), "err") + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + command + '.log'), "out", log_level) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + command + '_err.log'), "err") @staticmethod def _check_ownership(expid): @@ -600,10 +615,11 @@ class Autosubmit: current_owner_id = pwd.getpwuid(os.stat(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid)).st_uid).pw_name if current_user_id != current_owner_id: - raise AutosubmitCritical("You don't own the experiment {0}.".format(expid),7012) + raise AutosubmitCritical( + "You don't own the experiment {0}.".format(expid), 7012) except BaseException as e: - raise AutosubmitCritical("User or owner does not exists",7012,e.message) - + raise AutosubmitCritical( + "User or owner does not exists", 7012, e.message) @staticmethod def _delete_expid(expid_delete, force): @@ -634,13 +650,16 @@ class Autosubmit: currentOwner_id = 0 currentOwner = "empty" try: - currentOwner_id = os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid - currentOwner = pwd.getpwuid(os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name + currentOwner_id = os.stat(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid + currentOwner = pwd.getpwuid(os.stat(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid_delete)).st_uid).pw_name except: pass finally: - if currentOwner_id <= 0 : - Log.info("Current owner '{0}' of experiment {1} does not exist anymore.", currentOwner, expid_delete) + if currentOwner_id <= 0: + Log.info( + "Current owner '{0}' of experiment {1} does not exist anymore.", currentOwner, expid_delete) # Deletion workflow continues as usual, a disjunction is included for the case when # force is sent, and user is eadmin @@ -664,18 +683,23 @@ class Autosubmit: except: pass except OSError as e: - raise AutosubmitCritical('Can not delete experiment folder: ',7012,e.message) + raise AutosubmitCritical( + 'Can not delete experiment folder: ', 7012, e.message) Log.info("Deleting experiment from database...") ret = delete_experiment(expid_delete) if ret: - Log.result("Experiment {0} deleted".format(expid_delete)) + Log.result( + "Experiment {0} deleted".format(expid_delete)) else: if currentOwner_id == 0: - raise AutosubmitCritical('Detected Eadmin user however, -f flag is not found. {0} can not be deleted!'.format(expid_delete), 7012) + raise AutosubmitCritical( + 'Detected Eadmin user however, -f flag is not found. {0} can not be deleted!'.format(expid_delete), 7012) else: - raise AutosubmitCritical('Current user is not the owner of the experiment. {0} can not be deleted!'.format(expid_delete), 7012) + raise AutosubmitCritical( + 'Current user is not the owner of the experiment. {0} can not be deleted!'.format(expid_delete), 7012) except Exception as e: - raise AutosubmitCritical("Couldn't delete the experiment:",7012,e.message) + raise AutosubmitCritical( + "Couldn't delete the experiment:", 7012, e.message) @staticmethod def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False, root_folder=''): @@ -698,12 +722,14 @@ class Autosubmit: """ exp_id = None if description is None or hpc is None: - raise AutosubmitCritical("Check that the parameters are defined (-d and -H) ",7011) + raise AutosubmitCritical( + "Check that the parameters are defined (-d and -H) ", 7011) if not copy_id: exp_id = new_experiment( description, Autosubmit.autosubmit_version, test, operational) if exp_id == '': - raise AutosubmitCritical("Couldn't create a new experiment",7011) + raise AutosubmitCritical( + "Couldn't create a new experiment", 7011) try: os.mkdir(os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id)) os.mkdir(os.path.join( @@ -741,7 +767,8 @@ class Autosubmit: exp_id, hpc, Autosubmit.autosubmit_version, dummy) except (OSError, IOError) as e: Autosubmit._delete_expid(exp_id) - raise AutosubmitCritical("Couldn't create a new experiment, permissions?", 7012, e.message) + raise AutosubmitCritical( + "Couldn't create a new experiment, permissions?", 7012, e.message) else: try: if root_folder == '' or root_folder is None: @@ -807,7 +834,7 @@ class Autosubmit: if filename in conf_copy_filter_folder: if os.path.isfile(os.path.join(conf_copy_id, filename)): new_filename = filename.split( - ".")[0]+"_"+exp_id+".conf" + ".")[0] + "_" + exp_id + ".conf" content = open(os.path.join( conf_copy_id, filename), 'r').read() # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content @@ -830,13 +857,16 @@ class Autosubmit: autosubmit_git = AutosubmitGit(copy_id[0]) Log.info("checking model version...") if not autosubmit_git.check_commit(autosubmit_config): - raise AutosubmitCritical("Uncommitted changes",7013) + raise AutosubmitCritical( + "Uncommitted changes", 7013) else: - raise AutosubmitCritical("The experiment directory doesn't exist",7012) + raise AutosubmitCritical( + "The experiment directory doesn't exist", 7012) except (OSError, IOError) as e: Autosubmit._delete_expid(exp_id, True) - raise AutosubmitCritical("Can not create experiment", 7012,e.message) + raise AutosubmitCritical( + "Can not create experiment", 7012, e.message) Log.debug("Creating temporal directory...") exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) @@ -846,7 +876,7 @@ class Autosubmit: os.mkdir(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR)) os.chmod(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR), 0o775) Log.debug("Creating temporal remote directory...") - remote_tmp_path = os.path.join(tmp_path, "LOG_"+exp_id) + remote_tmp_path = os.path.join(tmp_path, "LOG_" + exp_id) os.mkdir(remote_tmp_path) os.chmod(remote_tmp_path, 0o755) @@ -897,7 +927,7 @@ class Autosubmit: Log.debug('Enter Autosubmit._delete_expid {0}', expid) return Autosubmit._delete_expid(expid, force) else: - raise AutosubmitCritical("Insufficient permissions",7012) + raise AutosubmitCritical("Insufficient permissions", 7012) else: raise AutosubmitCritical("Experiment does not exist", 7012) @@ -924,13 +954,14 @@ class Autosubmit: # Platform = from DEFAULT.HPCARCH, e.g. marenostrum4 if as_conf.get_platform().lower() not in platforms.keys(): raise AutosubmitCritical("Specified platform in expdef_.conf " + str(as_conf.get_platform( - ).lower()) + " is not a valid platform defined in platforms_.conf.",7014) + ).lower()) + " is not a valid platform defined in platforms_.conf.", 7014) platform = platforms[as_conf.get_platform().lower()] platform.add_parameters(parameters, True) # Attach paramenters to JobList job_list.parameters = parameters + @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): + def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): """ Generates cmd files experiment. @@ -1129,7 +1160,7 @@ class Autosubmit: job_list.update_list(as_conf, False) @staticmethod - def run_experiment(expid, notransitive=False, update_version=False): + def run_experiment(expid, notransitive=False, update_version=False, start_time=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). @@ -1138,18 +1169,18 @@ class Autosubmit: :return: True if run to the end, False otherwise :rtype: bool """ - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) import platform host = platform.node() if BasicConfig.ALLOWED_HOSTS and host not in BasicConfig.ALLOWED_HOSTS: - raise AutosubmitCritical("The current host is not allowed to run Autosubmit",7004) - + raise AutosubmitCritical( + "The current host is not allowed to run Autosubmit", 7004) as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(True) - Log.info("Autosubmit is running with {0}", Autosubmit.autosubmit_version) + Log.info( + "Autosubmit is running with {0}", Autosubmit.autosubmit_version) if update_version: if as_conf.get_version() != Autosubmit.autosubmit_version: Log.info("The {2} experiment {0} version is being updated to {1} for match autosubmit version", @@ -1158,12 +1189,53 @@ class Autosubmit: else: if as_conf.get_version() is not None and as_conf.get_version() != Autosubmit.autosubmit_version: raise AutosubmitCritical("Current experiment uses ({0}) which is not the running Autosubmit version \nPlease, update the experiment version if you wish to continue using AutoSubmit {1}\nYou can achieve this using the command autosubmit updateversion {2} \n" - "Or with the -v parameter: autosubmit run {2} -v ".format(as_conf.get_version(), Autosubmit.autosubmit_version, expid),7067) + "Or with the -v parameter: autosubmit run {2} -v ".format(as_conf.get_version(), Autosubmit.autosubmit_version, expid), 7067) + + # Handling starting time + if start_time: + Log.info("User provided starting time has been detected.") + current_time = time.time() + datetime_now = datetime.datetime.now() + target_date = parsed_time = None + try: + # Trying first parse H:M:S + parsed_time = datetime.datetime.strptime( + start_time, "%H:%M:%S") + target_date = datetime.datetime(datetime_now.year, datetime_now.month, + datetime_now.day, parsed_time.hour, parsed_time.minute, parsed_time.second) + except: + try: + # Trying second parse y-m-d H:M:S + target_date = datetime.datetime.strptime( + start_time, "%Y-%m-%d %H:%M:%S") + except: + target_date = None + Log.critical( + "The string input provided as the starting time of your experiment must have the format 'H:M:S' or 'yyyy-mm-dd H:M:S'. Your input was '{0}'.".format(start_time)) + return + # Must be in the future + if (target_date < datetime.datetime.now()): + Log.critical("You must provide a valid date into the future. Your input was interpreted as '{0}', which is considered past.\nCurrent time {1}.".format( + target_date.strftime("%Y-%m-%d %H:%M:%S"), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) + return + # Starting waiting sequence + Log.info("Your experiment will start execution on {0}\n".format( + target_date.strftime("%Y-%m-%d %H:%M:%S"))) + # Check time every second + while datetime.datetime.now() < target_date: + elapsed_time = target_date - datetime.datetime.now() + sys.stdout.write( + "\r{0} until execution starts".format(elapsed_time)) + sys.stdout.flush() + sleep(1) + # End of handling starting time block + # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): try: - Log.info("Preparing .lock file to avoid multiple instances with same experiment id") + Log.info( + "Preparing .lock file to avoid multiple instances with same experiment id") os.system('clear') signal.signal(signal.SIGINT, signal_handler) @@ -1179,14 +1251,17 @@ class Autosubmit: pkl_dir = os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') try: - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) except BaseException as e: - raise AutosubmitCritical("Corrupted job_list, backup couldn't be restored",7040,e.message) + raise AutosubmitCritical( + "Corrupted job_list, backup couldn't be restored", 7040, e.message) - - Log.debug("Starting from job list restored from {0} files", pkl_dir) + Log.debug( + "Starting from job list restored from {0} files", pkl_dir) Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) # check the job list script creation Log.debug("Checking experiment templates...") platforms_to_test = set() @@ -1201,15 +1276,18 @@ class Autosubmit: try: job_list.check_scripts(as_conf) except Exception as e: - raise AutosubmitCritical("Error while checking job templates",7015,e.message) + raise AutosubmitCritical( + "Error while checking job templates", 7015, e.message) Log.debug("Loading job packages") try: - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"),"job_packages_" + expid) + packages_persistence = JobPackagePersistence(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) except BaseException as e: - raise AutosubmitCritical("Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages",7040,e.message) + raise AutosubmitCritical( + "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", 7040, e.message) if as_conf.get_wrapper_type() != 'none': os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid+".db"), 0644) + expid, "pkl", "job_packages_" + expid + ".db"), 0644) try: packages = packages_persistence.load() except BaseException as e: @@ -1232,28 +1310,33 @@ class Autosubmit: save = job_list.update_list(as_conf) job_list.save() - Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) + Log.info( + "Autosubmit is running with v{0}", Autosubmit.autosubmit_version) # Before starting main loop, setup historical database tables and main information Log.debug("Running job data structure") try: job_data_structure = JobDataStructure(expid) - job_data_structure.validate_current_run(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) + job_data_structure.validate_current_run(job_list.get_job_list( + ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) ExperimentStatus(expid).update_running_status() except Exception as e: - raise AutosubmitCritical("Error while processing job_data_structure", 7067, e.message) + raise AutosubmitCritical( + "Error while processing job_data_structure", 7067, e.message) except AutosubmitCritical as e: raise AutosubmitCritical(e.message, 7067, e.trace) except Exception as e: - raise AutosubmitCritical("Error in run initialization", 7067, e.message) + raise AutosubmitCritical( + "Error in run initialization", 7067, e.message) ######################### # AUTOSUBMIT - MAIN LOOP ######################### # Main loop. Finishing when all jobs have been submitted - main_loop_retrials = 120 # Hard limit of tries 120 tries at 1min sleep each try - Autosubmit.restore_platforms(platforms_to_test) # establish the connection to all platforms + main_loop_retrials = 120 # Hard limit of tries 120 tries at 1min sleep each try + # establish the connection to all platforms + Autosubmit.restore_platforms(platforms_to_test) save = True Log.debug("Running main loop") while job_list.get_active(): @@ -1263,16 +1346,19 @@ class Autosubmit: # reload parameters changes Log.debug("Reloading parameters...") as_conf.reload() - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) total_jobs = len(job_list.get_job_list()) - Log.info("\n\n{0} of {1} jobs remaining ({2})".format(total_jobs - len(job_list.get_completed()),total_jobs,time.strftime("%H:%M"))) + Log.info("\n\n{0} of {1} jobs remaining ({2})".format( + total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) safetysleeptime = as_conf.get_safetysleeptime() default_retrials = as_conf.get_retrials() check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Number of retrials: {0}", default_retrials) - Log.debug('WRAPPER CHECK TIME = {0}'.format(check_wrapper_jobs_sleeptime)) - if save: # previous iteration + Log.debug('WRAPPER CHECK TIME = {0}'.format( + check_wrapper_jobs_sleeptime)) + if save: # previous iteration job_list.backup_save() save = False slurm = [] @@ -1289,7 +1375,8 @@ class Autosubmit: Log.debug( 'Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] - if as_conf.get_notifications() == 'true': # Setting prev_status as an easy way to check status change for inner jobs + # Setting prev_status as an easy way to check status change for inner jobs + if as_conf.get_notifications() == 'true': for inner_job in wrapper_job.job_list: inner_job.prev_status = inner_job.status check_wrapper = True @@ -1302,12 +1389,15 @@ class Autosubmit: platform.check_job(wrapper_job) try: if wrapper_job.status != wrapper_job.new_status: - Log.info('Wrapper job ' + wrapper_job.name + ' changed from ' + str(Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) + Log.info('Wrapper job ' + wrapper_job.name + ' changed from ' + str( + Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) except: - raise AutosubmitCritical("Wrapper is in Unknown Status couldn't get wrapper parameters",7050) + raise AutosubmitCritical( + "Wrapper is in Unknown Status couldn't get wrapper parameters", 7050) # New status will be saved and inner_jobs will be checked. - wrapper_job.check_status(wrapper_job.new_status) + wrapper_job.check_status( + wrapper_job.new_status) # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) if wrapper_job.status == Status.WAITING: for inner_job in wrapper_job.job_list: @@ -1328,8 +1418,10 @@ class Autosubmit: Status.VALUE_TO_KEY[inner_job.status], as_conf.get_mails_to()) # Detect and store changes - job_changes_tracker = {job.name: (job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} - job_data_structure.process_status_changes(job_changes_tracker) + job_changes_tracker = {job.name: ( + job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} + job_data_structure.process_status_changes( + job_changes_tracker) job_changes_tracker = {} else: # Prepare jobs, if slurm check all active jobs at once. job = job[0] @@ -1387,13 +1479,15 @@ class Autosubmit: if save or save2: job_list.save() if len(job_list.get_ready()) > 0: - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=False) if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=True) + Autosubmit.submit_ready_jobs( + as_conf, job_list, platforms_to_test, packages_persistence, hold=True) save = job_list.update_list(as_conf) if save: job_list.save() - # Safe spot to store changes + # Safe spot to store changes job_data_structure.process_status_changes( job_changes_tracker) job_changes_tracker = {} @@ -1401,19 +1495,21 @@ class Autosubmit: if Autosubmit.exit: job_list.save() time.sleep(safetysleeptime) - except AutosubmitError as e: #If an error is detected, restore all connections and job_list + except AutosubmitError as e: # If an error is detected, restore all connections and job_list Log.error("Trace: {0}", e.trace) Log.error("{1} [eCode={0}]", e.code, e.message) Log.info("Waiting 30 seconds before continue") sleep(30) - #Save job_list if not is a failed submitted job + # Save job_list if not is a failed submitted job recovery = True try: - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list( + expid, as_conf, notransitive=notransitive) except BaseException as e: raise AutosubmitCritical("Corrupted job_list, backup couldn't be restored", 7040, e.message) - if main_loop_retrials > 0: # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. + # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. + if main_loop_retrials > 0: main_loop_retrials = main_loop_retrials - 1 try: Autosubmit.restore_platforms(platforms_to_test) @@ -1427,21 +1523,23 @@ class Autosubmit: # noinspection PyTypeChecker platforms_to_test.add(job.platform) except BaseException: - raise AutosubmitCritical("Autosubmit couldn't recover the platforms",7050, e.message) + raise AutosubmitCritical( + "Autosubmit couldn't recover the platforms", 7050, e.message) else: - raise AutosubmitCritical("Autosubmit Encounter too much errors during running time",7051,e.message) - except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error + raise AutosubmitCritical( + "Autosubmit Encounter too much errors during running time", 7051, e.message) + except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error raise AutosubmitCritical(e.message, e.code, e.trace) except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) - except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught + except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught raise - #############################################################################3 + # 3 Log.result("No more jobs to run.") - # Updating job data header with finish time + # Updating job data header with current information job_data_structure.validate_current_run( - job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) + job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=False, only_update=True) # Wait for all remaining threads of I/O, close remaining connections timeout = 0 @@ -1456,7 +1554,7 @@ class Autosubmit: if "Thread-" in thread.name: if thread.isAlive(): active_threads = True - threads_active = threads_active+1 + threads_active = threads_active + 1 sleep(10) if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") @@ -1466,7 +1564,7 @@ class Autosubmit: job_data_structure.update_finish_time() except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message,7000) + raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: raise AutosubmitCritical(e.message, e.code, e.trace) except BaseException as e: @@ -1477,7 +1575,9 @@ class Autosubmit: Log.result("Checking the connection to all platforms in use") for platform in platform_to_test: platform.test_connection() - Log.result("[{1}] Connection successfull to host {0}",platform.host,platform.name) + Log.result("[{1}] Connection successfull to host {0}", + platform.host, platform.name) + @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, only_wrappers=False, hold=False): @@ -1535,7 +1635,8 @@ class Autosubmit: # If called from RUN or inspect command if not only_wrappers: try: - package.submit( as_conf, job_list.parameters, inspect, hold=hold) + package.submit( + as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) except (IOError, OSError): continue @@ -1555,9 +1656,10 @@ class Autosubmit: packages_persistence.save( package.name, package.jobs, package._expid, inspect) except WrongTemplateException as e: - raise AutosubmitCritical("Invalid parameter substitution in {0} template".format(e.job_name),7014) + raise AutosubmitCritical( + "Invalid parameter substitution in {0} template".format(e.job_name), 7014) except AutosubmitCritical as e: - raise AutosubmitCritical(e.message,e.code,e.trace) + raise AutosubmitCritical(e.message, e.code, e.trace) except AutosubmitError as e: raise except Exception as e: @@ -1593,13 +1695,15 @@ class Autosubmit: i += 1 save = True except WrongTemplateException as e: - raise AutosubmitCritical("Invalid parameter substitution in {0} template".format(e.job_name),7014,e.message) + raise AutosubmitCritical("Invalid parameter substitution in {0} template".format( + e.job_name), 7014, e.message) except AutosubmitError as e: raise except AutosubmitCritical as e: raise except Exception as e: - raise AutosubmitError("{0} submission failed".format(platform.name),6015,e.message) + raise AutosubmitError("{0} submission failed".format( + platform.name), 6015, e.message) return save @@ -1760,7 +1864,7 @@ class Autosubmit: if txt_only or txt_logfiles: monitor_exp.generate_output_txt(expid, jobs, os.path.join( - exp_path, "/tmp/LOG_"+expid), txt_logfiles, job_list_object=job_list) + exp_path, "/tmp/LOG_" + expid), txt_logfiles, job_list_object=job_list) else: # if file_format is set, use file_format, otherwise use conf value monitor_exp.generate_output(expid, @@ -1805,7 +1909,6 @@ class Autosubmit: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) - pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) @@ -1841,7 +1944,8 @@ class Autosubmit: expid, job_list, file_format, period_ini, period_fi, not hide) Log.result("Stats plot ready") except Exception as e: - raise AutosubmitCritical("Stats couldn't be shown",7061,e.message) + raise AutosubmitCritical( + "Stats couldn't be shown", 7061, e.message) else: Log.info("There are no {0} jobs in the period from {1} to {2}...".format( ft, period_ini, period_fi)) @@ -1934,7 +2038,6 @@ class Autosubmit: platforms_to_test = set() - for job in job_list.get_job_list(): job.submitter = submitter if job.platform_name is None: @@ -1943,7 +2046,8 @@ class Autosubmit: job.platform = platforms[job.platform_name.lower()] # noinspection PyTypeChecker platforms_to_test.add(platforms[job.platform_name.lower()]) - Autosubmit.restore_platforms(platforms_to_test) # establish the connection to all platforms + # establish the connection to all platforms + Autosubmit.restore_platforms(platforms_to_test) if all_jobs: jobs_to_recover = job_list.get_job_list() @@ -1960,8 +2064,10 @@ class Autosubmit: if job.platform.get_completed_files(job.name, 0, True): job.status = Status.COMPLETED - Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) - Log.status("CHANGED job '{0}' status to COMPLETED".format(job.name)) + Log.info( + "CHANGED job '{0}' status to COMPLETED".format(job.name)) + Log.status( + "CHANGED job '{0}' status to COMPLETED".format(job.name)) if not no_recover_logs: try: @@ -1974,7 +2080,6 @@ class Autosubmit: #Log.info("CHANGED job '{0}' status to WAITING".format(job.name)) #Log.status("CHANGED job '{0}' status to WAITING".format(job.name)) - end = datetime.datetime.now() Log.info("Time spent: '{0}'".format(end - start)) Log.info("Updating the jobs list") @@ -2058,7 +2163,7 @@ class Autosubmit: "Checking [{0}] from platforms configuration...", platform) if not as_conf.get_migrate_user_to(platform): Log.printlog( - "Missing directive USER_TO in [{0}]".format( platform),7014) + "Missing directive USER_TO in [{0}]".format(platform), 7014) error = True break if as_conf.get_migrate_project_to(platform): @@ -2131,7 +2236,7 @@ class Autosubmit: "The platform {0} does not contain absolute symlinks", platform) except BaseException: Log.printlog( - "Absolute symlinks failed to convert, check user in platform.conf",3000) + "Absolute symlinks failed to convert, check user in platform.conf", 3000) error = True break @@ -2142,12 +2247,12 @@ class Autosubmit: if not p.move_file(p.root_dir, os.path.join(p.temp_dir, experiment_id), True): Log.printlog( "The files/dirs on {0} cannot be moved to {1}.".format(p.root_dir, - os.path.join(p.temp_dir, experiment_id), 6012)) + os.path.join(p.temp_dir, experiment_id), 6012)) error = True break except (IOError, BaseException) as e: Log.printlog("The files/dirs on {0} cannot be moved to {1}.".format(p.root_dir, - os.path.join(p.temp_dir, experiment_id)),6012) + os.path.join(p.temp_dir, experiment_id)), 6012) error = True break @@ -2158,7 +2263,7 @@ class Autosubmit: if error: Log.printlog( - "The experiment cannot be offered, reverting changes",7012) + "The experiment cannot be offered, reverting changes", 7012) as_conf = AutosubmitConfig( experiment_id, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) @@ -2196,7 +2301,8 @@ class Autosubmit: Log.info('Migrating experiment {0}'.format(experiment_id)) Log.info("Moving local files/dirs") if not Autosubmit.unarchive(experiment_id, False): - raise AutosubmitCritical("The experiment cannot be picked up",7012) + raise AutosubmitCritical( + "The experiment cannot be picked up", 7012) Log.info("Local files/dirs have been successfully picked up") as_conf = AutosubmitConfig( experiment_id, BasicConfig, ConfigParserFactory()) @@ -2221,12 +2327,13 @@ class Autosubmit: try: p.send_command( "cp -rP " + os.path.join(p.temp_dir, experiment_id) + " " + p.root_dir) - p.send_command("chmod 755 -R "+p.root_dir) + p.send_command("chmod 755 -R " + p.root_dir) Log.result( "Files/dirs on {0} have been successfully picked up", platform) except (IOError, BaseException): error = True - Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format(os.path.join(p.temp_dir, experiment_id), p.root_dir),6012) + Log.printlog("The files/dirs on {0} cannot be copied to {1}.".format( + os.path.join(p.temp_dir, experiment_id), p.root_dir), 6012) break backup_files.append(platform) else: @@ -2235,7 +2342,7 @@ class Autosubmit: if error: Autosubmit.archive(experiment_id, False, False) Log.printlog( - "The experiment cannot be picked,reverting changes.",7012) + "The experiment cannot be picked,reverting changes.", 7012) for platform in backup_files: p = submitter.platforms[platform] p.send_command("rm -R " + p.root_dir) @@ -2243,7 +2350,7 @@ class Autosubmit: else: for platform in backup_files: p = submitter.platforms[platform] - p.send_command("rm -R " + p.temp_dir+"/"+experiment_id) + p.send_command("rm -R " + p.temp_dir + "/" + experiment_id) Log.result("The experiment has been successfully picked up.") #Log.info("Refreshing the experiment.") # Autosubmit.refresh(experiment_id,False,False) @@ -2263,7 +2370,6 @@ class Autosubmit: experiment_id, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) - project_type = as_conf.get_project_type() submitter = Autosubmit._get_submitter(as_conf) @@ -2430,7 +2536,8 @@ class Autosubmit: config_file.close() Log.result("Configuration file written successfully") except (IOError, OSError) as e: - raise AutosubmitCritical("Can not write config file: {0}",7012,e.message) + raise AutosubmitCritical( + "Can not write config file: {0}", 7012, e.message) return True @staticmethod @@ -2450,9 +2557,11 @@ class Autosubmit: d = dialog.Dialog( dialog="dialog", autowidgetsize=True, screen_color='GREEN') except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size",7060) + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) except Exception: - raise AutosubmitCritical("Dialog libs aren't found in your Operational system",7060) + raise AutosubmitCritical( + "Dialog libs aren't found in your Operational system", 7060) d.set_background_title("Autosubmit configure utility") if os.geteuid() == 0: @@ -2474,7 +2583,8 @@ class Autosubmit: os.system('clear') return False except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size",7060) + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) filename = '.autosubmitrc' if level == 'All': @@ -2511,16 +2621,19 @@ class Autosubmit: jobs_conf_path = parser.get('conf', 'jobs') except (IOError, OSError) as e: - raise AutosubmitCritical("Can not read config file",7014,e.message) + raise AutosubmitCritical( + "Can not read config file", 7014, e.message) while True: try: code, database_path = d.dselect(database_path, width=80, height=20, title='\Zb\Z1Select path to database\Zn', colors='enable') except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size", 7060) + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) if Autosubmit._requested_exit(code, d): - raise AutosubmitCritical("Graphical visualization failed, requested exit", 7060) + raise AutosubmitCritical( + "Graphical visualization failed, requested exit", 7060) elif code == dialog.Dialog.OK: database_path = database_path.replace('~', home_path) if not os.path.exists(database_path): @@ -2535,11 +2648,12 @@ class Autosubmit: title='\Zb\Z1Select path to experiments repository\Zn', colors='enable') except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size",7060) - + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) if Autosubmit._requested_exit(code, d): - raise AutosubmitCritical("Graphical visualization failed,requested exit",7060) + raise AutosubmitCritical( + "Graphical visualization failed,requested exit", 7060) elif code == dialog.Dialog.OK: database_path = database_path.replace('~', home_path) if not os.path.exists(database_path): @@ -2560,10 +2674,12 @@ class Autosubmit: form_height=10, title='\Zb\Z1Just a few more options:\Zn', colors='enable') except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size",7060) + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) if Autosubmit._requested_exit(code, d): - raise AutosubmitCritical("Graphical visualization failed, _requested_exit", 7060) + raise AutosubmitCritical( + "Graphical visualization failed, _requested_exit", 7060) elif code == dialog.Dialog.OK: database_filename = tag[0] platforms_conf_path = tag[1] @@ -2594,10 +2710,12 @@ class Autosubmit: form_height=10, title='\Zb\Z1Mail notifications configuration:\Zn', colors='enable') except dialog.DialogError: - raise AutosubmitCritical("Graphical visualization failed, not enough screen size", 7060) + raise AutosubmitCritical( + "Graphical visualization failed, not enough screen size", 7060) if Autosubmit._requested_exit(code, d): - raise AutosubmitCritical("Graphical visualization failed, requested exit", 7060) + raise AutosubmitCritical( + "Graphical visualization failed, requested exit", 7060) elif code == dialog.Dialog.OK: smtp_hostname = tag[0] mail_from = tag[1] @@ -2629,7 +2747,8 @@ class Autosubmit: width=50, height=5) os.system('clear') except (IOError, OSError) as e: - raise AutosubmitCritical("Can not write config file", 7012,e.message) + raise AutosubmitCritical( + "Can not write config file", 7012, e.message) return True @staticmethod @@ -2721,7 +2840,8 @@ class Autosubmit: # Cleaning to reduce file size. version = get_autosubmit_version(expid) if version is not None and version.startswith('3') and not Autosubmit.clean(expid, True, True, True, False): - raise AutosubmitCritical("Can not archive project. Clean not successful", 7012) + raise AutosubmitCritical( + "Can not archive project. Clean not successful", 7012) # Getting year of last completed. If not, year of expid folder year = None @@ -2756,8 +2876,7 @@ class Autosubmit: tar.close() os.chmod(os.path.join(year_path, output_filepath), 0o755) except Exception as e: - raise AutosubmitCritical("Can not write tar file", 7012,e.message) - + raise AutosubmitCritical("Can not write tar file", 7012, e.message) Log.info("Tar file created!") @@ -2770,14 +2889,15 @@ class Autosubmit: try: tmp_folder = os.path.join( BasicConfig.LOCAL_ROOT_DIR, "tmp") - tmp_expid = os.path.join(tmp_folder, expid+"_to_delete") + tmp_expid = os.path.join(tmp_folder, expid + "_to_delete") os.rename(exp_folder, tmp_expid) Log.warning("Experiment folder renamed to: {0}".format( - exp_folder+"_to_delete ")) + exp_folder + "_to_delete ")) except Exception as e: Autosubmit.unarchive(expid, compress, True) - raise AutosubmitCritical("Can not remove or rename experiments folder",7012,e.message) + raise AutosubmitCritical( + "Can not remove or rename experiments folder", 7012, e.message) Log.result("Experiment archived successfully") return True @@ -2825,7 +2945,7 @@ class Autosubmit: tar.close() except Exception as e: shutil.rmtree(exp_folder, ignore_errors=True) - Log.printlog("Can not extract tar file: {0}".format(e),6012) + Log.printlog("Can not extract tar file: {0}".format(e), 6012) return False Log.info("Unpacking finished") @@ -2833,7 +2953,8 @@ class Autosubmit: try: os.remove(archive_path) except Exception as e: - Log.printlog("Can not remove archived file folder: {0}".format(e),7012) + Log.printlog( + "Can not remove archived file folder: {0}".format(e), 7012) return False Log.result("Experiment {0} unarchived successfully", experiment_id) @@ -2896,9 +3017,11 @@ class Autosubmit: # Encapsulating the lock with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1) as fh: try: - Log.info("Preparing .lock file to avoid multiple instances with same expid.") + Log.info( + "Preparing .lock file to avoid multiple instances with same expid.") - as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig( + expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) project_type = as_conf.get_project_type() # Getting output type provided by the user in config, 'pdf' as default @@ -2911,19 +3034,20 @@ class Autosubmit: Autosubmit._create_project_associated_conf( as_conf, False, update_job) - # Load parameters Log.info("Loading parameters...") parameters = as_conf.load_parameters() date_list = as_conf.get_date_list() if len(date_list) != len(set(date_list)): - raise AutosubmitCritical('There are repeated start dates!',7014) + raise AutosubmitCritical( + 'There are repeated start dates!', 7014) num_chunks = as_conf.get_num_chunks() chunk_ini = as_conf.get_chunk_ini() member_list = as_conf.get_member_list() if len(member_list) != len(set(member_list)): - raise AutosubmitCritical("There are repeated member names!") + raise AutosubmitCritical( + "There are repeated member names!") rerun = as_conf.get_rerun() Log.info("\nCreating the jobs list...") @@ -2956,7 +3080,8 @@ class Autosubmit: groups_dict = dict() # Setting up job historical database header. Must create a new run. - JobDataStructure(expid).validate_current_run(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True) + JobDataStructure(expid).validate_current_run(job_list.get_job_list( + ), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), must_create=True) if not noplot: if group_by: @@ -3032,9 +3157,9 @@ class Autosubmit: raise AutosubmitCritical("Stopped by user input", 7010) except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message,7000) + raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: - raise AutosubmitCritical(e.message,e.code) + raise AutosubmitCritical(e.message, e.code) @staticmethod def _copy_code(as_conf, expid, project_type, force): @@ -3058,7 +3183,7 @@ class Autosubmit: try: hpcarch = submitter.platforms[as_conf.get_platform()] except: - raise AutosubmitCritical("Can't set main platform",7014) + raise AutosubmitCritical("Can't set main platform", 7014) return AutosubmitGit.clone_repository(as_conf, force, hpcarch) elif project_type == "svn": svn_project_url = as_conf.get_svn_project_url() @@ -3084,7 +3209,7 @@ class Autosubmit: shutil.rmtree(project_path, ignore_errors=True) raise AutosubmitCritical("Can not check out revision {0} into {1}".format(svn_project_revision + " " + svn_project_url, - project_path),7062) + project_path), 7062) Log.debug("{0}", output) elif project_type == "local": @@ -3099,7 +3224,7 @@ class Autosubmit: if force: try: cmd = ["rsync -ach --info=progress2 " + - local_project_path+"/* "+local_destination] + local_project_path + "/* " + local_destination] subprocess.call(cmd, shell=True) except subprocess.CalledProcessError: raise AutosubmitCritical("Can not rsync {0} into {1}. Exiting...".format( @@ -3126,7 +3251,7 @@ class Autosubmit: except subprocess.CalledProcessError: shutil.rmtree(project_path) raise AutosubmitCritical( - "Can not copy {0} into {1}. Exiting...".format( local_project_path, project_path), 7063) + "Can not copy {0} into {1}. Exiting...".format(local_project_path, project_path), 7063) Log.debug("{0}", output) return True @@ -3242,7 +3367,8 @@ class Autosubmit: ".\n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ "\n\tRemember that this option expects section names separated by a blank space as input." - raise AutosubmitCritical("Error in the supplied input for -ft.",7011,section_validation_message) + raise AutosubmitCritical( + "Error in the supplied input for -ft.", 7011, section_validation_message) job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) submitter = Autosubmit._get_submitter(as_conf) @@ -3264,7 +3390,8 @@ class Autosubmit: job.platform = platforms[job.platform_name.lower()] # noinspection PyTypeChecker platforms_to_test.add(platforms[job.platform_name.lower()]) - Autosubmit.restore_platforms(platforms_to_test) # establish the connection to all platforms + # establish the connection to all platforms + Autosubmit.restore_platforms(platforms_to_test) # Validating list of jobs, if filter_list -fl has been set: # Seems that Autosubmit.load_job_list call is necessary before verification is executed @@ -3299,7 +3426,8 @@ class Autosubmit: job_validation_message += "\n\tSpecified job(s) : [" + str(job_not_foundList) + "] not found in the experiment " + \ str(expid) + ". \n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ "\n\tRemember that this option expects job names separated by a blank space as input." - raise AutosubmitCritical("Error in the supplied input for -ft.",7011,section_validation_message) + raise AutosubmitCritical( + "Error in the supplied input for -ft.", 7011, section_validation_message) # Validating fc if filter_chunks -fc has been set: if filter_chunks is not None: @@ -3363,7 +3491,8 @@ class Autosubmit: # Ending validation if fc_filter_is_correct == False: - raise AutosubmitCritical("Error in the supplied input for -fc.",7011,section_validation_message) + raise AutosubmitCritical( + "Error in the supplied input for -fc.", 7011, section_validation_message) # Validating status, if filter_status -fs has been set: # At this point we already have job_list from where we are getting the allows STATUS if filter_status is not None: @@ -3395,7 +3524,8 @@ class Autosubmit: status_validation_message += "\n\t There are no jobs with status " + \ status + " in this experiment." if status_validation_error == True: - raise AutosubmitCritical("Error in the supplied input for -fs.{0}".format(status_validation_message),7011,section_validation_message) + raise AutosubmitCritical("Error in the supplied input for -fs.{0}".format( + status_validation_message), 7011, section_validation_message) jobs_filtered = [] final_status = Autosubmit._get_status(final) @@ -3487,7 +3617,8 @@ class Autosubmit: # Ending validation if filter_is_correct == False: - raise AutosubmitCritical("Error in the supplied input for -ftc.", 7011, section_validation_message) + raise AutosubmitCritical( + "Error in the supplied input for -ftc.", 7011, section_validation_message) # If input is valid, continue. record = dict() @@ -3587,7 +3718,8 @@ class Autosubmit: Log.warning( "-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str(current_length) + " jobs.") else: - Log.info(job_list.print_with_status(statusChange = performed_changes)) + Log.info(job_list.print_with_status( + statusChange=performed_changes)) else: Log.warning("No changes were performed.") # End of New Feature @@ -3663,7 +3795,7 @@ class Autosubmit: expidJoblist[str(x[0:4])] += 1 if str(expid) in expidJoblist: - wrongExpid = jobs.__len__()-expidJoblist[expid] + wrongExpid = jobs.__len__() - expidJoblist[expid] if wrongExpid > 0: Log.warning( "There are {0} job.name with an invalid Expid", wrongExpid) @@ -3690,13 +3822,13 @@ class Autosubmit: job_tracked_changes, job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size()) else: Log.printlog( - "Changes NOT saved to the JobList!!!!: use -s option to save",3000) + "Changes NOT saved to the JobList!!!!: use -s option to save", 3000) if as_conf.get_wrapper_type() != 'none' and check_wrapper: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid+".db"), 0775) + expid, "pkl", "job_packages_" + expid + ".db"), 0775) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) @@ -3749,32 +3881,31 @@ class Autosubmit: Log.warning("-d option only works with -ftc.") return True - except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" - raise AutosubmitCritical(message,7000) + raise AutosubmitCritical(message, 7000) @staticmethod def _user_yes_no_query(question): - """ - Utility function to ask user a yes/no question + """ + Utility function to ask user a yes/no question - :param question: question to ask - :type question: str - :return: True if answer is yes, False if it is no - :rtype: bool - """ - sys.stdout.write('{0} [y/n]\n'.format(question)) - while True: - try: - if sys.version_info[0] == 3: - answer = raw_input() - else: - # noinspection PyCompatibility - answer = raw_input() - return strtobool(answer.lower()) - except ValueError: - sys.stdout.write('Please respond with \'y\' or \'n\'.\n') + :param question: question to ask + :type question: str + :return: True if answer is yes, False if it is no + :rtype: bool + """ + sys.stdout.write('{0} [y/n]\n'.format(question)) + while True: + try: + if sys.version_info[0] == 3: + answer = raw_input() + else: + # noinspection PyCompatibility + answer = raw_input() + return strtobool(answer.lower()) + except ValueError: + sys.stdout.write('Please respond with \'y\' or \'n\'.\n') @staticmethod def _prepare_conf_files(exp_id, hpc, autosubmit_version, dummy): @@ -3900,7 +4031,8 @@ class Autosubmit: if communications_library == 'paramiko': return ParamikoSubmitter() else: - return ParamikoSubmitter()# only paramiko is avaliable right now so.. + # only paramiko is avaliable right now so.. + return ParamikoSubmitter() @staticmethod def _get_job_list_persistence(expid, as_conf): @@ -3916,7 +4048,7 @@ class Autosubmit: elif storage_type == 'db': return JobListPersistenceDb(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_list_" + expid) - raise AutosubmitCritical('Storage type not known',7014) + raise AutosubmitCritical('Storage type not known', 7014) @staticmethod def _create_json(text): @@ -4024,7 +4156,7 @@ class Autosubmit: exp_parser = as_conf.get_parser( ConfigParserFactory(), as_conf.experiment_file) if exp_parser.get_bool_option('rerun', "RERUN", True): - raise AutosubmitCritical('Can not test a RERUN experiment',7014) + raise AutosubmitCritical('Can not test a RERUN experiment', 7014) content = open(as_conf.experiment_file).read() if random_select: @@ -4036,7 +4168,8 @@ class Autosubmit: if platforms_parser.get_option(section, 'TEST_SUITE', 'false').lower() == 'true': test_platforms.append(section) if len(test_platforms) == 0: - raise AutosubmitCritical("Missing hpcarch setting in expdef",7014) + raise AutosubmitCritical( + "Missing hpcarch setting in expdef", 7014) hpc = random.choice(test_platforms) if member is None: @@ -4135,7 +4268,7 @@ class Autosubmit: try: submitter.load_platforms(as_conf) if submitter.platforms is None: - raise AutosubmitCritical("platforms couldn't be loaded",7014) + raise AutosubmitCritical("platforms couldn't be loaded", 7014) except: raise AutosubmitCritical("platforms couldn't be loaded", 7014) platforms = submitter.platforms @@ -4164,7 +4297,8 @@ class Autosubmit: if job.platform.get_completed_files(job.name, 0): job.status = Status.COMPLETED - Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) + Log.info( + "CHANGED job '{0}' status to COMPLETED".format(job.name)) job.platform.get_logs_files(expid, job.remote_logs) return job_list diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index bc8db17cba277686e8abbfaeef97509c2feefbf0..9b8b4c20ab5f70ac601a5f6b5381012401ee0792 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -34,7 +34,7 @@ from autosubmit.config.basicConfig import BasicConfig from autosubmit.job.job_common import Status from autosubmit.job.job_package_persistence import JobPackagePersistence from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, subs_dates -from log.log import Log,AutosubmitCritical,AutosubmitError +from log.log import Log, AutosubmitCritical, AutosubmitError CURRENT_DB_VERSION = 12 # Used to be 10 # Defining RowType standard @@ -394,7 +394,9 @@ class ExperimentStatus(MainDataBase): try: if self.conn_ec: cur = self.conn_ec.cursor() - cur.execute("SELECT id FROM experiment WHERE name=?",( self.expid) ) # TODO verify changes (self.expid,) -> (self.expid) + # TODO verify changes (self.expid,) -> (self.expid) + cur.execute( + "SELECT id FROM experiment WHERE name=?", (self.expid)) row = cur.fetchone() return int(row[0]) return None @@ -603,9 +605,11 @@ class JobDataStructure(MainDataBase): CURRENT_DB_VERSION)) self.current_run_id = self.get_current_run_id() except IOError as e: - raise AutosubmitCritical("Historic Database route {0} is not accesible".format(BasicConfig.JOBDATA_DIR),7067,e.message) + raise AutosubmitCritical("Historic Database route {0} is not accesible".format( + BasicConfig.JOBDATA_DIR), 7067, e.message) except Exception as e: - raise AutosubmitCritical("Historic Database {0} due an database error".format(),7067,e.message) + raise AutosubmitCritical( + "Historic Database {0} due an database error".format(), 7067, e.message) def determine_rowtype(self, code): """ @@ -650,7 +654,7 @@ class JobDataStructure(MainDataBase): else: raise Exception("Empty header database") - def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False): + def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False): """[summary] :param job_list ([type]): [description] @@ -682,7 +686,7 @@ class JobDataStructure(MainDataBase): current_total, failed_count, queue_count, running_count, submit_count) self.current_run_id = self._insert_experiment_run(new_run) else: - if current_run.total != current_total: + if current_run.total != current_total and only_update == False: new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count, current_total, failed_count, queue_count, running_count, submit_count) self.current_run_id = self._insert_experiment_run(new_run) @@ -692,6 +696,7 @@ class JobDataStructure(MainDataBase): current_run.queuing = queue_count current_run.submitted = submit_count current_run.running = running_count + current_run.total = current_total if only_update == True else current_run.total current_run.finish = 0 self._update_experiment_run(current_run) self.current_run_id = current_run.run_id diff --git a/docs/source/usage/run.rst b/docs/source/usage/run.rst index 24d9bd9e1ff8de0391a94c5b818166fb008d384e..47ec68a6796e23091dba3825feb0048db248917c 100644 --- a/docs/source/usage/run.rst +++ b/docs/source/usage/run.rst @@ -17,6 +17,8 @@ Options: prevents doing the transitive reduction when plotting the workflow -v --update_version update the experiment version to match the actual autosubmit version + -s --start_time + sets the starting time for the experiment. Accepted format: 'yyyy-mm-dd HH:MM:SS' or 'HH:MM:SS' (defaults to current day) -h, --help show this help message and exit Example: