diff --git a/.vscode/settings.json b/.vscode/settings.json index 97648ef47cdd77c80c7046c3460f8b80f26d802b..e90a26767ae6535ce55eb5352ccb9b0aba54ff21 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "restructuredtext.confPath": "${workspaceFolder}/docs/source" + "restructuredtext.confPath": "${workspaceFolder}/docs/source", + "python.pythonPath": "/Library/Frameworks/Python.framework/Versions/2.7/bin/python2.7" } \ No newline at end of file diff --git a/a29z_20200324_1027.pdfn b/a29z_20200324_1027.pdfn deleted file mode 100644 index 830d8fbd51b4978d420876ca927549b12e5b4020..0000000000000000000000000000000000000000 Binary files a/a29z_20200324_1027.pdfn and /dev/null differ diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py new file mode 100644 index 0000000000000000000000000000000000000000..f9d34c7e30886d691a8961d2dbdc4405f70925ed --- /dev/null +++ b/autosubmit/database/db_structure.py @@ -0,0 +1,153 @@ +import os +import sys +import string +import time +import pickle +import textwrap +import traceback +import sqlite3 +import copy +from datetime import datetime +from networkx import DiGraph + +#DB_FILE_AS_TIMES = "/esarchive/autosubmit/as_times.db" + + +def get_structure(exp_id, exp_path): + """ + Creates file of database and table of experiment structure if it does not exist. Returns current structure. + + :return: Map from experiment name source to name destination + :rtype: Dictionary Key: String, Value: List(of String) + """ + try: + pkl_path = os.path.join(exp_path, exp_id, "pkl") + if os.path.exists(pkl_path): + db_structure_path = os.path.join( + pkl_path, "structure_" + exp_id + ".db") + if not os.path.exists(db_structure_path): + open(db_structure_path, "w") + # print(db_structure_path) + conn = create_connection(db_structure_path) + create_table_query = textwrap.dedent( + '''CREATE TABLE + IF NOT EXISTS experiment_structure ( + e_from text NOT NULL, + e_to text NOT NULL, + UNIQUE(e_from,e_to) + );''') + create_table(conn, create_table_query) + current_table = _get_exp_structure(db_structure_path) + # print("Current table: ") + # print(current_table) + current_table_structure = dict() + for item in current_table: + _from, _to = item + if _from not in current_table_structure.keys(): + current_table_structure[_from] = list() + if _to not in current_table_structure.keys(): + current_table_structure[_to] = list() + current_table_structure[_from].append(_to) + if (len(current_table_structure.keys()) > 0): + # print("Return structure") + return current_table_structure + else: + return None + else: + # pkl folder not found + raise Exception("pkl folder not found " + str(pkl_path)) + except Exception as exp: + print(traceback.format_exc()) + + +def create_connection(db_file): + """ + Create a database connection to the SQLite database specified by db_file. + :param db_file: database file name + :return: Connection object or None + """ + try: + conn = sqlite3.connect(db_file) + return conn + except: + return None + + +def create_table(conn, create_table_sql): + """ create a table from the create_table_sql statement + :param conn: Connection object + :param create_table_sql: a CREATE TABLE statement + :return: + """ + try: + c = conn.cursor() + c.execute(create_table_sql) + except Exception as e: + print(e) + + +def _get_exp_structure(path): + """ + Get all registers from experiment_status.\n + :return: row content: exp_id, name, status, seconds_diff + :rtype: 4-tuple (int, str, str, int) + """ + try: + conn = create_connection(path) + conn.text_factory = str + cur = conn.cursor() + cur.execute( + "SELECT e_from, e_to FROM experiment_structure") + rows = cur.fetchall() + return rows + except Exception as exp: + print(traceback.format_exc()) + return dict() + + +def save_structure(graph, exp_id, exp_path): + """ + Saves structure if path is valid + """ + pkl_path = os.path.join(exp_path, exp_id, "pkl") + if os.path.exists(pkl_path): + db_structure_path = os.path.join( + pkl_path, "structure_" + exp_id + ".db") + # with open(db_structure_path, "w"): + conn = create_connection(db_structure_path) + _delete_table_content(conn) + for u, v in graph.edges(): + # save + _create_edge(conn, u, v) + #print("Created edge " + str(u) + str(v)) + conn.commit() + else: + # pkl folder not found + raise Exception("pkl folder not found " + str(pkl_path)) + + +def _create_edge(conn, u, v): + """ + Create edge + """ + try: + sql = ''' INSERT INTO experiment_structure(e_from, e_to) VALUES(?,?) ''' + cur = conn.cursor() + cur.execute(sql, (u, v)) + # return cur.lastrowid + except sqlite3.Error as e: + print("Error on Insert : " + str(type(e).__name__)) + + +def _delete_table_content(conn): + """ + Deletes table content + """ + try: + sql = ''' DELETE FROM experiment_structure ''' + cur = conn.cursor() + cur.execute(sql) + conn.commit() + except sqlite3.Error as e: + print(traceback.format_exc()) + print("Error on Delete : " + str(type(e).__name__)) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 59abd809c72677096b1d24612be840a0e99fc74e..8d4707c562e217a9ecbb047cc7846988e979f149 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -36,6 +36,7 @@ from autosubmit.job.job_utils import Dependency from autosubmit.job.job_common import Status, Type, bcolors from bscearth.utils.date import date2str, parse_date, sum_str_hours from autosubmit.job.job_packages import JobPackageSimple, JobPackageArray, JobPackageThread +import autosubmit.database.db_structure as DbStructure from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction @@ -48,7 +49,8 @@ class JobList: """ def __init__(self, expid, config, parser_factory, job_list_persistence): - self._persistence_path = os.path.join(config.LOCAL_ROOT_DIR, expid, "pkl") + self._persistence_path = os.path.join( + config.LOCAL_ROOT_DIR, expid, "pkl") self._update_file = "updated_list_" + expid + ".txt" self._failed_file = "failed_job_list_" + expid + ".pkl" self._persistence_file = "job_list_" + expid @@ -71,6 +73,7 @@ class JobList: self.packages_id = dict() self.job_package_map = dict() self.sections_checked = set() + @property def expid(self): """ @@ -96,7 +99,7 @@ class JobList: self._graph = value def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, wrapper_type=None, wrapper_jobs=None,new=True, notransitive=False): + default_job_type, wrapper_type=None, wrapper_jobs=None, new=True, notransitive=False): """ Creates all jobs needed for the current workflow @@ -132,7 +135,8 @@ class JobList: jobs_parser = self._get_jobs_parser() - dic_jobs = DicJobs(self, jobs_parser, date_list, member_list, chunk_list, date_format, default_retrials) + dic_jobs = DicJobs(self, jobs_parser, date_list, member_list, + chunk_list, date_format, default_retrials) self._dic_jobs = dic_jobs priority = 0 @@ -141,9 +145,11 @@ class JobList: # jobs_data includes the name of the .our and .err files of the job in LOG_expid if not new: jobs_data = {str(row[0]): row for row in self.load()} - self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) + self._create_jobs(dic_jobs, jobs_parser, priority, + default_job_type, jobs_data) Log.info("Adding dependencies...") - self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph) + self._add_dependencies(date_list, member_list, + chunk_list, dic_jobs, jobs_parser, self.graph) Log.info("Removing redundant dependencies...") self.update_genealogy(new, notransitive) @@ -151,9 +157,9 @@ class JobList: job.parameters = parameters # Perhaps this should be done by default independent of the wrapper_type supplied - if wrapper_type == 'vertical-mixed': - self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs(wrapper_jobs) - + if wrapper_type == 'vertical-mixed': + self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs( + wrapper_jobs) @staticmethod def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"): @@ -165,7 +171,8 @@ class JobList: continue dependencies_keys = jobs_parser.get(job_section, option).split() - dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs,job_section) + dependencies = JobList._manage_dependencies( + dependencies_keys, dic_jobs, job_section) for job in dic_jobs.get_jobs(job_section): num_jobs = 1 @@ -177,14 +184,14 @@ class JobList: dependencies, graph) @staticmethod - def _manage_dependencies(dependencies_keys, dic_jobs,job_section): + def _manage_dependencies(dependencies_keys, dic_jobs, job_section): dependencies = dict() for key in dependencies_keys: distance = None splits = None sign = None - if '-' not in key and '+' not in key and '*' not in key: + if '-' not in key and '+' not in key and '*' not in key: section = key else: if '-' in key: @@ -199,47 +206,61 @@ class JobList: if '[' in section: section_name = section[0:section.find("[")] - splits_section = int(dic_jobs.get_option(section_name, 'SPLITS', 0)) - splits = JobList._calculate_splits_dependencies(section, splits_section) + splits_section = int( + dic_jobs.get_option(section_name, 'SPLITS', 0)) + splits = JobList._calculate_splits_dependencies( + section, splits_section) section = section_name - dependency_running_type = dic_jobs.get_option(section, 'RUNNING', 'once').lower() + dependency_running_type = dic_jobs.get_option( + section, 'RUNNING', 'once').lower() delay = int(dic_jobs.get_option(section, 'DELAY', -1)) - select_chunks_opt = dic_jobs.get_option(job_section, 'SELECT_CHUNKS', None) + select_chunks_opt = dic_jobs.get_option( + job_section, 'SELECT_CHUNKS', None) selected_chunks = [] if select_chunks_opt is not None: if '*' in select_chunks_opt: sections_chunks = select_chunks_opt.split(' ') for section_chunk in sections_chunks: - info=section_chunk.split('*') + info = section_chunk.split('*') if info[0] in key: - for relation in range(1,len(info)): - auxiliar_relation_list=[] + for relation in range(1, len(info)): + auxiliar_relation_list = [] for location in info[relation].split('-'): auxiliar_chunk_list = [] location = location.strip('[').strip(']') if ':' in location: if len(location) == 3: - for chunk_number in range(int(location[0]),int(location[2])+1): - auxiliar_chunk_list.append(chunk_number) + for chunk_number in range(int(location[0]), int(location[2])+1): + auxiliar_chunk_list.append( + chunk_number) elif len(location) == 2: if ':' == location[0]: for chunk_number in range(0, int(location[1])+1): - auxiliar_chunk_list.append(chunk_number) + auxiliar_chunk_list.append( + chunk_number) elif ':' == location[1]: - for chunk_number in range(int(location[0])+1,len(dic_jobs._chunk_list)-1): - auxiliar_chunk_list.append(chunk_number) + for chunk_number in range(int(location[0])+1, len(dic_jobs._chunk_list)-1): + auxiliar_chunk_list.append( + chunk_number) elif ',' in location: for chunk in location.split(','): - auxiliar_chunk_list.append(int(chunk)) - elif re.match('^[0-9]+$',location): - auxiliar_chunk_list.append(int(location)) - auxiliar_relation_list.append(auxiliar_chunk_list) + auxiliar_chunk_list.append( + int(chunk)) + elif re.match('^[0-9]+$', location): + auxiliar_chunk_list.append( + int(location)) + auxiliar_relation_list.append( + auxiliar_chunk_list) selected_chunks.append(auxiliar_relation_list) if len(selected_chunks) >= 1: - dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,selected_chunks) #[]select_chunks_dest,select_chunks_orig + # []select_chunks_dest,select_chunks_orig + dependency = Dependency( + section, distance, dependency_running_type, sign, delay, splits, selected_chunks) else: - dependency = Dependency(section, distance, dependency_running_type, sign, delay, splits,[]) #[]select_chunks_dest,select_chunks_orig + # []select_chunks_dest,select_chunks_orig + dependency = Dependency( + section, distance, dependency_running_type, sign, delay, splits, []) dependencies[key] = dependency return dependencies @@ -271,30 +292,34 @@ class JobList: dependency) if skip: continue - chunk_relations_to_add=list() - if len(dependency.select_chunks_orig) > 0: # find chunk relation + chunk_relations_to_add = list() + if len(dependency.select_chunks_orig) > 0: # find chunk relation relation_indx = 0 while relation_indx < len(dependency.select_chunks_orig): if len(dependency.select_chunks_orig[relation_indx]) == 0 or job.chunk in dependency.select_chunks_orig[relation_indx] or job.chunk is None: chunk_relations_to_add.append(relation_indx) - relation_indx+=1 + relation_indx += 1 relation_indx -= 1 - if len(dependency.select_chunks_orig) <= 0 or job.chunk is None or len(chunk_relations_to_add) > 0 : #If doesn't contain select_chunks or running isn't chunk . ... - parents_jobs=dic_jobs.get_jobs(dependency.section, date, member, chunk) + # If doesn't contain select_chunks or running isn't chunk . ... + if len(dependency.select_chunks_orig) <= 0 or job.chunk is None or len(chunk_relations_to_add) > 0: + parents_jobs = dic_jobs.get_jobs( + dependency.section, date, member, chunk) for parent in parents_jobs: if dependency.delay == -1 or chunk > dependency.delay: if isinstance(parent, list): if job.split is not None: - parent = filter(lambda _parent: _parent.split == job.split, parent)[0] + parent = filter( + lambda _parent: _parent.split == job.split, parent)[0] else: if dependency.splits is not None: - parent = filter(lambda _parent: _parent.split in dependency.splits, parent) + parent = filter( + lambda _parent: _parent.split in dependency.splits, parent) if len(dependency.select_chunks_dest) <= 0 or parent.chunk is None: job.add_parent(parent) JobList._add_edge(graph, job, parent) else: - visited_parents=set() + visited_parents = set() for relation_indx in chunk_relations_to_add: if parent.chunk in dependency.select_chunks_dest[relation_indx] or len(dependency.select_chunks_dest[relation_indx]) == 0: if parent not in visited_parents: @@ -304,6 +329,7 @@ class JobList: JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, dependency.section, graph) + @staticmethod def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency): skip = False @@ -400,20 +426,21 @@ class JobList: def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): for section in parser.sections(): Log.debug("Creating {0} jobs".format(section)) - dic_jobs.read_section(section, priority, default_job_type, jobs_data) + dic_jobs.read_section( + section, priority, default_job_type, jobs_data) priority += 1 def _create_sorted_dict_jobs(self, wrapper_jobs): """ Creates a sorting of the jobs whose job.section is in wrapper_jobs, according to the following filters in order of importance: - date, member, RUNNING, and chunk number; where RUNNING is defined in jobs_.conf for each section. + date, member, RUNNING, and chunk number; where RUNNING is defined in jobs_.conf for each section. If the job does not have a chunk number, the total number of chunks configured for the experiment is used. :param wrapper_jobs: User defined job types in autosubmit_,conf [wrapper] section to be wrapped. \n :type wrapper_jobs: String \n :return: Sorted Dictionary of Dictionary of List that represents the jobs included in the wrapping process. \n - :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) + :rtype: Dictionary Key: date, Value: (Dictionary Key: Member, Value: List of jobs that belong to the date, member, and are ordered by chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) """ # Dictionary Key: date, Value: (Dictionary Key: Member, Value: List) dict_jobs = dict() @@ -423,21 +450,24 @@ class JobList: dict_jobs[date][member] = list() num_chunks = len(self._chunk_list) # Select only relevant jobs, those belonging to the sections defined in the wrapper - filtered_jobs_list = filter(lambda job: job.section in wrapper_jobs, self._job_list) + filtered_jobs_list = filter( + lambda job: job.section in wrapper_jobs, self._job_list) - filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members(filtered_jobs_list) + filtered_jobs_fake_date_member, fake_original_job_map = self._create_fake_dates_members( + filtered_jobs_list) - sections_running_type_map = dict() + sections_running_type_map = dict() for section in wrapper_jobs.split(" "): # RUNNING = once, as default. This value comes from jobs_.conf - sections_running_type_map[section] = self._dic_jobs.get_option(section, "RUNNING", 'once') - + sections_running_type_map[section] = self._dic_jobs.get_option( + section, "RUNNING", 'once') + for date in self._date_list: str_date = self._get_date(date) for member in self._member_list: # Filter list of fake jobs according to date and member, result not sorted at this point sorted_jobs_list = filter(lambda job: job.name.split("_")[1] == str_date and - job.name.split("_")[2] == member, filtered_jobs_fake_date_member) + job.name.split("_")[2] == member, filtered_jobs_fake_date_member) previous_job = sorted_jobs_list[0] @@ -445,7 +475,7 @@ class JobList: section_running_type = sections_running_type_map[previous_job.section] jobs_to_sort = [previous_job] - previous_section_running_type = None + previous_section_running_type = None # Index starts at 1 because 0 has been taken in a previous step for index in range(1, len(sorted_jobs_list) + 1): # If not last item @@ -457,10 +487,10 @@ class JobList: section_running_type = sections_running_type_map[job.section] # Test if RUNNING is different between sections, or if we have reached the last item in sorted_jobs_list if (previous_section_running_type != None and previous_section_running_type != section_running_type) \ - or index == len(sorted_jobs_list): + or index == len(sorted_jobs_list): - # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) - # Important to note that the only differentiating factor would be chunk OR num_chunks + # Sorting by date, member, chunk number if it is a chunk job otherwise num_chunks from JOB TYPE (section) + # Important to note that the only differentiating factor would be chunk OR num_chunks jobs_to_sort = sorted(jobs_to_sort, key=lambda k: (k.name.split('_')[1], (k.name.split('_')[2]), (int(k.name.split('_')[3]) if len(k.name.split('_')) == 5 else num_chunks + 1))) @@ -484,7 +514,7 @@ class JobList: def _create_fake_dates_members(self, filtered_jobs_list): """ - Using the list of jobs provided, creates clones of these jobs and modifies names conditionted on job.date, job.member values (testing None). + Using the list of jobs provided, creates clones of these jobs and modifies names conditionted on job.date, job.member values (testing None). The purpose is that all jobs share the same name structure. :param filtered_jobs_list: A list of jobs of only those that comply with certain criteria, e.g. those belonging to a user defined job type for wrapping. \n @@ -508,7 +538,7 @@ class JobList: fake_job = copy.deepcopy(job) # Use previous values to modify name of fake job fake_job.name = fake_job.name.split('_', 1)[0] + "_" + self._get_date(date) + "_" \ - + member + "_" + fake_job.name.split("_", 1)[1] + + member + "_" + fake_job.name.split("_", 1)[1] # Filling list of fake jobs, only difference is the name filtered_jobs_fake_date_member.append(fake_job) # Mapping fake jobs to orignal ones @@ -561,7 +591,6 @@ class JobList: return self._date_list def get_member_list(self): - """ Get inner member list @@ -606,7 +635,7 @@ class JobList: """ return self._ordered_jobs_by_date_member - def get_completed(self, platform=None,wrapper=False): + def get_completed(self, platform=None, wrapper=False): """ Returns a list of completed jobs @@ -617,7 +646,7 @@ class JobList: """ completed_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.COMPLETED] + job.status == Status.COMPLETED] if wrapper: return [job for job in completed_jobs if job.packed is False] @@ -634,13 +663,14 @@ class JobList: :rtype: list """ uncompleted_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status != Status.COMPLETED] + job.status != Status.COMPLETED] if wrapper: return [job for job in uncompleted_jobs if job.packed is False] else: return uncompleted_jobs - def get_submitted(self, platform=None, hold =False , wrapper=False): + + def get_submitted(self, platform=None, hold=False, wrapper=False): """ Returns a list of submitted jobs @@ -651,17 +681,17 @@ class JobList: """ submitted = list() if hold: - submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.SUBMITTED and job.hold == hold ] + submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.SUBMITTED and job.hold == hold] else: - submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.SUBMITTED ] + submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.SUBMITTED] if wrapper: return [job for job in submitted if job.packed is False] else: return submitted - def get_running(self, platform=None,wrapper=False): + def get_running(self, platform=None, wrapper=False): """ Returns a list of jobs running @@ -670,13 +700,14 @@ class JobList: :return: running jobs :rtype: list """ - running= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.RUNNING] + running = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.RUNNING] if wrapper: return [job for job in running if job.packed is False] else: return running - def get_queuing(self, platform=None,wrapper=False): + + def get_queuing(self, platform=None, wrapper=False): """ Returns a list of jobs queuing @@ -685,13 +716,14 @@ class JobList: :return: queuedjobs :rtype: list """ - queuing= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.QUEUING] + queuing = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.QUEUING] if wrapper: return [job for job in queuing if job.packed is False] else: return queuing - def get_failed(self, platform=None,wrapper=False): + + def get_failed(self, platform=None, wrapper=False): """ Returns a list of failed jobs @@ -700,13 +732,14 @@ class JobList: :return: failed jobs :rtype: list """ - failed= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.FAILED] + failed = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.FAILED] if wrapper: return [job for job in failed if job.packed is False] else: return failed - def get_unsubmitted(self, platform=None,wrapper=False): + + def get_unsubmitted(self, platform=None, wrapper=False): """ Returns a list of unsummited jobs @@ -715,15 +748,15 @@ class JobList: :return: all jobs :rtype: list """ - unsubmitted= [job for job in self._job_list if (platform is None or job.platform == platform) and - ( job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED ) ] + unsubmitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + (job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED)] if wrapper: return [job for job in unsubmitted if job.packed is False] else: return unsubmitted - def get_all(self, platform=None,wrapper=False): + def get_all(self, platform=None, wrapper=False): """ Returns a list of all jobs @@ -739,7 +772,7 @@ class JobList: else: return all - def get_ready(self, platform=None, hold=False , wrapper=False ): + def get_ready(self, platform=None, hold=False, wrapper=False): """ Returns a list of ready jobs @@ -749,14 +782,14 @@ class JobList: :rtype: list """ ready = [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.READY and job.hold is hold] + job.status == Status.READY and job.hold is hold] if wrapper: return [job for job in ready if job.packed is False] else: return ready - def get_waiting(self, platform=None,wrapper=False): + def get_waiting(self, platform=None, wrapper=False): """ Returns a list of jobs waiting @@ -765,8 +798,8 @@ class JobList: :return: waiting jobs :rtype: list """ - waiting_jobs= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.WAITING] + waiting_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.WAITING] if wrapper: return [job for job in waiting_jobs if job.packed is False] else: @@ -781,11 +814,11 @@ class JobList: :return: waiting jobs :rtype: list """ - waiting_jobs= [job for job in self._job_list if (job.platform.type == platform_type and - job.status == Status.WAITING)] + waiting_jobs = [job for job in self._job_list if (job.platform.type == platform_type and + job.status == Status.WAITING)] return waiting_jobs - def get_held_jobs(self,platform = None): + def get_held_jobs(self, platform=None): """ Returns a list of jobs in the platforms (Held) @@ -797,8 +830,7 @@ class JobList: return [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.HELD] - - def get_unknown(self, platform=None,wrapper=False): + def get_unknown(self, platform=None, wrapper=False): """ Returns a list of jobs on unknown state @@ -807,13 +839,14 @@ class JobList: :return: unknown state jobs :rtype: list """ - submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.UNKNOWN] + submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.UNKNOWN] if wrapper: return [job for job in submitted if job.packed is False] else: return submitted - def get_suspended(self, platform=None,wrapper=False): + + def get_suspended(self, platform=None, wrapper=False): """ Returns a list of jobs on unknown state @@ -822,12 +855,13 @@ class JobList: :return: unknown state jobs :rtype: list """ - suspended= [job for job in self._job_list if (platform is None or job.platform == platform) and - job.status == Status.SUSPENDED] + suspended = [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.SUSPENDED] if wrapper: return [job for job in suspended if job.packed is False] else: return suspended + def get_in_queue(self, platform=None, wrapper=False): """ Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held) @@ -844,7 +878,8 @@ class JobList: return [job for job in in_queue if job.packed is False] else: return in_queue - def get_not_in_queue(self, platform=None,wrapper=False): + + def get_not_in_queue(self, platform=None, wrapper=False): """ Returns a list of jobs NOT in the platforms (Ready, Waiting) @@ -853,12 +888,13 @@ class JobList: :return: jobs not in platforms :rtype: list """ - not_queued= self.get_ready(platform) + self.get_waiting(platform) + not_queued = self.get_ready(platform) + self.get_waiting(platform) if wrapper: return [job for job in not_queued if job.packed is False] else: return not_queued - def get_finished(self, platform=None,wrapper=False): + + def get_finished(self, platform=None, wrapper=False): """ Returns a list of jobs finished (Completed, Failed) @@ -868,7 +904,7 @@ class JobList: :return: finished jobs :rtype: list """ - finished= self.get_completed(platform) + self.get_failed(platform) + finished = self.get_completed(platform) + self.get_failed(platform) if wrapper: return [job for job in finished if job.packed is False] else: @@ -886,11 +922,14 @@ class JobList: :return: active jobs :rtype: list """ - active = self.get_in_queue(platform) + self.get_ready(platform=platform,hold=True) + self.get_ready(platform=platform,hold=False) - tmp = [job for job in active if job.hold and not job.status == Status.SUBMITTED and not job.status == Status.READY] - if len(tmp) == len(active): # IF only held jobs left without dependencies satisfied + active = self.get_in_queue(platform) + self.get_ready( + platform=platform, hold=True) + self.get_ready(platform=platform, hold=False) + tmp = [job for job in active if job.hold and not job.status == + Status.SUBMITTED and not job.status == Status.READY] + if len(tmp) == len(active): # IF only held jobs left without dependencies satisfied if len(tmp) != 0 and len(active) != 0: - Log.warning("Only Held Jobs active,Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow) ") + Log.warning( + "Only Held Jobs active,Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow) ") active = [] return active @@ -919,10 +958,10 @@ class JobList: jobs_by_id[job.id].append(job) return jobs_by_id - def get_in_ready_grouped_id(self, platform): - jobs=[] - [jobs.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] + jobs = [] + [jobs.append(job) for job in jobs if ( + platform is None or job._platform.name is platform.name)] jobs_by_id = dict() for job in jobs: @@ -998,7 +1037,8 @@ class JobList: """ Persists the job list """ - self._persistence.save(self._persistence_path, self._persistence_file, self._job_list) + self._persistence.save(self._persistence_path, + self._persistence_file, self._job_list) def update_from_file(self, store_change=True): """ @@ -1006,7 +1046,8 @@ class JobList: :param store_change: if True, renames the update file to avoid reloading it at the next iteration """ if os.path.exists(os.path.join(self._persistence_path, self._update_file)): - Log.info("Loading updated list: {0}".format(os.path.join(self._persistence_path, self._update_file))) + Log.info("Loading updated list: {0}".format( + os.path.join(self._persistence_path, self._update_file))) for line in open(os.path.join(self._persistence_path, self._update_file)): if line.strip() == '': continue @@ -1034,7 +1075,7 @@ class JobList: def parameters(self, value): self._parameters = value - def update_list(self, as_conf,store_change=True,fromSetStatus=False): + def update_list(self, as_conf, store_change=True, fromSetStatus=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1058,62 +1099,76 @@ class JobList: retrials = job.retrials if job.fail_count <= retrials: - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY job.packed = False save = True - Log.debug("Resetting job: {0} status to: READY for retrial...".format(job.name)) + Log.debug( + "Resetting job: {0} status to: READY for retrial...".format(job.name)) else: job.status = Status.WAITING save = True job.packed = False - Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) + Log.debug( + "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: Log.debug('Updating SYNC jobs') - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): job.status = Status.WAITING save = True - Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + Log.debug( + "Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) Log.debug('Update finished') Log.debug('Updating WAITING jobs') if not fromSetStatus: all_parents_completed = [] for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False - Log.debug("Setting job: {0} status to: READY (all parents completed)...".format(job.name)) + Log.debug( + "Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): - Log.debug('Updating WAITING jobs eligible for remote_dependencies') + Log.debug( + 'Updating WAITING jobs eligible for remote_dependencies') for job in self.get_waiting_remote_dependencies('slurm'.lower()): if job.name not in all_parents_completed: - tmp = [parent for parent in job.parents if ( (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower() )] + tmp = [parent for parent in job.parents if ( + (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower())] if len(tmp) == len(job.parents): job.status = Status.READY job.hold = True - Log.debug("Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) + Log.debug( + "Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) Log.debug('Updating Held jobs') if self.job_package_map: - held_jobs = [job for job in self.get_held_jobs() if ( job.id not in self.job_package_map.keys() ) ] - held_jobs += [wrapper_job for wrapper_job in self.job_package_map.values() if wrapper_job.status == Status.HELD ] + held_jobs = [job for job in self.get_held_jobs() if ( + job.id not in self.job_package_map.keys())] + held_jobs += [wrapper_job for wrapper_job in self.job_package_map.values() + if wrapper_job.status == Status.HELD] else: held_jobs = self.get_held_jobs() for job in held_jobs: - if self.job_package_map and job.id in self.job_package_map.keys(): # Wrappers and inner jobs + if self.job_package_map and job.id in self.job_package_map.keys(): # Wrappers and inner jobs hold_wrapper = False for inner_job in job.job_list: - valid_parents = [ parent for parent in inner_job.parents if parent not in job.job_list] - tmp = [parent for parent in valid_parents if parent.status == Status.COMPLETED ] + valid_parents = [ + parent for parent in inner_job.parents if parent not in job.job_list] + tmp = [ + parent for parent in valid_parents if parent.status == Status.COMPLETED] if len(tmp) < len(valid_parents): hold_wrapper = True job.hold = hold_wrapper @@ -1123,8 +1178,9 @@ class JobList: Log.debug( "Setting job: {0} status to: Queuing (all parents completed)...".format( job.name)) - else: # Non-wrapped jobs - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + else: # Non-wrapped jobs + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.hold = False Log.debug( @@ -1154,12 +1210,49 @@ class JobList: # Simplifying dependencies: if a parent is already an ancestor of another parent, # we remove parent dependency if not notransitive: - self.graph = transitive_reduction(self.graph) - for job in self._job_list: - children_to_remove = [child for child in job.children if child.name not in self.graph.neighbors(job.name)] - for child in children_to_remove: - job.children.remove(child) - child.parents.remove(job) + # Transitive reduction required + current_structure = None + try: + current_structure = DbStructure.get_structure( + self.expid, self._config.LOCAL_ROOT_DIR) + except Exception as exp: + pass + # print("Lengths : " + str(len(self._job_list)) + "\t" + + # str(len(current_structure.keys()))) + structure_valid = False + if ((current_structure) and (len(self._job_list) == len(current_structure.keys()))): + structure_valid = True + # print(current_structure.keys()) + # Structure exists and is valid, use it as a source of dependencies + for job in self._job_list: + if job.name not in current_structure.keys(): + structure_valid = False + continue + if structure_valid == True: + Log.info("Using existing valid structure.") + for job in self._job_list: + children_to_remove = [ + child for child in job.children if child.name not in current_structure[job.name]] + for child in children_to_remove: + job.children.remove(child) + child.parents.remove(job) + if structure_valid == False: + # Structure does not exist or it is not be updated, attempt to create it. + # print("Current: ") + # print(current_structure) + Log.info("Updating structure persistence...") + self.graph = transitive_reduction(self.graph) + for job in self._job_list: + children_to_remove = [ + child for child in job.children if child.name not in self.graph.neighbors(job.name)] + for child in children_to_remove: + job.children.remove(child) + child.parents.remove(job) + try: + DbStructure.save_structure( + self.graph, self.expid, self._config.LOCAL_ROOT_DIR) + except Exception as exp: + pass for job in self._job_list: if not job.has_parents() and new: @@ -1179,22 +1272,25 @@ class JobList: for job in self._job_list: show_logs = job.check_warnings if job.check.lower() == 'on_submission': - Log.info('Template {0} will be checked in running time'.format(job.section)) + Log.info( + 'Template {0} will be checked in running time'.format(job.section)) continue elif job.check.lower() != 'true': - Log.info('Template {0} will not be checked'.format(job.section)) + Log.info( + 'Template {0} will not be checked'.format(job.section)) continue else: if job.section in self.sections_checked: show_logs = False - if not job.check_script(as_conf, self.parameters,show_logs): + if not job.check_script(as_conf, self.parameters, show_logs): out = False self.sections_checked.add(job.section) if out: Log.result("Scripts OK") else: Log.warning("Scripts check failed") - Log.user_warning("Running after failed scripts check is at your own risk!") + Log.user_warning( + "Running after failed scripts check is at your own risk!") return out def _remove_job(self, job): @@ -1214,7 +1310,7 @@ class JobList: self._job_list.remove(job) - def rerun(self, chunk_list, notransitive=False,monitor=False): + def rerun(self, chunk_list, notransitive=False, monitor=False): """ Updates job list to rerun the jobs specified by chunk_list @@ -1226,14 +1322,17 @@ class JobList: Log.info("Adding dependencies...") dependencies = dict() for job_section in jobs_parser.sections(): - Log.debug("Reading rerun dependencies for {0} jobs".format(job_section)) + Log.debug( + "Reading rerun dependencies for {0} jobs".format(job_section)) # If does not has rerun dependencies, do nothing if not jobs_parser.has_option(job_section, "RERUN_DEPENDENCIES"): continue - dependencies_keys = jobs_parser.get(job_section, "RERUN_DEPENDENCIES").split() - dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs,job_section) + dependencies_keys = jobs_parser.get( + job_section, "RERUN_DEPENDENCIES").split() + dependencies = JobList._manage_dependencies( + dependencies_keys, self._dic_jobs, job_section) for job in self._job_list: job.status = Status.COMPLETED @@ -1249,7 +1348,7 @@ class JobList: for c in m['cs']: Log.debug("Chunk: " + c) chunk = int(c) - for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk ) ]: + for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk)]: if not job.rerun_only or chunk != previous_chunk + 1: job.status = Status.WAITING @@ -1261,7 +1360,7 @@ class JobList: for key in dependencies_keys: skip, (current_chunk, current_member, current_date) = JobList._calculate_dependency_metadata(chunk, member, date, - dependencies[key]) + dependencies[key]) if skip: continue @@ -1271,13 +1370,12 @@ class JobList: parent.status = Status.WAITING Log.debug("Parent: " + parent.name) - for job in [j for j in self._job_list if j.status == Status.COMPLETED]: if job.synchronize is None: self._remove_job(job) self.update_genealogy(notransitive=notransitive) - for job in [j for j in self._job_list if j.synchronize !=None]: + for job in [j for j in self._job_list if j.synchronize != None]: if job.status == Status.COMPLETED: job.status = Status.WAITING else: @@ -1304,9 +1402,9 @@ class JobList: self.update_genealogy(notransitive=notransitive) del self._dic_jobs - def print_with_status(self, statusChange = None, nocolor = False, existingList = None): + def print_with_status(self, statusChange=None, nocolor=False, existingList=None): """ - Returns the string representation of the dependency tree of + Returns the string representation of the dependency tree of the Job List :param statusChange: List of changes in the list, supplied in set status @@ -1321,11 +1419,13 @@ class JobList: # nocolor = True allJobs = self.get_all() if existingList is None else existingList # Header - result = (bcolors.BOLD if nocolor == False else '') + "## String representation of Job List [" + str(len(allJobs)) + "] " + result = (bcolors.BOLD if nocolor == False else '') + \ + "## String representation of Job List [" + str(len(allJobs)) + "] " if statusChange is not None: - result += "with " + (bcolors.OKGREEN if nocolor == False else '') + str(len(statusChange.keys())) + " Change(s) ##" + (bcolors.ENDC + bcolors.ENDC if nocolor == False else '') + result += "with " + (bcolors.OKGREEN if nocolor == False else '') + str(len(statusChange.keys()) + ) + " Change(s) ##" + (bcolors.ENDC + bcolors.ENDC if nocolor == False else '') else: - result += " ## " + result += " ## " # Find root root = None @@ -1336,8 +1436,9 @@ class JobList: print(root) # root exists if root is not None: - result += self._recursion_print(root, 0, visited, statusChange = statusChange, nocolor = nocolor) - else: + result += self._recursion_print(root, 0, visited, + statusChange=statusChange, nocolor=nocolor) + else: result += "\nCannot find root." return result @@ -1351,7 +1452,8 @@ class JobList: :rtype: String """ allJobs = self.get_all() - result = "## String representation of Job List [" + str(len(allJobs)) + "] ##" + result = "## String representation of Job List [" + str( + len(allJobs)) + "] ##" # Find root root = None @@ -1361,13 +1463,13 @@ class JobList: # root exists if root is not None: - result += self._recursion_print(root, 0) - else: + result += self._recursion_print(root, 0) + else: result += "\nCannot find root." return result - def _recursion_print(self, job, level, visited, statusChange = None, nocolor = False): + def _recursion_print(self, job, level, visited, statusChange=None, nocolor=False): """ Returns the list of children in a recursive way Traverses the dependency tree @@ -1377,32 +1479,38 @@ class JobList: """ result = "" if job.name not in visited: - visited.append(job.name) + visited.append(job.name) prefix = "" for i in range(level): - prefix += "| " + prefix += "| " # Prefix + Job Name - result = "\n"+ prefix + \ - (bcolors.BOLD + bcolors.CODE_TO_COLOR[job.status] if nocolor == False else '') + \ - job.name + \ - (bcolors.ENDC + bcolors.ENDC if nocolor == False else '') - if len(job._children) > 0: + result = "\n" + prefix + \ + (bcolors.BOLD + bcolors.CODE_TO_COLOR[job.status] if nocolor == False else '') + \ + job.name + \ + (bcolors.ENDC + bcolors.ENDC if nocolor == False else '') + if len(job._children) > 0: level += 1 children = job._children total_children = len(job._children) # Writes children number and status if color are not being showed result += " ~ [" + str(total_children) + (" children] " if total_children > 1 else " child] ") + \ - ("[" +Status.VALUE_TO_KEY[job.status] + "] " if nocolor == True else "") + ("[" + Status.VALUE_TO_KEY[job.status] + + "] " if nocolor == True else "") if statusChange is not None: # Writes change if performed - result += (bcolors.BOLD + bcolors.OKGREEN if nocolor == False else '') - result += (statusChange[job.name] if job.name in statusChange else "") - result += (bcolors.ENDC + bcolors.ENDC if nocolor == False else "") - + result += (bcolors.BOLD + + bcolors.OKGREEN if nocolor == False else '') + result += (statusChange[job.name] + if job.name in statusChange else "") + result += (bcolors.ENDC + + bcolors.ENDC if nocolor == False else "") + for child in children: # Continues recursion - result += self._recursion_print(child, level, visited, statusChange=statusChange, nocolor = nocolor) + result += self._recursion_print( + child, level, visited, statusChange=statusChange, nocolor=nocolor) else: - result += (" [" + Status.VALUE_TO_KEY[job.status] + "] " if nocolor == True else "") + result += (" [" + Status.VALUE_TO_KEY[job.status] + + "] " if nocolor == True else "") - return result \ No newline at end of file + return result diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 14cc0030ee06c9ea0cb97a438b2c076e5c06601a..b73e21fbad52dcbc459083b62a4604ea113c7fd0 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -26,16 +26,20 @@ from networkx import NetworkXError def transitive_reduction(graph): - if not is_directed_acyclic_graph(graph): - raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") - reduced_graph = DiGraph() - reduced_graph.add_nodes_from(graph.nodes()) - for u in graph: - u_edges = set(graph[u]) - for v in graph[u]: - u_edges -= {y for x, y in dfs_edges(graph, v)} - reduced_graph.add_edges_from((u, v) for v in u_edges) - return reduced_graph + try: + return networkx.algorithms.dag.transitive_reduction(graph) + except Exception as exp: + return None + # if not is_directed_acyclic_graph(graph): + # raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") + # reduced_graph = DiGraph() + # reduced_graph.add_nodes_from(graph.nodes()) + # for u in graph: + # u_edges = set(graph[u]) + # for v in graph[u]: + # u_edges -= {y for x, y in dfs_edges(graph, v)} + # reduced_graph.add_edges_from((u, v) for v in u_edges) + # return reduced_graph class Dependency(object): @@ -60,6 +64,3 @@ class Dependency(object): else: self.select_chunks_orig.append([]) - - -