diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 119a3f8676520587a79cc6006d1eea07b55aa536..15c8e3105aa7c1ed8c7cb1ef9edeb30d8e0e60ce 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -124,7 +124,7 @@ class Job(object): self.check_warnings = False self.packed = False self.hold = False - + self.distance_weight = 0 def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -805,7 +805,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 30' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: @@ -1370,3 +1370,6 @@ done time = int(output[index]) time = self._parse_timestamp(time) return time + + + diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 1f6cebaf50657c4b61a5afb3c3cfe3332afab49f..a412f1e74612ebd94e89b1db73806439bb600eaf 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -370,3 +370,4 @@ class DicJobs: return self._parser.get(section, option) else: return default + diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index aa9bc9702e9c15037bc3860effd8b585b8d6a30c..76220f2d5be414739ee405f7bcf67edd297943a6 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1151,6 +1151,22 @@ class JobList: if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): + for job in self.get_prepared(): + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.packed = False + save = True + Log.debug( + "Resetting job: {0} status to: READY for retrial...".format(job.name)) + if len(tmp) == len(job.parents): + job.status = Status.READY + job.packed = False + job.hold = False + save = True + Log.debug( + "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format(job.name)) Log.debug( 'Updating WAITING jobs eligible for be prepared') for job in self.get_waiting_remote_dependencies('slurm'.lower()): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 6852c83ce75683ed3db1622777169a165233ddad..575db7e9e6d511ba17b38905fb390bf522caf914 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -24,7 +24,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal from operator import attrgetter from math import ceil - +import operator class JobPackager(object): """ @@ -84,8 +84,42 @@ class JobPackager(object): Log.info("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 + def compute_weight(self,job_list): + job = self + jobs_by_section = dict() + held_jobs = self._jobs_list.get_held_jobs() + jobs_held_by_section = dict() + for job in held_jobs: + if job.section not in jobs_held_by_section: + jobs_held_by_section[job.section] = [] + jobs_held_by_section[job.section].append(job) + for job in job_list: + if job.section not in jobs_by_section: + jobs_by_section[job.section] = [] + jobs_by_section[job.section].append(job) + + for section in jobs_by_section: + if section in jobs_held_by_section.keys(): + weight=len(jobs_held_by_section[section])+1 + else: + weight = 1 + highest_completed=[] + + for job in sorted(jobs_by_section[section], key=operator.attrgetter('chunk')): + weight=weight+1 + job.distance_weight = weight + completed_jobs = 9999 + if job.has_parents() > 1: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) > completed_jobs: + completed_jobs=len(tmp) + highest_completed = [job] + else: + highest_completed.append(job) + for job in highest_completed: + job.distance_weight = job.distance_weight-1 + - #def build_packages(self, only_generate=False, jobs_filtered=[]): def build_packages(self): """ Returns the list of the built packages to be submitted @@ -101,18 +135,28 @@ class JobPackager(object): jobs_ready = self._jobs_list.get_ready(self._platform) if self.hold and len(jobs_ready) > 0: + self.compute_weight(jobs_ready) + sorted_jobs = sorted(jobs_ready, key=operator.attrgetter('distance_weight')) jobs_in_held_status = self._jobs_list.get_held_jobs( ) + self._jobs_list.get_submitted(self._platform, hold=self.hold) held_by_id = dict() for held_job in jobs_in_held_status: - held_by_id[held_job.id] = held_job + if held_job.id not in held_by_id: + held_by_id[held_job.id] = [] + held_by_id[held_job.id].append(held_job) current_held_jobs = len(held_by_id.keys()) remaining_held_slots = 10 - current_held_jobs try: - while len(jobs_ready) > remaining_held_slots: - if jobs_ready[-1].packed: - jobs_ready[-1].packed = False - del jobs_ready[-1] + while len(sorted_jobs) > remaining_held_slots: + if sorted_jobs[-1].packed: + sorted_jobs[-1].packed = False + del sorted_jobs[-1] + for job in sorted_jobs: + if job.distance_weight > 3: + sorted_jobs.remove(job) + #Log.warning("Job {1} have a weight of {0}", job.distance_weight,job.name) + jobs_ready = sorted_jobs + pass except IndexError: pass if len(jobs_ready) == 0: diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 52396c55a0d511b02b1fa0c3c475b9ae0ab9371e..80ef203fd8328b1e32a7fd064edfd7a0ebfc2b66 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -41,7 +41,7 @@ from diagram import create_bar_diagram class Monitor: """Class to handle monitoring of Jobs at HPC.""" - _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'),(Status.PREPARED, 'lightsalmon'), + _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'),(Status.PREPARED, 'skyblue'), (Status.SUBMITTED, 'cyan'), (Status.HELD, 'salmon'), (Status.QUEUING, 'pink'), (Status.RUNNING, 'green'), (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')])