From 08e4a1c8a40f15f151910c61e83b5ea0dc3a5e8b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 13 Jun 2024 10:00:45 +0200 Subject: [PATCH 1/4] Added "final" and "final_no_skip" --- autosubmit/job/job.py | 2 +- autosubmit/job/job_list.py | 36 ++++++++++++++++++++++++++++------- autosubmit/monitor/monitor.py | 7 +++++++ 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 5e4d1dc27..5c3a79cf4 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -2235,7 +2235,7 @@ class Job(object): return if self.wrapper_type == "vertical" and self.fail_count > 0: self.submit_time_timestamp = self.finish_time_timestamp - print(("Call from {} with status {}".format(self.name, self.status_str))) + Log.info(f"Call from {self.name} with status {self.status_str}") if hold is True: return # Do not write for HELD jobs. diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d369aae7e..b5b9c91ee 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2540,29 +2540,48 @@ class JobList(object): Check if all parents of a job have the correct status for checkpointing :return: jobs that fullfill the special conditions """ jobs_to_check = [] + jobs_to_skip = [] for status, sorted_job_list in self.jobs_edges.items(): if status == "ALL": continue for job in sorted_job_list: if job.status != Status.WAITING: continue - if status in ["RUNNING", "FAILED"]: + if status.upper() in ["RUNNING", "FAILED"]: # check checkpoint if any if job.platform and job.platform.connected: # This will be true only when used under setstatus/run job.get_checkpoint_files() non_completed_parents_current = 0 completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) for parent in job.edge_info[status].values(): - if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + if status.upper() in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: continue else: status_str = Status.VALUE_TO_KEY[parent[0].status] - if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status): - non_completed_parents_current += 1 + if status.upper() == "FINAL": + if status_str in ["FAILED","UNKNOWN","SKIPPED"]: + non_completed_parents_current += 1 + elif status.upper() == "FINAL_NO_SKIP": + if status_str in ["FAILED","UNKNOWN"]: + non_completed_parents_current += 1 + elif status_str == "SKIPPED": + jobs_to_skip.append(job) + elif status.upper() == "FAILED": + if status_str in ["FAILED","UNKNOWN"]: + non_completed_parents_current += 1 + elif status_str == "COMPLETED": + jobs_to_skip.append(job) + elif status.upper() == "COMPLETED": + if status_str == "FAILED": + jobs_to_skip.append(job) + else: + if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status.upper()): + non_completed_parents_current += 1 if (non_completed_parents_current + completed_parents) == len(job.parents): - jobs_to_check.append(job) + if job not in jobs_to_skip: + jobs_to_check.append(job) - return jobs_to_check + return jobs_to_check, jobs_to_skip def update_log_status(self, job, as_conf): """ @@ -2673,7 +2692,10 @@ class JobList(object): job.fail_count = 0 job.packed = False # Check checkpoint jobs, the status can be Any - for job in self.check_special_status(): + jobs_to_check, jobs_to_skip = self.check_special_status() + for job in jobs_to_skip: + job.status = Status.SKIPPED + for job in jobs_to_check: job.status = Status.READY # Run start time in format (YYYYMMDDHH:MM:SS) from current time job.ready_start_date = strftime("%Y%m%d%H%M%S") diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 4b0afea1f..cf83fbc1d 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -262,6 +262,13 @@ class Monitor: color = self._table.get(Status.READY,None) elif job.name in child.edge_info.get("SUBMITTED",{}): color = self._table.get(Status.SUBMITTED,None) + elif job.name in child.edge_info.get("FINAL",{}): + color = "black" + elif job.name in child.edge_info.get("FINAL_NO_SKIP",{}): + color = "lightblack" + + elif job.name in child.edge_info.get("COMPLETED",{}): + color = "grey" else: return None, None if label and label == 0: -- GitLab From 0ee1e464e6eabc4f5c28d5c666a6338e3495524e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 13 Jun 2024 11:49:33 +0200 Subject: [PATCH 2/4] Changing to combine --- autosubmit/job/job_common.py | 4 +- autosubmit/job/job_list.py | 44 ++++++++++++------- autosubmit/monitor/monitor.py | 9 ++-- .../userguide/defining_workflows/index.rst | 12 ++++- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 92dc1b1bd..1831d7ade 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -41,8 +41,8 @@ class Status: VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED', 9: 'DELAYED'} KEY_TO_VALUE = {'SUSPENDED': -3, 'UNKNOWN': -2, 'FAILED': -1, 'WAITING': 0, 'READY': 1, 'SUBMITTED': 2, 'QUEUING': 3, 'RUNNING': 4, 'COMPLETED': 5, 'HELD': 6, 'PREPARED': 7, 'SKIPPED': 8, 'DELAYED': 9} - LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING", "SKIPPED", "FAILED", "UNKNOWN", "COMPLETED", "SUSPENDED"] - + LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING"] + COMBINABLE = ["COMPLETED","FAILED","SKIPPED","UNKNOWN","SUSPENDED"] def retval(self, value): return getattr(self, value) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b5b9c91ee..151762cc8 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -920,11 +920,28 @@ class JobList(object): :return: """ if special_status not in self.jobs_edges: - self.jobs_edges[special_status] = set() - self.jobs_edges[special_status].add(job) - if "ALL" not in self.jobs_edges: - self.jobs_edges["ALL"] = set() - self.jobs_edges["ALL"].add(job) + multi = [] + if "," in special_status: + multi = [ status.strip(" ") for status in special_status.split(",") if status.strip(" ") != ""] + else: + multi = [ status.strip(" ") for status in special_status.split(" ") if status.strip(" ") != ""] + if len(multi) == 1: + self.jobs_edges[special_status] = set() + self.jobs_edges[special_status].add(job) + else: + self.jobs_edges["MULTI"] = set() + self.jobs_edges["MULTI"].add(job) + # self.jobs_edges["MULTI"] = {} + # for status in multi: + # if job.name not in self.jobs_edges["MULTI"]: + # self.jobs_edges["MULTI"][job.name] = dict() + # if status not in self.jobs_edges["MULTI"][job.name]: + # self.jobs_edges["MULTI"][job.name][status] = set() + # self.jobs_edges["MULTI"][job.name][status].add(job) + if "ALL" not in self.jobs_edges: + self.jobs_edges["ALL"] = set() + self.jobs_edges["ALL"].add(job) + def add_special_conditions(self, job, special_conditions, filters_to_apply, parent): """ @@ -2553,20 +2570,12 @@ class JobList(object): job.get_checkpoint_files() non_completed_parents_current = 0 completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) - for parent in job.edge_info[status].values(): - if status.upper() in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + for parent in job.edge_info[status.upper()].values(): + if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: continue else: status_str = Status.VALUE_TO_KEY[parent[0].status] - if status.upper() == "FINAL": - if status_str in ["FAILED","UNKNOWN","SKIPPED"]: - non_completed_parents_current += 1 - elif status.upper() == "FINAL_NO_SKIP": - if status_str in ["FAILED","UNKNOWN"]: - non_completed_parents_current += 1 - elif status_str == "SKIPPED": - jobs_to_skip.append(job) - elif status.upper() == "FAILED": + if status.upper() == "FAILED": if status_str in ["FAILED","UNKNOWN"]: non_completed_parents_current += 1 elif status_str == "COMPLETED": @@ -2574,6 +2583,9 @@ class JobList(object): elif status.upper() == "COMPLETED": if status_str == "FAILED": jobs_to_skip.append(job) + elif status.upper() == "MULTI": + if status not in Status.COMBINABLE: + raise AutosubmitCritical(f"Invalid status {status} in special status. Combinable Status are: {Status.COMBINABLE}") else: if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status.upper()): non_completed_parents_current += 1 diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index cf83fbc1d..34183e1c9 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -262,13 +262,10 @@ class Monitor: color = self._table.get(Status.READY,None) elif job.name in child.edge_info.get("SUBMITTED",{}): color = self._table.get(Status.SUBMITTED,None) - elif job.name in child.edge_info.get("FINAL",{}): - color = "black" - elif job.name in child.edge_info.get("FINAL_NO_SKIP",{}): - color = "lightblack" - + elif job.name in child.edge_info.get("MULTI",{}): + color = "grey" elif job.name in child.edge_info.get("COMPLETED",{}): - color = "grey" + color = "black" else: return None, None if label and label == 0: diff --git a/docs/source/userguide/defining_workflows/index.rst b/docs/source/userguide/defining_workflows/index.rst index 57e12ff87..1743c04e4 100644 --- a/docs/source/userguide/defining_workflows/index.rst +++ b/docs/source/userguide/defining_workflows/index.rst @@ -222,13 +222,21 @@ The ``STATUS`` keyword can be used to select the status of the dependency that y * "HELD": The task is held. * "QUEUING": The task is queuing. * "RUNNING": The task is running. + +The status are ordered, so if you select "RUNNING" status, the task will be run if the parent is in any of the following statuses: "RUNNING", "QUEUING", "HELD", "SUBMITTED", "READY", "PREPARED", "DELAYED", "WAITING". + +There are also a few "final" statuses that can be combined, the ones that can be combined are the following: + * "SKIPPED": The task is skipped. -* "FAILED": The task is failed. +* "FAILED": The task is failed and only failed. * "UNKNOWN": The task is unknown. * "COMPLETED": The task is completed. # Default * "SUSPENDED": The task is suspended. -The status are ordered, so if you select "RUNNING" status, the task will be run if the parent is in any of the following statuses: "RUNNING", "QUEUING", "HELD", "SUBMITTED", "READY", "PREPARED", "DELAYED", "WAITING". +Example: + STATUS: "COMPLETED, FAILED" + + .. code-block:: yaml -- GitLab From ada47c18a6d2f60f99a65037aa33e2c726c1193a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 13 Jun 2024 10:00:45 +0200 Subject: [PATCH 3/4] Added "final" and "final_no_skip" --- autosubmit/job/job.py | 2 +- autosubmit/job/job_list.py | 36 ++++++++++++++++++++++++++++------- autosubmit/monitor/monitor.py | 7 +++++++ 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bf102db05..6fc75323e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -2239,7 +2239,7 @@ class Job(object): return if self.wrapper_type == "vertical" and self.fail_count > 0: self.submit_time_timestamp = self.finish_time_timestamp - print(("Call from {} with status {}".format(self.name, self.status_str))) + Log.info(f"Call from {self.name} with status {self.status_str}") if hold is True: return # Do not write for HELD jobs. diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 7040baebb..1bead7931 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2579,29 +2579,48 @@ class JobList(object): Check if all parents of a job have the correct status for checkpointing :return: jobs that fullfill the special conditions """ jobs_to_check = [] + jobs_to_skip = [] for status, sorted_job_list in self.jobs_edges.items(): if status == "ALL": continue for job in sorted_job_list: if job.status != Status.WAITING: continue - if status in ["RUNNING", "FAILED"]: + if status.upper() in ["RUNNING", "FAILED"]: # check checkpoint if any if job.platform and job.platform.connected: # This will be true only when used under setstatus/run job.get_checkpoint_files() non_completed_parents_current = 0 completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) for parent in job.edge_info[status].values(): - if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + if status.upper() in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: continue else: status_str = Status.VALUE_TO_KEY[parent[0].status] - if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status): - non_completed_parents_current += 1 + if status.upper() == "FINAL": + if status_str in ["FAILED","UNKNOWN","SKIPPED"]: + non_completed_parents_current += 1 + elif status.upper() == "FINAL_NO_SKIP": + if status_str in ["FAILED","UNKNOWN"]: + non_completed_parents_current += 1 + elif status_str == "SKIPPED": + jobs_to_skip.append(job) + elif status.upper() == "FAILED": + if status_str in ["FAILED","UNKNOWN"]: + non_completed_parents_current += 1 + elif status_str == "COMPLETED": + jobs_to_skip.append(job) + elif status.upper() == "COMPLETED": + if status_str == "FAILED": + jobs_to_skip.append(job) + else: + if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status.upper()): + non_completed_parents_current += 1 if (non_completed_parents_current + completed_parents) == len(job.parents): - jobs_to_check.append(job) + if job not in jobs_to_skip: + jobs_to_check.append(job) - return jobs_to_check + return jobs_to_check, jobs_to_skip def update_log_status(self, job, as_conf): """ @@ -2712,7 +2731,10 @@ class JobList(object): job.fail_count = 0 job.packed = False # Check checkpoint jobs, the status can be Any - for job in self.check_special_status(): + jobs_to_check, jobs_to_skip = self.check_special_status() + for job in jobs_to_skip: + job.status = Status.SKIPPED + for job in jobs_to_check: job.status = Status.READY # Run start time in format (YYYYMMDDHH:MM:SS) from current time job.ready_start_date = strftime("%Y%m%d%H%M%S") diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index ccdfffb02..ed4761adf 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -307,6 +307,13 @@ class Monitor: color = self._table.get(Status.READY,None) elif job.name in child.edge_info.get("SUBMITTED",{}): color = self._table.get(Status.SUBMITTED,None) + elif job.name in child.edge_info.get("FINAL",{}): + color = "black" + elif job.name in child.edge_info.get("FINAL_NO_SKIP",{}): + color = "lightblack" + + elif job.name in child.edge_info.get("COMPLETED",{}): + color = "grey" else: return None, None if label and label == 0: -- GitLab From 2b664da2058219b1d270b0efe670009caa112366 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 13 Jun 2024 11:49:33 +0200 Subject: [PATCH 4/4] Changing to combine --- autosubmit/job/job_common.py | 4 +- autosubmit/job/job_list.py | 44 ++++++++++++------- autosubmit/monitor/monitor.py | 9 ++-- .../userguide/defining_workflows/index.rst | 12 ++++- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 92dc1b1bd..1831d7ade 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -41,8 +41,8 @@ class Status: VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED', 9: 'DELAYED'} KEY_TO_VALUE = {'SUSPENDED': -3, 'UNKNOWN': -2, 'FAILED': -1, 'WAITING': 0, 'READY': 1, 'SUBMITTED': 2, 'QUEUING': 3, 'RUNNING': 4, 'COMPLETED': 5, 'HELD': 6, 'PREPARED': 7, 'SKIPPED': 8, 'DELAYED': 9} - LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING", "SKIPPED", "FAILED", "UNKNOWN", "COMPLETED", "SUSPENDED"] - + LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING"] + COMBINABLE = ["COMPLETED","FAILED","SKIPPED","UNKNOWN","SUSPENDED"] def retval(self, value): return getattr(self, value) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 1bead7931..7aa1fbd2d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -937,11 +937,28 @@ class JobList(object): :return: """ if special_status not in self.jobs_edges: - self.jobs_edges[special_status] = set() - self.jobs_edges[special_status].add(job) - if "ALL" not in self.jobs_edges: - self.jobs_edges["ALL"] = set() - self.jobs_edges["ALL"].add(job) + multi = [] + if "," in special_status: + multi = [ status.strip(" ") for status in special_status.split(",") if status.strip(" ") != ""] + else: + multi = [ status.strip(" ") for status in special_status.split(" ") if status.strip(" ") != ""] + if len(multi) == 1: + self.jobs_edges[special_status] = set() + self.jobs_edges[special_status].add(job) + else: + self.jobs_edges["MULTI"] = set() + self.jobs_edges["MULTI"].add(job) + # self.jobs_edges["MULTI"] = {} + # for status in multi: + # if job.name not in self.jobs_edges["MULTI"]: + # self.jobs_edges["MULTI"][job.name] = dict() + # if status not in self.jobs_edges["MULTI"][job.name]: + # self.jobs_edges["MULTI"][job.name][status] = set() + # self.jobs_edges["MULTI"][job.name][status].add(job) + if "ALL" not in self.jobs_edges: + self.jobs_edges["ALL"] = set() + self.jobs_edges["ALL"].add(job) + def add_special_conditions(self, job, special_conditions, filters_to_apply, parent): """ @@ -2592,20 +2609,12 @@ class JobList(object): job.get_checkpoint_files() non_completed_parents_current = 0 completed_parents = len([parent for parent in job.parents if parent.status == Status.COMPLETED]) - for parent in job.edge_info[status].values(): - if status.upper() in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: + for parent in job.edge_info[status.upper()].values(): + if status in ["RUNNING", "FAILED"] and parent[1] and int(parent[1]) >= job.current_checkpoint_step: continue else: status_str = Status.VALUE_TO_KEY[parent[0].status] - if status.upper() == "FINAL": - if status_str in ["FAILED","UNKNOWN","SKIPPED"]: - non_completed_parents_current += 1 - elif status.upper() == "FINAL_NO_SKIP": - if status_str in ["FAILED","UNKNOWN"]: - non_completed_parents_current += 1 - elif status_str == "SKIPPED": - jobs_to_skip.append(job) - elif status.upper() == "FAILED": + if status.upper() == "FAILED": if status_str in ["FAILED","UNKNOWN"]: non_completed_parents_current += 1 elif status_str == "COMPLETED": @@ -2613,6 +2622,9 @@ class JobList(object): elif status.upper() == "COMPLETED": if status_str == "FAILED": jobs_to_skip.append(job) + elif status.upper() == "MULTI": + if status not in Status.COMBINABLE: + raise AutosubmitCritical(f"Invalid status {status} in special status. Combinable Status are: {Status.COMBINABLE}") else: if Status.LOGICAL_ORDER.index(status_str) >= Status.LOGICAL_ORDER.index(status.upper()): non_completed_parents_current += 1 diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index ed4761adf..cb348f7be 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -307,13 +307,10 @@ class Monitor: color = self._table.get(Status.READY,None) elif job.name in child.edge_info.get("SUBMITTED",{}): color = self._table.get(Status.SUBMITTED,None) - elif job.name in child.edge_info.get("FINAL",{}): - color = "black" - elif job.name in child.edge_info.get("FINAL_NO_SKIP",{}): - color = "lightblack" - + elif job.name in child.edge_info.get("MULTI",{}): + color = "grey" elif job.name in child.edge_info.get("COMPLETED",{}): - color = "grey" + color = "black" else: return None, None if label and label == 0: diff --git a/docs/source/userguide/defining_workflows/index.rst b/docs/source/userguide/defining_workflows/index.rst index 3cd8a8974..cae1530c6 100644 --- a/docs/source/userguide/defining_workflows/index.rst +++ b/docs/source/userguide/defining_workflows/index.rst @@ -226,13 +226,21 @@ The ``STATUS`` keyword can be used to select the status of the dependency that y * "HELD": The task is held. * "QUEUING": The task is queuing. * "RUNNING": The task is running. + +The status are ordered, so if you select "RUNNING" status, the task will be run if the parent is in any of the following statuses: "RUNNING", "QUEUING", "HELD", "SUBMITTED", "READY", "PREPARED", "DELAYED", "WAITING". + +There are also a few "final" statuses that can be combined, the ones that can be combined are the following: + * "SKIPPED": The task is skipped. -* "FAILED": The task is failed. +* "FAILED": The task is failed and only failed. * "UNKNOWN": The task is unknown. * "COMPLETED": The task is completed. # Default * "SUSPENDED": The task is suspended. -The status are ordered, so if you select "RUNNING" status, the task will be run if the parent is in any of the following statuses: "RUNNING", "QUEUING", "HELD", "SUBMITTED", "READY", "PREPARED", "DELAYED", "WAITING". +Example: + STATUS: "COMPLETED, FAILED" + + .. code-block:: yaml -- GitLab