diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 03876197252c1d856628ec977080cadbb583c890..736be3a0694bee4e5c0032ba889c4d0d14255040 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -157,6 +157,8 @@ class Autosubmit: help='Sets the starting time for this experiment') subparser.add_argument('-sa', '--start_after', required=False, help='Sets a experiment expid which completion will trigger the start of this experiment.') + subparser.add_argument('-rm', '--run_members', required=False, + help='Sets members allowed on this run.') # Expid subparser = subparsers.add_parser( @@ -511,7 +513,7 @@ class Autosubmit: args.command, args.logconsole, args.logfile, expid) if args.command == 'run': - return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, args.start_after) + return Autosubmit.run_experiment(args.expid, args.notransitive, args.update_version, args.start_time, args.start_after, args.run_members) elif args.command == 'expid': return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, args.operational, args.config) != '' @@ -1160,7 +1162,7 @@ class Autosubmit: job_list.update_list(as_conf, False) @staticmethod - def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None): + def run_experiment(expid, notransitive=False, update_version=False, start_time=None, start_after=None, run_members=None): """ Runs and experiment (submitting all the jobs properly and repeating its execution in case of failure). @@ -1267,6 +1269,22 @@ class Autosubmit: sleep(60) # End of completion trigger block + # Handling run_members + allowed_members = None + + if run_members: + allowed_members = run_members.split() + rmember = [ + rmember for rmember in allowed_members if rmember not in as_conf.get_member_list()] + if len(rmember) > 0: + Log.critical( + "Some of the members ({0}) in the list of allowed members you supplied do not exist in the current list of members specified in the conf files.\nCurrent list of members: {1}".format(str(rmember), str(as_conf.get_member_list()))) + return + if len(allowed_members) == 0: + Log.critical( + "Not a valid -rm --run_members input: {0}".format(str(run_members))) + return + # checking if there is a lock file to avoid multiple running on the same expid try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): @@ -1360,6 +1378,11 @@ class Autosubmit: except Exception as e: raise AutosubmitCritical( "Error while processing job_data_structure", 7067, e.message) + if allowed_members: + # Set allowed members after checks have been performed. This triggers the setter and main logic of the -rm feature. + job_list.run_members = allowed_members + Log.result("Only jobs with member value in {0} or no member will be allowed in this run. Also, those jobs already SUBMITTED, QUEUING, or RUNNING will be allowed to complete and will be tracked.".format( + str(allowed_members))) except AutosubmitCritical as e: raise AutosubmitCritical(e.message, 7067, e.trace) except Exception as e: @@ -1369,7 +1392,6 @@ class Autosubmit: ######################### # AUTOSUBMIT - MAIN LOOP ######################### - # Main loop. Finishing when all jobs have been submitted main_loop_retrials = 120 # Hard limit of tries 120 tries at 1min sleep each try # establish the connection to all platforms @@ -1392,8 +1414,8 @@ class Autosubmit: Log.info( "{0} is still working.".format(thread.name)) exit_active_threads = True - sleep(20) - exit_timeout += 20 + sleep(10) + exit_timeout += 10 return 0 # reload parameters changes Log.debug("Reloading parameters...") diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 2d04111c38741f96d2e8c8f99d0c3c4b7e04682c..c5203ce703d6cf0d44b947a9c3485a237a33e00a 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -29,10 +29,12 @@ import subprocess from pyparsing import nestedExpr from bscearth.utils.date import parse_date -from log.log import Log, AutosubmitError,AutosubmitCritical +from log.log import Log, AutosubmitError, AutosubmitCritical from autosubmit.config.basicConfig import BasicConfig from collections import defaultdict + + class AutosubmitConfig(object): """ Class to handle experiment configuration coming from file or database @@ -65,7 +67,6 @@ class AutosubmitConfig(object): self.wrong_config = defaultdict(list) self.warn_config = defaultdict(list) - @property def jobs_parser(self): return self._jobs_parser @@ -114,7 +115,8 @@ class AutosubmitConfig(object): if not re.match('^\[[^\[\]\# \t\n]*\][ \t]*$|^[ \t]+\[[^\[\]# \t\n]*\]', first_line): content = f.read() f.seek(0, 0) - f.write('[DEFAULT]'.rstrip('\r\n') + '\n' + first_line + content) + f.write('[DEFAULT]'.rstrip('\r\n') + + '\n' + first_line + content) @property def jobs_file(self): @@ -154,6 +156,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') + def get_processors(self, section): """ Gets processors needed for the given job type @@ -222,6 +225,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'USER_TO', '').lower() + def get_current_user(self, section): """ Returns the user to be changed from platform config file. @@ -239,6 +243,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'HOST', '').lower() + def get_current_project(self, section): """ Returns the project to be changed from platform config file. @@ -247,6 +252,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'PROJECT', '').lower() + def set_new_user(self, section, new_user): """ Sets new user for given platform @@ -256,12 +262,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -269,10 +275,13 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_user = self.get_current_user(section) - contentToMod = contentToMod.replace(re.search(r'[^#]\bUSER\b =.*', contentToMod).group(0)[1:], "USER = " + new_user) - contentToMod = contentToMod.replace(re.search(r'[^#]\bUSER_TO\b =.*', contentToMod).group(0)[1:], "USER_TO = " + old_user) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bUSER\b =.*', contentToMod).group(0)[1:], "USER = " + new_user) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bUSER_TO\b =.*', contentToMod).group(0)[1:], "USER_TO = " + old_user) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) + def set_new_host(self, section, new_host): """ Sets new host for given platform @@ -282,12 +291,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -295,10 +304,13 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_host = self.get_current_host(section) - contentToMod = contentToMod.replace(re.search(r'[^#]\bHOST\b =.*', contentToMod).group(0)[1:], "HOST = " + new_host) - contentToMod = contentToMod.replace(re.search(r'[^#]\bHOST_TO\b =.*', contentToMod).group(0)[1:], "HOST_TO = " + old_host) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bHOST\b =.*', contentToMod).group(0)[1:], "HOST = " + new_host) + contentToMod = contentToMod.replace(re.search( + r'[^#]\bHOST_TO\b =.*', contentToMod).group(0)[1:], "HOST_TO = " + old_host) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) + def get_migrate_project_to(self, section): """ Returns the project to change to from platform config file. @@ -307,6 +319,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'PROJECT_TO', '').lower() + def get_migrate_host_to(self, section): """ Returns the host to change to from platform config file. @@ -315,6 +328,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._platforms_parser.get_option(section, 'HOST_TO', "none").lower() + def set_new_project(self, section, new_project): """ Sets new project for given platform @@ -324,12 +338,12 @@ class AutosubmitConfig(object): """ with open(self._platforms_parser_file) as p_file: contentLine = p_file.readline() - contentToMod="" - content="" - mod=False + contentToMod = "" + content = "" + mod = False while contentLine: if re.search(section, contentLine): - mod=True + mod = True if mod: contentToMod += contentLine else: @@ -337,8 +351,10 @@ class AutosubmitConfig(object): contentLine = p_file.readline() if mod: old_project = self.get_current_project(section) - contentToMod = contentToMod.replace(re.search(r"[^#]\bPROJECT\b =.*", contentToMod).group(0)[1:], "PROJECT = " + new_project) - contentToMod = contentToMod.replace(re.search(r"[^#]\bPROJECT_TO\b =.*", contentToMod).group(0)[1:], "PROJECT_TO = " + old_project) + contentToMod = contentToMod.replace(re.search( + r"[^#]\bPROJECT\b =.*", contentToMod).group(0)[1:], "PROJECT = " + new_project) + contentToMod = contentToMod.replace(re.search( + r"[^#]\bPROJECT_TO\b =.*", contentToMod).group(0)[1:], "PROJECT_TO = " + old_project) open(self._platforms_parser_file, 'w').write(content) open(self._platforms_parser_file, 'a').write(contentToMod) @@ -351,6 +367,7 @@ class AutosubmitConfig(object): :rtype: str """ return str(self._jobs_parser.get_option(section, 'CUSTOM_DIRECTIVES', '')) + def show_messages(self): if len(self.warn_config.keys()) == 0 and len(self.wrong_config.keys()) == 0: @@ -362,21 +379,24 @@ class AutosubmitConfig(object): for section in self.warn_config: message += "Issues in [{0}] config file:".format(section) for parameter in self.warn_config[section]: - message += "\n[{0}] {1} ".format(parameter[0],parameter[1]) + message += "\n[{0}] {1} ".format(parameter[0], + parameter[1]) message += "\n" - Log.printlog(message,6013) + Log.printlog(message, 6013) if len(self.wrong_config.keys()) > 0: message = "On Configuration files:\n" for section in self.wrong_config: - message += "Critical Issues on [{0}] config file:".format(section) + message += "Critical Issues on [{0}] config file:".format( + section) for parameter in self.wrong_config[section]: message += "\n[{0}] {1}".format(parameter[0], parameter[1]) message += "\n" - raise AutosubmitCritical(message,7014) + raise AutosubmitCritical(message, 7014) else: return True - def check_conf_files(self,check_file=False): + + def check_conf_files(self, check_file=False): """ Checks configuration files (autosubmit, experiment jobs and platforms), looking for invalid values, missing required options. Prints results in log @@ -387,7 +407,7 @@ class AutosubmitConfig(object): Log.info('\nChecking configuration files...') self.ignore_file_path = check_file self.reload() - #Annotates all errors found in the configuration files in dictionaries self.warn_config and self.wrong_config. + # Annotates all errors found in the configuration files in dictionaries self.warn_config and self.wrong_config. self.check_expdef_conf() self.check_platforms_conf() self.check_jobs_conf() @@ -397,7 +417,8 @@ class AutosubmitConfig(object): # Check proj configuration self.check_proj() except: - pass # This exception is in case that the experiment doesn't contains any file ( usefull for test the workflow with None Option) + # This exception is in case that the experiment doesn't contains any file ( usefull for test the workflow with None Option) + pass # End of checkers. # This Try/Except is in charge of print all the info gathered by all the checkers and stop the program if any critical error is found. @@ -405,9 +426,11 @@ class AutosubmitConfig(object): result = self.show_messages() return result except AutosubmitCritical as e: - raise AutosubmitCritical(e.message,e.code,e.trace) # In case that there are critical errors in the configuration, Autosubmit won't continue. + # In case that there are critical errors in the configuration, Autosubmit won't continue. + raise AutosubmitCritical(e.message, e.code, e.trace) except Exception as e: - raise AutosubmitCritical("There was an error while showing the config log messages",7014,e.message) + raise AutosubmitCritical( + "There was an error while showing the config log messages", 7014, e.message) def check_autosubmit_conf(self): """ @@ -419,32 +442,43 @@ class AutosubmitConfig(object): self._conf_parser.read(self._conf_parser_file) if not self._conf_parser.check_exists('config', 'AUTOSUBMIT_VERSION'): - self.wrong_config["Autosubmit"]+=[['config', "AUTOSUBMIT_VERSION parameter not found"]] + self.wrong_config["Autosubmit"] += [['config', + "AUTOSUBMIT_VERSION parameter not found"]] if not self._conf_parser.check_is_int('config', 'MAXWAITINGJOBS', True): - self.wrong_config["Autosubmit"]+=[['config', "MAXWAITINGJOBS parameter not found or non-integer"]] - if not self._conf_parser.check_is_int('config', 'TOTALJOBS', True): - self.wrong_config["Autosubmit"]+=[['config', "TOTALJOBS parameter not found or non-integer"]] - if not self._conf_parser.check_is_int('config', 'SAFETYSLEEPTIME', True): - self.wrong_config["Autosubmit"]+=[['config', "SAFETYSLEEPTIME parameter not found or non-integer"]] - if not self._conf_parser.check_is_int('config', 'RETRIALS', True): - self.wrong_config["Autosubmit"]+=[['config', "RETRIALS parameter not found or non-integer"]] - if not self._conf_parser.check_is_boolean('mail', 'NOTIFICATIONS', False): - self.wrong_config["Autosubmit"]+=[['mail', "NOTIFICATIONS parameter not found or non-boolean"]] - if not self.is_valid_communications_library(): - self.wrong_config["Autosubmit"]+=[['config', "LIBRARY parameter not found or is not paramiko"]] - if not self.is_valid_storage_type(): - self.wrong_config["Autosubmit"]+=[['storage', "TYPE parameter not found"]] + self.wrong_config["Autosubmit"] += [['config', + "MAXWAITINGJOBS parameter not found or non-integer"]] + if not self._conf_parser.check_is_int('config', 'TOTALJOBS', True): + self.wrong_config["Autosubmit"] += [['config', + "TOTALJOBS parameter not found or non-integer"]] + if not self._conf_parser.check_is_int('config', 'SAFETYSLEEPTIME', True): + self.wrong_config["Autosubmit"] += [['config', + "SAFETYSLEEPTIME parameter not found or non-integer"]] + if not self._conf_parser.check_is_int('config', 'RETRIALS', True): + self.wrong_config["Autosubmit"] += [['config', + "RETRIALS parameter not found or non-integer"]] + if not self._conf_parser.check_is_boolean('mail', 'NOTIFICATIONS', False): + self.wrong_config["Autosubmit"] += [['mail', + "NOTIFICATIONS parameter not found or non-boolean"]] + if not self.is_valid_communications_library(): + self.wrong_config["Autosubmit"] += [['config', + "LIBRARY parameter not found or is not paramiko"]] + if not self.is_valid_storage_type(): + self.wrong_config["Autosubmit"] += [['storage', + "TYPE parameter not found"]] if self.get_wrapper_type() != 'None': self.check_wrapper_conf() if self.get_notifications() == 'true': for mail in self.get_mails_to(): if not self.is_valid_mail_address(mail): - self.wrong_config["Autosubmit"]+=[['mail', "invalid e-mail"]] + self.wrong_config["Autosubmit"] += [['mail', + "invalid e-mail"]] if "Autosubmit" not in self.wrong_config: - Log.result('{0} OK'.format(os.path.basename(self._conf_parser_file))) + Log.result('{0} OK'.format( + os.path.basename(self._conf_parser_file))) return True else: - Log.warning('{0} Issues'.format(os.path.basename(self._conf_parser_file))) + Log.warning('{0} Issues'.format( + os.path.basename(self._conf_parser_file))) return True return False @@ -453,40 +487,55 @@ class AutosubmitConfig(object): Checks experiment's queues configuration file. """ if len(self._platforms_parser.sections()) == 0: - self.wrong_config["Platform"] += [["Global","Platform file is not well-configured or found"]] + self.wrong_config["Platform"] += [["Global", + "Platform file is not well-configured or found"]] if len(self._platforms_parser.sections()) != len(set(self._platforms_parser.sections())): - self.wrong_config["Platform"]+=[["Global", "There are repeated platforms"]] + self.wrong_config["Platform"] += [["Global", + "There are repeated platforms"]] main_platform_found = False for section in self._platforms_parser.sections(): if section in self.hpcarch: - main_platform_found= True + main_platform_found = True if not self._platforms_parser.check_exists(section, 'TYPE'): - self.wrong_config["Platform"]+=[[section, "Mandatory TYPE parameter not found"]] - platform_type = self._platforms_parser.get_option(section, 'TYPE', '').lower() + self.wrong_config["Platform"] += [[section, + "Mandatory TYPE parameter not found"]] + platform_type = self._platforms_parser.get_option( + section, 'TYPE', '').lower() if platform_type != 'ps': - if not self._platforms_parser.check_exists(section, 'PROJECT'): - self.wrong_config["Platform"]+=[[ section, "Mandatory PROJECT parameter not found"]] - if not self._platforms_parser.check_exists(section, 'USER'): - self.wrong_config["Platform"]+=[[ section, "Mandatory USER parameter not found"]] - if not self._platforms_parser.check_exists(section, 'HOST'): - self.wrong_config["Platform"]+=[[ section, "Mandatory HOST parameter not found"]] - if not self._platforms_parser.check_exists(section, 'SCRATCH_DIR'): - self.wrong_config["Platform"]+=[[ section, "Mandatory SCRATCH_DIR parameter not found"]] - if not self._platforms_parser.check_is_boolean(section,'ADD_PROJECT_TO_HOST', False): - self.wrong_config["Platform"]+=[[ section, "Mandatory ADD_PROJECT_TO_HOST parameter not found or non-boolean"]] - if not self._platforms_parser.check_is_boolean(section, 'TEST_SUITE', False): - self.wrong_config["Platform"]+=[[ section, "Mandatory TEST_SUITE parameter not found or non-boolean"]] - if not self._platforms_parser.check_is_int(section, 'MAX_WAITING_JOBS',False): - self.wrong_config["Platform"]+=[[ section, "Mandatory MAX_WAITING_JOBS parameter not found or non-integer"]] - if not self._platforms_parser.check_is_int(section, 'TOTAL_JOBS', False): - self.wrong_config["Platform"]+=[[ section, "Mandatory TOTAL_JOBS parameter not found or non-integer"]] + if not self._platforms_parser.check_exists(section, 'PROJECT'): + self.wrong_config["Platform"] += [[section, + "Mandatory PROJECT parameter not found"]] + if not self._platforms_parser.check_exists(section, 'USER'): + self.wrong_config["Platform"] += [[section, + "Mandatory USER parameter not found"]] + if not self._platforms_parser.check_exists(section, 'HOST'): + self.wrong_config["Platform"] += [[section, + "Mandatory HOST parameter not found"]] + if not self._platforms_parser.check_exists(section, 'SCRATCH_DIR'): + self.wrong_config["Platform"] += [[section, + "Mandatory SCRATCH_DIR parameter not found"]] + if not self._platforms_parser.check_is_boolean(section, 'ADD_PROJECT_TO_HOST', False): + self.wrong_config["Platform"] += [ + [section, "Mandatory ADD_PROJECT_TO_HOST parameter not found or non-boolean"]] + if not self._platforms_parser.check_is_boolean(section, 'TEST_SUITE', False): + self.wrong_config["Platform"] += [[section, + "Mandatory TEST_SUITE parameter not found or non-boolean"]] + if not self._platforms_parser.check_is_int(section, 'MAX_WAITING_JOBS', False): + self.wrong_config["Platform"] += [ + [section, "Mandatory MAX_WAITING_JOBS parameter not found or non-integer"]] + if not self._platforms_parser.check_is_int(section, 'TOTAL_JOBS', False): + self.wrong_config["Platform"] += [[section, + "Mandatory TOTAL_JOBS parameter not found or non-integer"]] if not main_platform_found: - self.wrong_config["Expdef"] += [["Default", "Main platform is not defined! check if [HPCARCH = {0}] has any typo".format(self.hpcarch)]] + self.wrong_config["Expdef"] += [["Default", + "Main platform is not defined! check if [HPCARCH = {0}] has any typo".format(self.hpcarch)]] if "Platform" not in self.wrong_config: - Log.result('{0} OK'.format(os.path.basename(self._platforms_parser_file))) + Log.result('{0} OK'.format( + os.path.basename(self._platforms_parser_file))) return True return False + def check_jobs_conf(self): """ Checks experiment's jobs configuration file. @@ -500,28 +549,34 @@ class AutosubmitConfig(object): platforms.append('LOCAL') platforms.append('local') if len(sections) != len(set(sections)): - self.wrong_config["Jobs"] += [["Global", "There are repeated job names"]] + self.wrong_config["Jobs"] += [["Global", + "There are repeated job names"]] for section in sections: if not parser.check_exists(section, 'FILE'): - self.wrong_config["Jobs"] += [[ section, "Mandatory FILE parameter not found"]] + self.wrong_config["Jobs"] += [[section, + "Mandatory FILE parameter not found"]] else: - section_file_path = parser.get_option(section,'FILE') + section_file_path = parser.get_option(section, 'FILE') try: - if self.ignore_file_path: - if not os.path.exists(os.path.join(self.get_project_dir(),section_file_path)): + if self.ignore_file_path: + if not os.path.exists(os.path.join(self.get_project_dir(), section_file_path)): if parser.check_exists(section, 'CHECK'): if not parser.get_option(section, 'CHECK') in "on_submission": - self.wrong_config["Jobs"] += [[section, "FILE {0} doesn't exist and check parameter is not set on_submission value".format(section_file_path)]] + self.wrong_config["Jobs"] += [ + [section, "FILE {0} doesn't exist and check parameter is not set on_submission value".format(section_file_path)]] else: - self.wrong_config["Jobs"] += [[section, "FILE {0} doesn't exist".format(os.path.join(self.get_project_dir(),section_file_path))]] + self.wrong_config["Jobs"] += [[section, "FILE {0} doesn't exist".format( + os.path.join(self.get_project_dir(), section_file_path))]] except BaseException: - pass # tests conflict quick-patch - if not parser.check_is_boolean(section, 'RERUN_ONLY', False): - self.wrong_config["Jobs"]+=[[ section, "Mandatory RERUN_ONLY parameter not found or non-bool"]] + pass # tests conflict quick-patch + if not parser.check_is_boolean(section, 'RERUN_ONLY', False): + self.wrong_config["Jobs"] += [[section, + "Mandatory RERUN_ONLY parameter not found or non-bool"]] if parser.has_option(section, 'PLATFORM'): - if not parser.check_is_choice(section, 'PLATFORM', False, platforms): - self.wrong_config["Jobs"] += [[section, "PLATFORM parameter is invalid, this platform is not configured"]] + if not parser.check_is_choice(section, 'PLATFORM', False, platforms): + self.wrong_config["Jobs"] += [ + [section, "PLATFORM parameter is invalid, this platform is not configured"]] if parser.has_option(section, 'DEPENDENCIES'): for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).upper().split(' '): @@ -534,21 +589,25 @@ class AutosubmitConfig(object): if '[' in dependency: dependency = dependency[:dependency.find('[')] if dependency not in sections: - self.warn_config["Jobs"].append([section, "Dependency parameter is invalid, job {0} is not configured".format(dependency)]) + self.warn_config["Jobs"].append( + [section, "Dependency parameter is invalid, job {0} is not configured".format(dependency)]) if parser.has_option(section, 'RERUN_DEPENDENCIES'): - for dependency in str(parser.get_option(section, 'RERUN_DEPENDENCIES','')).split(' '): + for dependency in str(parser.get_option(section, 'RERUN_DEPENDENCIES', '')).split(' '): if '-' in dependency: dependency = dependency.split('-')[0] if '[' in dependency: dependency = dependency[:dependency.find('[')] if dependency not in sections: - self.warn_config["Jobs"]+=[[section, "RERUN_DEPENDENCIES parameter is invalid, job {0} is not configured".format(dependency)]] + self.warn_config["Jobs"] += [ + [section, "RERUN_DEPENDENCIES parameter is invalid, job {0} is not configured".format(dependency)]] if not parser.check_is_choice(section, 'RUNNING', False, ['once', 'date', 'member', 'chunk']): - self.wrong_config["Jobs"]+=[[section, "Mandatory RUNNING parameter is invalid"]] + self.wrong_config["Jobs"] += [[section, + "Mandatory RUNNING parameter is invalid"]] if "Jobs" not in self.wrong_config: - Log.result('{0} OK'.format(os.path.basename(self._jobs_parser_file))) + Log.result('{0} OK'.format( + os.path.basename(self._jobs_parser_file))) return True return False @@ -560,69 +619,85 @@ class AutosubmitConfig(object): :rtype: bool """ parser = self._exp_parser - if not parser.check_exists('DEFAULT', 'EXPID'): - self.wrong_config["Expdef"]+=[['DEFAULT', "Mandatory EXPID parameter is invalid"]] + if not parser.check_exists('DEFAULT', 'EXPID'): + self.wrong_config["Expdef"] += [['DEFAULT', + "Mandatory EXPID parameter is invalid"]] - if not parser.check_exists('DEFAULT', 'HPCARCH'): - self.wrong_config["Expdef"]+=[['DEFAULT', "Mandatory HPCARCH parameter is invalid"]] + if not parser.check_exists('DEFAULT', 'HPCARCH'): + self.wrong_config["Expdef"] += [['DEFAULT', + "Mandatory HPCARCH parameter is invalid"]] else: try: self.hpcarch = self.get_platform() except: - self.wrong_config["Expdef"] += [['Default', "HPCARCH value is not a valid platform (check typo)"]] - if not parser.check_exists('experiment', 'DATELIST'): - self.wrong_config["Expdef"]+=[['DEFAULT', "Mandatory DATELIST parameter is invalid"]] - if not parser.check_exists('experiment', 'MEMBERS'): - self.wrong_config["Expdef"]+=[['DEFAULT', "Mandatory MEMBERS parameter is invalid"]] - if not parser.check_is_choice('experiment', 'CHUNKSIZEUNIT', True,['year', 'month', 'day', 'hour']): - self.wrong_config["Expdef"]+=[['experiment', "Mandatory CHUNKSIZEUNIT choice is invalid"]] - - if not parser.check_is_int('experiment', 'CHUNKSIZE', True): - self.wrong_config["Expdef"]+=[['experiment', "Mandatory CHUNKSIZE is not an integer"]] - if not parser.check_is_int('experiment', 'NUMCHUNKS', True): - self.wrong_config["Expdef"]+=[['experiment', "Mandatory NUMCHUNKS is not an integer"]] - - if not parser.check_is_choice('experiment', 'CALENDAR', True, - ['standard', 'noleap']): - self.wrong_config["Expdef"]+=[['experiment', "Mandatory CALENDAR choice is invalid"]] - - if not parser.check_is_boolean('rerun', 'RERUN', True): - self.wrong_config["Expdef"]+=[['experiment', "Mandatory RERUN choice is not a boolean"]] + self.wrong_config["Expdef"] += [['Default', + "HPCARCH value is not a valid platform (check typo)"]] + if not parser.check_exists('experiment', 'DATELIST'): + self.wrong_config["Expdef"] += [['DEFAULT', + "Mandatory DATELIST parameter is invalid"]] + if not parser.check_exists('experiment', 'MEMBERS'): + self.wrong_config["Expdef"] += [['DEFAULT', + "Mandatory MEMBERS parameter is invalid"]] + if not parser.check_is_choice('experiment', 'CHUNKSIZEUNIT', True, ['year', 'month', 'day', 'hour']): + self.wrong_config["Expdef"] += [['experiment', + "Mandatory CHUNKSIZEUNIT choice is invalid"]] + + if not parser.check_is_int('experiment', 'CHUNKSIZE', True): + self.wrong_config["Expdef"] += [['experiment', + "Mandatory CHUNKSIZE is not an integer"]] + if not parser.check_is_int('experiment', 'NUMCHUNKS', True): + self.wrong_config["Expdef"] += [['experiment', + "Mandatory NUMCHUNKS is not an integer"]] + + if not parser.check_is_choice('experiment', 'CALENDAR', True, + ['standard', 'noleap']): + self.wrong_config["Expdef"] += [['experiment', + "Mandatory CALENDAR choice is invalid"]] + + if not parser.check_is_boolean('rerun', 'RERUN', True): + self.wrong_config["Expdef"] += [['experiment', + "Mandatory RERUN choice is not a boolean"]] if parser.check_is_choice('project', 'PROJECT_TYPE', True, ['none', 'git', 'svn', 'local']): project_type = parser.get_option('project', 'PROJECT_TYPE', '') if project_type == 'git': - if not parser.check_exists('git', 'PROJECT_ORIGIN'): - self.wrong_config["Expdef"]+=[['git', "PROJECT_ORIGIN parameter is invalid"]] - if not parser.check_exists('git', 'PROJECT_BRANCH'): - self.wrong_config["Expdef"]+=[['git', "PROJECT_BRANCH parameter is invalid"]] + if not parser.check_exists('git', 'PROJECT_ORIGIN'): + self.wrong_config["Expdef"] += [['git', + "PROJECT_ORIGIN parameter is invalid"]] + if not parser.check_exists('git', 'PROJECT_BRANCH'): + self.wrong_config["Expdef"] += [['git', + "PROJECT_BRANCH parameter is invalid"]] elif project_type == 'svn': - if not parser.check_exists('svn', 'PROJECT_URL'): - self.wrong_config["Expdef"]+=[['svn', "PROJECT_URL parameter is invalid"]] - if not parser.check_exists('svn', 'PROJECT_REVISION'): - self.wrong_config["Expdef"]+=[['svn', "PROJECT_REVISION parameter is invalid"]] + if not parser.check_exists('svn', 'PROJECT_URL'): + self.wrong_config["Expdef"] += [['svn', + "PROJECT_URL parameter is invalid"]] + if not parser.check_exists('svn', 'PROJECT_REVISION'): + self.wrong_config["Expdef"] += [['svn', + "PROJECT_REVISION parameter is invalid"]] elif project_type == 'local': - if not parser.check_exists('local', 'PROJECT_PATH'): - self.wrong_config["Expdef"]+=[['local', "PROJECT_PATH parameter is invalid"]] - elif project_type == 'none': #debug propouses + if not parser.check_exists('local', 'PROJECT_PATH'): + self.wrong_config["Expdef"] += [['local', + "PROJECT_PATH parameter is invalid"]] + elif project_type == 'none': # debug propouses self.ignore_file_path = False if project_type != 'none': if not parser.check_exists('project_files', 'FILE_PROJECT_CONF'): - self.wrong_config["Expdef"]+=[['project_files', "FILE_PROJECT_CONF parameter is invalid"]] + self.wrong_config["Expdef"] += [['project_files', + "FILE_PROJECT_CONF parameter is invalid"]] else: - self.wrong_config["Expdef"]+=[['project', "Mandatory project choice is invalid"]] - + self.wrong_config["Expdef"] += [['project', + "Mandatory project choice is invalid"]] if "Expdef" not in self.wrong_config: - Log.result('{0} OK'.format(os.path.basename(self._exp_parser_file))) + Log.result('{0} OK'.format( + os.path.basename(self._exp_parser_file))) return True return False - def check_proj(self): - + def check_proj(self): """ Checks project config file @@ -633,43 +708,54 @@ class AutosubmitConfig(object): if self._proj_parser_file == '': self._proj_parser = None else: - self._proj_parser = AutosubmitConfig.get_parser(self.parser_factory, self._proj_parser_file) + self._proj_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._proj_parser_file) return True except Exception as e: - self.wrong_config["Proj"]+=[['project_files', "FILE_PROJECT_CONF parameter is invalid"]] + self.wrong_config["Proj"] += [['project_files', + "FILE_PROJECT_CONF parameter is invalid"]] return False + def check_wrapper_conf(self): if not self.is_valid_jobs_in_wrapper(): - self.wrong_config["Wrapper"]+=[['wrapper', "JOBS_IN_WRAPPER contains non-defined jobs. parameter is invalid"]] + self.wrong_config["Wrapper"] += [['wrapper', + "JOBS_IN_WRAPPER contains non-defined jobs. parameter is invalid"]] if 'horizontal' in self.get_wrapper_type(): - if not self._platforms_parser.check_exists(self.get_platform(), 'PROCESSORS_PER_NODE'): - self.wrong_config["Wrapper"]+=[['wrapper', "PROCESSORS_PER_NODE no exist in the horizontal-wrapper platform"]] - if not self._platforms_parser.check_exists(self.get_platform(), 'MAX_PROCESSORS'): - self.wrong_config["Wrapper"]+=[['wrapper', "MAX_PROCESSORS no exist in the horizontal-wrapper platform"]] + if not self._platforms_parser.check_exists(self.get_platform(), 'PROCESSORS_PER_NODE'): + self.wrong_config["Wrapper"] += [ + ['wrapper', "PROCESSORS_PER_NODE no exist in the horizontal-wrapper platform"]] + if not self._platforms_parser.check_exists(self.get_platform(), 'MAX_PROCESSORS'): + self.wrong_config["Wrapper"] += [['wrapper', + "MAX_PROCESSORS no exist in the horizontal-wrapper platform"]] if 'vertical' in self.get_wrapper_type(): if not self._platforms_parser.check_exists(self.get_platform(), 'MAX_WALLCLOCK'): - self.wrong_config["Wrapper"]+=[['wrapper', "MAX_WALLCLOCK no exist in the vertical-wrapper platform"]] - if "Wrapper" not in self.wrong_config: + self.wrong_config["Wrapper"] += [['wrapper', + "MAX_WALLCLOCK no exist in the vertical-wrapper platform"]] + if "Wrapper" not in self.wrong_config: Log.result('wrappers OK') return True - - def reload(self): """ Creates parser objects for configuration files """ try: - self._conf_parser = AutosubmitConfig.get_parser(self.parser_factory, self._conf_parser_file) - self._platforms_parser = AutosubmitConfig.get_parser(self.parser_factory, self._platforms_parser_file) - self._jobs_parser = AutosubmitConfig.get_parser(self.parser_factory, self._jobs_parser_file) - self._exp_parser = AutosubmitConfig.get_parser(self.parser_factory, self._exp_parser_file) + self._conf_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._conf_parser_file) + self._platforms_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._platforms_parser_file) + self._jobs_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._jobs_parser_file) + self._exp_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._exp_parser_file) except Exception as e: - raise AutosubmitCritical("{0} \n Repeated parameter, check if you have any uncommented value that should be commented".format(e.message),7014) + raise AutosubmitCritical( + "{0} \n Repeated parameter, check if you have any uncommented value that should be commented".format(e.message), 7014) if self._proj_parser_file == '': self._proj_parser = None else: - self._proj_parser = AutosubmitConfig.get_parser(self.parser_factory, self._proj_parser_file) + self._proj_parser = AutosubmitConfig.get_parser( + self.parser_factory, self._proj_parser_file) def load_parameters(self): """ @@ -724,12 +810,14 @@ class AutosubmitConfig(object): # Experiment conf content = open(self._exp_parser_file).read() if re.search('EXPID =.*', content): - content = content.replace(re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) + content = content.replace( + re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) open(self._exp_parser_file, 'w').write(content) content = open(self._conf_parser_file).read() if re.search('EXPID =.*', content): - content = content.replace(re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) + content = content.replace( + re.search('EXPID =.*', content).group(0), "EXPID = " + exp_id) open(self._conf_parser_file, 'w').write(content) def get_project_type(self): @@ -794,6 +882,7 @@ class AutosubmitConfig(object): :rtype: str """ return self._exp_parser.get_option('git', 'REMOTE_CLONE_ROOT', '') + def get_submodules_list(self): """ Returns submodules list from experiment's config file @@ -801,7 +890,7 @@ class AutosubmitConfig(object): :return: submodules to load :rtype: list """ - return ' '.join(self._exp_parser.get_option('git', 'PROJECT_SUBMODULES','').split()).split() + return ' '.join(self._exp_parser.get_option('git', 'PROJECT_SUBMODULES', '').split()).split() def get_fetch_single_branch(self): """ @@ -811,6 +900,7 @@ class AutosubmitConfig(object): :rtype: boolean """ return self._exp_parser.get_option('git', 'FETCH_SINGLE_BRANCH', 'False').lower() + def get_project_destination(self): """ Returns git commit from experiment's config file @@ -825,7 +915,8 @@ class AutosubmitConfig(object): elif self.get_project_type().lower() == "svn": value = self.get_svn_project_url().split('/')[-1] elif self.get_project_type().lower() == "git": - value = self.get_git_project_origin().split('/')[-1].split('.')[-2] + value = self.get_git_project_origin().split( + '/')[-1].split('.')[-2] return value def set_git_project_commit(self, as_conf): @@ -834,19 +925,22 @@ class AutosubmitConfig(object): :param as_conf: Configuration class for exteriment :type as_conf: AutosubmitConfig """ - full_project_path=as_conf.get_project_dir() + full_project_path = as_conf.get_project_dir() try: output = subprocess.check_output("cd {0}; git rev-parse --abbrev-ref HEAD".format(full_project_path), shell=True) except subprocess.CalledProcessError as e: - raise AutosubmitCritical("Failed to retrieve project branch...",7014,e.message) + raise AutosubmitCritical( + "Failed to retrieve project branch...", 7014, e.message) project_branch = output Log.debug("Project branch is: " + project_branch) try: - output = subprocess.check_output("cd {0}; git rev-parse HEAD".format(full_project_path), shell=True) + output = subprocess.check_output( + "cd {0}; git rev-parse HEAD".format(full_project_path), shell=True) except subprocess.CalledProcessError as e: - raise AutosubmitCritical("Failed to retrieve project commit SHA...", 7014,e.message) + raise AutosubmitCritical( + "Failed to retrieve project commit SHA...", 7014, e.message) project_sha = output Log.debug("Project commit SHA is: " + project_sha) @@ -859,7 +953,8 @@ class AutosubmitConfig(object): content = content.replace(re.search('PROJECT_COMMIT =.*', content).group(0), "PROJECT_COMMIT = " + project_sha) open(self._exp_parser_file, 'w').write(content) - Log.debug("Project commit SHA succesfully registered to the configuration file.") + Log.debug( + "Project commit SHA succesfully registered to the configuration file.") return True def get_svn_project_url(self): @@ -908,7 +1003,8 @@ class AutosubmitConfig(object): if split_in.find("-") != -1: numbers = split_in.split("-") for count in range(int(numbers[0]), int(numbers[1]) + 1): - date_list.append(parse_date(string_date + str(count).zfill(len(numbers[0])))) + date_list.append(parse_date( + string_date + str(count).zfill(len(numbers[0])))) else: date_list.append(parse_date(string_date + split_in)) string_date = None @@ -937,7 +1033,8 @@ class AutosubmitConfig(object): :return: initial chunk :rtype: int """ - chunk_ini = self._exp_parser.get_option('experiment', 'CHUNKINI', default) + chunk_ini = self._exp_parser.get_option( + 'experiment', 'CHUNKINI', default) if chunk_ini == '': return default return int(chunk_ini) @@ -958,7 +1055,8 @@ class AutosubmitConfig(object): :return: Chunksize, 1 as default. :rtype: int """ - chunk_size = self._exp_parser.get_option('experiment', 'CHUNKSIZE', default) + chunk_size = self._exp_parser.get_option( + 'experiment', 'CHUNKSIZE', default) if chunk_size == '': return default return int(chunk_size) @@ -982,7 +1080,8 @@ class AutosubmitConfig(object): if split_in.find("-") != -1: numbers = split_in.split("-") for count in range(int(numbers[0]), int(numbers[1]) + 1): - member_list.append(string_member + str(count).zfill(len(numbers[0]))) + member_list.append( + string_member + str(count).zfill(len(numbers[0]))) else: member_list.append(string_member + split_in) string_member = None @@ -1031,7 +1130,8 @@ class AutosubmitConfig(object): """ content = open(self._exp_parser_file).read() if re.search('HPCARCH =.*', content): - content = content.replace(re.search('HPCARCH =.*', content).group(0), "HPCARCH = " + hpc) + content = content.replace( + re.search('HPCARCH =.*', content).group(0), "HPCARCH = " + hpc) open(self._exp_parser_file, 'w').write(content) def set_version(self, autosubmit_version): @@ -1054,7 +1154,8 @@ class AutosubmitConfig(object): :return: version :rtype: str """ - return self._conf_parser.get_option('config', 'AUTOSUBMIT_VERSION' , 'None') + return self._conf_parser.get_option('config', 'AUTOSUBMIT_VERSION', 'None') + def get_total_jobs(self): """ Returns max number of running jobs from autosubmit's config file @@ -1063,7 +1164,7 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get('config', 'TOTALJOBS')) - + def get_output_type(self): """ Returns default output type, pdf if none @@ -1071,7 +1172,7 @@ class AutosubmitConfig(object): :return: output type :rtype: string """ - return self._conf_parser.get_option('config', 'OUTPUT','pdf') + return self._conf_parser.get_option('config', 'OUTPUT', 'pdf') def get_max_wallclock(self): """ @@ -1087,7 +1188,8 @@ class AutosubmitConfig(object): :rtype: str """ - config_value = self._conf_parser.get_option('config', 'MAX_PROCESSORS', None) + config_value = self._conf_parser.get_option( + 'config', 'MAX_PROCESSORS', None) return int(config_value) if config_value is not None else config_value def get_max_waiting_jobs(self): @@ -1154,11 +1256,13 @@ class AutosubmitConfig(object): :return: if remote dependencies :rtype: bool """ - config_value = self._conf_parser.get_option('config', 'PRESUBMISSION', 'false').lower() + config_value = self._conf_parser.get_option( + 'config', 'PRESUBMISSION', 'false').lower() if config_value == "true": return True else: return False + def get_wrapper_type(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config @@ -1167,6 +1271,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + def get_wrapper_policy(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config @@ -1184,6 +1289,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + def get_wrapper_queue(self): """ Returns the wrapper queue if not defined, will be the one of the first job wrapped @@ -1192,6 +1298,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'QUEUE', 'None') + def get_min_wrapped_jobs(self): """ Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file @@ -1209,6 +1316,7 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) + def get_wrapper_method(self): """ Returns the method of make the wrapper @@ -1217,6 +1325,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'METHOD', 'ASThread') + def get_wrapper_check_time(self): """ Returns time to check the status of jobs in the wrapper diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 55937055822346ef21821792cce8d4a8c955346b..26efd72b2bd9dca1f16ca2faee82f2b32533e560 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -37,9 +37,11 @@ import autosubmit.database.db_structure as DbStructure from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction -from log.log import AutosubmitCritical,AutosubmitError,Log -#Log.get_logger("Log.Autosubmit") -class JobList: +from log.log import AutosubmitCritical, AutosubmitError, Log +# Log.get_logger("Log.Autosubmit") + + +class JobList(object): """ Class to manage the list of jobs to be run by autosubmit @@ -70,6 +72,7 @@ class JobList: self.packages_id = dict() self.job_package_map = dict() self.sections_checked = set() + self._run_members = None @property def expid(self): @@ -95,6 +98,26 @@ class JobList: def graph(self, value): self._graph = value + @property + def run_members(self): + return self._run_members + + @run_members.setter + def run_members(self, value): + if value is not None: + self._run_members = value + old_job_list = [job for job in self._job_list] + self._job_list = [ + job for job in old_job_list if job.member is None or job.member in self._run_members or job.status not in [Status.WAITING, Status.READY]] + old_job_list = [job for job in self._job_list] + old_job_list_names = [job.name for job in old_job_list] + self._job_list = [job for job in old_job_list if len( + job.parents) == 0 or len(set(old_job_list_names).intersection(set([jobp.name for jobp in job.parents]))) == len(job.parents)] + # for job in self._job_list: + # print("{0} {1}".format( + # job.name, Status.VALUE_TO_KEY[job.status])) + # print(job.parents) + def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, default_job_type, wrapper_type=None, wrapper_jobs=None, new=True, notransitive=False, update_structure=False): """ @@ -172,7 +195,8 @@ class JobList: if not jobs_parser.has_option(job_section, option): continue - dependencies_keys = jobs_parser.get(job_section, option).upper().split() + dependencies_keys = jobs_parser.get( + job_section, option).upper().split() dependencies = JobList._manage_dependencies( dependencies_keys, dic_jobs, job_section) @@ -829,7 +853,8 @@ class JobList: :return: waiting jobs :rtype: list """ - waiting_jobs = [job for job in self._job_list if (job.platform.type == platform_type and job.status == Status.WAITING)] + waiting_jobs = [job for job in self._job_list if ( + job.platform.type == platform_type and job.status == Status.WAITING)] return waiting_jobs def get_held_jobs(self, platform=None): @@ -939,7 +964,8 @@ class JobList: Status.SUBMITTED and not job.status == Status.READY] if len(tmp) == len(active): # IF only held jobs left without dependencies satisfied if len(tmp) != 0 and len(active) != 0: - raise AutosubmitCritical("Only Held Jobs active. Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow)",7066) + raise AutosubmitCritical( + "Only Held Jobs active. Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow)", 7066) active = [] return active @@ -1030,7 +1056,8 @@ class JobList: else: return list() except IOError: - Log.printlog("Autosubmit will use a backup for recover the job_list",6010) + Log.printlog( + "Autosubmit will use a backup for recover the job_list", 6010) return list() def load(self): @@ -1042,6 +1069,7 @@ class JobList: """ Log.info("Loading JobList") return self._persistence.load(self._persistence_path, self._persistence_file) + def backup_load(self): """ Recreates an stored job list from the persistence @@ -1050,31 +1078,36 @@ class JobList: :rtype: JobList """ Log.info("Loading backup JobList") - return self._persistence.load(self._persistence_path, self._persistence_file+"_backup") + return self._persistence.load(self._persistence_path, self._persistence_file + "_backup") + def save(self): """ Persists the job list """ self.update_status_log() - self._persistence.save(self._persistence_path,self._persistence_file, self._job_list) + self._persistence.save(self._persistence_path, + self._persistence_file, self._job_list) + def backup_save(self): """ Persists the job list """ self._persistence.save(self._persistence_path, - self._persistence_file+"_backup", self._job_list) + self._persistence_file + "_backup", self._job_list) + def update_status_log(self): job_list = self.get_completed()[-5:] + self.get_in_queue() - Log.status("\n{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", "Job Name", "Job Id", "Job Status", "Job Platform", "Job Queue") + Log.status("\n{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", "Job Name", + "Job Id", "Job Status", "Job Platform", "Job Queue") for job in job_list: if len(job.queue) < 1: queue = "no-scheduler" else: queue = job.queue - Log.status("{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", job.name, job.id, Status().VALUE_TO_KEY[job.status],job.platform.name,queue) - + Log.status("{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", job.name, job.id, Status( + ).VALUE_TO_KEY[job.status], job.platform.name, queue) def update_from_file(self, store_change=True): """ @@ -1167,7 +1200,8 @@ class JobList: if not fromSetStatus: all_parents_completed = [] for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False @@ -1338,7 +1372,8 @@ class JobList: if out: Log.result("Scripts OK") else: - Log.printlog("Scripts check failed\n Running after failed scripts is at your own risk!",3000) + Log.printlog( + "Scripts check failed\n Running after failed scripts is at your own risk!", 3000) return out def _remove_job(self, job): diff --git a/docs/source/usage/run.rst b/docs/source/usage/run.rst index 19b0af783a8b23b92188113a448848069a18fccb..9dbff161d1549242559b5b361bf83ff1c7999a04 100644 --- a/docs/source/usage/run.rst +++ b/docs/source/usage/run.rst @@ -21,6 +21,8 @@ Options: Sets the starting time for the experiment. Accepted format: 'yyyy-mm-dd HH:MM:SS' or 'HH:MM:SS' (defaults to current day). -sa --start_after Sets a experiment expid that will be tracked for completion. When this experiment is completed, the current instance of Autosubmit run will start. + -rm --run_members + Sets a list of members allowed to run. The list must have the format '### ###' where '###' represents the name of the member as set in the conf files. -h, --help show this help message and exit Example: