From ebbe34d8b17a656fb759d87499af4602c6007b9f Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 18 Nov 2020 20:28:03 +0100 Subject: [PATCH] Fixed#613. Implemented a modified time validation for structure. If the user modifies jobs_.conf after the structure has been created, the structure will be updated. Improved job historical data retrieval algorithm. --- autosubmit/database/db_jobdata.py | 75 +++++++++++----------- autosubmit/database/db_structure.py | 24 +++++--- autosubmit/job/job.py | 96 +++++++++++++++++++---------- autosubmit/job/job_list.py | 45 ++++++++++---- 4 files changed, 145 insertions(+), 95 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 917504528..568cfb5c2 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -112,21 +112,21 @@ class ExperimentRun(): class JobStepExtraData(): def __init__(self, key, dict_data): self.key = key - self.ncpus = dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys( + self.ncpus = dict_data["ncpus"] if dict_data and isinstance(dict_data, dict) and "ncpus" in dict_data.keys( ) else 0 - self.nnodes = dict_data["nnodes"] if dict_data and "nnodes" in dict_data.keys( + self.nnodes = dict_data["nnodes"] if dict_data and isinstance(dict_data, dict) and "nnodes" in dict_data.keys( ) else 0 - self.submit = int(time.mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in dict_data.keys( + self.submit = int(time.mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and isinstance(dict_data, dict) and "submit" in dict_data.keys( + ) else int(time.time()) + self.start = int(time.mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and isinstance(dict_data, dict) and "start" in dict_data.keys( + ) else int(time.time()) + self.finish = int(time.mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and isinstance(dict_data, dict) and "finish" in dict_data.keys( + ) and dict_data["finish"] != "Unknown" else int(time.time()) + self.energy = parse_output_number(dict_data["energy"]) if dict_data and isinstance(dict_data, dict) and "energy" in dict_data.keys( ) else 0 - self.start = int(time.mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in dict_data.keys( + self.maxRSS = dict_data["MaxRSS"] if dict_data and isinstance(dict_data, dict) and "MaxRSS" in dict_data.keys( ) else 0 - self.finish = int(time.mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in dict_data.keys( - ) and dict_data["finish"] != "Unknown" else 0 - self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in dict_data.keys( - ) else 0 - self.maxRSS = dict_data["MaxRSS"] if dict_data and "MaxRSS" in dict_data.keys( - ) else 0 - self.aveRSS = dict_data["AveRSS"] if dict_data and "AveRSS" in dict_data.keys( + self.aveRSS = dict_data["AveRSS"] if dict_data and isinstance(dict_data, dict) and "AveRSS" in dict_data.keys( ) else 0 @@ -185,6 +185,8 @@ class JobData(object): try: self.extra_data = loads(extra_data) except Exception as exp: + # print(exp) + # print(extra_data) self.extra_data = "" pass self.nnodes = nnodes @@ -1201,10 +1203,10 @@ class JobDataStructure(MainDataBase): return None def process_current_run_collection(self): - """Post-process for job_data. + """Post-process source output for job_data. - Returns: - ([job_data], [warning_messaages]): job data processes, messages + :return: job data processes, messages + :rtype: ([job_data], [warning_messaages]) """ try: @@ -1252,7 +1254,7 @@ class JobDataStructure(MainDataBase): no_process = False for job_data in jobs_in_package: # If it is a wrapper job step - if "energy" in job_data.extra_data.keys() and job_data.extra_data["energy"] != "NA": + if job_data.extra_data is not None and isinstance(job_data.extra_data, dict) and job_data.extra_data.get("energy", None) and job_data.extra_data["energy"] != "NA": name_to_current_job[job_data.job_name].energy = parse_output_number( job_data.extra_data["energy"]) sum_total_energy += name_to_current_job[job_data.job_name].energy @@ -1262,7 +1264,7 @@ class JobDataStructure(MainDataBase): jobs_in_package, key=lambda x: len(str(x.extra_data))) # Identify job steps keys_step = [ - y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] + y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else [] if len(keys_step) > 0: # Steps found keys_step.sort( @@ -1273,7 +1275,7 @@ class JobDataStructure(MainDataBase): if "submit" not in description_job.extra_data[key].keys(): keys_found = False break - + # Build wrapper jobs as job steps for key in keys_step: wrapper_jobs.append(JobStepExtraData( key, description_job.extra_data[key])) @@ -1285,9 +1287,10 @@ class JobDataStructure(MainDataBase): # Approximation not_1_to_1 = False else: - # Identify main step + # No jobs steps, identify main step main_step = [ - y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] + y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else [] + # For some reason, a packaged jobs can arrive as a single job slurm output if len(main_step) > 0 and main_step[0] not in ['AveRSS', 'finish', 'ncpus', 'submit', 'MaxRSS', 'start', 'nnodes', 'energy']: # Check only first one main_step = [main_step[0]] @@ -1296,7 +1299,7 @@ class JobDataStructure(MainDataBase): if key not in description_job.extra_data.keys() or "submit" not in description_job.extra_data[key].keys(): keys_found = False break - # Build wrapper jobs + # Build wrapper jobs as job steps for key in main_step: wrapper_jobs.append(JobStepExtraData( key, description_job.extra_data[key])) @@ -1320,15 +1323,10 @@ class JobDataStructure(MainDataBase): # warning_messages.append( # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy)) # Completing job information if necessary - dropped_jobs = [] - for i in range(0, len(jobs_in_package)): - if jobs_in_package[i].running_time() <= 0: - # Needs to be completed - # Dropping job from package list - dropped_jobs.append(i) - #dropped_job = jobs_in_package.pop(i) - for j in dropped_jobs: - jobs_in_package.pop(j) + # dropped_jobs = [job_data.job_name for job_data in jobs_in_package if job_data.running_time() <= 0] + jobs_in_package = [ + job_data for job_data in jobs_in_package if job_data.running_time() > 0] + # After completion is finished, calculate total resources to be approximated resources_total = sum( z.ncpus * z.running_time() for z in jobs_in_package) * 1.0 @@ -1336,9 +1334,6 @@ class JobDataStructure(MainDataBase): for job_data in jobs_in_package: job_data_factor = ( job_data.ncpus * job_data.running_time()) - # if job_data_factor <= 0: - # warning_messages.append("Approximation | Job {0} requires {1} ncpus and has {2} running time, resulting in a 0 energy approximation. This job will be ignored.".format( - # job_data.job_name, job_data.ncpus, job_data.running_time())) name_to_current_job[job_data.job_name].energy = round(job_data_factor / resources_total * sum_total_energy, 2) # else: @@ -1346,6 +1341,8 @@ class JobDataStructure(MainDataBase): # "Approximation | Aproximation for wrapper {0} failed.".format(package)) else: # Check if it is 1 to 1 + # If it is 1 to 1, then jobs in package is equal to wrapper jobs in size, so we can assign energy based on order of jobs. + # Needs more guarantees but so far it works. if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: # It is 1 to 1 for i in range(0, len(jobs_in_package)): @@ -1357,15 +1354,10 @@ class JobDataStructure(MainDataBase): .job_name].start = wrapper_jobs[i].start name_to_current_job[jobs_in_package[i] .job_name].finish = wrapper_jobs[i].finish - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} did not have enough or precise information to calculate an exact mapping.".format(package)) - # else: - # warning_messages.append( - # "Approximation | Wrapper {0} does not have energy information, it will be ignored.".format(package)) for job_data in current_job_data: - if job_data.rowtype == 2 and len(job_data.extra_data.keys()) > 0: + # Making VERY sure + if job_data.rowtype == 2 and job_data.extra_data and isinstance(job_data.extra_data, dict) and len(job_data.extra_data) > 0: keys = [x for x in job_data.extra_data.keys() if x != "parents" and '.' not in x] if len(keys) > 0: @@ -1379,13 +1371,16 @@ class JobDataStructure(MainDataBase): continue # warning_messages.append( # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys)) + # Updating detected energy values self.update_energy_values( [job for job in current_job_data if job.require_update == True]) except Exception as exp: + # stack = traceback.extract_stack() + # (filename, line, procname, text) = stack[-1] Log.info(traceback.format_exc()) Log.warning( - "Autosubmit couldn't process the SLURM. Exception {0}".format(str(exp))) + "Autosubmit couldn't process the SLURM. ".format(str(exp))) pass def update_energy_values(self, update_job_data): diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index 5c00cddad..b0b956537 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -27,7 +27,7 @@ import traceback import sqlite3 import copy from datetime import datetime -from log.log import Log,AutosubmitError,AutosubmitCritical +from log.log import Log, AutosubmitError, AutosubmitCritical # from networkx import DiGraph #DB_FILE_AS_TIMES = "/esarchive/autosubmit/as_times.db" @@ -60,15 +60,18 @@ def get_structure(exp_id, structures_path): current_table = _get_exp_structure(db_structure_path) # print("Current table: ") # print(current_table) - current_table_structure = dict() + current_table_structure = {} for item in current_table: _from, _to = item - if _from not in current_table_structure.keys(): - current_table_structure[_from] = list() - if _to not in current_table_structure.keys(): - current_table_structure[_to] = list() - current_table_structure[_from].append(_to) - if (len(current_table_structure.keys()) > 0): + current_table_structure.setdefault(_from, []) # .append(_to) + current_table_structure.setdefault( + _to, []).append(_from) # .append(_from) + # if _from not in current_table_structure.keys(): + # current_table_structure[_from] = list() + # if _to not in current_table_structure.keys(): + # current_table_structure[_to] = list() + # current_table_structure[_from].append(_to) + if (len(current_table_structure) > 0): # print("Return structure") return current_table_structure else: @@ -105,7 +108,7 @@ def create_table(conn, create_table_sql): c = conn.cursor() c.execute(create_table_sql) except Exception as e: - Log.printlog("Create table error {0}".format(str(e)),5000) + Log.printlog("Create table error {0}".format(str(e)), 5000) def _get_exp_structure(path): @@ -123,7 +126,8 @@ def _get_exp_structure(path): rows = cur.fetchall() return rows except Exception as exp: - Log.debug("Get structure error {0}, couldn't load from storage ".format(str(exp))) + Log.debug( + "Get structure error {0}, couldn't load from storage ".format(str(exp))) Log.debug(traceback.format_exc()) return dict() diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index e183abbf2..5730bfc83 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -129,6 +129,7 @@ class Job(object): self.hold = False self.distance_weight = 0 self.level = 0 + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -520,7 +521,8 @@ class Job(object): platforms_to_test = set() if self.platform_name is None: self.platform_name = hpcarch - self._platform = submitter.platforms[self.platform_name.lower()] # serial + # serial + self._platform = submitter.platforms[self.platform_name.lower()] try: self._platform.restore_connection() except Exception as e: @@ -536,11 +538,13 @@ class Job(object): try: while (not out_exist and not err_exist) and i < retries: try: - out_exist = self._platform.check_file_exists(remote_logs[0]) # will do 5 retries + out_exist = self._platform.check_file_exists( + remote_logs[0]) # will do 5 retries except IOError as e: out_exist = False try: - err_exist = self._platform.check_file_exists(remote_logs[1]) # will do 5 retries + err_exist = self._platform.check_file_exists( + remote_logs[1]) # will do 5 retries except IOError as e: err_exists = False if not out_exist or not err_exist: @@ -549,21 +553,25 @@ class Job(object): sleep(sleeptime) if i >= retries: if not out_exist or not err_exist: - Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format(retries,remote_logs[0],remote_logs[1])) + Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format( + retries, remote_logs[0], remote_logs[1])) sleep(5) # safe wait before end a thread return if copy_remote_logs: if local_logs != remote_logs: # unifying names for log files - self.synchronize_logs(self._platform, remote_logs, local_logs) + self.synchronize_logs( + self._platform, remote_logs, local_logs) remote_logs = local_logs self._platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand try: for local_log in local_logs: - self._platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + self._platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(e.message,self.name)) + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( + e.message, self.name)) sleep(5) # safe wait before end a thread try: self._platform.closeConnection() @@ -571,7 +579,8 @@ class Job(object): pass return except AutosubmitError as e: - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( + e.message, self.name), 6001) sleep(5) # safe wait before end a thread try: self._platform.closeConnection() @@ -580,7 +589,8 @@ class Job(object): return except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( + e.message, self.name), 6001) sleep(5) # safe wait before end a thread try: self._platform.closeConnection() @@ -588,7 +598,7 @@ class Job(object): pass return - sleep(5) # safe wait before end a thread + sleep(5) # safe wait before end a thread try: self._platform.closeConnection() except: @@ -609,8 +619,9 @@ class Job(object): if new_status == Status.COMPLETED: Log.debug( "{0} job seems to have completed: checking...".format(self.name)) - if not self._platform.get_completed_files(self.name,wrapper_failed=self.packed): - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + if not self._platform.get_completed_files(self.name, wrapper_failed=self.packed): + log_name = os.path.join( + self._tmp_path, self.name + '_COMPLETED') self.check_completion() else: @@ -625,16 +636,20 @@ class Job(object): elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.FAILED: - Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format(self.name),3000) - self._platform.get_completed_files(self.name,wrapper_failed=self.packed) + Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format( + self.name), 3000) + self._platform.get_completed_files( + self.name, wrapper_failed=self.packed) self.check_completion() if self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() elif self.status == Status.UNKNOWN: - Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format(self.name),3000) - self._platform.get_completed_files(self.name,wrapper_failed=self.packed) + Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( + self.name), 3000) + self._platform.get_completed_files( + self.name, wrapper_failed=self.packed) self.check_completion(Status.UNKNOWN) if self.status == Status.UNKNOWN: Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( @@ -693,7 +708,7 @@ class Job(object): """ log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') - if os.path.exists(log_name): #TODO + if os.path.exists(log_name): # TODO self.status = Status.COMPLETED else: Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( @@ -1234,7 +1249,8 @@ class WrapperJob(Job): if self._platform.check_file_exists('WRAPPER_FAILED', wrapper_failed=True): for job in self.inner_jobs_running: if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): - Log.info("Wrapper {0} Failed, checking inner_jobs...".format(self.name)) + Log.info( + "Wrapper {0} Failed, checking inner_jobs...".format(self.name)) self.failed = True self._platform.delete_file('WRAPPER_FAILED') break @@ -1248,7 +1264,6 @@ class WrapperJob(Job): if not still_running: self.cancel_failed_wrapper_job() - def check_inner_jobs_completed(self, jobs): not_completed_jobs = [ job for job in jobs if job.status != Status.COMPLETED] @@ -1275,12 +1290,15 @@ class WrapperJob(Job): def _check_inner_jobs_queue(self, prev_status): reason = str() if self._platform.type == 'slurm': - self._platform.send_command(self._platform.get_queue_status_cmd(self.id)) - reason = self._platform.parse_queue_reason(self._platform._ssh_output, self.id) + self._platform.send_command( + self._platform.get_queue_status_cmd(self.id)) + reason = self._platform.parse_queue_reason( + self._platform._ssh_output, self.id) if self._queuing_reason_cancel(reason): - Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format(self.name,reason),6009) - #while running jobs? - self._check_running_jobs() #todo + Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format( + self.name, reason), 6009) + # while running jobs? + self._check_running_jobs() # todo self.update_failed_jobs(canceled_wrapper=True) self.cancel_failed_wrapper_job() @@ -1324,6 +1342,7 @@ class WrapperJob(Job): job.update_status(self.as_config.get_copy_remote_logs() == 'true') return True return False + def _check_running_jobs(self): not_finished_jobs_dict = OrderedDict() self.inner_jobs_running = list() @@ -1378,28 +1397,35 @@ done if len(out) > 1: if job not in self.running_jobs_start: start_time = self._check_time(out, 1) - Log.info("Job {0} started at {1}".format(jobname, str(parse_date(start_time)))) + Log.info("Job {0} started at {1}".format( + jobname, str(parse_date(start_time)))) self.running_jobs_start[job] = start_time job.new_status = Status.RUNNING #job.status = Status.RUNNING - job.update_status(self.as_config.get_copy_remote_logs() == 'true') + job.update_status( + self.as_config.get_copy_remote_logs() == 'true') if len(out) == 2: Log.info("Job {0} is RUNNING".format(jobname)) - over_wallclock = self._check_inner_job_wallclock(job) + over_wallclock = self._check_inner_job_wallclock( + job) if over_wallclock: - Log.printlog("Job {0} is FAILED".format(jobname),6009) + Log.printlog( + "Job {0} is FAILED".format(jobname), 6009) elif len(out) == 3: end_time = self._check_time(out, 2) self._check_finished_job(job) - Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) + Log.info("Job {0} finished at {1}".format( + jobname, str(parse_date(end_time)))) if content == '': sleep(wait) retries = retries - 1 temp_list = self.inner_jobs_running - self.inner_jobs_running = [job for job in temp_list if job.status == Status.RUNNING] + self.inner_jobs_running = [ + job for job in temp_list if job.status == Status.RUNNING] if retries == 0 or over_wallclock: self.status = Status.FAILED - def _check_finished_job(self, job , failed_file=False): + + def _check_finished_job(self, job, failed_file=False): if not failed_file: wait = 2 retries = 2 @@ -1418,7 +1444,7 @@ done job.update_status(self.as_config.get_copy_remote_logs() == 'true') self.running_jobs_start.pop(job, None) - def update_failed_jobs(self,canceled_wrapper=False): + def update_failed_jobs(self, canceled_wrapper=False): running_jobs = self.inner_jobs_running self.inner_jobs_running = list() for job in running_jobs: @@ -1429,11 +1455,13 @@ done self.inner_jobs_running.append(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id),6009) - self._platform.send_command(self._platform.cancel_cmd + " " + str(self.id)) + Log.printlog("Cancelling job with id {0}".format(self.id), 6009) + self._platform.send_command( + self._platform.cancel_cmd + " " + str(self.id)) for job in self.job_list: if job.status not in [Status.COMPLETED, Status.FAILED]: job.status = Status.WAITING + def _update_completed_jobs(self): for job in self.job_list: if job.status == Status.RUNNING: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 230f9c640..34f25a579 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -500,9 +500,9 @@ class JobList(object): sections_running_type_map = dict() if "&" in wrapper_jobs: - char="&" + char = "&" else: - char=" " + char = " " for section in wrapper_jobs.split(char): # RUNNING = once, as default. This value comes from jobs_.conf sections_running_type_map[section] = self._dic_jobs.get_option( @@ -1171,7 +1171,7 @@ class JobList(object): def parameters(self, value): self._parameters = value - def update_list(self, as_conf, store_change=True, fromSetStatus=False,submitter=None): + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1201,9 +1201,11 @@ class JobList(object): if len(tmp) == len(job.parents): job.status = Status.READY if submitter is not None: - job.platform = submitter.platforms[job.platform_name.lower()] + job.platform = submitter.platforms[job.platform_name.lower( + )] job.platform.test_connection() - job.platform = submitter.platforms[job.platform_name.lower()] + job.platform = submitter.platforms[job.platform_name.lower( + )] job.platform.test_connection() job.id = None @@ -1337,21 +1339,42 @@ class JobList(object): if not notransitive: # Transitive reduction required current_structure = None - if os.path.exists(os.path.join(self._config.STRUCTURES_DIR, "structure_" + self.expid + ".db")): + db_path = os.path.join( + self._config.STRUCTURES_DIR, "structure_" + self.expid + ".db") + m_time_db = None + jobs_conf_path = os.path.join( + self._config.LOCAL_ROOT_DIR, self.expid, "conf", "jobs_{0}.conf".format(self.expid)) + m_time_job_conf = None + if os.path.exists(db_path): try: current_structure = DbStructure.get_structure( self.expid, self._config.STRUCTURES_DIR) + m_time_db = os.stat(db_path).st_mtime + if os.path.exists(jobs_conf_path): + m_time_job_conf = os.stat(jobs_conf_path).st_mtime except Exception as exp: pass structure_valid = False - if ((current_structure) and (len(self._job_list) == len(current_structure.keys())) and update_structure == False): + # If there is a current structure, and the number of jobs in JobList is equal to the number of jobs in the structure + if ((current_structure) and (len(self._job_list) == len(current_structure)) and update_structure == False): structure_valid = True - # print(current_structure.keys()) + # Further validation # Structure exists and is valid, use it as a source of dependencies - for job in self._job_list: - if job.name not in current_structure.keys(): + if m_time_job_conf: + if m_time_job_conf > m_time_db: + Log.info( + "File jobs_{0}.conf has been modified since the last time the structured was cached.".format(self.expid)) structure_valid = False - continue + else: + Log.info( + "File jobs_{0}.conf was not found.".format(self.expid)) + + if structure_valid == True: + for job in self._job_list: + if current_structure.get(job.name, None) is None: + structure_valid = False + break + if structure_valid == True: Log.info("Using existing valid structure.") for job in self._job_list: -- GitLab