diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 2de65112952d5ad640e84b06aa08f08d31a3a116..b27ba1da1bb38d3445657392e2cce5906395cac1 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -38,9 +38,11 @@ import autosubmit.database.db_structure as DbStructure from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction from log.log import AutosubmitCritical, AutosubmitError, Log -from threading import Thread,Lock +from threading import Thread, Lock import multiprocessing # Log.get_logger("Log.Autosubmit") + + def threaded(fn): def wrapper(*args, **kwargs): thread = Thread(target=fn, args=args, kwargs=kwargs) @@ -49,6 +51,7 @@ def threaded(fn): return thread return wrapper + class JobList(object): """ Class to manage the list of jobs to be run by autosubmit @@ -193,7 +196,8 @@ class JobList(object): # Checking for member constraints if len(run_only_members) > 0: # Found - Log.info("Considering only members {0}".format(str(run_only_members))) + Log.info("Considering only members {0}".format( + str(run_only_members))) old_job_list = [job for job in self._job_list] self._job_list = [ job for job in old_job_list if job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, Status.READY]] @@ -205,7 +209,6 @@ class JobList(object): if jobc in self._job_list: job.children.add(jobc) - # Perhaps this should be done by default independent of the wrapper_type supplied if wrapper_type == 'vertical-mixed': self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs( @@ -838,10 +841,10 @@ class JobList(object): """ logs = dict() for job in self._job_list: - logs[job.name] = (job.local_logs,job.remote_logs) + logs[job.name] = (job.local_logs, job.remote_logs) return logs - def add_logs(self,logs): + def add_logs(self, logs): """ add logs to the current job_list @@ -856,7 +859,6 @@ class JobList(object): job.local_logs = logs[job.name][0] job.remote_logs = logs[job.name][1] - def get_ready(self, platform=None, hold=False, wrapper=False): """ Returns a list of ready jobs @@ -886,6 +888,7 @@ class JobList(object): prepared = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.PREPARED] return prepared + def get_skipped(self, platform=None): """ Returns a list of prepared jobs @@ -896,8 +899,9 @@ class JobList(object): :rtype: list """ skipped = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and - job.status == Status.SKIPPED] + job.status == Status.SKIPPED] return skipped + def get_waiting(self, platform=None, wrapper=False): """ Returns a list of jobs waiting @@ -1207,8 +1211,10 @@ class JobList(object): move(os.path.join(self._persistence_path, self._update_file), os.path.join(self._persistence_path, self._update_file + "_" + output_date)) + def get_skippable_jobs(self, jobs_in_wrapper): - job_list_skip = [job for job in self.get_job_list() if job.skippable is True and ( job.status == Status.QUEUING or job.status == Status.RUNNING or job.status == Status.COMPLETED or job.status == Status.READY) and jobs_in_wrapper.find(job.section) == -1 ] + job_list_skip = [job for job in self.get_job_list() if job.skippable is True and (job.status == Status.QUEUING or job.status == + Status.RUNNING or job.status == Status.COMPLETED or job.status == Status.READY) and jobs_in_wrapper.find(job.section) == -1] skip_by_section = dict() for job in job_list_skip: if job.section not in skip_by_section: @@ -1216,6 +1222,7 @@ class JobList(object): else: skip_by_section[job.section].append(job) return skip_by_section + @property def parameters(self): """ @@ -1229,7 +1236,7 @@ class JobList(object): def parameters(self, value): self._parameters = value - def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time = False): + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1252,7 +1259,8 @@ class JobList(object): retrials = job.retrials if job.fail_count < retrials: job.inc_fail_count() - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY job.id = None @@ -1285,7 +1293,8 @@ class JobList(object): if not fromSetStatus: all_parents_completed = [] for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] + tmp = [parent for parent in job.parents if parent.status == + Status.COMPLETED or parent.status == Status.SKIPPED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False @@ -1363,7 +1372,8 @@ class JobList(object): for section in jobs_to_skip: for job in jobs_to_skip[section]: - if job.status == Status.READY or job.status == Status.QUEUING: # Check only jobs to be pending of canceled if not started + # Check only jobs to be pending of canceled if not started + if job.status == Status.READY or job.status == Status.QUEUING: jobdate = date2str(job.date, job.date_format) if job.running == 'chunk': for related_job in jobs_to_skip[section]: @@ -1375,7 +1385,7 @@ class JobList(object): job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) except: - pass #jobid finished already + pass # jobid finished already job.status = Status.SKIPPED save = True elif job.running == 'member': @@ -1389,7 +1399,7 @@ class JobList(object): job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) except: - pass #job_id finished already + pass # job_id finished already job.status = Status.SKIPPED save = True #save = True @@ -1479,6 +1489,7 @@ class JobList(object): for job in self._job_list: if not job.has_parents() and new: job.status = Status.READY + @threaded def check_scripts_threaded(self, as_conf): """ @@ -1505,15 +1516,22 @@ class JobList(object): """ Log.info("Checking scripts...") out = True - + # Implementing checking scripts feedback to the users in a minimum of 4 messages + count = stage = 0 for job in self._job_list: + count += 1 + if (count >= len(self._job_list) / 4 * (stage + 1)) or count == len(self._job_list): + stage += 1 + Log.info("{} of {} checked".format(count, len(self._job_list))) + show_logs = job.check_warnings if job.check.lower() == 'on_submission': Log.info( 'Template {0} will be checked in running time'.format(job.section)) continue elif job.check.lower() != 'true': - Log.info('Template {0} will not be checked'.format(job.section)) + Log.info( + 'Template {0} will not be checked'.format(job.section)) continue else: if job.section in self.sections_checked: