diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1983dbce5f2b8b58f4be7cd76b794acfaf9975ff..493d3ea03cf09ff81f9f4e889e3587e39d6e5611 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -71,6 +71,7 @@ from job.job_package_persistence import JobPackagePersistence from job.job_list_persistence import JobListPersistenceDb from job.job_list_persistence import JobListPersistencePkl from job.job_grouping import JobGrouping +# from API.testAPI import Monitor # noinspection PyPackageRequirements from bscearth.utils.log import Log from database.db_common import create_db @@ -355,6 +356,11 @@ class Autosubmit: "Valid values = ['Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN']") group.add_argument('-ft', '--filter_type', type=str, help='Select the job type to filter the list of jobs') + group.add_argument('-ftc', '--filter_type_chunk', type=str, + help='Supply the list of chunks to change the status. Default = "Any". When the member name "all" is set, all the chunks \ + selected from for that member will be updated for all the members. Example: all [1], will have as a result that the \ + chunks 1 for all the members will be updated. Follow the format: ' + '"[ 19601101 [ fc0 [1 2 3 4] Any [1] ] 19651101 [ fc0 [16-30] ] ],SIM,SIM2,SIM3"') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') @@ -366,6 +372,7 @@ class Autosubmit: subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') subparser.add_argument('-cw', '--check_wrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') + subparser.add_argument('-d', '--detail', action='store_true', default=False, help='Generate detailed view of changes') # Test Case @@ -456,8 +463,8 @@ class Autosubmit: return Autosubmit.install() elif args.command == 'setstatus': return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, - args.filter_chunks, args.filter_status, args.filter_type, args.hide, - args.group_by, args.expand, args.expand_status, args.notransitive,args.check_wrapper) + args.filter_chunks, args.filter_status, args.filter_type, args.filter_type_chunk, args.hide, + args.group_by, args.expand, args.expand_status, args.notransitive,args.check_wrapper, args.detail) elif args.command == 'testcase': return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, args.HPC, args.branch) @@ -2619,9 +2626,11 @@ class Autosubmit: job.status = final_status Log.info("CHANGED: job: " + job.name + " status to: " + final) + + @staticmethod - def set_status(expid, noplot, save, final, lst, filter_chunks, filter_status, filter_section, hide, group_by=None, - expand=list(), expand_status=list(), notransitive=False,check_wrapper=False): + def set_status(expid, noplot, save, final, lst, filter_chunks, filter_status, filter_section, filter_type_chunk, hide, group_by=None, + expand=list(), expand_status=list(), notransitive=False,check_wrapper=False, detail=False): """ Set status @@ -2669,7 +2678,181 @@ class Autosubmit: Log.critical('Can not run with invalid configuration') return False + + # Validating job sections, if filter_section -ft has been set: + if filter_section is not None: + section_validation_error = False + section_error = False + section_not_foundList = list() + section_validation_message = "\n## Section Validation Message ##" + countStart = filter_section.count('[') + countEnd = filter_section.count(']') + if countStart > 1 or countEnd > 1: + section_validation_error = True + section_validation_message += "\n\tList of sections has a format error. Perhaps you were trying to use -fc instead." + countUnderscore = filter_section.count('_') + if countUnderscore > 1: + section_validation_error = True + section_validation_message += "\n\tList of sections provided has a format error. Perhaps you were trying to use -fl instead." + if section_validation_error == False: + if len(str(filter_section).strip()) > 0: + if len(filter_section.split()) > 0: + jobSections = as_conf.get_jobs_sections() + for section in filter_section.split(): + # print(section) + # Provided section is not an existing section or it is not the keyword 'Any' + if section not in jobSections and (section != "Any"): + section_error = True + section_not_foundList.append(section) + else: + section_validation_error = True + section_validation_message += "\n\tEmpty input. No changes performed." + if section_validation_error == True or section_error == True: + if section_error == True: + section_validation_message += "\n\tSpecified section(s) : [" + str(section_not_foundList) + \ + "] not found in the experiment " + str(expid) + \ + ".\n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ + "\n\tRemember that this option expects section names separated by a blank space as input." + + Log.info(section_validation_message) + Log.critical("Error in the supplied input for -ft.") + return False + + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + + # Validating list of jobs, if filter_list -fl has been set: + # Seems that Autosubmit.load_job_list call is necessary before verification is executed + if job_list is not None and lst is not None: + job_validation_error = False + job_error = False + job_not_foundList = list() + job_validation_message = "\n## Job Validation Message ##" + jobs = list() + countStart = lst.count('[') + countEnd = lst.count(']') + if countStart > 1 or countEnd > 1: + job_validation_error = True + job_validation_message += "\n\tList of jobs has a format error. Perhaps you were trying to use -fc instead." + + if job_validation_error == False: + for job in job_list.get_job_list(): + jobs.append(job.name) + if len(str(lst).strip()) > 0: + if len(lst.split()) > 0: + for sentJob in lst.split(): + #print(sentJob) + # Provided job does not exist or it is not the keyword 'Any' + if sentJob not in jobs and (sentJob != "Any"): + job_error = True + job_not_foundList.append(sentJob) + else: + job_validation_error = True + job_validation_message += "\n\tEmpty input. No changes performed." + + if job_validation_error == True or job_error == True: + if job_error == True: + job_validation_message += "\n\tSpecified job(s) : [" + str(job_not_foundList) + "] not found in the experiment " + \ + str(expid) + ". \n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ + "\n\tRemember that this option expects job names separated by a blank space as input." + Log.info(job_validation_message) + Log.critical("Error in the supplied input for -fl.") + return False + + # Validating fc if filter_chunks -fc has been set: + if filter_chunks is not None: + fc_validation_message = "## -fc Validation Message ##" + fc_filter_is_correct = True + selected_sections = filter_chunks.split(",")[1:] + selected_formula = filter_chunks.split(",")[0] + current_sections = as_conf.get_jobs_sections() + fc_deserializedJson = object() + # Starting Validation + if len(str(selected_sections).strip()) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tMust include a section (job type)." + else: + for section in selected_sections: + # section = section.strip() + # Validating empty sections + if len(str(section).strip()) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tEmpty sections are not accepted." + break + # Validating existing sections + # Retrieve experiment data + + if section not in current_sections: + fc_filter_is_correct = False + fc_validation_message += "\n\tSection " + section + " does not exist in experiment. Remember not to include blank spaces." + + # Validating chunk formula + if len(selected_formula) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tA formula for chunk filtering has not been provided." + + # If everything is fine until this point + if fc_filter_is_correct == True: + # Retrieve experiment data + current_dates = as_conf._exp_parser.get_option('experiment','DATELIST','').split() + current_members = as_conf.get_member_list() + # Parse json + try: + fc_deserializedJson = json.loads(Autosubmit._create_json(selected_formula)) + except: + fc_filter_is_correct = False + fc_validation_message += "\n\tProvided chunk formula does not have the right format. Were you trying to use another option?" + if fc_filter_is_correct == True: + for startingDate in fc_deserializedJson['sds']: + if startingDate['sd'] not in current_dates: + fc_filter_is_correct = False + fc_validation_message += "\n\tStarting date " + startingDate['sd'] + " does not exist in experiment." + for member in startingDate['ms']: + if member['m'] not in current_members: + fc_filter_is_correct = False + fc_validation_message += "\n\tMember " + member['m'] + " does not exist in experiment." + + # Ending validation + if fc_filter_is_correct == False: + Log.info(fc_validation_message) + Log.critical("Error in the supplied input for -fc.") + return False + + # Validating status, if filter_status -fs has been set: + # At this point we already have job_list from where we are getting the allows STATUS + if filter_status is not None: + status_validation_error = False + status_validation_message = "\n## Status Validation Message ##" + # Trying to identify chunk formula + countStart = filter_status.count('[') + countEnd = filter_status.count(']') + if countStart > 1 or countEnd > 1: + status_validation_error = True + status_validation_message += "\n\tList of status provided has a format error. Perhaps you were trying to use -fc instead." + # Trying to identify job names, implying status names won't use more than 1 underscore _ + countUnderscore = filter_status.count('_') + if countUnderscore > 1: + status_validation_error = True + status_validation_message += "\n\tList of status provided has a format error. Perhaps you were trying to use -fl instead." + # If everything is fine until this point + if status_validation_error == False: + status_filter = filter_status.split() + status_reference = Status() + status_list = list() + for job in job_list.get_job_list(): + reference = status_reference.VALUE_TO_KEY[job.status] + if reference not in status_list: + status_list.append(reference) + for status in status_filter: + if status not in status_list: + status_validation_error = True + status_validation_message += "\n\t There are no jobs with status " + status + " in this experiment." + if status_validation_error == True: + Log.info(status_validation_message) + Log.critical("Error in the supplied input for -fs.") + return False + + jobs_filtered =[] final_status = Autosubmit._get_status(final) if filter_section or filter_chunks: @@ -2688,6 +2871,151 @@ class Autosubmit: jobs_filtered.append(job) else: Autosubmit.change_status(final, final_status, job) + + # New feature : Change status by section, member, and chunk; freely. + # Including inner validation. Trying to make it independent. + if filter_type_chunk: + validation_message = "## -ftc Validation Message ##" + filter_is_correct = True + selected_sections = filter_type_chunk.split(",")[1:] + selected_formula = filter_type_chunk.split(",")[0] + deserializedJson = object() + performed_changes = dict() + + # Starting Validation + if len(str(selected_sections).strip()) == 0: + filter_is_correct = False + validation_message += "\n\tMust include a section (job type). If you want to apply the changes to all sections, include 'Any'." + else: + for section in selected_sections: + # Validating empty sections + if len(str(section).strip()) == 0: + filter_is_correct = False + validation_message += "\n\tEmpty sections are not accepted." + break + # Validating existing sections + # Retrieve experiment data + current_sections = as_conf.get_jobs_sections() + if section not in current_sections and section != "Any": + filter_is_correct = False + validation_message += "\n\tSection " + section + " does not exist in experiment." + + # Validating chunk formula + if len(selected_formula) == 0: + filter_is_correct = False + validation_message += "\n\tA formula for chunk filtering has not been provided. If you want to change all chunks, include 'Any'." + + # If everything is fine until this point + if filter_is_correct == True: + # Retrieve experiment data + current_dates = as_conf._exp_parser.get_option('experiment','DATELIST','').split() + current_members = as_conf.get_member_list() + # Parse json + try: + deserializedJson = json.loads(Autosubmit._create_json(selected_formula)) + except: + filter_is_correct = False + validation_message += "\n\tProvided chunk formula does not have the right format. Were you trying to use another option?" + if filter_is_correct == True: + for startingDate in deserializedJson['sds']: + if startingDate['sd'] not in current_dates: + filter_is_correct = False + validation_message += "\n\tStarting date " + startingDate['sd'] + " does not exist in experiment." + for member in startingDate['ms']: + if member['m'] not in current_members and member['m'] != "Any": + filter_is_correct_ = False + validation_message += "\n\tMember " + member['m'] + " does not exist in experiment." + + + # Ending validation + if filter_is_correct == False: + Log.info(validation_message) + Log.critical("Error in the supplied input for -ftc.") + return False + + # If input is valid, continue. + record = dict() + final_list = [] + # Get current list + working_list = job_list.get_job_list() + for section in selected_sections: + if section == "Any": + # Any section + section_selection = working_list + # Go through start dates + for starting_date in deserializedJson['sds']: + date = starting_date['sd'] + date_selection = filter(lambda j: date2str(j.date) == date, section_selection) + # Members for given start date + for member_group in starting_date['ms']: + member = member_group['m'] + if member == "Any": + # Any member + member_selection = date_selection + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = filter(lambda j: j.chunk == int(chunk), member_selection) + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in filter(lambda j: j.chunk == int(chunk) and j.synchronize is not None, date_selection): + final_list.append(job) + else: + # Selected members + member_selection = filter(lambda j: j.member == member, date_selection) + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = filter(lambda j: j.chunk == int(chunk), member_selection) + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in filter(lambda j: j.chunk == int(chunk) and j.synchronize is not None, date_selection): + final_list.append(job) + else: + # Only given section + section_selection = filter(lambda j: j.section == section, working_list) + # Go through start dates + for starting_date in deserializedJson['sds']: + date = starting_date['sd'] + date_selection = filter(lambda j: date2str(j.date) == date, section_selection) + # Members for given start date + for member_group in starting_date['ms']: + member = member_group['m'] + if member == "Any": + # Any member + member_selection = date_selection + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = filter(lambda j: j.chunk == int(chunk), member_selection) + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in filter(lambda j: j.chunk == int(chunk) and j.synchronize is not None, date_selection): + final_list.append(job) + else: + # Selected members + member_selection = filter(lambda j: j.member == member, date_selection) + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = filter(lambda j: j.chunk == int(chunk), member_selection) + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in filter(lambda j: j.chunk == int(chunk) and j.synchronize is not None, date_selection): + final_list.append(job) + status = Status() + for job in final_list: + if job.status != final_status: + # Only real changes + performed_changes[job.name] = str(Status.VALUE_TO_KEY[job.status]) + " -> " + str(final) + Autosubmit.change_status(final, final_status, job) + # If changes have been performed + if len(performed_changes.keys()) > 0: + if detail == True: + Log.info(job_list.print_with_status(statusChange = performed_changes)) + else: + Log.warning("No changes were performed.") + # End of New Feature if filter_chunks: if len(jobs_filtered) == 0: @@ -2745,7 +3073,6 @@ class Autosubmit: if wrongExpid > 0: Log.warning("There are {0} job.name with an invalid Expid",wrongExpid) - if jobs == 'Any': for job in job_list.get_job_list(): Autosubmit.change_status(final, final_status, job) @@ -2754,6 +3081,8 @@ class Autosubmit: if job.name in jobs: Autosubmit.change_status(final, final_status, job) + + job_list.update_list(as_conf,False,True) if save and wrongExpid == 0: @@ -3127,6 +3456,7 @@ class Autosubmit: @staticmethod def load_job_list(expid, as_conf, notransitive=False,monitor=False): + BasicConfig.read() rerun = as_conf.get_rerun() job_list = JobList(expid, BasicConfig, ConfigParserFactory(), Autosubmit._get_job_list_persistence(expid, as_conf)) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index b2a1fdd96e40566450b346b756f06d996eff2716..51205c98d13b131d7fe353c1070f007a165f77c6 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -40,6 +40,15 @@ class Status: def retval(self, value): return getattr(self, value) +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' class Type: """ diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index e933cc96fe488bb9b323c5afbab4245959b02803..f7d75d408a391208c2322f4f65351fc715863f66 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -34,7 +34,7 @@ from autosubmit.job.job import Job from bscearth.utils.log import Log from autosubmit.job.job_dict import DicJobs from autosubmit.job.job_utils import Dependency -from autosubmit.job.job_common import Status, Type +from autosubmit.job.job_common import Status, Type, bcolors from bscearth.utils.date import date2str, parse_date, sum_str_hours from autosubmit.job.job_packages import JobPackageSimple, JobPackageArray, JobPackageThread @@ -1050,3 +1050,88 @@ class JobList: if flag: self.update_genealogy(notransitive=notransitive) del self._dic_jobs + + def print_with_status(self, statusChange = None): + """ + Returns the string representation of the dependency tree of + the Job List + + :return: String representation + :rtype: String + """ + allJobs = self.get_all() + # Header + result = bcolors.BOLD + "## String representation of Job List [" + str(len(allJobs)) + "] with " + \ + bcolors.OKGREEN + str(len(statusChange.keys())) + " Change(s) ##" + bcolors.ENDC + bcolors.ENDC + + # Find root + root = None + for job in allJobs: + if job.has_parents() == False: + root = job + + # root exists + if root is not None: + result += self._recursion_print(root, 0, statusChange = statusChange) + else: + result += "\nCannot find root." + + return result + + def __str__(self): + """ + Returns the string representation of the class. + Usage print(class) + + :return: String representation. + :rtype: String + """ + allJobs = self.get_all() + result = bcolors.BOLD + "## String representation of Job List [" + str(len(allJobs)) + "] ##" + bcolors.ENDC + + # Find root + root = None + for job in allJobs: + if job.has_parents() == False: + root = job + + # root exists + if root is not None: + result += self._recursion_print(root, 0) + else: + result += "\nCannot find root." + + return result + + def _recursion_print(self, job, level, statusChange = None): + """ + Returns the list of children in a recursive way + Traverses the dependency tree + + :return: parent + list of children + :rtype: String + """ + result = "" + prefix = "" + for i in range(level): + prefix += "| " + # Prefix + Job Name + result = "\n"+ prefix + bcolors.BOLD + job.name + bcolors.ENDC + if len(job._children) > 0: + level += 1 + children = job._children + total_children = len(job._children) + # Writes children number + result += " ~ [" + str(total_children) + (" children] " if total_children > 1 else " child] ") + if statusChange is not None: + # Writes change if performed + result += bcolors.BOLD + bcolors.OKGREEN + statusChange[job.name] if job.name in statusChange else "" + result += bcolors.ENDC + bcolors.ENDC + + for child in children: + # Continues recursion + result += self._recursion_print(child, level, statusChange=statusChange) + else: + pass + + return result \ No newline at end of file diff --git a/docs/source/usage/setstatus.rst b/docs/source/usage/setstatus.rst index ed8b4fecb8ae978bb89bdbcb9e85ff699fefba68..6d8975f9faa2c30f790d4fed8b8de0152535f61f 100644 --- a/docs/source/usage/setstatus.rst +++ b/docs/source/usage/setstatus.rst @@ -28,6 +28,15 @@ Options: List of status to be changed -ft FILTER_TYPE, --filter_type List of types to be changed + -ftc FILTER_TYPE_CHUNK --filter_type_chunk + Accepts a string with the formula: "[ 19601101 [ fc0 [1 2 3 4] Any [1] ] 19651101 [ fc0 [16 30] ] ],SIM,SIM2" + Where SIM, SIM2 are section (or job type) names that also accept the keyword "Any" so the changes apply to all sections. + Starting Date (19601101) does not accept the keyword "Any". + Member names (fc0) accept the keyword "Any", so the chunks ([1 2 3 4]) given will be updated in all members. + Chunks must be in the format "[1 2 3 4 n]" where "n" is an integer representing the number of the chunk in the member, + no range format is allowed. + -d When using the option -ftc and sending this flag, a tree view of the experiment with markers indicating which jobs + have been changed will be generated. --hide, hide the plot -group_by {date,member,chunk,split,automatic} criteria to use for grouping jobs