From da084eefbdb5e4b606be60d4bddf21781bb2e277 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 May 2020 11:04:39 +0200 Subject: [PATCH 1/4] Fixed prepared jobs not being recognized as ready if all they parents finishes --- autosubmit/job/job_list.py | 16 ++++++++++++++++ autosubmit/monitor/monitor.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index aa9bc9702..76220f2d5 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -1151,6 +1151,22 @@ class JobList: if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): + for job in self.get_prepared(): + tmp = [ + parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.packed = False + save = True + Log.debug( + "Resetting job: {0} status to: READY for retrial...".format(job.name)) + if len(tmp) == len(job.parents): + job.status = Status.READY + job.packed = False + job.hold = False + save = True + Log.debug( + "A job in prepared status has all parent completed, job: {0} status set to: READY ...".format(job.name)) Log.debug( 'Updating WAITING jobs eligible for be prepared') for job in self.get_waiting_remote_dependencies('slurm'.lower()): diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 52396c55a..80ef203fd 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -41,7 +41,7 @@ from diagram import create_bar_diagram class Monitor: """Class to handle monitoring of Jobs at HPC.""" - _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'),(Status.PREPARED, 'lightsalmon'), + _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'),(Status.PREPARED, 'skyblue'), (Status.SUBMITTED, 'cyan'), (Status.HELD, 'salmon'), (Status.QUEUING, 'pink'), (Status.RUNNING, 'green'), (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')]) -- GitLab From 6a2ab5fd740b7749cc67415e6e287af872832f97 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 3 Jun 2020 16:43:37 +0200 Subject: [PATCH 2/4] more work to presubmission --- autosubmit/job/job.py | 19 ++++++++++++++++++- autosubmit/job/job_dict.py | 1 + autosubmit/job/job_packager.py | 7 +++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 119a3f867..4e9bdc950 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -124,7 +124,7 @@ class Job(object): self.check_warnings = False self.packed = False self.hold = False - + self.distance_weight = 0 def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -1063,6 +1063,20 @@ class Job(object): parent.children.remove(self) self.parents.remove(parent) + def compute_weight(self): + job = self + parent_not_completed = True + while job.has_parents() > 1 and parent_not_completed: + tmp_parents = list(self.parents)[1:] + for parent in tmp_parents: + Log.info("Job name is {0}, while parent is {1}", job.name,parent.name) + if parent.status == Status.COMPLETED: + parent_not_completed = False + if parent_not_completed: + self.distance_weight=self.distance_weight+1 + job = tmp_parents[0] # first parent + + def synchronize_logs(self, platform, remote_logs, local_logs): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err @@ -1370,3 +1384,6 @@ done time = int(output[index]) time = self._parse_timestamp(time) return time + + + diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 1f6cebaf5..a412f1e74 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -370,3 +370,4 @@ class DicJobs: return self._parser.get(section, option) else: return default + diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 6852c83ce..346f31a22 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -24,7 +24,7 @@ from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, Jo JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal from operator import attrgetter from math import ceil - +import operator class JobPackager(object): """ @@ -85,7 +85,6 @@ class JobPackager(object): jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 - #def build_packages(self, only_generate=False, jobs_filtered=[]): def build_packages(self): """ Returns the list of the built packages to be submitted @@ -101,6 +100,10 @@ class JobPackager(object): jobs_ready = self._jobs_list.get_ready(self._platform) if self.hold and len(jobs_ready) > 0: + for job in jobs_ready: + job.compute_weight() + sorted_jobs = sorted(jobs_ready, key=operator.attrgetter('distance_weight')) + jobs_in_held_status = self._jobs_list.get_held_jobs( ) + self._jobs_list.get_submitted(self._platform, hold=self.hold) held_by_id = dict() -- GitLab From 31a2cd829c9c68f2a8bc6b2e665567940e4ad003 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 4 Jun 2020 15:08:05 +0200 Subject: [PATCH 3/4] Another update --- autosubmit/job/job.py | 14 --------- autosubmit/job/job_packager.py | 55 +++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4e9bdc950..f29cff868 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1063,20 +1063,6 @@ class Job(object): parent.children.remove(self) self.parents.remove(parent) - def compute_weight(self): - job = self - parent_not_completed = True - while job.has_parents() > 1 and parent_not_completed: - tmp_parents = list(self.parents)[1:] - for parent in tmp_parents: - Log.info("Job name is {0}, while parent is {1}", job.name,parent.name) - if parent.status == Status.COMPLETED: - parent_not_completed = False - if parent_not_completed: - self.distance_weight=self.distance_weight+1 - job = tmp_parents[0] # first parent - - def synchronize_logs(self, platform, remote_logs, local_logs): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 346f31a22..da8fc62bd 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -84,6 +84,41 @@ class JobPackager(object): Log.info("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 + def compute_weight(self,job_list): + job = self + jobs_by_section = dict() + held_jobs = self._jobs_list.get_held_jobs() + jobs_held_by_section = dict() + for job in held_jobs: + if job.section not in jobs_held_by_section: + jobs_held_by_section[job.section] = [] + jobs_held_by_section[job.section].append(job) + for job in job_list: + if job.section not in jobs_by_section: + jobs_by_section[job.section] = [] + jobs_by_section[job.section].append(job) + + for section in jobs_by_section: + if section in jobs_held_by_section.keys(): + weight=len(jobs_held_by_section[section])+1 + else: + weight = 1 + highest_completed=[] + + for job in sorted(jobs_by_section[section], key=operator.attrgetter('chunk')): + weight=weight+1 + job.distance_weight = weight + completed_jobs = 9999 + if job.has_parents() > 1: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) > completed_jobs: + completed_jobs=len(tmp) + highest_completed = [job] + else: + highest_completed.append(job) + for job in highest_completed: + job.distance_weight = job.distance_weight-1 + def build_packages(self): """ @@ -100,22 +135,26 @@ class JobPackager(object): jobs_ready = self._jobs_list.get_ready(self._platform) if self.hold and len(jobs_ready) > 0: - for job in jobs_ready: - job.compute_weight() + self.compute_weight(jobs_ready) sorted_jobs = sorted(jobs_ready, key=operator.attrgetter('distance_weight')) - jobs_in_held_status = self._jobs_list.get_held_jobs( ) + self._jobs_list.get_submitted(self._platform, hold=self.hold) held_by_id = dict() for held_job in jobs_in_held_status: - held_by_id[held_job.id] = held_job + if held_job.id not in held_by_id: + held_by_id[held_job.id] = [] + held_by_id[held_job.id].append(held_job) current_held_jobs = len(held_by_id.keys()) remaining_held_slots = 10 - current_held_jobs try: - while len(jobs_ready) > remaining_held_slots: - if jobs_ready[-1].packed: - jobs_ready[-1].packed = False - del jobs_ready[-1] + while len(sorted_jobs) > remaining_held_slots: + if sorted_jobs[-1].packed: + sorted_jobs[-1].packed = False + del sorted_jobs[-1] + for job in sorted_jobs: + if job.distance_weight > 4: + sorted_jobs.remove(job) + jobs_ready = sorted_jobs except IndexError: pass if len(jobs_ready) == 0: -- GitLab From 2a1b0b846fbad54b85a2ad000c9ab3dd86cc066c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 8 Jun 2020 13:46:02 +0200 Subject: [PATCH 4/4] Broken job.held --- autosubmit/job/job.py | 2 +- autosubmit/job/job_packager.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index f29cff868..15c8e3105 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -805,7 +805,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 30' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index da8fc62bd..575db7e9e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -152,9 +152,11 @@ class JobPackager(object): sorted_jobs[-1].packed = False del sorted_jobs[-1] for job in sorted_jobs: - if job.distance_weight > 4: + if job.distance_weight > 3: sorted_jobs.remove(job) + #Log.warning("Job {1} have a weight of {0}", job.distance_weight,job.name) jobs_ready = sorted_jobs + pass except IndexError: pass if len(jobs_ready) == 0: -- GitLab