diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 4b85f66e7f92a87c013d0968a628f2976f7c5514..80292a28c51f7d0598cfe12bf82a2da1524817c1 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -501,6 +501,11 @@ class Autosubmit: selected from for that member will be updated for all the members. Example: all [1], will have as a result that the \ chunks 1 for all the members will be updated. Follow the format: ' '"[ 19601101 [ fc0 [1 2 3 4] Any [1] ] 19651101 [ fc0 [16-30] ] ],SIM,SIM2,SIM3"') + group.add_argument('-ftcs', '--filter_type_chunk_split', type=str, + help='Supply the list of chunks & splits to change the status. Default = "Any". When the member name "all" is set, all the chunks \ + selected from for that member will be updated for all the members. Example: all [1], will have as a result that the \ + chunks 1 for all the members will be updated. Follow the format: ' + '"[ 19601101 [ fc0 [1 [1 2] 2 3 4] Any [1] ] 19651101 [ fc0 [16-30] ] ],SIM,SIM2,SIM3"') subparser.add_argument('--hide', action='store_true', default=False, help='hides plot window') @@ -689,7 +694,7 @@ class Autosubmit: elif args.command == 'setstatus': return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, args.filter_chunks, args.filter_status, args.filter_type, - args.filter_type_chunk, args.hide, + args.filter_type_chunk, args.filter_type_chunk_split, args.hide, args.group_by, args.expand, args.expand_status, args.notransitive, args.check_wrapper, args.detail) elif args.command == 'testcase': @@ -4451,8 +4456,7 @@ class Autosubmit: rerun = as_conf.get_rerun() Log.info("\nCreating the jobs list...") - job_list = JobList(expid, BasicConfig, YAMLParserFactory(), - Autosubmit._get_job_list_persistence(expid, as_conf), as_conf) + job_list = JobList(expid, BasicConfig, YAMLParserFactory(),Autosubmit._get_job_list_persistence(expid, as_conf), as_conf) prev_job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) @@ -4753,36 +4757,362 @@ class Autosubmit: Log.status("CHANGED: job: " + job.name + " status to: " + final) @staticmethod - def set_status(expid, noplot, save, final, lst, filter_chunks, filter_status, filter_section, filter_type_chunk, + def _validate_section(as_conf,filter_section): + section_validation_error = False + section_error = False + section_not_foundList = list() + section_validation_message = "\n## Section Validation Message ##" + countStart = filter_section.count('[') + countEnd = filter_section.count(']') + if countStart > 1 or countEnd > 1: + section_validation_error = True + section_validation_message += "\n\tList of sections has a format error. Perhaps you were trying to use -fc instead." + if section_validation_error is False: + if len(str(filter_section).strip()) > 0: + if len(filter_section.split()) > 0: + jobSections = as_conf.jobs_data + for section in filter_section.split(): + # print(section) + # Provided section is not an existing section, or it is not the keyword 'Any' + if section not in jobSections and (section != "Any"): + section_error = True + section_not_foundList.append(section) + else: + section_validation_error = True + section_validation_message += "\n\tEmpty input. No changes performed." + if section_validation_error is True or section_error is True: + if section_error is True: + section_validation_message += "\n\tSpecified section(s) : [" + str(section_not_foundList) + " not found"\ + ".\n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ + "\n\tRemember that this option expects section names separated by a blank space as input." + + raise AutosubmitCritical("Error in the supplied input for -ft.", 7011, section_validation_message) + @staticmethod + def _validate_list(as_conf,job_list,filter_list): + job_validation_error = False + job_error = False + job_not_foundList = list() + job_validation_message = "\n## Job Validation Message ##" + jobs = list() + countStart = filter_list.count('[') + countEnd = filter_list.count(']') + if countStart > 1 or countEnd > 1: + job_validation_error = True + job_validation_message += "\n\tList of jobs has a format error. Perhaps you were trying to use -fc instead." + + if job_validation_error is False: + for job in job_list.get_job_list(): + jobs.append(job.name) + if len(str(filter_list).strip()) > 0: + if len(filter_list.split()) > 0: + for sentJob in filter_list.split(): + # Provided job does not exist, or it is not the keyword 'Any' + if sentJob not in jobs and (sentJob != "Any"): + job_error = True + job_not_foundList.append(sentJob) + else: + job_validation_error = True + job_validation_message += "\n\tEmpty input. No changes performed." + + if job_validation_error is True or job_error is True: + if job_error is True: + job_validation_message += "\n\tSpecified job(s) : [" + str( + job_not_foundList) + "] not found in the experiment " + \ + str(as_conf.expid) + ". \n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ + "\n\tRemember that this option expects job names separated by a blank space as input." + raise AutosubmitCritical( + "Error in the supplied input for -ft.", 7011, job_validation_message) + @staticmethod + def _validate_chunks(as_conf,filter_chunks): + fc_validation_message = "## -fc Validation Message ##" + fc_filter_is_correct = True + selected_sections = filter_chunks.split(",")[1:] + selected_formula = filter_chunks.split(",")[0] + current_sections = as_conf.jobs_data + fc_deserializedJson = object() + # Starting Validation + if len(str(selected_sections).strip()) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tMust include a section (job type)." + else: + for section in selected_sections: + # section = section.strip() + # Validating empty sections + if len(str(section).strip()) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tEmpty sections are not accepted." + break + # Validating existing sections + # Retrieve experiment data + + if section not in current_sections: + fc_filter_is_correct = False + fc_validation_message += "\n\tSection " + section + \ + " does not exist in experiment. Remember not to include blank spaces." + + # Validating chunk formula + if len(selected_formula) == 0: + fc_filter_is_correct = False + fc_validation_message += "\n\tA formula for chunk filtering has not been provided." + + # If everything is fine until this point + if fc_filter_is_correct is True: + # Retrieve experiment data + current_dates = as_conf.experiment_data["EXPERIMENT"]["DATELIST"].split() + current_members = as_conf.get_member_list() + # Parse json + try: + fc_deserializedJson = json.loads( + Autosubmit._create_json(selected_formula)) + except Exception as e: + fc_filter_is_correct = False + fc_validation_message += "\n\tProvided chunk formula does not have the right format. Were you trying to use another option?" + if fc_filter_is_correct is True: + for startingDate in fc_deserializedJson['sds']: + if startingDate['sd'] not in current_dates: + fc_filter_is_correct = False + fc_validation_message += "\n\tStarting date " + \ + startingDate['sd'] + \ + " does not exist in experiment." + for member in startingDate['ms']: + if member['m'] not in current_members and member['m'].lower() != "any": + fc_filter_is_correct = False + fc_validation_message += "\n\tMember " + \ + member['m'] + \ + " does not exist in experiment." + + # Ending validation + if fc_filter_is_correct is False: + raise AutosubmitCritical( + "Error in the supplied input for -fc.", 7011, fc_validation_message) + @staticmethod + def _validate_status(job_list,filter_status): + status_validation_error = False + status_validation_message = "\n## Status Validation Message ##" + # Trying to identify chunk formula + countStart = filter_status.count('[') + countEnd = filter_status.count(']') + if countStart > 1 or countEnd > 1: + status_validation_error = True + status_validation_message += "\n\tList of status provided has a format error. Perhaps you were trying to use -fc instead." + # If everything is fine until this point + if status_validation_error is False: + status_filter = filter_status.split() + status_reference = Status() + status_list = list() + for job in job_list.get_job_list(): + reference = status_reference.VALUE_TO_KEY[job.status] + if reference not in status_list: + status_list.append(reference) + for status in status_filter: + if status not in status_list: + status_validation_error = True + status_validation_message += "\n\t There are no jobs with status " + \ + status + " in this experiment." + if status_validation_error is True: + raise AutosubmitCritical("Error in the supplied input for -fs.", 7011, status_validation_message) + + @staticmethod + def _validate_type_chunk(as_conf,filter_type_chunk): + #Change status by section, member, and chunk; freely. + # Including inner validation. Trying to make it independent. + # 19601101 [ fc0 [1 2 3 4] Any [1] ] 19651101 [ fc0 [16-30] ] ],SIM,SIM2,SIM3 + validation_message = "## -ftc Validation Message ##" + filter_is_correct = True + selected_sections = filter_type_chunk.split(",")[1:] + selected_formula = filter_type_chunk.split(",")[0] + deserializedJson = object() + # Starting Validation + if len(str(selected_sections).strip()) == 0: + filter_is_correct = False + validation_message += "\n\tMust include a section (job type). If you want to apply the changes to all sections, include 'Any'." + else: + for section in selected_sections: + # Validating empty sections + if len(str(section).strip()) == 0: + filter_is_correct = False + validation_message += "\n\tEmpty sections are not accepted." + break + # Validating existing sections + # Retrieve experiment data + current_sections = as_conf.jobs_data + if section not in current_sections and section != "Any": + filter_is_correct = False + validation_message += "\n\tSection " + \ + section + " does not exist in experiment." + + # Validating chunk formula + if len(selected_formula) == 0: + filter_is_correct = False + validation_message += "\n\tA formula for chunk filtering has not been provided. If you want to change all chunks, include 'Any'." + + if filter_is_correct is False: + raise AutosubmitCritical( + "Error in the supplied input for -ftc.", 7011, validation_message) + + @staticmethod + def _validate_chunk_split(as_conf,filter_chunk_split): + # new filter + pass + @staticmethod + def _validate_set_status_filters(as_conf,job_list,filter_list,filter_chunks,filter_status,filter_section,filter_type_chunk, filter_chunk_split): + if filter_section is not None: + Autosubmit._validate_section(as_conf,filter_section) + if filter_list is not None: + Autosubmit._validate_list(as_conf,job_list,filter_list) + if filter_chunks is not None: + Autosubmit._validate_chunks(as_conf,filter_chunks) + if filter_status is not None: + Autosubmit._validate_status(job_list,filter_status) + if filter_type_chunk is not None: + Autosubmit._validate_type_chunk(as_conf,filter_type_chunk) + if filter_chunk_split is not None: + Autosubmit._validate_chunk_split(as_conf,filter_chunk_split) + + @staticmethod + def _apply_ftc(job_list,filter_type_chunk_split): + """ + Accepts a string with the formula: "[ 19601101 [ fc0 [1 [1] 2 [2 3] 3 4] Any [1] ] 19651101 [ fc0 [16 30] ] ],SIM [ Any ] ,SIM2 [ 1 2]" + Where SIM, SIM2 are section (job types) names that also accept the keyword "Any" so the changes apply to all sections. + Starting Date (19601101) does not accept the keyword "Any", so you must specify the starting dates to be changed. + You can also specify date ranges to apply the change to a range on dates. + Member names (fc0) accept the keyword "Any", so the chunks ([1 2 3 4]) given will be updated for all members. + Chunks must be in the format "[1 2 3 4]" where "1 2 3 4" represent the numbers of the chunks in the member, + Splits must be in the format "[ 1 2 3 4]" where "1 2 3 4" represent the numbers of the splits in the sections. + no range format is allowed. + :param filter_type_chunk_split: string with the formula + :return: final_list + """ + # Get selected sections and formula + final_list = [] + selected_sections = filter_type_chunk_split.split(",")[1:] + selected_formula = filter_type_chunk_split.split(",")[0] + # Retrieve experiment data + # Parse json + deserializedJson = json.loads(Autosubmit._create_json(selected_formula)) + # Get current list + working_list = job_list.get_job_list() + for section in selected_sections: + if str(section).upper() == "ANY": + # Any section + section_selection = working_list + # Go through start dates + for starting_date in deserializedJson['sds']: + date = starting_date['sd'] + date_selection = [j for j in section_selection if date2str( + j.date) == date] + # Members for given start date + for member_group in starting_date['ms']: + member = member_group['m'] + if str(member).upper() == "ANY": + # Any member + member_selection = date_selection + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = [j for j in member_selection if j.chunk == int(chunk)] + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in [j for j in date_selection if + j.chunk == int(chunk) and j.synchronize is not None]: + final_list.append(job) + else: + # Selected members + member_selection = [j for j in date_selection if j.member == member] + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = [j for j in member_selection if j.chunk == int(chunk)] + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in [j for j in date_selection if + j.chunk == int(chunk) and j.synchronize is not None]: + final_list.append(job) + else: + # Only given section + section_splits = section.split("[") + section = section_splits[0].strip(" [") + if len(section_splits) > 1: + if "," in section_splits[1]: + splits = section_splits[1].strip(" ]").split(",") + else: + splits = section_splits[1].strip(" ]").split(" ") + else: + splits = ["ANY"] + final_splits = [] + for split in splits: + start = None + end = None + if split.find("-") != -1: + start = split.split("-")[0] + end = split.split("-")[1] + if split.find(":") != -1: + start = split.split(":")[0] + end = split.split(":")[1] + if start and end: + final_splits += [ str(i) for i in range(int(start),int(end)+1)] + else: + final_splits.append(str(split)) + splits = final_splits + jobs_filtered = [j for j in working_list if j.section == section and ( j.split is None or splits[0] == "ANY" or str(j.split) in splits ) ] + # Go through start dates + for starting_date in deserializedJson['sds']: + date = starting_date['sd'] + date_selection = [j for j in jobs_filtered if date2str( + j.date) == date] + # Members for given start date + for member_group in starting_date['ms']: + member = member_group['m'] + if str(member).upper() == "ANY": + # Any member + member_selection = date_selection + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = [j for j in member_selection if + j.chunk is None or j.chunk == int(chunk)] + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in [j for j in date_selection if + j.chunk == int(chunk) and j.synchronize is not None]: + final_list.append(job) + else: + # Selected members + member_selection = [j for j in date_selection if j.member == member] + chunk_group = member_group['cs'] + for chunk in chunk_group: + filtered_job = [j for j in member_selection if j.chunk == int(chunk)] + for job in filtered_job: + final_list.append(job) + # From date filter and sync is not None + for job in [j for j in date_selection if + j.chunk == int(chunk) and j.synchronize is not None]: + final_list.append(job) + return final_list + @staticmethod + def set_status(expid, noplot, save, final, filter_list, filter_chunks, filter_status, filter_section, filter_type_chunk, filter_type_chunk_split, hide, group_by=None, expand=list(), expand_status=list(), notransitive=False, check_wrapper=False, detail=False): """ - Set status - - :param detail: - :param check_wrapper: - :param notransitive: - :param expand_status: - :param expand: - :param group_by: - :param filter_type_chunk: - :param noplot: - :param expid: experiment identifier - :type expid: str - :param save: if true, saves the new jobs list - :type save: bool - :param final: status to set on jobs - :type final: str - :param lst: list of jobs to change status - :type lst: str - :param filter_chunks: chunks to change status - :type filter_chunks: str - :param filter_status: current status of the jobs to change status - :type filter_status: str - :param filter_section: sections to change status - :type filter_section: str - :param hide: hides plot window - :type hide: bool + Set status of jobs + :param expid: experiment id + :param noplot: do not plot + :param save: save + :param final: final status + :param filter_list: list of jobs + :param filter_chunks: filter chunks + :param filter_status: filter status + :param filter_section: filter section + :param filter_type_chunk: filter type chunk + :param filter_chunk_split: filter chunk split + :param hide: hide + :param group_by: group by + :param expand: expand + :param expand_status: expand status + :param notransitive: notransitive + :param check_wrapper: check wrapper + :param detail: detail + :return: """ Autosubmit._check_ownership(expid, raise_error=True) exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) @@ -4798,10 +5128,11 @@ class Autosubmit: Log.debug('Exp ID: {0}', expid) Log.debug('Save: {0}', save) Log.debug('Final status: {0}', final) - Log.debug('List of jobs to change: {0}', lst) + Log.debug('List of jobs to change: {0}', filter_list) Log.debug('Chunks to change: {0}', filter_chunks) Log.debug('Status of jobs to change: {0}', filter_status) Log.debug('Sections to change: {0}', filter_section) + wrongExpid = 0 as_conf = AutosubmitConfig( expid, BasicConfig, YAMLParserFactory()) @@ -4810,46 +5141,8 @@ class Autosubmit: # Getting output type from configuration output_type = as_conf.get_output_type() # Getting db connections - - # Validating job sections, if filter_section -ft has been set: - if filter_section is not None: - section_validation_error = False - section_error = False - section_not_foundList = list() - section_validation_message = "\n## Section Validation Message ##" - countStart = filter_section.count('[') - countEnd = filter_section.count(']') - if countStart > 1 or countEnd > 1: - section_validation_error = True - section_validation_message += "\n\tList of sections has a format error. Perhaps you were trying to use -fc instead." - # countUnderscore = filter_section.count('_') - # if countUnderscore > 1: - # section_validation_error = True - # section_validation_message += "\n\tList of sections provided has a format error. Perhaps you were trying to use -fl instead." - if section_validation_error is False: - if len(str(filter_section).strip()) > 0: - if len(filter_section.split()) > 0: - jobSections = as_conf.jobs_data - for section in filter_section.split(): - # print(section) - # Provided section is not an existing section, or it is not the keyword 'Any' - if section not in jobSections and (section != "Any"): - section_error = True - section_not_foundList.append(section) - else: - section_validation_error = True - section_validation_message += "\n\tEmpty input. No changes performed." - if section_validation_error is True or section_error is True: - if section_error is True: - section_validation_message += "\n\tSpecified section(s) : [" + str(section_not_foundList) + \ - "] not found in the experiment " + str(expid) + \ - ".\n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ - "\n\tRemember that this option expects section names separated by a blank space as input." - - raise AutosubmitCritical( - "Error in the supplied input for -ft.", 7011, section_validation_message+job_validation_message) - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) + # To be added in a function that checks which platforms must be connected to + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) hpcarch = as_conf.get_platform() @@ -4868,8 +5161,7 @@ class Autosubmit: job.platform = platforms[job.platform_name] # noinspection PyTypeChecker if job.status in [Status.QUEUING, Status.SUBMITTED, Status.RUNNING]: - platforms_to_test.add( - platforms[job.platform_name]) + platforms_to_test.add(platforms[job.platform_name]) # establish the connection to all platforms definitive_platforms = list() for platform in platforms_to_test: @@ -4878,347 +5170,48 @@ class Autosubmit: definitive_platforms.append(platform.name) except Exception as e: pass - - # Validating list of jobs, if filter_list -fl has been set: - # Seems that Autosubmit.load_job_list call is necessary before verification is executed - if job_list is not None and lst is not None: - job_validation_error = False - job_error = False - job_not_foundList = list() - job_validation_message = "\n## Job Validation Message ##" - jobs = list() - countStart = lst.count('[') - countEnd = lst.count(']') - if countStart > 1 or countEnd > 1: - job_validation_error = True - job_validation_message += "\n\tList of jobs has a format error. Perhaps you were trying to use -fc instead." - - if job_validation_error is False: - for job in job_list.get_job_list(): - jobs.append(job.name) - if len(str(lst).strip()) > 0: - if len(lst.split()) > 0: - for sentJob in lst.split(): - # Provided job does not exist, or it is not the keyword 'Any' - if sentJob not in jobs and (sentJob != "Any"): - job_error = True - job_not_foundList.append(sentJob) - else: - job_validation_error = True - job_validation_message += "\n\tEmpty input. No changes performed." - - if job_validation_error is True or job_error is True: - if job_error is True: - job_validation_message += "\n\tSpecified job(s) : [" + str( - job_not_foundList) + "] not found in the experiment " + \ - str(expid) + ". \n\tProcess stopped. Review the format of the provided input. Comparison is case sensitive." + \ - "\n\tRemember that this option expects job names separated by a blank space as input." - raise AutosubmitCritical( - "Error in the supplied input for -ft.", 7011, section_validation_message+job_validation_message) - - # Validating fc if filter_chunks -fc has been set: - if filter_chunks is not None: - fc_validation_message = "## -fc Validation Message ##" - fc_filter_is_correct = True - selected_sections = filter_chunks.split(",")[1:] - selected_formula = filter_chunks.split(",")[0] - current_sections = as_conf.jobs_data - fc_deserializedJson = object() - # Starting Validation - if len(str(selected_sections).strip()) == 0: - fc_filter_is_correct = False - fc_validation_message += "\n\tMust include a section (job type)." - else: - for section in selected_sections: - # section = section.strip() - # Validating empty sections - if len(str(section).strip()) == 0: - fc_filter_is_correct = False - fc_validation_message += "\n\tEmpty sections are not accepted." - break - # Validating existing sections - # Retrieve experiment data - - if section not in current_sections: - fc_filter_is_correct = False - fc_validation_message += "\n\tSection " + section + \ - " does not exist in experiment. Remember not to include blank spaces." - - # Validating chunk formula - if len(selected_formula) == 0: - fc_filter_is_correct = False - fc_validation_message += "\n\tA formula for chunk filtering has not been provided." - - # If everything is fine until this point - if fc_filter_is_correct is True: - # Retrieve experiment data - current_dates = as_conf.experiment_data["EXPERIMENT"]["DATELIST"].split() - current_members = as_conf.get_member_list() - # Parse json - try: - fc_deserializedJson = json.loads( - Autosubmit._create_json(selected_formula)) - except Exception as e: - fc_filter_is_correct = False - fc_validation_message += "\n\tProvided chunk formula does not have the right format. Were you trying to use another option?" - if fc_filter_is_correct is True: - for startingDate in fc_deserializedJson['sds']: - if startingDate['sd'] not in current_dates: - fc_filter_is_correct = False - fc_validation_message += "\n\tStarting date " + \ - startingDate['sd'] + \ - " does not exist in experiment." - for member in startingDate['ms']: - if member['m'] not in current_members and member['m'].lower() != "any": - fc_filter_is_correct = False - fc_validation_message += "\n\tMember " + \ - member['m'] + \ - " does not exist in experiment." - - # Ending validation - if fc_filter_is_correct is False: - section_validation_message = fc_validation_message - raise AutosubmitCritical( - "Error in the supplied input for -fc.", 7011, section_validation_message+job_validation_message) - # Validating status, if filter_status -fs has been set: - # At this point we already have job_list from where we are getting the allows STATUS - if filter_status is not None: - status_validation_error = False - status_validation_message = "\n## Status Validation Message ##" - # Trying to identify chunk formula - countStart = filter_status.count('[') - countEnd = filter_status.count(']') - if countStart > 1 or countEnd > 1: - status_validation_error = True - status_validation_message += "\n\tList of status provided has a format error. Perhaps you were trying to use -fc instead." - # Trying to identify job names, implying status names won't use more than 1 underscore _ - # countUnderscore = filter_status.count('_') - # if countUnderscore > 1: - # status_validation_error = True - # status_validation_message += "\n\tList of status provided has a format error. Perhaps you were trying to use -fl instead." - # If everything is fine until this point - if status_validation_error is False: - status_filter = filter_status.split() - status_reference = Status() - status_list = list() - for job in job_list.get_job_list(): - reference = status_reference.VALUE_TO_KEY[job.status] - if reference not in status_list: - status_list.append(reference) - for status in status_filter: - if status not in status_list: - status_validation_error = True - status_validation_message += "\n\t There are no jobs with status " + \ - status + " in this experiment." - if status_validation_error is True: - raise AutosubmitCritical("Error in the supplied input for -fs.{0}".format( - status_validation_message), 7011, section_validation_message+job_validation_message) - + ##### End of the ""function"" + # This will raise an autosubmit critical if any of the filters has issues in the format specified by the user + Autosubmit._validate_set_status_filters(as_conf,job_list,filter_list,filter_chunks,filter_status,filter_section,filter_type_chunk, filter_type_chunk_split) + #### Starts the filtering process #### + final_list = [] jobs_filtered = [] + jobs_left_to_be_filtered = True final_status = Autosubmit._get_status(final) - if filter_section or filter_chunks: - if filter_section: - ft = filter_section.split() - else: - ft = filter_chunks.split(",")[1:] - if ft == 'Any': + # I have the impression that whoever did this function thought about the possibility of having multiple filters at the same time + # But, as it was, it is not possible to have multiple filters at the same time due to the way the code is written + if filter_section: + ft = filter_section.split() + if str(ft).upper() == 'ANY': for job in job_list.get_job_list(): - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) else: for section in ft: for job in job_list.get_job_list(): if job.section == section: - if filter_chunks: - jobs_filtered.append(job) - else: - Autosubmit.change_status( - final, final_status, job, save) - - # New feature : Change status by section, member, and chunk; freely. - # Including inner validation. Trying to make it independent. - # 19601101 [ fc0 [1 2 3 4] Any [1] ] 19651101 [ fc0 [16-30] ] ],SIM,SIM2,SIM3 - if filter_type_chunk: - validation_message = "## -ftc Validation Message ##" - filter_is_correct = True - selected_sections = filter_type_chunk.split(",")[1:] - selected_formula = filter_type_chunk.split(",")[0] - deserializedJson = object() - performed_changes = dict() - - # Starting Validation - if len(str(selected_sections).strip()) == 0: - filter_is_correct = False - validation_message += "\n\tMust include a section (job type). If you want to apply the changes to all sections, include 'Any'." - else: - for section in selected_sections: - # Validating empty sections - if len(str(section).strip()) == 0: - filter_is_correct = False - validation_message += "\n\tEmpty sections are not accepted." - break - # Validating existing sections - # Retrieve experiment data - current_sections = as_conf.jobs_data - if section not in current_sections and section != "Any": - filter_is_correct = False - validation_message += "\n\tSection " + \ - section + " does not exist in experiment." - - # Validating chunk formula - if len(selected_formula) == 0: - filter_is_correct = False - validation_message += "\n\tA formula for chunk filtering has not been provided. If you want to change all chunks, include 'Any'." - - # If everything is fine until this point - if filter_is_correct is True: - # Retrieve experiment data - current_dates = as_conf.experiment_data["EXPERIMENT"]["DATELIST"].split() - current_members = as_conf.get_member_list() - # Parse json - try: - deserializedJson = json.loads( - Autosubmit._create_json(selected_formula)) - except Exception as e: - filter_is_correct = False - validation_message += "\n\tProvided chunk formula does not have the right format. Were you trying to use another option?" - if filter_is_correct is True: - for startingDate in deserializedJson['sds']: - if startingDate['sd'] not in current_dates: - filter_is_correct = False - validation_message += "\n\tStarting date " + \ - startingDate['sd'] + \ - " does not exist in experiment." - for member in startingDate['ms']: - if member['m'] not in current_members and member['m'] != "Any": - filter_is_correct_ = False - validation_message += "\n\tMember " + \ - member['m'] + \ - " does not exist in experiment." - - # Ending validation - if filter_is_correct is False: - raise AutosubmitCritical( - "Error in the supplied input for -ftc.", 7011, section_validation_message+job_validation_message) - - # If input is valid, continue. - record = dict() - final_list = [] - # Get current list - working_list = job_list.get_job_list() - for section in selected_sections: - if section == "Any": - # Any section - section_selection = working_list - # Go through start dates - for starting_date in deserializedJson['sds']: - date = starting_date['sd'] - date_selection = [j for j in section_selection if date2str( - j.date) == date] - # Members for given start date - for member_group in starting_date['ms']: - member = member_group['m'] - if member == "Any": - # Any member - member_selection = date_selection - chunk_group = member_group['cs'] - for chunk in chunk_group: - filtered_job = [j for j in member_selection if j.chunk == int(chunk)] - for job in filtered_job: - final_list.append(job) - # From date filter and sync is not None - for job in [j for j in date_selection if - j.chunk == int(chunk) and j.synchronize is not None]: - final_list.append(job) - else: - # Selected members - member_selection = [j for j in date_selection if j.member == member] - chunk_group = member_group['cs'] - for chunk in chunk_group: - filtered_job = [j for j in member_selection if j.chunk == int(chunk)] - for job in filtered_job: - final_list.append(job) - # From date filter and sync is not None - for job in [j for j in date_selection if - j.chunk == int(chunk) and j.synchronize is not None]: - final_list.append(job) - else: - # Only given section - section_selection = [j for j in working_list if j.section == section] - # Go through start dates - for starting_date in deserializedJson['sds']: - date = starting_date['sd'] - date_selection = [j for j in section_selection if date2str( - j.date) == date] - # Members for given start date - for member_group in starting_date['ms']: - member = member_group['m'] - if member == "Any": - # Any member - member_selection = date_selection - chunk_group = member_group['cs'] - for chunk in chunk_group: - filtered_job = [j for j in member_selection if - j.chunk is None or j.chunk == int(chunk)] - for job in filtered_job: - final_list.append(job) - # From date filter and sync is not None - for job in [j for j in date_selection if - j.chunk == int(chunk) and j.synchronize is not None]: - final_list.append(job) - else: - # Selected members - member_selection = [j for j in date_selection if j.member == member] - chunk_group = member_group['cs'] - for chunk in chunk_group: - filtered_job = [j for j in member_selection if j.chunk == int(chunk)] - for job in filtered_job: - final_list.append(job) - # From date filter and sync is not None - for job in [j for j in date_selection if - j.chunk == int(chunk) and j.synchronize is not None]: - final_list.append(job) - status = Status() - for job in final_list: - if job.status in [Status.QUEUING, Status.RUNNING, - Status.SUBMITTED] and job.platform.name not in definitive_platforms: - Log.printlog("JOB: [{1}] is ignored as the [{0}] platform is currently offline".format( - job.platform.name, job.name), 6000) - continue - if job.status != final_status: - # Only real changes - performed_changes[job.name] = str( - Status.VALUE_TO_KEY[job.status]) + " -> " + str(final) - Autosubmit.change_status( - final, final_status, job, save) - # If changes have been performed - if len(list(performed_changes.keys())) > 0: - if detail is True: - current_length = len(job_list.get_job_list()) - if current_length > 1000: - Log.warning( - "-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str( - current_length) + " jobs.") - else: - Log.info(job_list.print_with_status( - statusChange=performed_changes)) - else: - Log.warning("No changes were performed.") - # End of New Feature - + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) if filter_chunks: + ft = filter_chunks.split(",")[1:] + # Any located in section part + if str(ft).upper() == "ANY": + for job in job_list.get_job_list(): + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) + for job in job_list.get_job_list(): + if job.section == section: + if filter_chunks: + jobs_filtered.append(job) if len(jobs_filtered) == 0: jobs_filtered = job_list.get_job_list() - fc = filter_chunks - Log.debug(fc) - - if fc == 'Any': + # Any located in chunks part + if str(fc).upper() == "ANY": for job in jobs_filtered: - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) else: - # noinspection PyTypeChecker data = json.loads(Autosubmit._create_json(fc)) for date_json in data['sds']: date = date_json['sd'] @@ -5242,49 +5235,81 @@ class Autosubmit: for chunk_json in member_json['cs']: chunk = int(chunk_json) for job in [j for j in jobs_date if j.chunk == chunk and j.synchronize is not None]: - Autosubmit.change_status( - final, final_status, job, save) - + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) for job in [j for j in jobs_member if j.chunk == chunk]: - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + + #Autosubmit.change_status(final, final_status, job, save) if filter_status: status_list = filter_status.split() - Log.debug("Filtering jobs with status {0}", filter_status) - if status_list == 'Any': + if str(status_list).upper() == 'ANY': for job in job_list.get_job_list(): - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) else: for status in status_list: fs = Autosubmit._get_status(status) for job in [j for j in job_list.get_job_list() if j.status == fs]: - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) - if lst: - jobs = lst.split() + if filter_list: + jobs = filter_list.split() expidJoblist = defaultdict(int) - for x in lst.split(): + for x in filter_list.split(): expidJoblist[str(x[0:4])] += 1 - if str(expid) in expidJoblist: wrongExpid = jobs.__len__() - expidJoblist[expid] if wrongExpid > 0: Log.warning( "There are {0} job.name with an invalid Expid", wrongExpid) - - if jobs == 'Any': + if str(jobs).upper() == 'ANY': for job in job_list.get_job_list(): - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) else: for job in job_list.get_job_list(): if job.name in jobs: - Autosubmit.change_status( - final, final_status, job, save) + final_list.append(job) + #Autosubmit.change_status(final, final_status, job, save) + # All filters should be in a function but no have time to do it + # filter_Type_chunk_split == filter_type_chunk, but with the split essencially is the same but not sure about of changing the name to the filter itself + if filter_type_chunk_split is not None: + final_list.extend(Autosubmit._apply_ftc(job_list,filter_type_chunk_split)) + if filter_type_chunk: + final_list.extend(Autosubmit._apply_ftc(job_list,filter_type_chunk)) + # Time to change status + final_list = list(set(final_list)) + performed_changes = {} + for job in final_list: + if job.status in [Status.QUEUING, Status.RUNNING, + Status.SUBMITTED] and job.platform.name not in definitive_platforms: + Log.printlog("JOB: [{1}] is ignored as the [{0}] platform is currently offline".format( + job.platform.name, job.name), 6000) + continue + if job.status != final_status: + # Only real changes + performed_changes[job.name] = str( + Status.VALUE_TO_KEY[job.status]) + " -> " + str(final) + Autosubmit.change_status( + final, final_status, job, save) + # If changes have been performed + if len(list(performed_changes.keys())) > 0: + if detail is True: + current_length = len(job_list.get_job_list()) + if current_length > 1000: + Log.warning( + "-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str( + current_length) + " jobs.") + else: + Log.info(job_list.print_with_status( + statusChange=performed_changes)) + else: + Log.warning("No changes were performed.") + job_list.update_list(as_conf, False, True) @@ -5301,37 +5326,38 @@ class Autosubmit: else: Log.printlog( "Changes NOT saved to the JobList!!!!: use -s option to save", 3000) - - if as_conf.get_wrapper_type() != 'none' and check_wrapper: - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid + ".db"), 0o775) - packages_persistence.reset_table(True) - referenced_jobs_to_remove = set() - job_list_wrappers = copy.deepcopy(job_list) - jobs_wr = copy.deepcopy(job_list.get_job_list()) - [job for job in jobs_wr if ( - job.status != Status.COMPLETED)] - for job in jobs_wr: - for child in job.children: - if child not in jobs_wr: - referenced_jobs_to_remove.add(child) - for parent in job.parents: - if parent not in jobs_wr: - referenced_jobs_to_remove.add(parent) - - for job in jobs_wr: - job.children = job.children - referenced_jobs_to_remove - job.parents = job.parents - referenced_jobs_to_remove - Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, - packages_persistence, True) - - packages = packages_persistence.load(True) - else: - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() + #Visualization stuff that should be in a function common to monitor , create, -cw flag, inspect and so on if not noplot: + if as_conf.get_wrapper_type() != 'none' and check_wrapper: + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, + expid, "pkl", "job_packages_" + expid + ".db"), 0o775) + packages_persistence.reset_table(True) + referenced_jobs_to_remove = set() + job_list_wrappers = copy.deepcopy(job_list) + jobs_wr = copy.deepcopy(job_list.get_job_list()) + [job for job in jobs_wr if ( + job.status != Status.COMPLETED)] + for job in jobs_wr: + for child in job.children: + if child not in jobs_wr: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_wr: + referenced_jobs_to_remove.add(parent) + + for job in jobs_wr: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, + packages_persistence, True) + + packages = packages_persistence.load(True) + else: + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() groups_dict = dict() if group_by: status = list() @@ -5355,11 +5381,7 @@ class Autosubmit: show=not hide, groups=groups_dict, job_list_object=job_list) - - if not filter_type_chunk and detail is True: - Log.warning("-d option only works with -ftc.") return True - except (portalocker.AlreadyLocked, portalocker.LockException) as e: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index fc9e80e684a07f6b5e975b4694bea513d508a136..fda64d152c5ae605fff651a7023817705ef454b1 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -84,16 +84,20 @@ class Job(object): return "{0} STATUS: {1}".format(self.name, self.status) def __init__(self, name, job_id, status, priority): + self.wait = None + self.splits = None + self.rerun_only = False self.script_name_wrapper = None - self.delay_end = datetime.datetime.now() - self.delay_retrials = "0" + self.retrials = None + self.delay_end = None + self.delay_retrials = None self.wrapper_type = None self._wrapper_queue = None self._platform = None self._queue = None self._partition = None - self.retry_delay = "0" + self.retry_delay = None self.platform_name = None # type: str self.section = None # type: str self.wallclock = None # type: str @@ -120,7 +124,7 @@ class Job(object): self.long_name = name self.date_format = '' self.type = Type.BASH - self.hyperthreading = "none" + self.hyperthreading = None self.scratch_free_space = None self.custom_directives = [] self.undefined_variables = set() @@ -209,6 +213,11 @@ class Job(object): """ return Status.VALUE_TO_KEY.get(self.status, "UNKNOWN") + def __str__(self): + return self.name + + def __repr__(self): + return self.name @property def children_names_str(self): """ @@ -405,6 +414,16 @@ class Job(object): self._parents.add(new_parent) new_parent.__add_child(self) + def add_child(self, children): + """ + Add children for the job. It also adds current job as a parent for all the new children + + :param children: job's children to add + :type children: Job + """ + for child in children: + self.__add_child(child) + child._parents.add(self) def __add_child(self, new_child): """ Adds a new child to the job @@ -1029,7 +1048,7 @@ class Job(object): self.threads = str(as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1"))) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1"))) self.nodes = str(as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES",""))) - self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING","none"))) + self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING",as_conf.platforms_data.get(job_platform.name,{}).get("HYPERTHREADING",None))) if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int(self.processors) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node self.memory = str(as_conf.jobs_data[self.section].get("MEMORY",as_conf.platforms_data.get(job_platform.name,{}).get("MEMORY",""))) @@ -1113,15 +1132,14 @@ class Job(object): parameters['SDATE'] = date2str(self.date, self.date_format) parameters['MEMBER'] = self.member parameters['SPLIT'] = self.split + parameters['SPLITS'] = self.splits parameters['DELAY'] = self.delay parameters['FREQUENCY'] = self.frequency parameters['SYNCHRONIZE'] = self.synchronize parameters['PACKED'] = self.packed parameters['CHUNK'] = 1 - if hasattr(self, 'RETRIALS'): - parameters['RETRIALS'] = self.retrials - if hasattr(self, 'delay_retrials'): - parameters['DELAY_RETRIALS'] = self.delay_retrials + parameters['RETRIALS'] = self.retrials + parameters['DELAY_RETRIALS'] = self.delay_retrials if self.date is not None and len(str(self.date)) > 0: if self.chunk is None and len(str(self.chunk)) > 0: chunk = 1 @@ -1184,20 +1202,6 @@ class Job(object): parameters['CHUNK_LAST'] = 'FALSE' parameters['NUMMEMBERS'] = len(as_conf.get_member_list()) parameters['DEPENDENCIES'] = str(as_conf.jobs_data[self.section].get("DEPENDENCIES","")) - # This shouldn't be necessary anymore as now all sub is done in the as_conf.reload() - # if len(self.export) > 0: - # variables = re.findall('%(? 0: - # variables = [variable[1:-1] for variable in variables] - # for key in variables: - # try: - # self.export = re.sub( - # '%(?. -from autosubmit.job.job import Job +from collections.abc import Iterable +import itertools +from contextlib import suppress from bscearth.utils.date import date2str + +from autosubmit.job.job import Job from autosubmit.job.job_common import Status, Type -from log.log import Log, AutosubmitError, AutosubmitCritical -from collections.abc import Iterable +from log.log import Log, AutosubmitCritical +from collections import namedtuple + class DicJobs: """ Class to create jobs from conf file and to find jobs by start date, member and chunk @@ -42,9 +47,8 @@ class DicJobs: :type default_retrials: config_common """ - def __init__(self, jobs_list, date_list, member_list, chunk_list, date_format, default_retrials,jobs_data,experiment_data): + def __init__(self, date_list, member_list, chunk_list, date_format, default_retrials, jobs_data, experiment_data): self._date_list = date_list - self._jobs_list = jobs_list self._member_list = member_list self._chunk_list = chunk_list self._jobs_data = jobs_data @@ -69,22 +73,19 @@ class DicJobs: parameters = self.experiment_data["JOBS"] splits = int(parameters[section].get("SPLITS", -1)) - running = str(parameters[section].get('RUNNING',"once")).lower() + running = str(parameters[section].get('RUNNING', "once")).lower() frequency = int(parameters[section].get("FREQUENCY", 1)) if running == 'once': - self._create_jobs_once(section, priority, default_job_type, jobs_data,splits) + self._create_jobs_once(section, priority, default_job_type, jobs_data, splits) elif running == 'date': - self._create_jobs_startdate(section, priority, frequency, default_job_type, jobs_data,splits) + self._create_jobs_startdate(section, priority, frequency, default_job_type, jobs_data, splits) elif running == 'member': - self._create_jobs_member(section, priority, frequency, default_job_type, jobs_data,splits) + self._create_jobs_member(section, priority, frequency, default_job_type, jobs_data, splits) elif running == 'chunk': synchronize = str(parameters[section].get("SYNCHRONIZE", "")) delay = int(parameters[section].get("DELAY", -1)) - self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, splits, jobs_data) - - - - pass + self._create_jobs_chunk(section, priority, frequency, default_job_type, synchronize, delay, splits, + jobs_data) def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict(), splits=-1): """ @@ -99,23 +100,15 @@ class DicJobs: :type frequency: int """ self._dic[section] = dict() - tmp_dic = dict() - tmp_dic[section] = dict() count = 0 for date in self._date_list: count += 1 if count % frequency == 0 or count == len(self._date_list): - if splits <= 0: - self._dic[section][date] = self.build_job(section, priority, date, None, None, default_job_type, - jobs_data) - self._jobs_list.graph.add_node(self._dic[section][date].name) - else: - tmp_dic[section][date] = [] - self._create_jobs_split(splits, section, date, None, None, priority, - default_job_type, jobs_data, tmp_dic[section][date]) - self._dic[section][date] = tmp_dic[section][date] - - def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict(),splits=-1): + self._dic[section][date] = [] + self._create_jobs_split(splits, section, date, None, None, priority,default_job_type, jobs_data, self._dic[section][date]) + + + def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict(), splits=-1): """ Create jobs to be run once per member @@ -131,23 +124,16 @@ class DicJobs: """ self._dic[section] = dict() - tmp_dic = dict() - tmp_dic[section] = dict() for date in self._date_list: self._dic[section][date] = dict() count = 0 for member in self._member_list: count += 1 if count % frequency == 0 or count == len(self._member_list): - if splits <= 0: - self._dic[section][date][member] = self.build_job(section, priority, date, member, None,default_job_type, jobs_data,splits) - self._jobs_list.graph.add_node(self._dic[section][date][member].name) - else: - self._create_jobs_split(splits, section, date, member, None, priority, - default_job_type, jobs_data, tmp_dic[section][date][member]) - self._dic[section][date][member] = tmp_dic[section][date][member] - - def _create_jobs_once(self, section, priority, default_job_type, jobs_data=dict(),splits=0): + self._dic[section][date][member] = [] + self._create_jobs_split(splits, section, date, member, None, priority,default_job_type, jobs_data, self._dic[section][date][member]) + + def _create_jobs_once(self, section, priority, default_job_type, jobs_data=dict(), splits=0): """ Create jobs to be run once @@ -156,25 +142,11 @@ class DicJobs: :param priority: priority for the jobs :type priority: int """ + self._dic[section] = [] + self._create_jobs_split(splits, section, None, None, None, priority, default_job_type, jobs_data,self._dic[section]) - - if splits <= 0: - job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, -1) - self._dic[section] = job - self._jobs_list.graph.add_node(job.name) - else: - self._dic[section] = [] - total_jobs = 1 - while total_jobs <= splits: - job = self.build_job(section, priority, None, None, None, default_job_type, jobs_data, total_jobs) - self._dic[section].append(job) - self._jobs_list.graph.add_node(job.name) - total_jobs += 1 - pass - - #self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) - #self._jobs_list.graph.add_node(self._dic[section].name) - def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, jobs_data=dict()): + def _create_jobs_chunk(self, section, priority, frequency, default_job_type, synchronize=None, delay=0, splits=0, + jobs_data=dict()): """ Create jobs to be run once per chunk @@ -189,6 +161,7 @@ class DicJobs: :param delay: if this parameter is set, the job is only created for the chunks greater than the delay :type delay: int """ + self._dic[section] = dict() # Temporally creation for unified jobs in case of synchronize tmp_dic = dict() if synchronize is not None and len(str(synchronize)) > 0: @@ -197,29 +170,17 @@ class DicJobs: count += 1 if delay == -1 or delay < chunk: if count % frequency == 0 or count == len(self._chunk_list): - if splits > 1: - if synchronize == 'date': - tmp_dic[chunk] = [] - self._create_jobs_split(splits, section, None, None, chunk, priority, - default_job_type, jobs_data, tmp_dic[chunk]) - elif synchronize == 'member': - tmp_dic[chunk] = dict() - for date in self._date_list: - tmp_dic[chunk][date] = [] - self._create_jobs_split(splits, section, date, None, chunk, priority, - default_job_type, jobs_data, tmp_dic[chunk][date]) - - else: - if synchronize == 'date': - tmp_dic[chunk] = self.build_job(section, priority, None, None, - chunk, default_job_type, jobs_data) - elif synchronize == 'member': - tmp_dic[chunk] = dict() - for date in self._date_list: - tmp_dic[chunk][date] = self.build_job(section, priority, date, None, - chunk, default_job_type, jobs_data) + if synchronize == 'date': + tmp_dic[chunk] = [] + self._create_jobs_split(splits, section, None, None, chunk, priority, + default_job_type, jobs_data, tmp_dic[chunk]) + elif synchronize == 'member': + tmp_dic[chunk] = dict() + for date in self._date_list: + tmp_dic[chunk][date] = [] + self._create_jobs_split(splits, section, date, None, chunk, priority, + default_job_type, jobs_data, tmp_dic[chunk][date]) # Real dic jobs assignment/creation - self._dic[section] = dict() for date in self._date_list: self._dic[section][date] = dict() for member in self._member_list: @@ -235,23 +196,22 @@ class DicJobs: elif synchronize == 'member': if chunk in tmp_dic: self._dic[section][date][member][chunk] = tmp_dic[chunk][date] - - if splits > 1 and (synchronize is None or not synchronize): + else: self._dic[section][date][member][chunk] = [] - self._create_jobs_split(splits, section, date, member, chunk, priority, default_job_type, jobs_data, self._dic[section][date][member][chunk]) - pass - elif synchronize is None or not synchronize: - self._dic[section][date][member][chunk] = self.build_job(section, priority, date, member, - chunk, default_job_type, jobs_data) - self._jobs_list.graph.add_node(self._dic[section][date][member][chunk].name) - - def _create_jobs_split(self, splits, section, date, member, chunk, priority, default_job_type, jobs_data, dict_): - total_jobs = 1 - while total_jobs <= splits: - job = self.build_job(section, priority, date, member, chunk, default_job_type, jobs_data, total_jobs) - dict_.append(job) - self._jobs_list.graph.add_node(job.name) - total_jobs += 1 + self._create_jobs_split(splits, section, date, member, chunk, priority, + default_job_type, jobs_data, + self._dic[section][date][member][chunk]) + def _create_jobs_split(self, splits, section, date, member, chunk, priority, default_job_type, jobs_data, section_data): + gen = ( job for job in jobs_data.values() if (job[6] == member or member is None) and (job[5] == date or date is None) and (job[7] == chunk or chunk is None) and (job[4] == section or section is None) ) + if splits <= 0: + self.build_job(section, priority, date, member, chunk, default_job_type, gen, section_data, -1) + else: + current_split = 1 + while current_split <= splits: + self.build_job(section, priority, date, member, chunk, default_job_type, itertools.islice(gen,0,current_split), section_data,current_split) + current_split += 1 + # clean remaining gen elements if any ( avoids GeneratorExit exception ) + for _ in gen: pass def get_jobs(self, section, date=None, member=None, chunk=None): """ @@ -276,7 +236,7 @@ class DicJobs: return jobs dic = self._dic[section] - #once jobs + # once jobs if type(dic) is list: jobs = dic elif type(dic) is not dict: @@ -287,7 +247,7 @@ class DicJobs: else: for d in self._date_list: self._get_date(jobs, dic, d, member, chunk) - if len(jobs) > 1 and isinstance(jobs[0], Iterable): + if isinstance(jobs[0], Iterable): try: jobs_flattened = [job for jobs_to_flatten in jobs for job in jobs_to_flatten] jobs = jobs_flattened @@ -330,9 +290,8 @@ class DicJobs: jobs.append(dic[c]) return jobs - def build_job(self, section, priority, date, member, chunk, default_job_type, jobs_data=dict(), split=-1): - parameters = self.experiment_data["JOBS"] - name = self._jobs_list.expid + def build_job(self, section, priority, date, member, chunk, default_job_type, jobs_generator,section_data, split=-1): + name = self.experiment_data.get("DEFAULT", {}).get("EXPID", "") if date is not None and len(str(date)) > 0: name += "_" + date2str(date, self._date_format) if member is not None and len(str(member)) > 0: @@ -342,90 +301,18 @@ class DicJobs: if split > -1: name += "_{0}".format(split) name += "_" + section - if name in jobs_data: - job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) - job.local_logs = (jobs_data[name][8], jobs_data[name][9]) - job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) - + for job_data in jobs_generator: + if job_data[0] == name: + job = Job(job_data[0], job_data[1], job_data[2], priority) + job.local_logs = (job_data[8], job_data[9]) + job.remote_logs = (job_data[10], job_data[11]) else: job = Job(name, 0, Status.WAITING, priority) - + job.default_job_type = default_job_type job.section = section job.date = date job.member = member job.chunk = chunk - job.date_format = self._date_format - job.delete_when_edgeless = str(parameters[section].get("DELETE_WHEN_EDGELESS", "true")).lower() - - if split > -1: - job.split = split - - job.frequency = int(parameters[section].get( "FREQUENCY", 1)) - job.delay = int(parameters[section].get( "DELAY", -1)) - job.wait = str(parameters[section].get( "WAIT", True)).lower() - job.rerun_only = str(parameters[section].get( "RERUN_ONLY", False)).lower() - job_type = str(parameters[section].get( "TYPE", default_job_type)).lower() - - job.dependencies = parameters[section].get( "DEPENDENCIES", "") - if job.dependencies and type(job.dependencies) is not dict: - job.dependencies = str(job.dependencies).split() - if job_type == 'bash': - job.type = Type.BASH - elif job_type == 'python' or job_type == 'python3': - job.type = Type.PYTHON3 - elif job_type == 'python2': - job.type = Type.PYTHON2 - elif job_type == 'r': - job.type = Type.R - hpcarch = self.experiment_data.get("DEFAULT",{}) - hpcarch = hpcarch.get("HPCARCH","") - job.platform_name = str(parameters[section].get("PLATFORM", hpcarch)).upper() - if self.experiment_data["PLATFORMS"].get(job.platform_name, "") == "" and job.platform_name.upper() != "LOCAL": - raise AutosubmitCritical("Platform does not exists, check the value of %JOBS.{0}.PLATFORM% = {1} parameter".format(job.section,job.platform_name),7000,"List of platforms: {0} ".format(self.experiment_data["PLATFORMS"].keys()) ) - job.file = str(parameters[section].get( "FILE", "")) - job.additional_files = parameters[section].get( "ADDITIONAL_FILES", []) - - job.executable = str(parameters[section].get("EXECUTABLE", self.experiment_data["PLATFORMS"].get(job.platform_name,{}).get("EXECUTABLE",""))) - job.queue = str(parameters[section].get( "QUEUE", "")) - - job.ec_queue = str(parameters[section].get("EC_QUEUE", "")) - if job.ec_queue == "" and job.platform_name != "LOCAL": - job.ec_queue = str(self.experiment_data["PLATFORMS"][job.platform_name].get("EC_QUEUE","hpc")) - - job.partition = str(parameters[section].get( "PARTITION", "")) - job.check = str(parameters[section].get( "CHECK", "true")).lower() - job.export = str(parameters[section].get( "EXPORT", "")) - job.processors = str(parameters[section].get( "PROCESSORS", "")) - job.threads = str(parameters[section].get( "THREADS", "")) - job.tasks = str(parameters[section].get( "TASKS", "")) - job.memory = str(parameters[section].get("MEMORY", "")) - job.memory_per_task = str(parameters[section].get("MEMORY_PER_TASK", "")) - remote_max_wallclock = self.experiment_data["PLATFORMS"].get(job.platform_name,{}) - remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) - job.wallclock = parameters[section].get("WALLCLOCK", remote_max_wallclock) - job.retrials = int(parameters[section].get( 'RETRIALS', 0)) - job.delay_retrials = int(parameters[section].get( 'DELAY_RETRY_TIME', "-1")) - if job.wallclock is None and job.platform_name.upper() != "LOCAL": - job.wallclock = "01:59" - elif job.wallclock is None and job.platform_name.upper() != "LOCAL": - job.wallclock = "00:00" - elif job.wallclock is None: - job.wallclock = "00:00" - if job.retrials == -1: - job.retrials = None - notify_on = parameters[section].get("NOTIFY_ON",None) - if type(notify_on) == str: - job.notify_on = [x.upper() for x in notify_on.split(' ')] - else: - job.notify_on = "" - job.synchronize = str(parameters[section].get( "SYNCHRONIZE", "")) - job.check_warnings = str(parameters[section].get("SHOW_CHECK_WARNINGS", False)).lower() - job.running = str(parameters[section].get( 'RUNNING', 'once')) - job.x11 = str(parameters[section].get( 'X11', False )).lower() - job.skippable = str(parameters[section].get( "SKIPPABLE", False)).lower() - self._jobs_list.get_job_list().append(job) - - return job - - + job.split = split + section_data.append(job) diff --git a/autosubmit/job/job_grouping.py b/autosubmit/job/job_grouping.py index bcddaf038371b1708fee4d8eb198c77e0855a136..13084bccacfc50b6f08a0b8c1311728b2a27f9cd 100644 --- a/autosubmit/job/job_grouping.py +++ b/autosubmit/job/job_grouping.py @@ -53,16 +53,12 @@ class JobGrouping(object): self.group_status_dict[group] = status final_jobs_group = dict() - for job, groups in jobs_group_dict.items(): - for group in groups: - if group not in blacklist: - while group in groups_map: - group = groups_map[group] - # to remove the jobs belonging to group that should be expanded - if group in self.group_status_dict: - if job not in final_jobs_group: - final_jobs_group[job] = list() - final_jobs_group[job].append(group) + for group, jobs in jobs_group_dict.items(): + for job in jobs: + if job not in blacklist: + if group not in final_jobs_group: + final_jobs_group[group] = list() + final_jobs_group[group].append(job) jobs_group_dict = final_jobs_group @@ -171,7 +167,8 @@ class JobGrouping(object): if self.group_by == 'split': if job.split is not None and len(str(job.split)) > 0: idx = job.name.rfind("_") - groups.append(job.name[:idx - 1] + job.name[idx + 1:]) + split_len = len(str(job.split)) + groups.append(job.name[:idx - split_len] + job.name[idx + 1:]) elif self.group_by == 'chunk': if job.chunk is not None and len(str(job.chunk)) > 0: groups.append(date2str(job.date, self.date_format) + '_' + job.member + '_' + str(job.chunk)) @@ -198,9 +195,9 @@ class JobGrouping(object): blacklist.append(group) break - if job.name not in jobs_group_dict: - jobs_group_dict[job.name] = list() - jobs_group_dict[job.name].append(group) + if group not in jobs_group_dict: + jobs_group_dict[group] = list() + jobs_group_dict[group].append(job.name) def _check_synchronized_job(self, job, groups): synchronized = False diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 380431d52ce4e5f0355dd06931e798e4b429d601..4cf563998a41728b2eac96e66ccc19655f853c04 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -18,6 +18,7 @@ # along with Autosubmit. If not, see . import collections import copy +import networkx as nx import re import os import pickle @@ -62,7 +63,7 @@ class JobList(object): """ - def __init__(self, expid, config, parser_factory, job_list_persistence,as_conf): + def __init__(self, expid, config, parser_factory, job_list_persistence, as_conf): self._persistence_path = os.path.join( config.LOCAL_ROOT_DIR, expid, "pkl") self._update_file = "updated_list_" + expid + ".txt" @@ -81,8 +82,6 @@ class JobList(object): self._chunk_list = [] self._dic_jobs = dict() self._persistence = job_list_persistence - self._graph = DiGraph() - self.packages_dict = dict() self._ordered_jobs_by_date_member = dict() @@ -92,6 +91,7 @@ class JobList(object): self._run_members = None self.jobs_to_run_first = list() self.rerun_job_list = list() + self.graph = DiGraph() @property def expid(self): """ @@ -102,24 +102,11 @@ class JobList(object): """ return self._expid - @property - def graph(self): - """ - Returns the graph - - :return: graph - :rtype: networkx graph - """ - return self._graph @property def jobs_data(self): return self.experiment_data["JOBS"] - @graph.setter - def graph(self, value): - self._graph = value - @property def run_members(self): return self._run_members @@ -143,10 +130,7 @@ class JobList(object): def create_dictionary(self, date_list, member_list, num_chunks, chunk_ini, date_format, default_retrials, wrapper_jobs): chunk_list = list(range(chunk_ini, num_chunks + 1)) - - jobs_parser = self._get_jobs_parser() - dic_jobs = DicJobs(self, date_list, member_list, - chunk_list, date_format, default_retrials,jobs_data={},experiment_data=self.experiment_data) + dic_jobs = DicJobs(date_list, member_list, chunk_list, date_format, default_retrials,{},self.experiment_data) self._dic_jobs = dic_jobs for wrapper_section in wrapper_jobs: if str(wrapper_jobs[wrapper_section]).lower() != 'none': @@ -202,21 +186,23 @@ class JobList(object): self._member_list = member_list chunk_list = list(range(chunk_ini, num_chunks + 1)) self._chunk_list = chunk_list - - - dic_jobs = DicJobs(self,date_list, member_list,chunk_list, date_format, default_retrials,jobs_data,experiment_data=self.experiment_data) + dic_jobs = DicJobs(date_list, member_list,chunk_list, date_format, default_retrials,jobs_data,self.experiment_data) self._dic_jobs = dic_jobs - priority = 0 if show_log: Log.info("Creating jobs...") # jobs_data includes the name of the .our and .err files of the job in LOG_expid jobs_data = dict() + recreate = True if not new: try: - jobs_data = {row[0]: row for row in self.load()} + self._job_list = self.load() + recreate = False + Log.info("Load finished") except Exception as e: try: - jobs_data = {row[0]: row for row in self.backup_load()} + self._job_list = self.backup_load() + recreate = False + Log.info("Load finished") except Exception as e: pass Log.info("Deleting previous pkl due being incompatible with current AS version") @@ -225,25 +211,14 @@ class JobList(object): if os.path.exists(os.path.join(self._persistence_path, self._persistence_file+"_backup.pkl")): os.remove(os.path.join(self._persistence_path, self._persistence_file+"_backup.pkl")) - self._create_jobs(dic_jobs, priority,default_job_type, jobs_data) - if show_log: - Log.info("Adding dependencies...") - self._add_dependencies(date_list, member_list,chunk_list, dic_jobs, self.graph) - - if show_log: - Log.info("Removing redundant dependencies...") - self.update_genealogy( - new, notransitive, update_structure=update_structure) - for job in self._job_list: - job.parameters = parameters - job_data = jobs_data.get(job.name,"none") - try: - if job_data != "none": - job.wrapper_type = job_data[12] - else: - job.wrapper_type = "none" - except BaseException as e: - job.wrapper_type = "none" + if recreate: + self._create_jobs(dic_jobs, 0, default_job_type) + if show_log: + Log.info("Adding dependencies to the graph..") + self._add_dependencies(date_list, member_list,chunk_list, dic_jobs) + if show_log: + Log.info("Adding dependencies to the job..") + self.update_genealogy(new, update_structure=update_structure, recreate = recreate) # Checking for member constraints if len(run_only_members) > 0: @@ -252,9 +227,9 @@ class JobList(object): Log.info("Considering only members {0}".format( str(run_only_members))) old_job_list = [job for job in self._job_list] - self._job_list = [ - job for job in old_job_list if job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, Status.READY]] - for job in self._job_list: + self._job_list = [job for job in old_job_list if job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, Status.READY]] + gen_joblist = [job for job in self._job_list] + for job in gen_joblist: for jobp in job.parents: if jobp in self._job_list: job.parents.add(jobp) @@ -264,6 +239,10 @@ class JobList(object): if show_log: Log.info("Looking for edgeless jobs...") self._delete_edgeless_jobs() + if new: + for job in self._job_list: + if not job.has_parents(): + job.status = Status.READY for wrapper_section in wrapper_jobs: try: if wrapper_jobs[wrapper_section] is not None and len(str(wrapper_jobs[wrapper_section])) > 0: @@ -274,36 +253,27 @@ class JobList(object): raise AutosubmitCritical("Some section jobs of the wrapper:{0} are not in the current job_list defined in jobs.conf".format(wrapper_section),7014,str(e)) - @staticmethod - def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, graph, option="DEPENDENCIES"): + def _add_dependencies(self,date_list, member_list, chunk_list, dic_jobs, option="DEPENDENCIES"): jobs_data = dic_jobs._jobs_data.get("JOBS",{}) for job_section in jobs_data.keys(): Log.debug("Adding dependencies for {0} jobs".format(job_section)) - # If it does not have dependencies, do nothing - if not (job_section, option): - continue - - dependencies_keys = jobs_data[job_section].get(option,{}) - if type(dependencies_keys) is str: - if "," in dependencies_keys: - dependencies_list = dependencies_keys.split(",") - else: - dependencies_list = dependencies_keys.split(" ") - dependencies_keys = {} - for dependency in dependencies_list: - dependencies_keys[dependency] = {} - if dependencies_keys is None: - dependencies_keys = {} + # If it does not have dependencies, just append it to job_list and continue + dependencies_keys = jobs_data.get(job_section,{}).get(option,None) dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs, job_section) - + if not dependencies_keys: + Log.printlog(f"WARNING: Job Section {dependencies_keys} is not defined", Log.WARNING) for job in dic_jobs.get_jobs(job_section): + self.graph.add_node(job.name) + self.graph.nodes.get(job.name)['job'] = job + if not dependencies: + continue num_jobs = 1 if isinstance(job, list): num_jobs = len(job) for i in range(num_jobs): _job = job[i] if num_jobs > 1 else job - JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, - dependencies, graph) + self._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, + dependencies) pass @@ -311,11 +281,11 @@ class JobList(object): def _manage_dependencies(dependencies_keys, dic_jobs, job_section): parameters = dic_jobs._jobs_data["JOBS"] dependencies = dict() + for key in dependencies_keys: distance = None splits = None sign = None - if '-' not in key and '+' not in key and '*' not in key and '?' not in key: section = key else: @@ -332,26 +302,11 @@ class JobList(object): key_split = key.split(sign) section = key_split[0] distance = int(key_split[1]) - - if '[' in section: - #Todo check what is this because we never enter this - try: - section_name = section[0:section.find("[")] - splits_section = int( - dic_jobs.experiment_data["JOBS"][section_name].get('SPLITS', -1)) - splits = JobList._calculate_splits_dependencies( - section, splits_section) - section = section_name - except Exception as e: - pass - if parameters.get(section,None) is None: - Log.printlog("WARNING: SECTION {0} is not defined in jobs.conf".format(section)) - continue - #raise AutosubmitCritical("Section:{0} doesn't exists.".format(section),7014) - dependency_running_type = str(parameters[section].get('RUNNING', 'once')).lower() - delay = int(parameters[section].get('DELAY', -1)) - dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,relationships=dependencies_keys[key]) - dependencies[key] = dependency + if parameters.get(section,None) is not None: + dependency_running_type = str(parameters[section].get('RUNNING', 'once')).lower() + delay = int(parameters[section].get('DELAY', -1)) + dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,relationships=dependencies_keys[key]) + dependencies[key] = dependency return dependencies @staticmethod @@ -394,7 +349,7 @@ class JobList(object): return False elif filter_value.find(",") != -1: aux_filter = filter_value.split(",") - if filter_type != "chunks": + if filter_type not in ["chunks", "splits"]: for value in aux_filter: if str(value).isdigit(): to_filter.append(associative_list[int(value)]) @@ -408,7 +363,7 @@ class JobList(object): start = start_end[0].strip("[]") end = start_end[1].strip("[]") del start_end - if filter_type == "chunks": # chunk directly + if filter_type not in ["chunks", "splits"]: # chunk directly for value in range(int(start), int(end) + 1): to_filter.append(value) else: # index @@ -416,7 +371,8 @@ class JobList(object): to_filter.append(value) else: to_filter.append(filter_value) - if str(parent_value).upper() in str(to_filter).upper(): + + if str(to_filter).find(str(parent_value).upper()) != -1: return True else: return False @@ -427,8 +383,8 @@ class JobList(object): """ Check if the current_job_value is included in the filter_value :param relationship: current filter level to check. - :param level_to_check: can be a date, member or chunk. - :param value_to_check: can be a date, member or chunk. + :param level_to_check: can be a date, member, chunk or split. + :param value_to_check: can be a date, member, chunk or split. :return: """ optional = False @@ -438,7 +394,7 @@ class JobList(object): current_filter = {} if filter_type.upper().find(level_to_check.upper()) != -1: for filter_range in filter_data.keys(): - if str(value_to_check) is None or str(filter_range).upper().find(str(value_to_check).upper()) != -1: + if str(filter_range).upper() == "ALL" or str(value_to_check) is None or str(filter_range).upper().find(str(value_to_check).upper()) != -1: if filter_data[filter_range] is not None: if filter_type.find("?") != -1 or filter_range.find("?") != -1: optional = True @@ -448,52 +404,163 @@ class JobList(object): if len(filters) == 0: filters = [{}] return filters,optional + + @staticmethod + def _check_dates(relationships, current_job): + filters_to_apply, optional = JobList._check_relationship(relationships, "DATES_FROM", date2str(current_job.date)) + for filter in filters_to_apply: + if "MEMBERS_FROM" in filter: + filters_to_apply_m, optional_m = JobList._check_members(filter, current_job) + if len(filters_to_apply_m) > 0: + filters_to_apply = filters_to_apply_m + optional = optional_m + if "CHUNKS_FROM" in filter: + filters_to_apply_c, optional_c = JobList._check_chunks(filter, current_job) + if len(filters_to_apply_c) > 0: + filters_to_apply = filters_to_apply_c + optional = optional_c + if "SPLITS_FROM" in filter: + filters_to_apply_s, optional_s = JobList._check_splits(filter, current_job) + if len(filters_to_apply_s) > 0: + filters_to_apply = filters_to_apply_s + optional = optional_s + return filters_to_apply, optional + + @staticmethod + def _check_members(relationships, current_job): + filters_to_apply, optional = JobList._check_relationship(relationships, "MEMBERS_FROM", current_job.member) + for filter in filters_to_apply: + if "CHUNKS_FROM" in filter: + filters_to_apply_c, optional_c = JobList._check_chunks(filter, current_job) + if len(filters_to_apply_c) > 0: + filters_to_apply = filters_to_apply_c + optional = optional_c + if "SPLITS_FROM" in filter: + filters_to_apply_s, optional_s = JobList._check_splits(filter, current_job) + if len(filters_to_apply_s) > 0: + filters_to_apply = filters_to_apply_s + optional = optional_s + return filters_to_apply, optional + + @staticmethod + def _check_chunks(relationships, current_job): + filters_to_apply, optional = JobList._check_relationship(relationships, "CHUNKS_FROM", current_job.chunk) + for filter in filters_to_apply: + if "SPLITS_FROM" in filter: + filters_to_apply_s, optional_s = JobList._check_splits(filter, current_job) + if len(filters_to_apply_s) > 0: + filters_to_apply = filters_to_apply_s + optional = optional_s + return filters_to_apply, optional + + @staticmethod + def _check_splits(relationships, current_job): + filters_to_apply, optional = JobList._check_relationship(relationships, "SPLITS_FROM", current_job.split) + return filters_to_apply, optional + @staticmethod def _filter_current_job(current_job,relationships): ''' Check if the current_job is included in the filter_value ( from) - :param current_job: - :param dependency: - :return: + :param current_job: current job to check + :param dependency: dependency to check + :return: filter_to_apply(dict), boolean ''' - # Search all filters in dependency relationship that affect current_job - # First level can be Date,member or chunk or generic - # Second level can be member or chunk or generic - # Third level can only be chunked. - # If the filter is generic, it will be applied to all section jobs. - # Check Date then Member or Chunk then Chunk + + # This function will look if the given relationship is set for the given job DATEs,MEMBER,CHUNK,SPLIT ( _from filters ) + # And if it is, it will return the dependencies that need to be activated (_TO filters) + # _FROM behavior: + # DATES_FROM can contain MEMBERS_FROM,CHUNKS_FROM,SPLITS_FROM + # MEMBERS_FROM can contain CHUNKS_FROM,SPLITS_FROM + # CHUNKS_FROM can contain SPLITS_FROM + # SPLITS_FROM can contain nothing + # _TO behavior: + # TO keywords, can be in any of the _FROM filters and they will only affect the _FROM filter they are in. + # There are 4 keywords: + # 1. ALL: all the dependencies will be activated of the given filter type (dates, members, chunks or/and splits) + # 2. NONE: no dependencies will be activated of the given filter type (dates, members, chunks or/and splits) + # 3. NATURAL: this is the normal behavior, represents a way of letting the job to be activated if they would normally be activated. + # 4. ? : this is a weak dependency activation flag, The dependency will be activated but the job can fail without affecting the workflow. + optional = False - filters_to_apply = [] - # this should be a list + filters_to_apply = [{}] + # Check if filter_from-filter_to relationship is set if relationships is not None and len(relationships) > 0: - filters_to_apply,optional = JobList._check_relationship(relationships,"DATES_FROM",date2str(current_job.date)) + # CHECK IF DATES FILTERS IS SET + if "DATES_FROM" in relationships: + filters_to_apply, optional = JobList._check_dates(relationships, current_job) + elif "MEMBERS_FROM" in relationships: + filters_to_apply, optional = JobList._check_members(relationships, current_job) + elif "CHUNKS_FROM" in relationships: + filters_to_apply, optional = JobList._check_chunks(relationships, current_job) + elif "SPLITS_FROM" in relationships: + filters_to_apply, optional = JobList._check_splits(relationships, current_job) + else: + relationships.pop("CHUNKS_FROM", None) + relationships.pop("MEMBERS_FROM", None) + relationships.pop("DATES_FROM", None) + relationships.pop("SPLITS_FROM", None) + filters_to_apply = [relationships] + return filters_to_apply,optional + # IF DATES FILTERS IS SET if len(filters_to_apply[0]) > 0: - for filter_number in range(0, len(filters_to_apply)): + for filter_number in range(len(filters_to_apply)): + # CHECK IF MEMBERS FILTERS IS SET if "MEMBERS_FROM" in filters_to_apply[filter_number]: filters_to_apply_m,optional = JobList._check_relationship(filters_to_apply[filter_number],"MEMBERS_FROM",current_job.member) - if len(filters_to_apply_m) > 0: - filters_to_apply,optional = filters_to_apply_m - else: - filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) - if len(filters_to_apply_c) > 0: - filters_to_apply = filters_to_apply_c + # IF MEMBERS FILTERS IS SET + if len(filters_to_apply_m[filter_number]) > 0: + filters_to_apply = filters_to_apply_m + if "CHUNKS_FROM" in filters_to_apply[filter_number]: + filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) + if len(filters_to_apply_c[filter_number]) > 0: + filters_to_apply = filters_to_apply_c + if "SPLITS_FROM" in filters_to_apply[filter_number]: + filters_to_apply_s,optional = JobList._check_relationship(filters_to_apply[filter_number],"SPLITS_FROM",current_job.split) + if len(filters_to_apply_s[filter_number]) > 0: + filters_to_apply = filters_to_apply_s + # CHECK IF CHUNKS FILTERS IS SET (MEMBERS FILTERS IS NOT SET) elif "CHUNKS_FROM" in filters_to_apply[filter_number]: - filters_to_apply,optional = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) - # Check Member then Chunk + filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply[filter_number],"CHUNKS_FROM",current_job.chunk) + if len(filters_to_apply[filter_number]) > 0: + filters_to_apply = filters_to_apply_c + if "SPLITS_FROM" in filters_to_apply[filter_number]: + filters_to_apply_s,optional = JobList._check_relationship(filters_to_apply[filter_number],"SPLITS_FROM",current_job.split) + if len(filters_to_apply_s[filter_number]) > 0: + filters_to_apply = filters_to_apply_s + # CHECK IF SPLITS FILTERS IS SET (MEMBERS FILTERS IS NOT SET) + elif "SPLITS_FROM" in filters_to_apply[filter_number]: + filters_to_apply,optional = JobList._check_relationship(filters_to_apply[filter_number],"SPLITS_FROM",current_job.split) + # IF DATES FILTERS IS NOT SET, check if MEMBERS FILTERS IS SET if len(filters_to_apply[0]) == 0: - filters_to_apply,optional = JobList._check_relationship(relationships,"MEMBERS_FROM",current_job.member) - if len(filters_to_apply) > 0 and ( "CHUNKS_FROM" in filters_to_apply): - filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply,"CHUNKS_FROM",current_job.chunk) - if len(filters_to_apply_c) > 0: - filters_to_apply = filters_to_apply_c - #Check Chunk + filters_to_apply, optional = JobList._check_relationship(relationships, "MEMBERS_FROM", current_job.member) + for filter_number in range(len(filters_to_apply)): + if "CHUNKS_FROM" in filters_to_apply: + filters_to_apply_c,optional = JobList._check_relationship(filters_to_apply,"CHUNKS_FROM",current_job.chunk) + if len(filters_to_apply_c) > 0: + filters_to_apply = filters_to_apply_c + if "SPLITS_FROM" in filters_to_apply: + filters_to_apply_s,optional = JobList._check_relationship(filters_to_apply,"SPLITS_FROM",current_job.split) + if len(filters_to_apply_s) > 0: + filters_to_apply = filters_to_apply_s + #Check Chunk then splits if len(filters_to_apply[0]) == 0: - filters_to_apply,optional = JobList._check_relationship(relationships,"CHUNKS_FROM",current_job.chunk) - # Generic filter + filters_to_apply, optional = JobList._check_relationship(relationships, "CHUNKS_FROM", current_job.chunk) + for filter_number in range(len(filters_to_apply)): + if "SPLITS_FROM" in filters_to_apply[filter_number]: + filters_to_apply_s,optional = JobList._filter_splits(filters_to_apply, "SPLITS_FROM", current_job.split) + if len(filters_to_apply_s) > 0: + filters_to_apply = filters_to_apply_s + # Check Splits + if len(filters_to_apply[0]) == 0: + filters_to_apply, optional = JobList._check_relationship(relationships, "SPLITS_FROM",current_job.split) + + # Global filter if len(filters_to_apply[0]) == 0: relationships.pop("CHUNKS_FROM",None) relationships.pop("MEMBERS_FROM",None) relationships.pop("DATES_FROM",None) + relationships.pop("SPLITS_FROM",None) filters_to_apply = [relationships] if len(filters_to_apply) == 1 and len(filters_to_apply[0]) == 0: @@ -506,14 +573,14 @@ class JobList(object): @staticmethod def _valid_parent(parent,member_list,date_list,chunk_list,is_a_natural_relation,filters_to_apply): ''' - Check if the parent is valid for the current_job + Check if the parent is valid for the current job :param parent: job to check - :param member_list: - :param date_list: - :param chunk_list: - :param is_a_natural_relation: - :param filters_to_apply: - :return: + :param member_list: list of members + :param date_list: list of dates + :param chunk_list: list of chunks + :param is_a_natural_relation: if the relation is natural or not + :param filters_to_apply: filters to apply + :return: True if the parent is valid, False otherwise ''' #check if current_parent is listed on dependency.relationships optional = False @@ -522,6 +589,7 @@ class JobList(object): dates_to = str(filter_.get("DATES_TO", "natural")).lower() members_to = str(filter_.get("MEMBERS_TO", "natural")).lower() chunks_to = str(filter_.get("CHUNKS_TO", "natural")).lower() + splits_to = str(filter_.get("SPLITS_TO", "natural")).lower() if not is_a_natural_relation: if dates_to == "natural": dates_to = "none" @@ -529,33 +597,39 @@ class JobList(object): members_to = "none" if chunks_to == "natural": chunks_to = "none" + if splits_to == "natural": + splits_to = "none" associative_list["dates"] = date_list associative_list["members"] = member_list associative_list["chunks"] = chunk_list + if parent.splits is not None: + associative_list["splits"] = [ str(split) for split in range(1,int(parent.splits)+1) ] + else: + associative_list["splits"] = None if dates_to == "natural": associative_list["dates"] = [date2str(parent.date)] if parent.date is not None else date_list - if members_to == "natural": associative_list["members"] = [parent.member] if parent.member is not None else member_list if chunks_to == "natural": associative_list["chunks"] = [parent.chunk] if parent.chunk is not None else chunk_list + if splits_to == "natural": + associative_list["splits"] = [parent.split] if parent.split is not None else parent.splits parsed_parent_date = date2str(parent.date) if parent.date is not None else None # Apply all filters to look if this parent is an appropriated candidate for the current_job valid_dates = JobList._apply_filter(parsed_parent_date, dates_to, associative_list["dates"], "dates") valid_members = JobList._apply_filter(parent.member, members_to, associative_list["members"],"members") valid_chunks = JobList._apply_filter(parent.chunk, chunks_to, associative_list["chunks"], "chunks") - - if valid_dates and valid_members and valid_chunks: - if dates_to.find("?") != -1 or members_to.find("?") != -1 or chunks_to.find("?") != -1: + valid_splits = JobList._apply_filter(parent.split, splits_to, associative_list["splits"], "splits") + if valid_dates and valid_members and valid_chunks and valid_splits: + if dates_to.find("?") != -1 or members_to.find("?") != -1 or chunks_to.find("?") != -1 or splits_to.find("?") != -1: optional = True return True,optional return False,optional - @staticmethod - def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, - graph): + + def _manage_job_dependencies(self,dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies): ''' Manage the dependencies of a job :param dic_jobs: @@ -568,6 +642,8 @@ class JobList(object): :param graph: :return: ''' + + parsed_date_list = [] for dat in date_list: parsed_date_list.append(date2str(dat)) @@ -585,73 +661,37 @@ class JobList(object): other_parents = dic_jobs.get_jobs(dependency.section, None, None, None) parents_jobs = dic_jobs.get_jobs(dependency.section, date, member, chunk) - #if dependency.sign in ["+", "-"]: - # natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,chunk) - #else: - natural_jobs = dic_jobs.get_jobs(dependency.section, date, member,chunk) + natural_jobs = dic_jobs.get_jobs(dependency.section, date, member, chunk) if dependency.sign in ['?']: optional_section = True else: optional_section = False - # Convert multi_array list into 1d list - if len(parents_jobs) > 0: - aux = [] - for p_split in parents_jobs: - if type(p_split) is not list: - aux.append(p_split) - else: - for aux_job in p_split: - aux.append(aux_job) - parents_jobs = aux - if len(natural_jobs) > 0: - aux = [] - for p_split in natural_jobs: - if type(p_split) is not list: - aux.append(p_split) - else: - for aux_job in p_split: - aux.append(aux_job) - natural_jobs = aux all_parents = list(set(other_parents + parents_jobs)) # Get dates_to, members_to, chunks_to of the deepest level of the relationship. filters_to_apply,optional_from = JobList._filter_current_job(job,copy.deepcopy(dependency.relationships)) if len(filters_to_apply) == 0: - filters_to_apply.append({"DATES_TO": "natural", "MEMBERS_TO": "natural", "CHUNKS_TO": "natural"}) + filters_to_apply.append({"DATES_TO": "natural", "MEMBERS_TO": "natural", "CHUNKS_TO": "natural", "SPLITS_TO": "natural"}) for parent in all_parents: - # Generic for all dependencies - if dependency.delay == -1 or chunk > dependency.delay: - if isinstance(parent, list): - if job.split is not None and len(str(job.split)) > 0: - parent = list(filter( - lambda _parent: _parent.split == job.split, parent)) - parent = parent[0] - else: - if dependency.splits is not None and len(str(dependency.splits)) > 0: - parent = [_parent for _parent in parent if _parent.split in dependency.splits] # If splits is not None, the job is a list of jobs if parent.name == job.name: continue - # Check if it is a natural relation based in autosubmit terms ( same date,member,chunk ). - if parent in natural_jobs and ((job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk ) and (parent.split is None or job.split is None or parent.split <= job.split) ) : + # Check if it is a natural relation. The only difference is that a chunk can depend on a chunks <= than the current chunk + if parent in natural_jobs and (job.chunk is None or parent.chunk is None or parent.chunk <= job.chunk): natural_relationship = True else: natural_relationship = False - if job.name.find("a002_20120101_0_2_SIM") != -1: - print("test") # Check if the current parent is a valid parent based on the dependencies set on expdef.conf valid,optional_to = JobList._valid_parent(parent, member_list, parsed_date_list, chunk_list, natural_relationship,filters_to_apply) if not valid: continue - else: - pass # If the parent is valid, add it to the graph - job.add_parent(parent) - JobList._add_edge(graph, job, parent) + #job.add_parent(parent) + self.graph.add_edge(parent.name, job.name) # Could be more variables in the future if optional_to or optional_from or optional_section: job.add_edge_info(parent.name,special_variables={"optional":True}) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, - member_list, dependency.section, graph, other_parents) + member_list, dependency.section, other_parents) @staticmethod def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency): @@ -705,7 +745,7 @@ class JobList(object): @staticmethod def handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, - section_name, graph,visited_parents): + section_name,visited_parents): if job.wait and job.frequency > 1: if job.chunk is not None and len(str(job.chunk)) > 0: max_distance = (chunk_list.index(chunk) + 1) % job.frequency @@ -715,7 +755,6 @@ class JobList(object): for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance): if parent not in visited_parents: job.add_parent(parent) - JobList._add_edge(graph, job, parent) elif job.member is not None and len(str(job.member)) > 0: member_index = member_list.index(job.member) max_distance = (member_index + 1) % job.frequency @@ -726,7 +765,6 @@ class JobList(object): member_list[member_index - distance], chunk): if parent not in visited_parents: job.add_parent(parent) - JobList._add_edge(graph, job, parent) elif job.date is not None and len(str(job.date)) > 0: date_index = date_list.index(job.date) max_distance = (date_index + 1) % job.frequency @@ -737,17 +775,6 @@ class JobList(object): member, chunk): if parent not in visited_parents: job.add_parent(parent) - JobList._add_edge(graph, job, parent) - - @staticmethod - def _add_edge(graph, job, parents): - num_parents = 1 - if isinstance(parents, list): - num_parents = len(parents) - for i in range(num_parents): - parent = parents[i] if isinstance(parents, list) else parents - graph.add_edge(parent.name, job.name) - pass @staticmethod def _create_jobs(dic_jobs, priority, default_job_type, jobs_data=dict()): for section in dic_jobs._jobs_data.get("JOBS",{}).keys(): @@ -1996,92 +2023,48 @@ class JobList(object): Log.debug('Update finished') return save - def update_genealogy(self, new=True, notransitive=False, update_structure=False): + def update_genealogy(self, new=True, update_structure=False, recreate = False): """ When we have created the job list, every type of job is created. Update genealogy remove jobs that have no templates :param update_structure: - :param notransitive: :param new: if it is a new job list or not :type new: bool """ + current_structure = None + structure_valid = False - # Use a copy of job_list because original is modified along iterations - for job in self._job_list[:]: - if job.file is None or job.file == '': - self._remove_job(job) - - # Simplifying dependencies: if a parent is already an ancestor of another parent, - # we remove parent dependency - if not notransitive: - # Transitive reduction required - current_structure = None - db_path = os.path.join( - self._config.STRUCTURES_DIR, "structure_" + self.expid + ".db") - m_time_db = None - jobs_conf_path = os.path.join( - self._config.LOCAL_ROOT_DIR, self.expid, "conf", "jobs_{0}.yml".format(self.expid)) - m_time_job_conf = None + if not new: + db_path = os.path.join(self._config.STRUCTURES_DIR, "structure_" + self.expid + ".db") if os.path.exists(db_path): try: current_structure = DbStructure.get_structure( self.expid, self._config.STRUCTURES_DIR) - m_time_db = os.stat(db_path).st_mtime - if os.path.exists(jobs_conf_path): - m_time_job_conf = os.stat(jobs_conf_path).st_mtime except Exception as exp: pass - structure_valid = False # If there is a current structure, and the number of jobs in JobList is equal to the number of jobs in the structure if (current_structure) and (len(self._job_list) == len(current_structure)) and update_structure is False: structure_valid = True - # Further validation - # Structure exists and is valid, use it as a source of dependencies - if m_time_job_conf: - if m_time_job_conf > m_time_db: - Log.info( - "File jobs_{0}.yml has been modified since the last time the structure persistence was saved.".format(self.expid)) + # check loaded job_list + joblist_gen = ( job for job in self._job_list ) + for job in joblist_gen: + if current_structure.get(job.name, None) is None: structure_valid = False - else: - Log.info( - "File jobs_{0}.yml was not found.".format(self.expid)) - - if structure_valid is True: - for job in self._job_list: - if current_structure.get(job.name, None) is None: - structure_valid = False - break - - if structure_valid is True: - Log.info("Using existing valid structure.") - for job in self._job_list: - children_to_remove = [ - child for child in job.children if child.name not in current_structure[job.name]] - for child in children_to_remove: - job.children.remove(child) - child.parents.remove(job) - if structure_valid is False: - # Structure does not exist, or it is not be updated, attempt to create it. - Log.info("Updating structure persistence...") - self.graph = transitive_reduction(self.graph) # add threads for large experiments? todo - if self.graph: - for job in self._job_list: - children_to_remove = [ - child for child in job.children if child.name not in self.graph.neighbors(job.name)] - for child in children_to_remove: - job.children.remove(child) - child.parents.remove(job) - try: - DbStructure.save_structure( - self.graph, self.expid, self._config.STRUCTURES_DIR) - except Exception as exp: - Log.warning(str(exp)) - pass - - for job in self._job_list: - if not job.has_parents() and new: - job.status = Status.READY - + break + if not structure_valid: + Log.info("Transitive reduction...") + self.graph = transitive_reduction(self.graph,recreate) + if recreate: + # update job list view as transitive_Reduction also fills job._parents and job._children if recreate is set + self._job_list = [ job["job"] for job in self.graph.nodes().values() ] + gen_job_list = ( job for job in self._job_list if not job.has_parents()) + for job in gen_job_list: + job.status = Status.READY + self.save() + try: + DbStructure.save_structure(self.graph, self.expid, self._config.STRUCTURES_DIR) + except Exception as exp: + Log.warning(str(exp)) @threaded def check_scripts_threaded(self, as_conf): """ @@ -2242,13 +2225,13 @@ class JobList(object): Removes all jobs to be run only in reruns """ flag = False - for job in set(self._job_list): + for job in self._job_list[:]: if job.rerun_only == "true": self._remove_job(job) flag = True if flag: - self.update_genealogy(notransitive=notransitive) + self.update_genealogy() del self._dic_jobs def print_with_status(self, statusChange=None, nocolor=False, existingList=None): @@ -2294,7 +2277,7 @@ class JobList(object): return result - def __str__(self): + def __str__(self,nocolor = False,get_active=False): """ Returns the string representation of the class. Usage print(class) @@ -2302,24 +2285,34 @@ class JobList(object): :return: String representation. :rtype: String """ - allJobs = self.get_all() + if get_active: + jobs = self.get_active() + else: + jobs = self.get_all() result = "## String representation of Job List [" + str( - len(allJobs)) + "] ##" - + len(jobs)) + "] ##" # Find root root = None - for job in allJobs: - if job.has_parents() is False: - root = job - - # root exists - if root is not None and len(str(root)) > 0: - result += self._recursion_print(root, 0) + roots = [] + if get_active: + for job in jobs: + if len(job.parents) == 0 and job.status in (Status.READY, Status.RUNNING): + roots.append(job) else: - result += "\nCannot find root." - + for job in jobs: + if len(job.parents) == 0: + roots.append(job) + visited = list() + #print(root) + # root exists + for root in roots: + if root is not None and len(str(root)) > 0: + result += self._recursion_print(root, 0, visited,nocolor=nocolor) + else: + result += "\nCannot find root." return result - + def __repr__(self): + return self.__str__(True,True) def _recursion_print(self, job, level, visited=[], statusChange=None, nocolor=False): """ Returns the list of children in a recursive way diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 7554ddad746eeee5bcdbbb6920b78080a8024e68..38e6d42f5e652f626c1986f2ef16071d76fc2037 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -88,10 +88,10 @@ class JobListPersistencePkl(JobListPersistence): Log.debug("Saving JobList: " + path) jobs_data = [(job.name, job.id, job.status, job.priority, job.section, job.date, - job.member, job.chunk, + job.member, job.chunk, job.split, job.local_logs[0], job.local_logs[1], job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] - pickle.dump(jobs_data, fd, protocol=2) + pickle.dump(job_list, fd, protocol=2) Log.debug('Job list saved') @@ -131,7 +131,7 @@ class JobListPersistenceDb(JobListPersistence): self._reset_table() jobs_data = [(job.name, job.id, job.status, job.priority, job.section, job.date, - job.member, job.chunk, + job.member, job.chunk, job.split, job.local_logs[0], job.local_logs[1], job.remote_logs[0], job.remote_logs[1],job.wrapper_type) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 9782122738093e02d00be3c1df3aedd8f3840247..c5282a44594f48cab85a08ed719ea513d0070b3b 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -17,9 +17,9 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import networkx +import networkx as nx import os - +from contextlib import suppress from networkx.algorithms.dag import is_directed_acyclic_graph from networkx import DiGraph from networkx import dfs_edges @@ -29,13 +29,45 @@ from autosubmitconfigparser.config.basicconfig import BasicConfig from typing import Dict -def transitive_reduction(graph): +def transitive_reduction(graph,recreate): + """ + + Returns transitive reduction of a directed graph + + The transitive reduction of G = (V,E) is a graph G- = (V,E-) such that + for all v,w in V there is an edge (v,w) in E- if and only if (v,w) is + in E and there is no path from v to w in G with length greater than 1. + + :param graph: A directed acyclic graph (DAG) + :type graph: NetworkX DiGraph + :return: The transitive reduction of G + """ try: - return networkx.algorithms.dag.transitive_reduction(graph) + TR = nx.DiGraph() + TR.add_nodes_from(graph.nodes(data=True)) + descendants = {} + # count before removing set stored in descendants + check_count = dict(graph.in_degree) + for i,u in enumerate(graph): + u_nbrs = set(graph[u]) + for v in graph[u]: + if v in u_nbrs: + if v not in descendants: + descendants[v] = {y for x, y in nx.dfs_edges(graph, v)} + u_nbrs -= descendants[v] + check_count[v] -= 1 + if check_count[v] == 0: + del descendants[v] + TR.add_edges_from((u, v) for v in u_nbrs) + # Get JOB node atributte of all neighbors of current node + # and add it to current node as job_children + if recreate: + TR.nodes[u]["job"].add_child([TR.nodes[v]["job"] for v in u_nbrs]) + + return TR except Exception as exp: if not is_directed_acyclic_graph(graph): - raise NetworkXError( - "Transitive reduction only uniquely defined on directed acyclic graphs.") + raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") reduced_graph = DiGraph() reduced_graph.add_nodes_from(graph.nodes()) for u in graph: diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 8b8bffc557396e8fe870fddb2bf478b950060ccc..0009ae7c5c606dce1788a0f81b6779c90bdb9b68 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -159,54 +159,45 @@ class Monitor: if job.has_parents(): continue - if not groups or job.name not in groups['jobs'] or (job.name in groups['jobs'] and len(groups['jobs'][job.name]) == 1): + if not groups: node_job = pydotplus.Node(job.name, shape='box', style="filled", fillcolor=self.color_status(job.status)) - - if groups and job.name in groups['jobs']: - group = groups['jobs'][job.name][0] - node_job.obj_dict['name'] = group - node_job.obj_dict['attributes']['fillcolor'] = self.color_status( - groups['status'][group]) - node_job.obj_dict['attributes']['shape'] = 'box3d' - exp.add_node(node_job) self._add_children(job, exp, node_job, groups, hide_groups) + else: + job_in_group = False + for group,jobs in groups.get("jobs",{}).items(): + if job.name in jobs: + job_in_group = True + node_job = pydotplus.Node(group, shape='box3d', style="filled", + previous_nodefillcolor=self.color_status(groups['status'][group])) + exp.add_node(node_job) + self._add_children(job, exp, node_job, groups, hide_groups) + if not job_in_group: + node_job = pydotplus.Node(job.name, shape='box', style="filled", + fillcolor=self.color_status(job.status)) + exp.add_node(node_job) + self._add_children(job, exp, node_job, groups, hide_groups) if groups: if not hide_groups: - for job, group in groups['jobs'].items(): - if len(group) > 1: - group_name = 'cluster_' + '_'.join(group) - if group_name not in graph.obj_dict['subgraphs']: - subgraph = pydotplus.graphviz.Cluster( - graph_name='_'.join(group)) - subgraph.obj_dict['attributes']['color'] = 'invis' - else: - subgraph = graph.get_subgraph(group_name)[0] - - previous_node = exp.get_node(group[0])[0] - if len(subgraph.get_node(group[0])) == 0: - subgraph.add_node(previous_node) - - for i in range(1, len(group)): - node = exp.get_node(group[i])[0] - if len(subgraph.get_node(group[i])) == 0: - subgraph.add_node(node) - - edge = subgraph.get_edge( - node.obj_dict['name'], previous_node.obj_dict['name']) - if len(edge) == 0: - edge = pydotplus.Edge(previous_node, node) - edge.obj_dict['attributes']['dir'] = 'none' - # constraint false allows the horizontal alignment - edge.obj_dict['attributes']['constraint'] = 'false' - edge.obj_dict['attributes']['penwidth'] = 4 - subgraph.add_edge(edge) - - previous_node = node - if group_name not in graph.obj_dict['subgraphs']: - graph.add_subgraph(subgraph) + for group, jobs in groups.get("jobs",{}).items(): + group_name = 'cluster_' + group + subgraph = pydotplus.graphviz.Cluster(graph_name='_' + group) + subgraph.obj_dict['attributes']['color'] = 'invis' + job_node = exp.get_node(group) + subgraph.add_node(job_node[0]) + # for p_node in previous_node: + # edge = subgraph.get_edge( job_node.obj_dict['name'], p_node.obj_dict['name'] ) + # if len(edge) == 0: + # edge = pydotplus.Edge(previous_node, job_node) + # edge.obj_dict['attributes']['dir'] = 'none' + # # constraint false allows the horizontal alignment + # edge.obj_dict['attributes']['constraint'] = 'false' + # edge.obj_dict['attributes']['penwidth'] = 4 + # subgraph.add_edge(edge) + # if group_name not in graph.obj_dict['subgraphs']: + # graph.add_subgraph(subgraph) else: for edge in copy.deepcopy(exp.obj_dict['edges']): if edge[0].replace('"', '') in groups['status']: @@ -264,27 +255,23 @@ class Monitor: def _check_node_exists(self, exp, job, groups, hide_groups): skip = False - if groups and job.name in groups['jobs']: - group = groups['jobs'][job.name][0] - node = exp.get_node(group) - if len(groups['jobs'][job.name]) > 1 or hide_groups: - skip = True - else: - node = exp.get_node(job.name) - + node = exp.get_node(job.name) + for group,jobs in groups.get('jobs',{}).items(): + if job.name in jobs: + node = exp.get_node(group) + if hide_groups: + skip = True return node, skip def _create_node(self, job, groups, hide_groups): node = None - - if groups and job.name in groups['jobs'] and len(groups['jobs'][job.name]) == 1: - if not hide_groups: - group = groups['jobs'][job.name][0] - node = pydotplus.Node(group, shape='box3d', style="filled", - fillcolor=self.color_status(groups['status'][group])) - node.set_name(group.replace('"', '')) - - elif not groups or job.name not in groups['jobs']: + if not hide_groups: + for group,jobs in groups.get("jobs",{}).items(): + if job.name in jobs: + node = pydotplus.Node(group, shape='box3d', style="filled", + fillcolor=self.color_status(groups['status'][group])) + node.set_name(group.replace('"', '')) + if node is None: node = pydotplus.Node(job.name, shape='box', style="filled", fillcolor=self.color_status(job.status)) return node @@ -316,8 +303,7 @@ class Monitor: output_file = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "plot", expid + "_" + output_date + "." + output_format) - graph = self.create_tree_list( - expid, joblist, packages, groups, hide_groups) + graph = self.create_tree_list(expid, joblist, packages, groups, hide_groups) Log.debug("Saving workflow plot at '{0}'", output_file) if output_format == "png":