diff --git a/VERSION b/VERSION index 95ecedbba34c550e4bbe7a8c9a32e9cc489ab427..654cee2e4e8375b1ed23aef936f9701b8b238932 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.15.14 +3.15.15 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 5dba3bb14200b02563c1a9a8ce8a3a89f0ed37bd..e5f07216a9c915c28243f09b3288ba9401a09f82 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -17,9 +17,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . from __future__ import print_function + +import requests import threading import traceback -import requests + try: # noinspection PyCompatibility from configparser import SafeConfigParser @@ -57,11 +59,10 @@ import locale from distutils.util import strtobool from log.log import Log, AutosubmitError, AutosubmitCritical from typing import Set -import sqlite3 -#try: +# try: # import dialog -#except Exception: +# except Exception: # dialog = None dialog = None from time import sleep @@ -72,7 +73,6 @@ import tarfile import time import copy import os -import glob import pwd import sys import shutil @@ -92,13 +92,14 @@ import history.utils as HUtils import helpers.autosubmit_helper as AutosubmitHelper import helpers.utils as HelperUtils import statistics.utils as StatisticsUtils + """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit """ - sys.path.insert(0, os.path.abspath('.')) + # noinspection PyUnusedLocal @@ -123,11 +124,13 @@ def signal_handler_create(signal_received, frame): raise AutosubmitCritical( 'Autosubmit has been closed in an unexpected way. Killed or control + c.', 7010) + class MyParser(argparse.ArgumentParser): - def error(self, message): - sys.stderr.write('error: %s\n' % message) - self.print_help() - sys.exit(2) + def error(self, message): + sys.stderr.write('error: %s\n' % message) + self.print_help() + sys.exit(2) + class Autosubmit: """ @@ -220,7 +223,7 @@ class Autosubmit: help='Groups the jobs automatically or by date, member, chunk or split') subparser.add_argument('-expand', type=str, help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') subparser.add_argument( '-expand_status', type=str, help='Select the stat uses to be expanded') subparser.add_argument('--hide_groups', action='store_true', @@ -388,7 +391,8 @@ class Autosubmit: subparser.add_argument( '-fp', '--folder_path', type=str, help='Allows to select a non-default folder.') subparser.add_argument( - '-p', '--placeholders', default=False, action='store_true', help='disables the sustitution of placeholders by -') + '-p', '--placeholders', default=False, action='store_true', + help='disables the sustitution of placeholders by -') subparser.add_argument('-v', '--update_version', action='store_true', default=False, help='Update experiment version') # Create @@ -602,7 +606,8 @@ class Autosubmit: Autosubmit._init_logs(args, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, args.start_after, args.run_members) + return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, + args.start_after, args.run_members) elif args.command == 'expid': return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, args.operational, args.config) != '' @@ -611,7 +616,8 @@ class Autosubmit: elif args.command == 'monitor': return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.hide, args.text, args.group_by, args.expand, - args.expand_status, args.hide_groups, args.notransitive, args.check_wrapper, args.txt_logfiles, detail=False) + args.expand_status, args.hide_groups, args.notransitive, args.check_wrapper, + args.txt_logfiles, detail=False) elif args.command == 'stats': return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, args.notransitive) @@ -619,14 +625,16 @@ class Autosubmit: return Autosubmit.clean(args.expid, args.project, args.plot, args.stats) elif args.command == 'recovery': return Autosubmit.recovery(args.expid, args.noplot, args.save, args.all, args.hide, args.group_by, - args.expand, args.expand_status, args.notransitive, args.no_recover_logs, args.detail, args.force) + args.expand, args.expand_status, args.notransitive, args.no_recover_logs, + args.detail, args.force) elif args.command == 'check': return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.notransitive, args.force, args.check_wrapper) elif args.command == 'report': - return Autosubmit.report(args.expid, args.template, args.show_all_parameters, args.folder_path, args.placeholders) + return Autosubmit.report(args.expid, args.template, args.show_all_parameters, args.folder_path, + args.placeholders) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': @@ -645,8 +653,10 @@ class Autosubmit: return Autosubmit.install() elif args.command == 'setstatus': return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, - args.filter_chunks, args.filter_status, args.filter_type, args.filter_type_chunk, args.hide, - args.group_by, args.expand, args.expand_status, args.notransitive, args.check_wrapper, args.detail) + args.filter_chunks, args.filter_status, args.filter_type, + args.filter_type_chunk, args.hide, + args.group_by, args.expand, args.expand_status, args.notransitive, + args.check_wrapper, args.detail) elif args.command == 'testcase': return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, args.HPC, args.branch) @@ -701,19 +711,23 @@ class Autosubmit: host = fullhost forbidden = BasicConfig.DENIED_HOSTS authorized = BasicConfig.ALLOWED_HOSTS - message = "Command: {0} is not allowed to run in host: {1}.\n".format(args.command.upper(),fullhost) + message = "Command: {0} is not allowed to run in host: {1}.\n".format(args.command.upper(), fullhost) message += "List of permissions as follows:Command | hosts \nAllowed hosts\n" for command in BasicConfig.ALLOWED_HOSTS: - message += " {0}:{1} \n".format(command,BasicConfig.ALLOWED_HOSTS[command]) + message += " {0}:{1} \n".format(command, BasicConfig.ALLOWED_HOSTS[command]) message += "Denied hosts\n" for command in BasicConfig.DENIED_HOSTS: - message += " {0}:{1} \n".format(command,BasicConfig.DENIED_HOSTS[command]) - message += "[Command: autosubmit {0}] is not allowed to run in [host: {1}].".format(args.command.upper(), fullhost) + message += " {0}:{1} \n".format(command, BasicConfig.DENIED_HOSTS[command]) + message += "[Command: autosubmit {0}] is not allowed to run in [host: {1}].".format(args.command.upper(), + fullhost) if args.command in BasicConfig.DENIED_HOSTS: - if 'all' in BasicConfig.DENIED_HOSTS[args.command] or host in BasicConfig.DENIED_HOSTS[args.command] or fullhost in BasicConfig.DENIED_HOSTS[args.command]: + if 'all' in BasicConfig.DENIED_HOSTS[args.command] or host in BasicConfig.DENIED_HOSTS[ + args.command] or fullhost in BasicConfig.DENIED_HOSTS[args.command]: raise AutosubmitCritical(message, 7071) if args.command in BasicConfig.ALLOWED_HOSTS: - if 'all' not in BasicConfig.ALLOWED_HOSTS[args.command] and not (host in BasicConfig.ALLOWED_HOSTS[args.command] or fullhost in BasicConfig.ALLOWED_HOSTS[args.command]): + if 'all' not in BasicConfig.ALLOWED_HOSTS[args.command] and not ( + host in BasicConfig.ALLOWED_HOSTS[args.command] or fullhost in BasicConfig.ALLOWED_HOSTS[ + args.command]): raise AutosubmitCritical(message, 7071) if expid != 'None' and args.command not in expid_less and args.command not in global_log_command: as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) @@ -725,9 +739,9 @@ class Autosubmit: raise AutosubmitCritical("Experiment does not exist", 7012) # delete is treated differently if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix"]: - owner,eadmin,currentOwner = Autosubmit._check_ownership(expid,raise_error=True) #fastlook + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) # fastlook else: - owner,eadmin,currentOwner = Autosubmit._check_ownership(expid,raise_error=False) #fastlook + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) # fastlook if not os.path.exists(tmp_path): os.mkdir(tmp_path) @@ -747,7 +761,7 @@ class Autosubmit: else: st = os.stat(tmp_path) oct_perm = str(oct(st.st_mode))[-3:] - if int(oct_perm[1]) in [6,7] or int(oct_perm[2]) in [6,7]: + if int(oct_perm[1]) in [6, 7] or int(oct_perm[2]) in [6, 7]: Log.set_file(os.path.join(tmp_path, args.command + '.log'), "out", log_level) Log.set_file(os.path.join(tmp_path, args.command + '_err.log'), "err") else: @@ -755,25 +769,29 @@ class Autosubmit: args.command + expid + '.log'), "out", log_level) Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, args.command + expid + '_err.log'), "err") - Log.printlog("Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format(tmp_path,oct_perm,BasicConfig.GLOBAL_LOG_DIR)) + Log.printlog( + "Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format( + tmp_path, oct_perm, BasicConfig.GLOBAL_LOG_DIR)) Log.file_path = tmp_path if owner: if "update_version" in args: force_update_version = args.update_version else: force_update_version = False - if args.command not in ["upgrade","updateversion"]: + if args.command not in ["upgrade", "updateversion"]: if force_update_version: if as_conf.get_version() != Autosubmit.autosubmit_version: - Log.info("The {2} experiment {0} version is being updated to {1} for match autosubmit version", - as_conf.get_version(), Autosubmit.autosubmit_version, expid) + Log.info( + "The {2} experiment {0} version is being updated to {1} for match autosubmit version", + as_conf.get_version(), Autosubmit.autosubmit_version, expid) as_conf.set_version(Autosubmit.autosubmit_version) else: if as_conf.get_version() is not None and as_conf.get_version() != Autosubmit.autosubmit_version: raise AutosubmitCritical( "Current experiment uses ({0}) which is not the running Autosubmit version \nPlease, update the experiment version if you wish to continue using AutoSubmit {1}\nYou can achieve this using the command autosubmit updateversion {2} \n" "Or with the -v parameter: autosubmit {3} {2} -v ".format(as_conf.get_version(), - Autosubmit.autosubmit_version, expid,args.command), + Autosubmit.autosubmit_version, + expid, args.command), 7014) else: if expid == 'None': @@ -807,7 +825,7 @@ class Autosubmit: "Autosubmit is running with {0}", Autosubmit.autosubmit_version) @staticmethod - def _check_ownership(expid,raise_error=False): + def _check_ownership(expid, raise_error=False): """ Check if user owns or if it is edamin :return: owner,eadmin @@ -830,8 +848,8 @@ class Autosubmit: :rtype: boolean """ message = "The {0} experiment was removed from the local disk and from the database.".format(expid_delete) - message+= " Note that this action does not delete any data written by the experiment.\n" - message+= "Complete list of files/directories deleted:\n" + message += " Note that this action does not delete any data written by the experiment.\n" + message += "Complete list of files/directories deleted:\n" for root, dirs, files in os.walk(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)): for dir_ in dirs: message += os.path.join(root, dir_) + "\n" @@ -839,9 +857,10 @@ class Autosubmit: "structure_{0}.db".format(expid_delete)) + "\n" message += os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid_delete)) + "\n" - owner,eadmin,currentOwner = Autosubmit._check_ownership(expid_delete) - if expid_delete == '' or expid_delete is None and not os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR,expid_delete)): - Log.printlog("Experiment directory does not exist.",Log.WARNING) + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid_delete) + if expid_delete == '' or expid_delete is None and not os.path.exists( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid_delete)): + Log.printlog("Experiment directory does not exist.", Log.WARNING) else: # Deletion workflow continues as usual, a disjunction is included for the case when # force is sent, and user is eadmin @@ -849,7 +868,8 @@ class Autosubmit: try: if owner or (force and eadmin): if force and eadmin: - Log.info("Preparing deletion of experiment {0} from owner: {1}, as eadmin.", expid_delete, currentOwner) + Log.info("Preparing deletion of experiment {0} from owner: {1}, as eadmin.", expid_delete, + currentOwner) try: Log.info("Deleting experiment from database...") try: @@ -865,14 +885,16 @@ class Autosubmit: error_message += 'Can not delete directory: {0}\n'.format(str(e)) try: Log.info("Removing Structure db...") - structures_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.STRUCTURES_DIR, "structure_{0}.db".format(expid_delete)) + structures_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.STRUCTURES_DIR, + "structure_{0}.db".format(expid_delete)) if os.path.exists(structures_path): os.remove(structures_path) except BaseException as e: error_message += 'Can not delete structure: {0}\n'.format(str(e)) try: Log.info("Removing job_data db...") - job_data_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.JOBDATA_DIR, "job_data_{0}.db".format(expid_delete)) + job_data_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, BasicConfig.JOBDATA_DIR, + "job_data_{0}.db".format(expid_delete)) if os.path.exists(job_data_path): os.remove(job_data_path) except BaseException as e: @@ -882,16 +904,20 @@ class Autosubmit: else: if not eadmin: raise AutosubmitCritical( - 'Detected Eadmin user however, -f flag is not found. {0} can not be deleted!'.format(expid_delete), 7012) + 'Detected Eadmin user however, -f flag is not found. {0} can not be deleted!'.format( + expid_delete), 7012) else: raise AutosubmitCritical( - 'Current user is not the owner of the experiment. {0} can not be deleted!'.format(expid_delete), 7012) + 'Current user is not the owner of the experiment. {0} can not be deleted!'.format( + expid_delete), 7012) Log.printlog(message, Log.RESULT) except Exception as e: # Avoid calling Log at this point since it is possible that tmp folder is already deleted. error_message += "Couldn't delete the experiment".format(e.message) if error_message != "": - raise AutosubmitError("Some experiment files weren't correctly deleted\nPlease if the trace shows DATABASE IS LOCKED, report it to git\nIf there are I/O issues, wait until they're solved and then use this command again.\n",error_message,6004) + raise AutosubmitError( + "Some experiment files weren't correctly deleted\nPlease if the trace shows DATABASE IS LOCKED, report it to git\nIf there are I/O issues, wait until they're solved and then use this command again.\n", + error_message, 6004) @staticmethod def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False, root_folder=''): @@ -933,7 +959,7 @@ class Autosubmit: if resource_exists('autosubmit.config', 'files/' + filename): index = filename.index('.') new_filename = filename[:index] + \ - "_" + exp_id + filename[index:] + "_" + exp_id + filename[index:] if filename == 'platforms.conf' and BasicConfig.DEFAULT_PLATFORMS_CONF != '': content = open(os.path.join( @@ -1013,7 +1039,8 @@ class Autosubmit: conf_copy_id, filename), 'r').readlines() # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content - if filename.startswith("platforms") and os.path.isfile(BasicConfig.CUSTOM_PLATFORMS_PATH): + if filename.startswith("platforms") and os.path.isfile( + BasicConfig.CUSTOM_PLATFORMS_PATH): content = open( BasicConfig.CUSTOM_PLATFORMS_PATH, 'r').readlines() # Setting email notifications to false @@ -1105,7 +1132,7 @@ class Autosubmit: os.chmod(os.path.join(exp_id_path, "conf/proj_" + str(exp_id) + ".conf"), 0o755) except: - pass #some folder may no exists, like proj + pass # some folder may no exists, like proj return exp_id @staticmethod @@ -1131,7 +1158,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitCritical("Seems that something went wrong, please check the trace", 7012,e.message) + raise AutosubmitCritical("Seems that something went wrong, please check the trace", 7012, e.message) else: raise AutosubmitCritical("Insufficient permissions", 7012) else: @@ -1172,7 +1199,8 @@ class Autosubmit: job_list.parameters = parameters @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): + def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, + check_wrapper=False): """ Generates cmd files experiment. @@ -1182,7 +1210,7 @@ class Autosubmit: :rtype: bool """ try: - Autosubmit._check_ownership(expid,raise_error=True) + Autosubmit._check_ownership(expid, raise_error=True) exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) if os.path.exists(os.path.join(tmp_path, 'autosubmit.lock')): @@ -1257,8 +1285,8 @@ class Autosubmit: for chunk_json in member_json['cs']: chunk = int(chunk_json) jobs = jobs + \ - [job for job in filter( - lambda j: j.chunk == chunk, jobs_member)] + [job for job in filter( + lambda j: j.chunk == chunk, jobs_member)] elif filter_status: Log.debug( @@ -1328,7 +1356,9 @@ class Autosubmit: except AutosubmitError as e: raise except BaseException as e: - raise AutosubmitCritical("There are issues that occurred during the templates generation, please check that job parameters are well set and the template path exists.",7014,e.message) + raise AutosubmitCritical( + "There are issues that occurred during the templates generation, please check that job parameters are well set and the template path exists.", + 7014, e.message) return True @staticmethod @@ -1393,16 +1423,19 @@ class Autosubmit: unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": job_list.parse_jobs_by_filter(unparsed_two_step_start) - job_list.create_dictionary(date_list, member_list, num_chunks, chunk_ini, date_format, as_conf.get_retrials(), wrapper_jobs ) + job_list.create_dictionary(date_list, member_list, num_chunks, chunk_ini, date_format, as_conf.get_retrials(), + wrapper_jobs) while job_list.get_active(): - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) - #for job in job_list.get_uncompleted_and_not_waiting(): + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True, + only_wrappers, hold=False) + # for job in job_list.get_uncompleted_and_not_waiting(): # job.status = Status.COMPLETED job_list.update_list(as_conf, False) @staticmethod - def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, run_members=None): + def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, + run_members=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). @@ -1422,7 +1455,8 @@ class Autosubmit: except BaseException as e: - raise AutosubmitCritical("Failure during the loading of the experiment configuration, check file paths",7014,e.message) + raise AutosubmitCritical("Failure during the loading of the experiment configuration, check file paths", + 7014, e.message) try: # Handling starting time @@ -1483,7 +1517,9 @@ class Autosubmit: try: job.platform = submitter.platforms[job.platform_name.lower()] except: - raise AutosubmitCritical("hpcarch={0} not found in the platforms configuration file".format(job.platform_name), 7014) + raise AutosubmitCritical( + "hpcarch={0} not found in the platforms configuration file".format(job.platform_name), + 7014) # noinspection PyTypeChecker if job.status not in (Status.COMPLETED, Status.SUSPENDED): platforms_to_test.add(job.platform) @@ -1501,7 +1537,8 @@ class Autosubmit: "job_packages not found", 6016, e.message) except BaseException as e: raise AutosubmitCritical( - "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", 7040, e.message) + "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", + 7040, e.message) if as_conf.get_wrapper_type() != 'none': os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) @@ -1567,7 +1604,8 @@ class Autosubmit: job_list.job_package_map[jobs[0].id] = wrapper_job except Exception as e: raise AutosubmitCritical( - "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", 7014, str(e)) + "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", + 7014, str(e)) Log.debug("Checking job_list current status") job_list.update_list(as_conf, first_time=True) @@ -1578,9 +1616,12 @@ class Autosubmit: Log.debug("Running job data structure") try: # Historical Database: Can create a new run if there is a difference in the number of jobs or if the current run does not exist. - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.initialize_database() - exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), as_conf.get_chunk_size(), current_config=as_conf.get_full_config_as_json()) + exp_history.process_status_changes(job_list.get_job_list(), as_conf.get_chunk_size_unit(), + as_conf.get_chunk_size(), + current_config=as_conf.get_full_config_as_json()) Autosubmit.database_backup(expid) except Exception as e: try: @@ -1593,12 +1634,15 @@ class Autosubmit: except Exception as e: # Connection to status database ec_earth.db can fail. # API worker will fix the status. - Log.debug("Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format(str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) + Log.debug( + "Autosubmit couldn't set your experiment as running on the autosubmit times database: {1}. Exception: {0}".format( + str(e), os.path.join(BasicConfig.DB_DIR, BasicConfig.AS_TIMES_DB)), 7003) if allowed_members: # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. job_list.run_members = allowed_members - Log.result("Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( - str(allowed_members))) + Log.result( + "Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( + str(allowed_members))) except AutosubmitCritical as e: e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) @@ -1610,12 +1654,12 @@ class Autosubmit: if unparsed_two_step_start != "": job_list.parse_jobs_by_filter(unparsed_two_step_start) - main_loop_retrials = 11250*2 # Hard limit of tries ( 48h min 72h max), 2 retrials per stop + main_loop_retrials = 11250 * 2 # Hard limit of tries ( 48h min 72h max), 2 retrials per stop # establish the connection to all platforms Autosubmit.restore_platforms(platforms_to_test) save = True - #@main + # @main Log.debug("Running main loop") ######################### # AUTOSUBMIT - MAIN LOOP @@ -1623,7 +1667,7 @@ class Autosubmit: # Main loop. Finishing when all jobs have been submitted while job_list.get_active(): - #Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) + # Log.info("FD: {0}".format(log.fd_show.fd_table_status_str())) try: if Autosubmit.exit: # Closing threads on Ctrl+C @@ -1638,8 +1682,9 @@ class Autosubmit: if "JOB_" in thread.name: if thread.isAlive(): active_threads = True - Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format( - thread.name, 60 - timeout)) + Log.info( + "{0} is still retrieving outputs, time remaining is {1} seconds.".format( + thread.name, 60 - timeout)) break if active_threads: sleep(10) @@ -1649,9 +1694,9 @@ class Autosubmit: Log.debug("Reloading parameters...") try: Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - #Log.info("FD 2: {0}".format(log.fd_show.fd_table_status_str())) - except BaseException as e : - raise AutosubmitError("Config files seems to not be accesible",6040,e.message) + # Log.info("FD 2: {0}".format(log.fd_show.fd_table_status_str())) + except BaseException as e: + raise AutosubmitError("Config files seems to not be accesible", 6040, e.message) total_jobs = len(job_list.get_job_list()) Log.info("\n\n{0} of {1} jobs remaining ({2})".format( total_jobs - len(job_list.get_completed()), total_jobs, time.strftime("%H:%M"))) @@ -1693,15 +1738,18 @@ class Autosubmit: wrapper_job.checked_time = datetime.datetime.now() # This is where wrapper will be checked on the slurm platform, update takes place. try: - platform.check_job(wrapper_job,is_wrapper=True) + platform.check_job(wrapper_job, is_wrapper=True) except BaseException as e: job_list.save() - raise AutosubmitError("The communication with {0} went wrong while checking wrapper {1}\n{2}".format(platform.name,wrapper_job.id,str(e))) - #Log.info("FD 3Wrapper checked: {0}".format(log.fd_show.fd_table_status_str())) + raise AutosubmitError( + "The communication with {0} went wrong while checking wrapper {1}\n{2}".format( + platform.name, wrapper_job.id, str(e))) + # Log.info("FD 3Wrapper checked: {0}".format(log.fd_show.fd_table_status_str())) try: if wrapper_job.status != wrapper_job.new_status: Log.info('Wrapper job ' + wrapper_job.name + ' changed from ' + str( - Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) + Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str( + Status.VALUE_TO_KEY[wrapper_job.new_status])) save = True except: raise AutosubmitCritical( @@ -1712,7 +1760,9 @@ class Autosubmit: wrapper_job.check_status(wrapper_job.new_status) except: job_list.save() - raise AutosubmitError("The communication with {0} went wrong while checking the inner_jobs of {1}\n{2}".format(platform.name,wrapper_job.id,str(e))) + raise AutosubmitError( + "The communication with {0} went wrong while checking the inner_jobs of {1}\n{2}".format( + platform.name, wrapper_job.id, str(e))) # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) if wrapper_job.status == Status.WAITING: @@ -1729,13 +1779,16 @@ class Autosubmit: for inner_job in wrapper_job.job_list: if inner_job.prev_status != inner_job.status: if Status.VALUE_TO_KEY[inner_job.status] in inner_job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, inner_job.name, - Status.VALUE_TO_KEY[inner_job.prev_status], + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, + inner_job.name, + Status.VALUE_TO_KEY[ + inner_job.prev_status], Status.VALUE_TO_KEY[inner_job.status], as_conf.get_mails_to()) # Detect and store changes job_changes_tracker = {job.name: ( - job.prev_status, job.status) for job in wrapper_job.job_list if job.prev_status != job.status} + job.prev_status, job.status) for job in wrapper_job.job_list if + job.prev_status != job.status} else: # Prepare jobs, if slurm check all active jobs at once. # TODO: All of this should be a function, put in slurm_platform file, paramiko and ecmwf check_jobs to clean the code job = job[0] @@ -1752,14 +1805,16 @@ class Autosubmit: completed_joblist.append(job) else: # If they're not from slurm platform check one-by-one TODO: Implement ecwmf future platform and mnX, abstract this part platform.check_job(job) - #Log.info("FD 4 check job: {0}".format(log.fd_show.fd_table_status_str())) - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): + # Log.info("FD 4 check job: {0}".format(log.fd_show.fd_table_status_str())) + if prev_status != job.update_status( + as_conf.get_copy_remote_logs() == 'true'): # Keeping track of changes job_changes_tracker[job.name] = ( prev_status, job.status) if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: - Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name, + Notifier.notify_status_change(MailNotifier(BasicConfig), expid, + job.name, Status.VALUE_TO_KEY[prev_status], Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) @@ -1775,7 +1830,7 @@ class Autosubmit: Log.debug("Checking all jobs at once") platform.check_Alljobs( platform_jobs[3], as_conf) - #Log.info("FD slurm jobs: {0}".format(log.fd_show.fd_table_status_str())) + # Log.info("FD slurm jobs: {0}".format(log.fd_show.fd_table_status_str())) for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] @@ -1825,7 +1880,8 @@ class Autosubmit: exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) Autosubmit.database_backup(expid) except: - Log.warning("Couldn't recover the Historical database, AS will continue without it, GUI may be affected") + Log.warning( + "Couldn't recover the Historical database, AS will continue without it, GUI may be affected") job_changes_tracker = {} if Autosubmit.exit: job_list.save() @@ -1834,7 +1890,7 @@ class Autosubmit: except AutosubmitError as e: # If an error is detected, restore all connections and job_list Log.error("Trace: {0}", e.trace) Log.error("{1} [eCode={0}]", e.code, e.message) - #Log.debug("FD recovery: {0}".format(log.fd_show.fd_table_status_str())) + # Log.debug("FD recovery: {0}".format(log.fd_show.fd_table_status_str())) # No need to wait until the remote platform reconnection recovery = False as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) @@ -1846,7 +1902,7 @@ class Autosubmit: if job.fail_count > 0: failed_names[job.name] = job.fail_count except BaseException as e: - Log.printlog("Error trying to store failed job count",Log.WARNING) + Log.printlog("Error trying to store failed job count", Log.WARNING) Log.result("Storing failed job count...done") while not recovery and main_loop_retrials > 0: delay = min(15 * consecutive_retrials, 30) @@ -1885,7 +1941,8 @@ class Autosubmit: Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) # Recovery wrapper [Packages] Log.info("Recovering Wrappers...") - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: @@ -1959,7 +2016,7 @@ class Autosubmit: max = 10 Log.info("Restoring the connection to all experiment platforms") consecutive_retrials = 1 - delay = min(15*consecutive_retrials,120) + delay = min(15 * consecutive_retrials, 120) while not reconnected and main_loop_retrials > 0: main_loop_retrials = main_loop_retrials - 1 Log.info("Recovering the remote platform connection") @@ -1974,7 +2031,8 @@ class Autosubmit: else: mail_notify = False times = times + 1 - Autosubmit.restore_platforms(platforms_to_test,mail_notify=mail_notify,as_conf=as_conf,expid=expid) + Autosubmit.restore_platforms(platforms_to_test, mail_notify=mail_notify, + as_conf=as_conf, expid=expid) reconnected = True except AutosubmitCritical as e: # Message prompt by restore_platforms. @@ -1986,18 +2044,21 @@ class Autosubmit: except BaseException: reconnected = False if main_loop_retrials <= 0: - raise AutosubmitCritical("Autosubmit Encounter too much errors during running time, limit of {0} retrials reached".format(main_loop_retrials), 7051, e.message) + raise AutosubmitCritical( + "Autosubmit Encounter too much errors during running time, limit of {0} retrials reached".format( + main_loop_retrials), 7051, e.message) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error raise AutosubmitCritical(e.message, e.code, e.trace) except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) except BaseException as e: # If this happens, there is a bug in the code or an exception not-well caught - raise AutosubmitCritical("There is a bug in the code, please contact via gitlab",7070,str(e)) + raise AutosubmitCritical("There is a bug in the code, please contact via gitlab", 7070, str(e)) Log.result("No more jobs to run.") # Updating job data header with current information when experiment ends try: - exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, + historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.process_job_list_changes_to_experiment_totals(job_list.get_job_list()) Autosubmit.database_backup(expid) except: @@ -2035,10 +2096,10 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070,str(e)) + raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @staticmethod - def restore_platforms(platform_to_test,mail_notify=False,as_conf=None,expid=expid): + def restore_platforms(platform_to_test, mail_notify=False, as_conf=None, expid=expid): Log.info("Checking the connection to all platforms in use") issues = "" platform_issues = "" @@ -2062,7 +2123,7 @@ class Autosubmit: if mail_notify: email = as_conf.get_mails_to() if "@" in email[0]: - Notifier.notify_experiment_status(MailNotifier(BasicConfig),expid,email,platform) + Notifier.notify_experiment_status(MailNotifier(BasicConfig), expid, email, platform) except: pass platform_issues += "\n[{1}] Connection Unsuccessful to host {0} ".format( @@ -2074,27 +2135,33 @@ class Autosubmit: platform.host, platform.name) else: platform_issues += "\n[{0}] has configuration issues.\n Check that the connection is passwd-less.(ssh {1}@{4})\n Check the parameters that build the root_path are correct:{{scratch_dir/project/user}} = {{{3}/{2}/{1}}}".format( - platform.name, platform.user, platform.project, platform.scratch,platform.host) + platform.name, platform.user, platform.project, platform.scratch, platform.host) issues += platform_issues # Checks if bashrc is provinding output that could mess with Autosubmit remote pooling, if so, warns the user but continues as Autosubmit should be able to strip the output platform.get_bashrc_output() if platform.bashrc_output != "" or platform.bashrc_err != "": - Log.warning("Bashrc is providing output that could mess with Autosubmit remote pooling\nHINT: add [ -z \"$PS1\" ] && return. at the header of {1}:~/.bashrc".format(platform.name,platform.host)) + Log.warning( + "Bashrc is providing output that could mess with Autosubmit remote pooling\nHINT: add [ -z \"$PS1\" ] && return. at the header of {1}:~/.bashrc".format( + platform.name, platform.host)) if platform_issues == "": Log.result("[{1}] Connection successful to host {0}", platform.host, platform.name) else: if platform.connected: platform.connected = False - Log.printlog("[{1}] Connection sucessful to host {0}, however there are issues with %HPCROOT%".format(platform.host, platform.name), - Log.WARNING) + Log.printlog( + "[{1}] Connection sucessful to host {0}, however there are issues with %HPCROOT%".format( + platform.host, platform.name), + Log.WARNING) else: - Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) + Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), + Log.WARNING) if issues != "": raise AutosubmitCritical( - "Issues while checking the connectivity of platforms.", 7010, issues+"\n"+ssh_config_issues) + "Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) @staticmethod - def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, only_wrappers=False, hold=False): + def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, + only_wrappers=False, hold=False): # type: (AutosubmitConfig, JobList, Set[Platform], JobPackagePersistence, bool, bool, bool) -> bool """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -2132,7 +2199,7 @@ class Autosubmit: as_conf, platform, job_list, hold=hold).build_packages() if not inspect: platform.open_submit_script() - valid_packages_to_submit = [] # type: List[JobPackageBase] + valid_packages_to_submit = [] # type: List[JobPackageBase] for package in packages_to_submit: try: @@ -2156,7 +2223,7 @@ class Autosubmit: if not only_wrappers: try: package.submit(as_conf, job_list.parameters, inspect, hold=hold) - save=True + save = True if not inspect: if str(platform.type).lower() != "slurm": job_list.update_list(as_conf) @@ -2170,7 +2237,8 @@ class Autosubmit: if package.jobs[0].id != 0: failed_packages.append(package.jobs[0].id) platform.connected = False - if str(e.trace).lower().find("bad parameters") != -1 or str(e.message).lower().find("scheduler is not installed") != -1: + if str(e.trace).lower().find("bad parameters") != -1 or str(e.message).lower().find( + "scheduler is not installed") != -1: error_msg = "" for package_tmp in valid_packages_to_submit: for job_tmp in package_tmp.jobs: @@ -2180,11 +2248,11 @@ class Autosubmit: if job_tmp.section not in error_msg: error_msg += job_tmp.section + "&" if str(e.trace).lower().find("bad parameters") != -1: - error_message+="\ncheck job and queue specified in jobs.conf. Sections that could be affected: {0}".format( - error_msg[:-1]) + error_message += "\ncheck job and queue specified in jobs.conf. Sections that could be affected: {0}".format( + error_msg[:-1]) else: - error_message+="\ncheck that {1} platform has set the correct scheduler. Sections that could be affected: {0}".format( - error_msg[:-1],platform.name) + error_message += "\ncheck that {1} platform has set the correct scheduler. Sections that could be affected: {0}".format( + error_msg[:-1], platform.name) except WrongTemplateException as e: raise AutosubmitCritical("Invalid parameter substitution in {0} template".format( e.job_name), 7014, e.message) @@ -2192,8 +2260,9 @@ class Autosubmit: raise except Exception as e: platform.connected = False - raise AutosubmitError("{0} submission failed. May be related to running a job with check=on_submission and another that affect this job template".format( - platform.name), 6015, str(e)) + raise AutosubmitError( + "{0} submission failed. May be related to running a job with check=on_submission and another that affect this job template".format( + platform.name), 6015, str(e)) except WrongTemplateException as e: raise AutosubmitCritical( "Invalid parameter substitution in {0} template".format(e.job_name), 7014) @@ -2205,7 +2274,7 @@ class Autosubmit: raise if str(platform.type).lower() in ["slurm", "pjm"] and not inspect and not only_wrappers: try: - valid_packages_to_submit = [ package for package in valid_packages_to_submit if package.x11 != True] + valid_packages_to_submit = [package for package in valid_packages_to_submit if package.x11 != True] if len(valid_packages_to_submit) > 0: submit_time = int(time.time() / 60) try: @@ -2216,17 +2285,19 @@ class Autosubmit: try: for package in valid_packages_to_submit: try: - elapsed_time_minutes = str(int(round(int(time.time() / 60) - submit_time)+1)) - job_historic = platform.get_jobid_by_jobname(package.jobs[0].name,minutes=elapsed_time_minutes) + elapsed_time_minutes = str(int(round(int(time.time() / 60) - submit_time) + 1)) + job_historic = platform.get_jobid_by_jobname(package.jobs[0].name, + minutes=elapsed_time_minutes) except: job_historic = [] - #Recover jobid from jobname + # Recover jobid from jobname if len(job_historic) > 0 and isinstance(job_historic, list): job_id = job_historic[-1] for job_id_historic in job_historic: if job_id_historic != job_id: try: - platform.send_command(platform.cancel_cmd + " {0}".format(job_id_historic)) + platform.send_command( + platform.cancel_cmd + " {0}".format(job_id_historic)) except: pass for job in package.jobs: @@ -2237,7 +2308,7 @@ class Autosubmit: except: pass job_list.save() - job_list.update_list(as_conf,store_change=True) + job_list.update_list(as_conf, store_change=True) jobs_id = None platform.connected = False if e.trace is not None: @@ -2245,26 +2316,33 @@ class Autosubmit: else: e.trace = "" has_trace_bad_parameters = False - if has_trace_bad_parameters or e.message.lower().find("invalid partition") != -1 or e.message.lower().find("invalid qos") != -1 or e.message.lower().find("scheduler is not installed") != -1 or e.message.lower().find("failed") != -1 or e.message.lower().find("not available") != -1: + if has_trace_bad_parameters or e.message.lower().find( + "invalid partition") != -1 or e.message.lower().find( + "invalid qos") != -1 or e.message.lower().find( + "scheduler is not installed") != -1 or e.message.lower().find( + "failed") != -1 or e.message.lower().find("not available") != -1: error_msg = "" for package_tmp in valid_packages_to_submit: for job_tmp in package_tmp.jobs: if job_tmp.section not in error_msg: error_msg += job_tmp.section + "&" if has_trace_bad_parameters: - error_message+="Check job and queue specified in jobs.conf. Sections that could be affected: {0}".format(error_msg[:-1]) + error_message += "Check job and queue specified in jobs.conf. Sections that could be affected: {0}".format( + error_msg[:-1]) else: - error_message+="Check that {1} platform has set the correct scheduler. Sections that could be affected: {0}".format( - error_msg[:-1], platform.name) + error_message += "Check that {1} platform has set the correct scheduler. Sections that could be affected: {0}".format( + error_msg[:-1], platform.name) if e.trace is None: e.trace = "" - raise AutosubmitCritical(error_message,7014,e.message+"\n"+str(e.trace)) + raise AutosubmitCritical(error_message, 7014, e.message + "\n" + str(e.trace)) except IOError as e: raise AutosubmitError( "IO issues ", 6016, e.message) except BaseException as e: if e.message.find("scheduler") != -1: - raise AutosubmitCritical("Are you sure that [{0}] scheduler is the correct type for platform [{1}]?.\n Please, double check that {0} is loaded for {1} before autosubmit launch any job.".format(platform.type.upper(),platform.name.upper()),7070) + raise AutosubmitCritical( + "Are you sure that [{0}] scheduler is the correct type for platform [{1}]?.\n Please, double check that {0} is loaded for {1} before autosubmit launch any job.".format( + platform.type.upper(), platform.name.upper()), 7070) raise AutosubmitError( "Submission failed, this can be due a failure on the platform", 6015, e.message) if jobs_id is None or len(jobs_id) <= 0: @@ -2274,7 +2352,7 @@ class Autosubmit: jobnames += [job.name for job in package.jobs] for jobname in jobnames: jobid = platform.get_jobid_by_jobname(jobname) - #cancel bad submitted job if jobid is encountered + # cancel bad submitted job if jobid is encountered for id in jobid: platform.cancel_job(id) except: @@ -2282,7 +2360,8 @@ class Autosubmit: platform.connected = False raise AutosubmitError( - "Submission failed, this can be due a failure on the platform\n{0}\n{1}".format(str(platform.name),""), 6015) + "Submission failed, this can be due a failure on the platform\n{0}\n{1}".format( + str(platform.name), ""), 6015) i = 0 if hold: sleep(10) @@ -2306,7 +2385,8 @@ class Autosubmit: sleep(5) retries = retries - 1 if not can_continue: - package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) + package.jobs[0].platform.send_command( + package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) i = i + 1 continue if not platform.hold_job(package.jobs[0]): @@ -2326,7 +2406,7 @@ class Autosubmit: if len(failed_packages) > 0: try: for job_id in failed_packages: - platform.send_command( platform.cancel_cmd + " {0}".format(job_id)) + platform.send_command(platform.cancel_cmd + " {0}".format(job_id)) except: pass raise AutosubmitError( @@ -2358,12 +2438,13 @@ class Autosubmit: if not inspect: job_list.save() if error_message != "": - raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message),7015) + raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message), 7015) return save @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, - group_by=None, expand="", expand_status=list(), hide_groups=False, notransitive=False, check_wrapper=False, txt_logfiles=False, detail=False): + group_by=None, expand="", expand_status=list(), hide_groups=False, notransitive=False, + check_wrapper=False, txt_logfiles=False, detail=False): """ Plots workflow graph for a given experiment with status of each job coded by node color. Plot is created in experiment's plot folder with name __