From 0f193346bb8c5f1bff82de8e0a221829826e019c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 8 Nov 2019 16:33:17 +0100 Subject: [PATCH 01/13] Work In progress, there are some issues first approach will be only direct childs job --- autosubmit/autosubmit.py | 81 ++++++++++++++++++----- autosubmit/config/config_common.py | 3 +- autosubmit/config/files/autosubmit.conf | 11 ++- autosubmit/job/job.py | 1 + autosubmit/job/job_common.py | 6 +- autosubmit/job/job_list.py | 41 ++++++++++-- autosubmit/job/job_packager.py | 54 ++++----------- autosubmit/job/job_packages.py | 26 ++++---- autosubmit/monitor/monitor.py | 8 ++- autosubmit/platforms/ecplatform.py | 2 +- autosubmit/platforms/locplatform.py | 2 +- autosubmit/platforms/paramiko_platform.py | 21 ++++-- autosubmit/platforms/platform.py | 2 +- autosubmit/platforms/psplatform.py | 2 +- autosubmit/platforms/saga_platform.py | 4 +- autosubmit/platforms/slurmplatform.py | 18 ++--- test/unit/test_job_common.py | 2 + 17 files changed, 184 insertions(+), 100 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index fccf961ea..08374452f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1219,20 +1219,17 @@ class Autosubmit: save = False for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() + packages_to_submit = JobPackager(as_conf, platform, job_list).build_packages(hold=False) + if not inspect: platform.open_submit_script() valid_packages_to_submit = [] + valid_packages_to_submit_held =[] for package in packages_to_submit: try: - if hasattr(package, "name"): - if remote_dependencies_dict and package.name in remote_dependencies_dict['dependencies']: - remote_dependency = remote_dependencies_dict['dependencies'][package.name] - remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] - package.set_job_dependency(remote_dependency_id) if not only_wrappers: try: - package.submit(as_conf, job_list.parameters, inspect) + package.submit(as_conf, job_list.parameters, inspect,hold=False) valid_packages_to_submit.append(package) except (IOError,OSError): #write error file @@ -1250,8 +1247,7 @@ class Autosubmit: package._wallclock, package._num_processors, package.platform, as_conf) job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + if isinstance(package, JobPackageThread): # If it is instance of JobPackageThread, then it is JobPackageVertical. packages_persistence.save(package.name, package.jobs, package._expid, inspect) @@ -1262,12 +1258,46 @@ class Autosubmit: except Exception: Log.error("{0} submission failed due to Unknown error", platform.name) raise + if as_conf.get_remote_dependencies: + packages_remote_dependencies = JobPackager(as_conf, platform, job_list).build_packages(hold=True) + for package in packages_remote_dependencies: + try: + if not only_wrappers: + try: + package.submit(as_conf, job_list.parameters, inspect,hold=True) + valid_packages_to_submit_held.append(package) + except (IOError, OSError): + # write error file + continue + if only_wrappers or inspect: + for innerJob in package._jobs: + # Setting status to COMPLETED so it does not get stuck in the loop that calls this function + innerJob.status = Status.COMPLETED + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.READY, 0, + package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + + if isinstance(package, JobPackageThread): + # If it is instance of JobPackageThread, then it is JobPackageVertical. + packages_persistence.save(package.name, package.jobs, package._expid, inspect) + save = True + except WrongTemplateException as e: + Log.error("Invalid parameter substitution in {0} template", e.job_name) + raise + except Exception: + Log.error("{0} submission failed due to Unknown error", platform.name) + raise if platform.type == "slurm" and not inspect and not only_wrappers: try: save = True if len(valid_packages_to_submit) > 0: - jobs_id = platform.submit_Script() + jobs_id = platform.submit_Script(hold=False) if jobs_id is None: raise BaseException("Exiting AS being unable to get jobID") i = 0 @@ -1285,23 +1315,40 @@ class Autosubmit: package._wallclock, package._num_processors, package.platform, as_conf) job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict[ - 'name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 - - + save = True + if len(valid_packages_to_submit_held) > 0: + jobs_id = platform.submit_Script(hold=True) + if jobs_id is None: + raise BaseException("Exiting AS being unable to get jobID") + i = 0 + for package in valid_packages_to_submit_held: + for job in package.jobs: + job.id = str(jobs_id[i]) + Log.info("{0} submitted", job.name) + job.status = Status.SUBMITTED + job.write_submit_time() + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, + package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + if isinstance(package, JobPackageThread): + # Saving only when it is a real multi job package + packages_persistence.save(package.name, package.jobs, package._expid, inspect) + i += 1 except WrongTemplateException as e: Log.error("Invalid parameter substitution in {0} template", e.job_name) raise except Exception: Log.error("{0} submission failed", platform.name) raise - - return save @staticmethod diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4c5681b82..3fbd88890 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1049,8 +1049,7 @@ class AutosubmitConfig(object): :return: if remote dependencies :rtype: bool """ - return self._conf_parser.get_option('wrapper', 'DEPENDENCIES', 'false').lower() == 'true' - + return self._conf_parser.get('config', 'DEPENDENCIES', 'false').lower() == 'true' def get_wrapper_type(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 7c5bf3c43..03209656a 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -14,12 +14,21 @@ TOTALJOBS = 6 # Time (seconds) between connections to the HPC queue scheduler to poll already submitted jobs status # Default = 10 SAFETYSLEEPTIME = 10 -# Number of retrials if a job fails. Can ve override at job level +# Number of retrials if a job fails. Can be override at job level # Default = 0 RETRIALS = 0 # Default output type for CREATE, MONITOR, SET STATUS, RECOVERY. Available options: pdf, svg, png, ps, txt # Default = pdf OUTPUT = pdf +# Allow to send jobs earlier +# Default = False +DEPENDENCIES = FALSE +# Basic Configuration of wrapper +# Types avaliable: Horizontal,vertical,vertical-mixed,horizontal-vertical +#[wrapper] +#TYPE = Vertical +#JOBS_IN_WRAPPER = SIM + [mail] # Enable mail notifications diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9edb4ce08..56bb77709 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -100,6 +100,7 @@ class Job(object): self._platform = None self.check = 'True' self.packed = False + self.hold = False def __getstate__(self): diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index f779b453b..5a2a09bbd 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -29,13 +29,14 @@ class Status: QUEUING = 3 RUNNING = 4 COMPLETED = 5 + HELD = 6 FAILED = -1 UNKNOWN = -2 SUSPENDED = -3 ####### # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', - 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED'} + 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD'} def retval(self, value): return getattr(self, value) @@ -57,9 +58,10 @@ class bcolors: QUEUING = '\033[35;1m' RUNNING = '\033[32m' COMPLETED = '\033[33m' + HELD = '\033[35;1m' FAILED = '\033[31m' SUSPENDED = '\033[31;1m' - CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED} + CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD} diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index e9ceca8df..b852758b8 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -643,7 +643,7 @@ class JobList: :rtype: list """ return [job for job in self._job_list] - def get_ready(self, platform=None): + def get_ready(self, platform=None, hold=False): """ Returns a list of ready jobs @@ -653,7 +653,7 @@ class JobList: :rtype: list """ return [job for job in self._job_list if (platform is None or job.platform is platform) and - job.status == Status.READY] + job.status == Status.READY and job.hold is hold] def get_waiting(self, platform=None): """ @@ -666,7 +666,29 @@ class JobList: """ return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.WAITING] + def get_waiting(self, platform=None): + """ + Returns a list of jobs waiting + :param platform: job platform + :type platform: HPCPlatform + :return: waiting jobs + :rtype: list + """ + return [job for job in self._job_list if (platform is None or job.platform is platform) and + job.status == Status.WAITING] + + def get_waiting_remote_dependencies(self, platform_type='slurm'): + """ + Returns a list of jobs waiting on slurm scheduler + + :param platform: job platform + :type platform: HPCPlatform + :return: waiting jobs + :rtype: list + """ + return [job for job in self._job_list if (job.platform.type is platform_type and + job.status == Status.WAITING)] def get_unknown(self, platform=None): """ Returns a list of jobs on unknown state @@ -926,13 +948,24 @@ class JobList: Log.debug('Updating WAITING jobs') - for job in self.get_waiting(): - if not fromSetStatus: + if not fromSetStatus: + for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY save = True Log.debug("Setting job: {0} status to: READY (all parents completed)...".format(job.name)) + if as_conf.get_remote_dependencies(): + Log.debug('Updating WAITING jobs eligible for remote_dependencies') + for job in self.get_waiting_remote_dependencies(): + tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and (parent.section is job.section or parent.check.lower() is not 'on_submission')] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.hold = True + save = True + Log.debug("Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) + Log.debug('Updating WAITING jobs eligible for remote_dependencies') + Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 5a7603150..2920b4bd2 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -54,7 +54,6 @@ class JobPackager(object): # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() # True or False - self.remote_dependencies = self._as_config.get_remote_dependencies() self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform))) @@ -63,7 +62,7 @@ class JobPackager(object): Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 - def build_packages(self,only_generate=False, jobs_filtered=[]): + def build_packages(self,only_generate=False, jobs_filtered=[],hold=False): """ Returns the list of the built packages to be submitted @@ -71,18 +70,17 @@ class JobPackager(object): :rtype: List() of JobPackageVertical """ packages_to_submit = list() - remote_dependencies_dict = dict() # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty if only_generate: jobs_to_submit = jobs_filtered else: - jobs_ready = self._jobs_list.get_ready(self._platform) + jobs_ready = self._jobs_list.get_ready(self._platform,hold) if jobs_ready == 0: # If there are no jobs ready, result is tuple of empty - return packages_to_submit, remote_dependencies_dict + return packages_to_submit if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): # If there is no more space in platform, result is tuple of empty - return packages_to_submit, remote_dependencies_dict + return packages_to_submit # Sort by 6 first digits of date available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) @@ -103,11 +101,11 @@ class JobPackager(object): max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) if self.wrapper_type in ['vertical', 'vertical-mixed']: - built_packages, remote_dependencies_dict = self._build_vertical_packages(jobs_to_submit_by_section[section], + built_packages = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) packages_to_submit += built_packages elif self.wrapper_type == 'horizontal': - built_packages, remote_dependencies_dict = self._build_horizontal_packages(jobs_to_submit_by_section[section], + built_packages = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) packages_to_submit += built_packages @@ -123,7 +121,7 @@ class JobPackager(object): package = JobPackageSimple([job]) packages_to_submit.append(package) - return packages_to_submit, remote_dependencies_dict + return packages_to_submit def _divide_list_by_section(self, jobs_list): """ @@ -149,8 +147,6 @@ class JobPackager(object): def _build_horizontal_packages(self, section_list, max_wrapped_jobs, section): packages = [] - remote_dependencies_dict = dict() - horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, max_wrapped_jobs, self.max_jobs, self._platform.processors_per_node) @@ -167,13 +163,9 @@ class JobPackager(object): current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources) packages.append(current_package) - if self.remote_dependencies and current_package: - remote_dependencies_dict['name_to_id'] = dict() - remote_dependencies_dict['dependencies'] = dict() - packages += horizontal_packager.get_next_packages(section, potential_dependency=current_package.name, - remote_dependencies_dict=remote_dependencies_dict) - return packages, remote_dependencies_dict + + return packages def _build_vertical_packages(self, section_list, max_wrapped_jobs): """ @@ -187,13 +179,6 @@ class JobPackager(object): :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) """ packages = [] - potential_dependency = None - remote_dependencies_dict = dict() - # True when autosubmit_.conf value [wrapper].DEPENDENCIES has been set to true - if self.remote_dependencies: - remote_dependencies_dict['name_to_id'] = dict() - remote_dependencies_dict['dependencies'] = dict() - for job in section_list: if self.max_jobs > 0: if job.packed is False: @@ -213,20 +198,12 @@ class JobPackager(object): if job.status is Status.READY: packages.append(JobPackageVertical(jobs_list)) else: - package = JobPackageVertical(jobs_list, potential_dependency) + package = JobPackageVertical(jobs_list, None) packages.append(package) - # Possible need of "if self.remote_dependencies here" - remote_dependencies_dict['name_to_id'][potential_dependency] = -1 - remote_dependencies_dict['dependencies'][package.name] = potential_dependency - if self.remote_dependencies: - # Sending last item in list of packaged - child = job_vertical_packager.get_wrappable_child(jobs_list[-1]) - if child is not None: - section_list.insert(section_list.index(job) + 1, child) - potential_dependency = packages[-1].name + else: break - return packages, remote_dependencies_dict + return packages def _build_hybrid_package(self, jobs_list, max_wrapped_jobs, section): jobs_resources = dict() @@ -541,7 +518,7 @@ class JobPackagerHorizontal(object): jobname = jobname.split('_')[-1] return self._sort_order_dict[jobname] - def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, remote_dependencies_dict=dict(),horizontal_vertical=False,max_procs=0): + def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, packages_remote_dependencies=list(),horizontal_vertical=False,max_procs=0): packages = [] job = max(self.job_list, key=attrgetter('total_wallclock')) wallclock = job.wallclock @@ -573,10 +550,7 @@ class JobPackagerHorizontal(object): if total_wallclock > max_wallclock: return packages packages.append(package_jobs) - if remote_dependencies_dict: - current_package = JobPackageHorizontal(package_jobs, potential_dependency) - remote_dependencies_dict['name_to_id'][potential_dependency] = -1 - remote_dependencies_dict['dependencies'][current_package.name] = potential_dependency + else: break diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 4303f7bbc..fa7c6d56c 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -75,7 +75,7 @@ class JobPackageBase(object): """ return self._platform - def submit(self, configuration, parameters,only_generate=False): + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n :type configuration: AutosubmitConfig object \n @@ -102,7 +102,7 @@ class JobPackageBase(object): else: self._create_scripts(configuration) self._send_files() - self._do_submission() + self._do_submission(job_scripts=None,hold=hold) def _create_scripts(self, configuration): @@ -111,7 +111,7 @@ class JobPackageBase(object): def _send_files(self): raise Exception('Not implemented') - def _do_submission(self): + def _do_submission(self,job_scripts=None, hold=False): raise Exception('Not implemented') @@ -133,13 +133,13 @@ class JobPackageSimple(JobPackageBase): for job in self.jobs: self.platform.send_file(self._job_scripts[job.name]) - def _do_submission(self, job_scripts=None): + def _do_submission(self, job_scripts=None, hold=False): if job_scripts is None: job_scripts = self._job_scripts for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - job.id = self.platform.submit_job(job, job_scripts[job.name]) + job.id = self.platform.submit_job(job, job_scripts[job.name], hold) if job.id is None: continue Log.info("{0} submitted", job.name) @@ -166,10 +166,10 @@ class JobPackageSimpleWrapped(JobPackageSimple): for job in self.jobs: self.platform.send_file(self._job_wrapped_scripts[job.name]) - def _do_submission(self, job_scripts=None): + def _do_submission(self, job_scripts=None, hold=False): if job_scripts is None: job_scripts = self._job_wrapped_scripts - super(JobPackageSimpleWrapped, self)._do_submission(job_scripts) + super(JobPackageSimpleWrapped, self)._do_submission(job_scripts, hold=False) class JobPackageArray(JobPackageBase): @@ -221,12 +221,12 @@ class JobPackageArray(JobPackageBase): self.platform.send_file(self._job_inputs[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold) if package_id is None: return @@ -316,7 +316,7 @@ class JobPackageThread(JobPackageBase): self.platform.send_file(self._job_scripts[job.name], check=False) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: @@ -328,7 +328,7 @@ class JobPackageThread(JobPackageBase): self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold) if package_id is None: return @@ -400,12 +400,12 @@ class JobPackageThreadWrapped(JobPackageThread): self.platform.send_file(self._job_scripts[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold) if package_id is None: raise Exception('Submission failed') diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index eec9a3652..7d17e9d94 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -42,7 +42,7 @@ from diagram import create_bar_diagram class Monitor: """Class to handle monitoring of Jobs at HPC.""" _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'), - (Status.SUBMITTED, 'cyan'), (Status.QUEUING, 'lightpink'), (Status.RUNNING, 'green'), + (Status.SUBMITTED, 'cyan'), (Status.HELD, 'pink'), (Status.QUEUING, 'lightpink'), (Status.RUNNING, 'green'), (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')]) @staticmethod @@ -61,12 +61,15 @@ class Monitor: return Monitor._table[Status.READY] elif status == Status.SUBMITTED: return Monitor._table[Status.SUBMITTED] + elif status == Status.HELD: + return Monitor._table[Status.HELD] elif status == Status.QUEUING: return Monitor._table[Status.QUEUING] elif status == Status.RUNNING: return Monitor._table[Status.RUNNING] elif status == Status.COMPLETED: return Monitor._table[Status.COMPLETED] + elif status == Status.FAILED: return Monitor._table[Status.FAILED] elif status == Status.SUSPENDED: @@ -98,12 +101,15 @@ class Monitor: fillcolor=self._table[Status.READY])) legend.add_node(pydotplus.Node(name='SUBMITTED', shape='box', style="filled", fillcolor=self._table[Status.SUBMITTED])) + legend.add_node(pydotplus.Node(name='HELD', shape='box', style="filled", + fillcolor=self._table[Status.HELD])) legend.add_node(pydotplus.Node(name='QUEUING', shape='box', style="filled", fillcolor=self._table[Status.QUEUING])) legend.add_node(pydotplus.Node(name='RUNNING', shape='box', style="filled", fillcolor=self._table[Status.RUNNING])) legend.add_node(pydotplus.Node(name='COMPLETED', shape='box', style="filled", fillcolor=self._table[Status.COMPLETED])) + legend.add_node(pydotplus.Node(name='FAILED', shape='box', style="filled", fillcolor=self._table[Status.FAILED])) legend.add_node(pydotplus.Node(name='SUSPENDED', shape='box', style="filled", diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 7900d8787..bff993360 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -111,7 +111,7 @@ class EcPlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): return self._checkjob_cmd + str(job_id) - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self._submit_cmd + job_script def connect(self): diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index f2766be55..0f3d1852b 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -80,7 +80,7 @@ class LocalPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self.get_call(job_script, job) def get_checkjob_cmd(self, job_id): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index b6f3d6372..1f4cdfacc 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -256,7 +256,7 @@ class ParamikoPlatform(Platform): os.path.join(self.get_files_path(), dest))) return False - def submit_job(self, job, script_name): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -264,11 +264,13 @@ class ParamikoPlatform(Platform): :type job: autosubmit.job.job.Job :param script_name: job script's name :rtype scriptname: str + :param hold: send job hold + :type hold: boolean :return: job id for the submitted job :rtype: int """ if self.type == 'slurm': - self.get_submit_cmd(script_name, job) + self.get_submit_cmd(script_name, job, hold) return None else: if self.send_command(self.get_submit_cmd(script_name, job)): @@ -277,7 +279,7 @@ class ParamikoPlatform(Platform): return int(job_id) else: return None - def submit_Script(self): + def submit_Script(self, hold=False): """ Sends a SubmitfileScript, exec in platform and retrieve the Jobs_ID. @@ -325,6 +327,7 @@ class ParamikoPlatform(Platform): job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: job_status = Status.QUEUING + elif job_status in self.job_status['FAILED']: job_status = Status.FAILED else: @@ -392,11 +395,15 @@ class ParamikoPlatform(Platform): job.new_status=Status.FAILED job.update_status(remote_logs) return - Log.info("Job {0} is QUEUING {1}", job.name, reason) + if reason is "(JobHeldUser)": + job.new_status=Status.HELD + Log.info("Job {0} is HELD", job.name) + else: + Log.info("Job {0} is QUEUING {1}", job.name, reason) else: for job in job_list: job_status = Status.UNKNOWN - Log.warning('check_job() The job id ({0}) from platform {1} has an status of {1}.', job_id, self.name, job_status) + Log.warning('check_job() The job id ({0}) from platform {1} has an status of {1}.', job.id, self.name, job_status) job.new_status=job_status def get_checkjob_cmd(self, job_id): @@ -514,13 +521,15 @@ class ParamikoPlatform(Platform): pass - def get_submit_cmd(self, job_script, job_type): + def get_submit_cmd(self, job_script, job_type,hold=False): """ Get command to add job to scheduler :param job_type: :param job_script: path to job script :param job_script: str + :param hold: submit a job in a held status + :param hold: boolean :return: command to submit job to platforms :rtype: str """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index b117f88ce..0a2a0c79a 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -301,7 +301,7 @@ class Platform(object): path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path - def submit_job(self, job, scriptname): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index 83fc6a18a..9861db07e 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -74,7 +74,7 @@ class PsPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self.get_call(job_script, job) def get_checkjob_cmd(self, job_id): diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 3d1f1e88b..6495a7abd 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -199,7 +199,7 @@ class SagaPlatform(Platform): except saga.DoesNotExist: return True - def submit_job(self, job, scriptname): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -210,7 +210,7 @@ class SagaPlatform(Platform): :return: job id for the submitted job :rtype: int """ - saga_job = self.create_saga_job(job, scriptname) + saga_job = self.create_saga_job(job, script_name) saga_job.run() return saga_job.id diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c40850616..757b0cf76 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -63,20 +63,16 @@ class SlurmPlatform(ParamikoPlatform): return os.path.join(BasicConfig.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) - def submit_Script(self): + def submit_Script(self,hold=False): """ - Sends a SubmitfileScript, execute it in the platform and retrieves the Jobs_ID of all jobs at once. + Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. :param job: job object :type job: autosubmit.job.job.Job :return: job id for submitted jobs :rtype: list(int) """ - self.send_file(self.get_submit_script(),False) - - #cmd = '(cd '+self.get_files_path()+';'+' ./'+os.path.basename(self._submit_script_path)+')' - cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) if self.send_command(cmd): jobs_id = self.get_submitted_job_id(self.get_ssh_output()) @@ -93,6 +89,8 @@ class SlurmPlatform(ParamikoPlatform): self.cancel_cmd = "scancel" self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir) + self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir) + self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir @@ -130,8 +128,12 @@ class SlurmPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): - self._submit_script_file.write(self._submit_cmd + job_script + "\n") + def get_submit_cmd(self, job_script, job, hold=False): + if not hold: + self._submit_script_file.write(self._submit_cmd + job_script + "\n") + else: + self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" ) + def get_checkjob_cmd(self, job_id): diff --git a/test/unit/test_job_common.py b/test/unit/test_job_common.py index a2b7b86a6..76ed7a62e 100644 --- a/test/unit/test_job_common.py +++ b/test/unit/test_job_common.py @@ -15,6 +15,8 @@ class TestJobCommon(TestCase): self.assertEquals('WAITING', Status.VALUE_TO_KEY[Status.WAITING]) self.assertEquals('READY', Status.VALUE_TO_KEY[Status.READY]) self.assertEquals('SUBMITTED', Status.VALUE_TO_KEY[Status.SUBMITTED]) + self.assertEquals('HELD', Status.VALUE_TO_KEY[Status.HELD]) self.assertEquals('QUEUING', Status.VALUE_TO_KEY[Status.QUEUING]) self.assertEquals('RUNNING', Status.VALUE_TO_KEY[Status.RUNNING]) self.assertEquals('COMPLETED', Status.VALUE_TO_KEY[Status.COMPLETED]) + -- GitLab From c281abf6559a0d19025aa6345b3feb00876cd964 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 12 Nov 2019 12:53:16 +0100 Subject: [PATCH 02/13] Simple package workflow working, need to test with a real case --- autosubmit/autosubmit.py | 80 ++++------------------- autosubmit/job/job.py | 3 + autosubmit/job/job_common.py | 2 +- autosubmit/job/job_list.py | 55 ++++++++++------ autosubmit/job/job_packager.py | 5 +- autosubmit/job/job_packages.py | 10 +-- autosubmit/monitor/monitor.py | 2 +- autosubmit/platforms/paramiko_platform.py | 33 ++++++---- 8 files changed, 78 insertions(+), 112 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 08374452f..e4b96b5d7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -963,7 +963,7 @@ class Autosubmit: Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): # Sending only_wrappers = True - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers,hold=False) job_list.update_list(as_conf, False) @@ -1168,15 +1168,16 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True - if job_list.update_list(as_conf) or save: job_list.save() - - - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=False): job_list.save() + if as_conf.get_remote_dependencies(): + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=True): + job_list.save() + if Autosubmit.exit: return 2 time.sleep(safetysleeptime) @@ -1197,7 +1198,7 @@ class Autosubmit: @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, - only_wrappers=False): + only_wrappers=False,hold=False): """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -1218,18 +1219,17 @@ class Autosubmit: """ save = False for platform in platforms_to_test: - Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) - packages_to_submit = JobPackager(as_conf, platform, job_list).build_packages(hold=False) + Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform,hold=hold)), platform.name) + packages_to_submit = JobPackager(as_conf, platform, job_list).build_packages(hold=hold) if not inspect: platform.open_submit_script() valid_packages_to_submit = [] - valid_packages_to_submit_held =[] for package in packages_to_submit: try: if not only_wrappers: try: - package.submit(as_conf, job_list.parameters, inspect,hold=False) + package.submit(as_conf, job_list.parameters, inspect,hold=hold) valid_packages_to_submit.append(package) except (IOError,OSError): #write error file @@ -1258,53 +1258,18 @@ class Autosubmit: except Exception: Log.error("{0} submission failed due to Unknown error", platform.name) raise - if as_conf.get_remote_dependencies: - packages_remote_dependencies = JobPackager(as_conf, platform, job_list).build_packages(hold=True) - for package in packages_remote_dependencies: - try: - if not only_wrappers: - try: - package.submit(as_conf, job_list.parameters, inspect,hold=True) - valid_packages_to_submit_held.append(package) - except (IOError, OSError): - # write error file - continue - if only_wrappers or inspect: - for innerJob in package._jobs: - # Setting status to COMPLETED so it does not get stuck in the loop that calls this function - innerJob.status = Status.COMPLETED - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.READY, 0, - package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - - if isinstance(package, JobPackageThread): - # If it is instance of JobPackageThread, then it is JobPackageVertical. - packages_persistence.save(package.name, package.jobs, package._expid, inspect) - save = True - except WrongTemplateException as e: - Log.error("Invalid parameter substitution in {0} template", e.job_name) - raise - except Exception: - Log.error("{0} submission failed due to Unknown error", platform.name) - raise if platform.type == "slurm" and not inspect and not only_wrappers: try: save = True if len(valid_packages_to_submit) > 0: - jobs_id = platform.submit_Script(hold=False) + jobs_id = platform.submit_Script(hold=hold) if jobs_id is None: raise BaseException("Exiting AS being unable to get jobID") i = 0 for package in valid_packages_to_submit: for job in package.jobs: job.id = str(jobs_id[i]) - Log.info("{0} submitted", job.name) job.status = Status.SUBMITTED job.write_submit_time() if hasattr(package, "name"): @@ -1320,29 +1285,6 @@ class Autosubmit: packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 save = True - if len(valid_packages_to_submit_held) > 0: - jobs_id = platform.submit_Script(hold=True) - if jobs_id is None: - raise BaseException("Exiting AS being unable to get jobID") - i = 0 - for package in valid_packages_to_submit_held: - for job in package.jobs: - job.id = str(jobs_id[i]) - Log.info("{0} submitted", job.name) - job.status = Status.SUBMITTED - job.write_submit_time() - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, - package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - if isinstance(package, JobPackageThread): - # Saving only when it is a real multi job package - packages_persistence.save(package.name, package.jobs, package._expid, inspect) - i += 1 except WrongTemplateException as e: Log.error("Invalid parameter substitution in {0} template", e.job_name) raise diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 56bb77709..d9386156f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -506,6 +506,8 @@ class Job(object): self.update_status(copy_remote_logs,True) return Log.info("Job {0} is QUEUING {1}", self.name, reason) + elif self.status is Status.HELD: + Log.info("Job {0} is HELD", self.name) elif self.status is Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status is Status.COMPLETED: @@ -1038,6 +1040,7 @@ class WrapperJob(Job): reason) self.cancel_failed_wrapper_job() return + Log.info("Job {0} is QUEUING {1}", self.name, reason) self.status = status if status in [Status.FAILED, Status.UNKNOWN]: diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 5a2a09bbd..0522cfd6a 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -58,7 +58,7 @@ class bcolors: QUEUING = '\033[35;1m' RUNNING = '\033[32m' COMPLETED = '\033[33m' - HELD = '\033[35;1m' + HELD = '\033[34;1m' FAILED = '\033[31m' SUSPENDED = '\033[31;1m' CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD} diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b852758b8..3c2bdbd80 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -664,31 +664,35 @@ class JobList: :return: waiting jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + waiting_jobs= [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.WAITING] - def get_waiting(self, platform=None): + return waiting_jobs + + def get_waiting_remote_dependencies(self, platform_type='slurm'.lower()): """ - Returns a list of jobs waiting + Returns a list of jobs waiting on slurm scheduler :param platform: job platform :type platform: HPCPlatform :return: waiting jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and - job.status == Status.WAITING] + waiting_jobs= [job for job in self._job_list if (job.platform.type == platform_type and + job.status == Status.WAITING)] + return waiting_jobs - def get_waiting_remote_dependencies(self, platform_type='slurm'): + def get_held_jobs(self,platform = None): """ - Returns a list of jobs waiting on slurm scheduler + Returns a list of jobs in the platforms (Held) :param platform: job platform :type platform: HPCPlatform - :return: waiting jobs + :return: jobs in platforms :rtype: list """ - return [job for job in self._job_list if (job.platform.type is platform_type and - job.status == Status.WAITING)] + return [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.HELD] + def get_unknown(self, platform=None): """ Returns a list of jobs on unknown state @@ -715,7 +719,7 @@ class JobList: def get_in_queue(self, platform=None): """ - Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown) + Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held) :param platform: job platform :type platform: HPCPlatform @@ -723,7 +727,7 @@ class JobList: :rtype: list """ return self.get_submitted(platform) + self.get_running(platform) + self.get_queuing( - platform) + self.get_unknown(platform) + platform) + self.get_unknown(platform) + self.get_held_jobs(platform) def get_not_in_queue(self, platform=None): """ @@ -933,10 +937,7 @@ class JobList: Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY - - for job in self.get_completed(): - if job.synchronize is not None: #and job in self.get_active(): Log.debug('Updating SYNC jobs') tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] @@ -945,26 +946,40 @@ class JobList: save = True Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) Log.debug('Update finished') - - Log.debug('Updating WAITING jobs') + if as_conf.get_remote_dependencies(): + all_parents_completed=[] if not fromSetStatus: for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY + job.hold = False save = True Log.debug("Setting job: {0} status to: READY (all parents completed)...".format(job.name)) + if as_conf.get_remote_dependencies(): + all_parents_completed.append(job.name) if as_conf.get_remote_dependencies(): Log.debug('Updating WAITING jobs eligible for remote_dependencies') - for job in self.get_waiting_remote_dependencies(): - tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and (parent.section is job.section or parent.check.lower() is not 'on_submission')] + for job in self.get_waiting_remote_dependencies('slurm'.lower()): + if job.name not in all_parents_completed: + tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or (parent.status == Status.QUEUING and not parent.hold) or parent.status == Status.RUNNING) and (parent.section is job.section or parent.check.lower() is not 'on_submission')] if len(tmp) == len(job.parents): job.status = Status.READY job.hold = True save = True Log.debug("Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) - Log.debug('Updating WAITING jobs eligible for remote_dependencies') + Log.debug('Updating Held jobs') + for job in self.get_held_jobs(): + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.hold = False + Log.debug( + "Setting job: {0} status to: Queuing (all parents completed)...".format( + job.name)) + else: + job.hold = True + save= True Log.debug('Update finished') diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 2920b4bd2..185689ca8 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -48,9 +48,8 @@ class JobPackager(object): self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf # .total_jobs Maximum number of jobs at the same time - self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) + self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) - # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() # True or False @@ -74,7 +73,7 @@ class JobPackager(object): if only_generate: jobs_to_submit = jobs_filtered else: - jobs_ready = self._jobs_list.get_ready(self._platform,hold) + jobs_ready = self._jobs_list.get_ready(self._platform,hold=hold) if jobs_ready == 0: # If there are no jobs ready, result is tuple of empty return packages_to_submit diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index fa7c6d56c..af0101fbb 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -139,7 +139,7 @@ class JobPackageSimple(JobPackageBase): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - job.id = self.platform.submit_job(job, job_scripts[job.name], hold) + job.id = self.platform.submit_job(job, job_scripts[job.name], hold=hold) if job.id is None: continue Log.info("{0} submitted", job.name) @@ -169,7 +169,7 @@ class JobPackageSimpleWrapped(JobPackageSimple): def _do_submission(self, job_scripts=None, hold=False): if job_scripts is None: job_scripts = self._job_wrapped_scripts - super(JobPackageSimpleWrapped, self)._do_submission(job_scripts, hold=False) + super(JobPackageSimpleWrapped, self)._do_submission(job_scripts, hold=hold) class JobPackageArray(JobPackageBase): @@ -226,7 +226,7 @@ class JobPackageArray(JobPackageBase): self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script, hold) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: return @@ -328,7 +328,7 @@ class JobPackageThread(JobPackageBase): self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script, hold) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: return @@ -405,7 +405,7 @@ class JobPackageThreadWrapped(JobPackageThread): self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script, hold) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: raise Exception('Submission failed') diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 7d17e9d94..a2d35c7fc 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -42,7 +42,7 @@ from diagram import create_bar_diagram class Monitor: """Class to handle monitoring of Jobs at HPC.""" _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'), - (Status.SUBMITTED, 'cyan'), (Status.HELD, 'pink'), (Status.QUEUING, 'lightpink'), (Status.RUNNING, 'green'), + (Status.SUBMITTED, 'cyan'), (Status.HELD, 'salmon'), (Status.QUEUING, 'pink'), (Status.RUNNING, 'green'), (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')]) @staticmethod diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 1f4cdfacc..89613f05e 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -270,7 +270,7 @@ class ParamikoPlatform(Platform): :rtype: int """ if self.type == 'slurm': - self.get_submit_cmd(script_name, job, hold) + self.get_submit_cmd(script_name, job, hold=hold) return None else: if self.send_command(self.get_submit_cmd(script_name, job)): @@ -325,9 +325,10 @@ class ParamikoPlatform(Platform): job_status = Status.COMPLETED elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING - elif job_status in self.job_status['QUEUING']: + elif job_status in self.job_status['QUEUING'] and job.hold is False: job_status = Status.QUEUING - + elif job_status in self.job_status['QUEUING'] and job.hold is True: + job_status = Status.HELD elif job_status in self.job_status['FAILED']: job_status = Status.FAILED else: @@ -356,7 +357,7 @@ class ParamikoPlatform(Platform): Log.warning('Retrying check job command: {0}', cmd) Log.warning('Can not get job status for all jobs, retrying in 3 sec') sleep(3) - Log.debug('Successful check job command: {0}', cmd) + Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: in_queue_jobs=[] list_queue_jobid="" @@ -373,6 +374,10 @@ class ParamikoPlatform(Platform): elif job_status in self.job_status['QUEUING']: job_status = Status.QUEUING if self.type == "slurm": + if job.status == Status.HELD: + job_status = Status.HELD + if not job.hold: + self.send_command("scontrol release "+"{0}".format(job_id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS list_queue_jobid += str(job.id) + ',' in_queue_jobs.append(job) elif job_status in self.job_status['FAILED']: @@ -392,18 +397,22 @@ class ParamikoPlatform(Platform): if job.queuing_reason_cancel(reason): Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) self.send_command(self.platform.cancel_cmd + " {0}".format(job.id)) - job.new_status=Status.FAILED + job.new_status = Status.FAILED job.update_status(remote_logs) return - if reason is "(JobHeldUser)": + elif reason == '(JobHeldUser)': job.new_status=Status.HELD Log.info("Job {0} is HELD", job.name) + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", job.name) + job.new_status = Status.WAITING + job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) else: Log.info("Job {0} is QUEUING {1}", job.name, reason) else: for job in job_list: job_status = Status.UNKNOWN - Log.warning('check_job() The job id ({0}) from platform {1} has an status of {1}.', job.id, self.name, job_status) + Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) job.new_status=job_status def get_checkjob_cmd(self, job_id): @@ -440,9 +449,8 @@ class ParamikoPlatform(Platform): if self._ssh is None: if not self.connect(): return None - timeout = 120.0 + timeout = 600.0 try: - stdin, stdout, stderr = self._ssh.exec_command(command) channel = stdout.channel channel.settimeout(timeout) @@ -476,10 +484,9 @@ class ParamikoPlatform(Platform): stdout.close() stderr.close() self._ssh_output = "" - if len(stdout_chunks) > 0: - for s in stdout_chunks: - if s is not None: - self._ssh_output += s + for s in stdout_chunks: + if s is not '': + self._ssh_output += s for errorLine in stderr_readlines: if errorLine.find("submission failed") != -1: Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) -- GitLab From 7207413ae7e949481bd338f9d5633ed26e4e02a6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 15 Nov 2019 09:57:56 +0100 Subject: [PATCH 03/13] Working on wrappers, still some bugs and pipeline not working --- autosubmit/autosubmit.py | 17 ++-- autosubmit/job/job.py | 98 ++++++++++++++++------- autosubmit/job/job_list.py | 2 +- autosubmit/job/job_packager.py | 8 +- autosubmit/job/job_packages.py | 4 + autosubmit/platforms/paramiko_platform.py | 10 ++- 6 files changed, 94 insertions(+), 45 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e4b96b5d7..118e78531 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -938,7 +938,7 @@ class Autosubmit: :rtype: \n """ job_list._job_list=jobs_filtered - job_list.update_list(as_conf,False) + # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object @@ -1219,25 +1219,26 @@ class Autosubmit: """ save = False for platform in platforms_to_test: - Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform,hold=hold)), platform.name) - packages_to_submit = JobPackager(as_conf, platform, job_list).build_packages(hold=hold) - + Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform, hold=hold)), platform.name) + packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages(hold=hold) if not inspect: platform.open_submit_script() + else: + packages_to_submit += JobPackager(as_conf, platform, job_list, hold=True).build_packages(hold=True) valid_packages_to_submit = [] for package in packages_to_submit: try: if not only_wrappers: try: - package.submit(as_conf, job_list.parameters, inspect,hold=hold) + package.submit(as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) - except (IOError,OSError): - #write error file + except (IOError, OSError): + # write error file continue if only_wrappers or inspect: for innerJob in package._jobs: # Setting status to COMPLETED so it does not get stuck in the loop that calls this function - innerJob.status=Status.COMPLETED + innerJob.status = Status.COMPLETED if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d9386156f..957343ffc 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -493,21 +493,35 @@ class Job(object): self.check_completion() else: self.status = new_status - if self.status is Status.QUEUING: + if self.status is Status.QUEUING or self.status is Status.HELD: if iswrapper: reason = str() if self.platform.type == 'slurm': self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output) + reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) if self._queuing_reason_cancel(reason): Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, reason) self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) self.new_status =Status.FAILED self.update_status(copy_remote_logs,True) return - Log.info("Job {0} is QUEUING {1}", self.name, reason) - elif self.status is Status.HELD: - Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldUser)': + self.new_status = Status.HELD + self.hold = True + Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", self.name) + self.hold = True + self.new_status = Status.READY + self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + self.update_failed_held() + else: + self.hold = False + if self.status is Status.QUEUING: + Log.info("Job {0} is QUEUING {1}", self.name, reason) + else: + if self.status is Status.HELD: + Log.info("Job {0} is HELD", self.name) elif self.status is Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status is Status.COMPLETED: @@ -552,7 +566,7 @@ class Job(object): def update_children_status(self): children = list(self.children) for child in children: - if child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: + if child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: #TODO add held? child.status = Status.FAILED children += list(child.children) @@ -1012,6 +1026,7 @@ class WrapperJob(Job): self.as_config = as_config # save start time, wallclock and processors?! self.checked_time = datetime.datetime.now() + self.hold = False def _queuing_reason_cancel(self, reason): try: if len(reason.split('(', 1)) > 1: @@ -1028,30 +1043,43 @@ class WrapperJob(Job): return False def check_status(self, status): - if status != self.status: - if status == Status.QUEUING: - reason = str() - if self.platform.type == 'slurm': - self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) - - if self._queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, - reason) - self.cancel_failed_wrapper_job() - return - + self.status = status + if status == Status.QUEUING or status == Status.HELD: + reason = str() + if self.platform.type == 'slurm': + self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) + reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) + if self._queuing_reason_cancel(reason): + Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, + reason) + self.cancel_failed_wrapper_job() + return + elif reason == '(JobHeldUser)': + if self.hold is False: + self.platform.send_command("scontrol release " + "{0}".format(self.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + self.status = Status.QUEUING + else: + Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", self.name) + self.hold = False + self.status = Status.WAITING + self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + else: + self.hold = False + self.status = Status.QUEUING Log.info("Job {0} is QUEUING {1}", self.name, reason) - self.status = status - if status in [Status.FAILED, Status.UNKNOWN]: - self.cancel_failed_wrapper_job() - self.update_failed_jobs() - elif status == Status.COMPLETED: - self.check_inner_jobs_completed(self.job_list) - elif status == Status.RUNNING: - time.sleep(3) - Log.debug('Checking inner jobs status') - self.check_inner_job_status() + self.update_inner_jobs_held(self.hold) + else: + if status in [Status.FAILED, Status.UNKNOWN]: + self.cancel_failed_wrapper_job() + self.update_failed_jobs() + elif status == Status.COMPLETED: + self.check_inner_jobs_completed(self.job_list) + elif status == Status.RUNNING or status == Status.SUBMITTED: + time.sleep(3) + Log.debug('Checking inner jobs status') + self.check_inner_job_status() def check_inner_job_status(self): self._check_running_jobs() @@ -1143,6 +1171,18 @@ class WrapperJob(Job): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] for job in not_finished_jobs: self._check_finished_job(job) + def update_inner_jobs_held(self,hold): + jobs = [job for job in self.job_list if job.status in [ Status.SUBMITTED or Status.Held or Status.QUEUING]] + for job in jobs: + if hold: + job.status = Status.HELD + else: + job.status = Status.QUEUING + jobs_held_admin = [job for job in self.job_list if job.status in [ Status.WAITING ]] + for job in jobs_held_admin: + if hold: + job.status = Status.WAITING + job.hold = False def _check_wrapper_status(self): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 3c2bdbd80..024f793e2 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -963,7 +963,7 @@ class JobList: Log.debug('Updating WAITING jobs eligible for remote_dependencies') for job in self.get_waiting_remote_dependencies('slurm'.lower()): if job.name not in all_parents_completed: - tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or (parent.status == Status.QUEUING and not parent.hold) or parent.status == Status.RUNNING) and (parent.section is job.section or parent.check.lower() is not 'on_submission')] + tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or (parent.status == Status.QUEUING and not parent.hold and parent.name.lower() not in "setup") or parent.status == Status.RUNNING)] if len(tmp) == len(job.parents): job.status = Status.READY job.hold = True diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 185689ca8..183683564 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -38,7 +38,7 @@ class JobPackager(object): :type jobs_list: JobList object. """ - def __init__(self, as_config, platform, jobs_list): + def __init__(self, as_config, platform, jobs_list,hold=False): self._as_config = as_config self._platform = platform self._jobs_list = jobs_list @@ -55,10 +55,10 @@ class JobPackager(object): # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() - Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform))) + Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform,hold=hold))) Log.debug("Number of jobs available: {0}", self._max_wait_jobs_to_submit) - if len(jobs_list.get_ready(platform)) > 0: - Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) + if len(jobs_list.get_ready(platform,hold=hold)) > 0: + Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform,hold=hold))) self._maxTotalProcessors = 0 def build_packages(self,only_generate=False, jobs_filtered=[],hold=False): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index af0101fbb..95aae9c36 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -327,6 +327,8 @@ class JobPackageThread(JobPackageBase): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) + if hold: + job.hold = hold package_id = self.platform.submit_job(None, self._common_script, hold=hold) @@ -404,6 +406,8 @@ class JobPackageThreadWrapped(JobPackageThread): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) + if hold: + job.hold = hold package_id = self.platform.submit_job(None, self._common_script, hold=hold) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 89613f05e..999ff74fa 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -372,12 +372,15 @@ class ParamikoPlatform(Platform): elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: - job_status = Status.QUEUING + if self.type == "slurm": if job.status == Status.HELD: - job_status = Status.HELD if not job.hold: self.send_command("scontrol release "+"{0}".format(job_id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + job_status = Status.QUEUING + job_status = Status.HELD + else: + job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' in_queue_jobs.append(job) elif job_status in self.job_status['FAILED']: @@ -492,7 +495,8 @@ class ParamikoPlatform(Platform): Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) return False if not ignore_log: - Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + if len(stderr_readlines) > 0: + Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) return True -- GitLab From ec0053b35aca7a72b62c67d398ab8edba3b26c28 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 18 Nov 2019 10:40:10 +0100 Subject: [PATCH 04/13] change hold --- autosubmit/job/job.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 957343ffc..181dd0ee3 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1069,14 +1069,15 @@ class WrapperJob(Job): self.hold = False self.status = Status.QUEUING Log.info("Job {0} is QUEUING {1}", self.name, reason) - self.update_inner_jobs_held(self.hold) + self.update_inner_jobs_queue(self.hold) else: - if status in [Status.FAILED, Status.UNKNOWN]: + + if self.status in [Status.FAILED, Status.UNKNOWN]: self.cancel_failed_wrapper_job() self.update_failed_jobs() - elif status == Status.COMPLETED: + elif self.status == Status.COMPLETED: self.check_inner_jobs_completed(self.job_list) - elif status == Status.RUNNING or status == Status.SUBMITTED: + elif self.status == Status.RUNNING or status == Status.SUBMITTED: time.sleep(3) Log.debug('Checking inner jobs status') self.check_inner_job_status() @@ -1171,13 +1172,14 @@ class WrapperJob(Job): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] for job in not_finished_jobs: self._check_finished_job(job) - def update_inner_jobs_held(self,hold): - jobs = [job for job in self.job_list if job.status in [ Status.SUBMITTED or Status.Held or Status.QUEUING]] + def update_inner_jobs_queue(self,hold): + jobs = [job for job in self.job_list if job.status in [ Status.SUBMITTED or Status.HELD or Status.QUEUING]] for job in jobs: if hold: job.status = Status.HELD else: job.status = Status.QUEUING + job.hold = hold jobs_held_admin = [job for job in self.job_list if job.status in [ Status.WAITING ]] for job in jobs_held_admin: if hold: -- GitLab From c44a43aa3491193544ddfe40cd63f49d77d78f2d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 19 Nov 2019 10:01:07 +0100 Subject: [PATCH 05/13] change config --- autosubmit/config/config_common.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3fbd88890..9c5947403 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -467,18 +467,18 @@ class AutosubmitConfig(object): if parser.has_option(section, 'PLATFORM'): result = result and parser.check_is_choice(section, 'PLATFORM', False, platforms) - if parser.has_option(section, 'DEPENDENCIES'): - for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).split(' '): - if '-' in dependency: - dependency = dependency.split('-')[0] - elif '+' in dependency: - dependency = dependency.split('+')[0] - if '[' in dependency: - dependency = dependency[:dependency.find('[')] - if dependency not in sections: - Log.error( - 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, - dependency)) + # if parser.has_option(section, 'DEPENDENCIES'): #TODO if any breaks is there + # for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).split(' '): + # if '-' in dependency: + # dependency = dependency.split('-')[0] + # elif '+' in dependency: + # dependency = dependency.split('+')[0] + # if '[' in dependency: + # dependency = dependency[:dependency.find('[')] + # if dependency not in sections: + # Log.error( + # 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, + # dependency)) if parser.has_option(section, 'RERUN_DEPENDENCIES'): for dependency in str(parser.get_option(section, 'RERUN_DEPENDENCIES', @@ -1049,7 +1049,11 @@ class AutosubmitConfig(object): :return: if remote dependencies :rtype: bool """ - return self._conf_parser.get('config', 'DEPENDENCIES', 'false').lower() == 'true' + config_value = self._conf_parser.get_option('config', 'DEPENDENCIES', 'false').lower() + if config_value is "true": + return True + else: + return False def get_wrapper_type(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config -- GitLab From bc09413465504dc45d70054331b361f181587c02 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 21 Nov 2019 09:21:47 +0100 Subject: [PATCH 06/13] More fixes now appear to work with wrappers but the workflow differs a bit, Last a design issue --- autosubmit/autosubmit.py | 3 +-- autosubmit/config/config_common.py | 2 +- autosubmit/platforms/paramiko_platform.py | 12 ++++++------ autosubmit/platforms/slurmplatform.py | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 118e78531..3fa20dcde 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1149,6 +1149,7 @@ class Autosubmit: if platform.type == "slurm" and list_jobid!="": slurm.append([platform,list_jobid,list_prevStatus,completed_joblist]) #END LOOP + #CHECK ALL JOBS for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] @@ -1158,8 +1159,6 @@ class Autosubmit: prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] - - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 9c5947403..5c385d0a3 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1050,7 +1050,7 @@ class AutosubmitConfig(object): :rtype: bool """ config_value = self._conf_parser.get_option('config', 'DEPENDENCIES', 'false').lower() - if config_value is "true": + if config_value == "true": return True else: return False diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 999ff74fa..42f8c78ce 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -357,6 +357,7 @@ class ParamikoPlatform(Platform): Log.warning('Retrying check job command: {0}', cmd) Log.warning('Can not get job status for all jobs, retrying in 3 sec') sleep(3) + job_list_status = self.get_ssh_output() Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: in_queue_jobs=[] @@ -364,21 +365,19 @@ class ParamikoPlatform(Platform): for job in job_list: job_id=job.id job_status = Status.UNKNOWN - - job_status = self.parse_Alljobs_output(self.get_ssh_output(),job_id) + job_status = self.parse_Alljobs_output(job_list_status,job_id) # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: - if self.type == "slurm": if job.status == Status.HELD: + job_status = Status.HELD if not job.hold: self.send_command("scontrol release "+"{0}".format(job_id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS job_status = Status.QUEUING - job_status = Status.HELD else: job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' @@ -395,8 +394,9 @@ class ParamikoPlatform(Platform): if self.type == 'slurm' and len(in_queue_jobs) > 0: cmd=self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) + queue_status=self._ssh_output for job in in_queue_jobs: - reason = self.parse_queue_reason(self._ssh_output,job.id) + reason = self.parse_queue_reason(queue_status,job.id) if job.queuing_reason_cancel(reason): Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) self.send_command(self.platform.cancel_cmd + " {0}".format(job.id)) @@ -488,7 +488,7 @@ class ParamikoPlatform(Platform): stderr.close() self._ssh_output = "" for s in stdout_chunks: - if s is not '': + if s != '': self._ssh_output += s for errorLine in stderr_readlines: if errorLine.find("submission failed") != -1: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 757b0cf76..cdacf2090 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -140,7 +140,7 @@ class SlurmPlatform(ParamikoPlatform): return 'sacct -n -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): - return "sacct -n -X -j {1} -o 'jobid,State'".format(self.host, jobs_id) + return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) -- GitLab From fcdab4f4e52735ce2c3bb88fb532ade36d09c01b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 21 Nov 2019 09:38:07 +0100 Subject: [PATCH 07/13] Fixing Test-pipeline --- test/unit/test_wrappers.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 9dc6daf7c..b0604c42d 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -218,7 +218,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -267,7 +267,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -313,7 +313,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -359,7 +359,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -398,7 +398,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -462,7 +462,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -522,7 +522,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -584,7 +584,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -645,7 +645,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -706,7 +706,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -778,7 +778,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) -- GitLab From 9a260ab47e0e2671c25c522c94078038fac1e827 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 22 Nov 2019 11:08:19 +0100 Subject: [PATCH 08/13] updates to wrappers --- autosubmit/config/config_common.py | 12 +++++++++--- autosubmit/config/files/autosubmit.conf | 10 ++++++++-- autosubmit/job/job_packager.py | 4 +++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 5c385d0a3..d27f15db1 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1071,16 +1071,22 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + def get_min_wrapper_jobs(self): + """ + Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file + + :return: minim number of jobs (or total jobs) + :rtype: int + """ + return int(self._conf_parser.get_option('wrapper', 'MIN_WRAPPED', 2)) def get_max_wrapped_jobs(self): """ Returns the maximum number of jobs that can be wrapped together as configured in autosubmit's config file :return: maximum number of jobs (or total jobs) - :rtype: string + :rtype: int """ - #return int(self._conf_parser.get_option('wrapper', 'MAXWRAPPEDJOBS', self.get_total_jobs())) - return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) def get_wrapper_check_time(self): """ diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 03209656a..12c1d9fc8 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -23,12 +23,18 @@ OUTPUT = pdf # Allow to send jobs earlier # Default = False DEPENDENCIES = FALSE + # Basic Configuration of wrapper -# Types avaliable: Horizontal,vertical,vertical-mixed,horizontal-vertical +# Types available: Horizontal,vertical,vertical-mixed,horizontal-vertical +# JOBS_IN_WRAPPER = Sections that should be wrapped together ex SIM +# MIN_WRAPPED set the minim number of jobs that should be included in the wrapper. DEFAULT = 2 +# MAX_WRAPPED set the maxim number of jobs that should be included in the wrapper. DEFAULT = TOTALJOBS + #[wrapper] #TYPE = Vertical #JOBS_IN_WRAPPER = SIM - +#MIN_WRAPPED = 2 +#MAX_WRAPPED = 9999 [mail] # Enable mail notifications diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 183683564..98dc5e5e4 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -98,6 +98,7 @@ class JobPackager(object): and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) + min_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs())) if self.wrapper_type in ['vertical', 'vertical-mixed']: built_packages = self._build_vertical_packages(jobs_to_submit_by_section[section], @@ -119,7 +120,6 @@ class JobPackager(object): else: package = JobPackageSimple([job]) packages_to_submit.append(package) - return packages_to_submit def _divide_list_by_section(self, jobs_list): @@ -174,6 +174,8 @@ class JobPackager(object): :type section_list: List() of Job Objects. \n :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type max_wrapped_jobs: Integer. \n + :param min_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n + :type min_wrapped_jobs: Integer. \n :return: List of Wrapper Packages, Dictionary that details dependencies. \n :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) """ -- GitLab From 437b1a1411883a76a3fdb2037fc408795ba8780e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 22 Nov 2019 16:38:31 +0100 Subject: [PATCH 09/13] Wrapper processors are bugged, wrappers now have a minim number of innerjobs inside --- autosubmit/autosubmit.py | 2 -- autosubmit/config/config_common.py | 2 +- autosubmit/job/job_packager.py | 37 ++++++++++++++++++++++++------ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 3fa20dcde..3ded1b8ba 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1222,8 +1222,6 @@ class Autosubmit: packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages(hold=hold) if not inspect: platform.open_submit_script() - else: - packages_to_submit += JobPackager(as_conf, platform, job_list, hold=True).build_packages(hold=True) valid_packages_to_submit = [] for package in packages_to_submit: try: diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d27f15db1..e27e1e8a7 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1071,7 +1071,7 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') - def get_min_wrapper_jobs(self): + def get_min_wrapped_jobs(self): """ Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 98dc5e5e4..0b93d3c4b 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -90,6 +90,7 @@ class JobPackager(object): jobs_to_submit = list_of_available[0:num_jobs_to_submit] # print(len(jobs_to_submit)) jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) + for section in jobs_to_submit_by_section: # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] @@ -98,20 +99,42 @@ class JobPackager(object): and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) - min_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs())) + + dependencies_keys = self._as_config.jobs_parser.get(section, "DEPENDENCIES").split() + hard_limit_wrapper = max_wrapped_jobs + for k in dependencies_keys: + if "-" in k: + k_divided = k.split("-") + if k_divided[0] not in self.jobs_in_wrapper: + number = int(k_divided[1].strip(" ")) + if number < hard_limit_wrapper: + hard_limit_wrapper = number + min_wrapped_jobs = min(self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED",self._as_config.get_min_wrapped_jobs()),hard_limit_wrapper) + + + if self.wrapper_type in ['vertical', 'vertical-mixed']: - built_packages = self._build_vertical_packages(jobs_to_submit_by_section[section], + built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) - packages_to_submit += built_packages elif self.wrapper_type == 'horizontal': - built_packages = self._build_horizontal_packages(jobs_to_submit_by_section[section], + built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - packages_to_submit += built_packages elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: - built_packages = self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - packages_to_submit.append(built_packages) + built_packages_tmp = self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section) + + built_packages = [] + for p in built_packages_tmp: + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif p.jobs[0].chunk > self._jobs_list._chunk_list[-1] - ( int(self._jobs_list._chunk_list[-1]) % min_wrapped_jobs): #Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, let their innerjob be wrapped again. + for job in p.jobs: + job.packed = False + packages_to_submit += built_packages + else: # No wrapper allowed / well-configured for job in jobs_to_submit_by_section[section]: -- GitLab From fbc4e413a31cd1b4de88a3a63d09837257b65066 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 29 Nov 2019 10:06:33 +0100 Subject: [PATCH 10/13] A lot of fixes for inspect and -cw command and wrappers in general, also retry system fixed and dependencies finished? --- autosubmit/autosubmit.py | 18 +++++++-- autosubmit/job/job.py | 45 +++++++++++++---------- autosubmit/job/job_list.py | 4 +- autosubmit/job/job_packager.py | 4 +- autosubmit/job/job_packages.py | 2 +- autosubmit/platforms/paramiko_platform.py | 7 ++-- 6 files changed, 49 insertions(+), 31 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 3ded1b8ba..7880f7633 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -960,11 +960,11 @@ class Autosubmit: job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) # Loading parameters again - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): # Sending only_wrappers = True Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers,hold=False) - job_list.update_list(as_conf, False) + job_list.update_list(as_conf, False,False) @@ -1113,7 +1113,10 @@ class Autosubmit: if job_list.job_package_map and job_id in job_list.job_package_map: Log.debug('Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] + if as_conf.get_remote_dependencies(): + wrapper_job.hold = wrapper_job.job_list[0].hold check_wrapper = True + if wrapper_job.status == Status.RUNNING: check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now() - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False if check_wrapper: @@ -1121,7 +1124,11 @@ class Autosubmit: platform.check_job(wrapper_job) Log.info( 'Wrapper job ' + wrapper_job.name + ' is ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) + wrapper_job.check_status(wrapper_job.new_status) + if wrapper_job.status == Status.WAITING: # if job failed to be held, delete it from packages table + job_list.job_package_map.pop(job_id, None) + job_list.packages_dict.pop(job_id, None) save = True else: Log.info("Waiting for wrapper check time: {0}\n", check_wrapper_jobs_sleeptime) @@ -1422,8 +1429,9 @@ class Autosubmit: packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) - jobs_wr = copy.deepcopy(jobs) - [job for job in jobs_wr if (job.status != Status.COMPLETED)] + jobs_wr_aux = copy.deepcopy(jobs) + jobs_wr = [] + [jobs_wr.append(job) for job in jobs_wr_aux if (job.status == Status.READY or job.status == Status.WAITING)] for job in jobs_wr: for child in job.children: if child not in jobs_wr: @@ -1439,6 +1447,8 @@ class Autosubmit: packages_persistence, True) packages = packages_persistence.load(True) + packages+= JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() else: packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).load() diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 181dd0ee3..06d2d699c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -512,9 +512,8 @@ class Job(object): elif reason == '(JobHeldAdmin)': Log.info("Job {0} Failed to be HELD, canceling... ", self.name) self.hold = True - self.new_status = Status.READY + self.new_status = Status.WAITING self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) - self.update_failed_held() else: self.hold = False if self.status is Status.QUEUING: @@ -727,11 +726,11 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 8' elif self.type == Type.PYTHON: - template = 'time.sleep(5)' + template = 'time.sleep(8)' elif self.type == Type.R: - template = 'Sys.sleep(5)' + template = 'Sys.sleep(8)' else: template = '' @@ -1058,18 +1057,21 @@ class WrapperJob(Job): if self.hold is False: self.platform.send_command("scontrol release " + "{0}".format(self.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS self.status = Status.QUEUING + Log.info("Job {0} is QUEUING {1}", self.name, reason) else: + self.status = Status.HELD Log.info("Job {0} is HELD", self.name) elif reason == '(JobHeldAdmin)': Log.info("Job {0} Failed to be HELD, canceling... ", self.name) self.hold = False self.status = Status.WAITING self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + else: self.hold = False self.status = Status.QUEUING Log.info("Job {0} is QUEUING {1}", self.name, reason) - self.update_inner_jobs_queue(self.hold) + self.update_inner_jobs_queue(self.hold) else: if self.status in [Status.FAILED, Status.UNKNOWN]: @@ -1077,11 +1079,13 @@ class WrapperJob(Job): self.update_failed_jobs() elif self.status == Status.COMPLETED: self.check_inner_jobs_completed(self.job_list) - elif self.status == Status.RUNNING or status == Status.SUBMITTED: - time.sleep(3) + elif self.status == Status.RUNNING: + #time.sleep(3) + self.update_inner_jobs_queue(self.hold) Log.debug('Checking inner jobs status') self.check_inner_job_status() + def check_inner_job_status(self): self._check_running_jobs() self.check_inner_jobs_completed(self.running_jobs_start.keys()) @@ -1156,7 +1160,7 @@ class WrapperJob(Job): Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) self._check_finished_job(job) else: - Log.debug("Job {0} is SUBMITTED and waiting for dependencies".format(jobname)) + Log.debug("Job {0} is {1} and waiting for dependencies".format(jobname,Status.VALUE_TO_KEY[job.status])) def _check_finished_job(self, job): if self.platform.check_completed_files(job.name): @@ -1173,18 +1177,21 @@ class WrapperJob(Job): for job in not_finished_jobs: self._check_finished_job(job) def update_inner_jobs_queue(self,hold): - jobs = [job for job in self.job_list if job.status in [ Status.SUBMITTED or Status.HELD or Status.QUEUING]] - for job in jobs: - if hold: - job.status = Status.HELD - else: - job.status = Status.QUEUING - job.hold = hold - jobs_held_admin = [job for job in self.job_list if job.status in [ Status.WAITING ]] - for job in jobs_held_admin: - if hold: + + if self.status == Status.WAITING: + for job in self.job_list: job.status = Status.WAITING job.hold = False + job.packed = False + else: + jobs = [job for job in self.job_list if job.status in [Status.SUBMITTED or Status.HELD or Status.QUEUING]] + for job in jobs: + job.hold = hold + if hold: + job.status = Status.HELD + else: + job.status = Status.QUEUING + def _check_wrapper_status(self): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 024f793e2..88b01e521 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -900,7 +900,7 @@ class JobList: def parameters(self, value): self._parameters = value - def update_list(self, as_conf,store_change=True,fromSetStatus=False): + def update_list(self, as_conf,store_change=True,fromSetStatus=False,ignoreRemoteDependency=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -938,7 +938,7 @@ class JobList: # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): - if job.synchronize is not None: #and job in self.get_active(): + if job.synchronize is not None: Log.debug('Updating SYNC jobs') tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 0b93d3c4b..799062022 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -128,9 +128,9 @@ class JobPackager(object): for p in built_packages_tmp: if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper built_packages.append(p) - elif p.jobs[0].chunk > self._jobs_list._chunk_list[-1] - ( int(self._jobs_list._chunk_list[-1]) % min_wrapped_jobs): #Last case, wrap remaining jobs + elif p.jobs[0].chunk >= self._jobs_list._chunk_list[-1] - ( int(self._jobs_list._chunk_list[-1]) % min_wrapped_jobs): #Last case, wrap remaining jobs built_packages.append(p) - else: # If a package is discarded, let their innerjob be wrapped again. + else: # If a package is discarded, allow to wrap their inner jobs again. for job in p.jobs: job.packed = False packages_to_submit += built_packages diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 95aae9c36..add805e38 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -47,7 +47,7 @@ class JobPackageBase(object): self._platform = jobs[0].platform self._custom_directives = set() for job in jobs: - if job.platform != self._platform or job.platform is None: + if job.platform.name != self._platform.name or job.platform is None: raise Exception('Only one valid platform per package') except IndexError: raise Exception('No jobs given') diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 42f8c78ce..6789228a8 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -311,14 +311,15 @@ class ParamikoPlatform(Platform): # URi: value ? job.new_status= job_status - while not self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0: + while not ( self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0 ) or (self.get_ssh_output() == "" and retries >= 0): retries -= 1 Log.warning('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) Log.error('Can not get job status for job id ({0}), retrying in 10 sec', job_id) sleep(5) - + output=self.get_ssh_output() + output_stripped=output.strip() if retries >= 0: - Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) + #Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) job_status = self.parse_job_output(self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: -- GitLab From 9f6a9450e608a3d87349baf69eb2fa7c7e95b1d4 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 2 Dec 2019 12:31:44 +0100 Subject: [PATCH 11/13] & section fixed --- autosubmit/autosubmit.py | 2 +- autosubmit/config/config_common.py | 24 ++++++------- autosubmit/job/job_packager.py | 56 +++++++++++++++++++++--------- 3 files changed, 53 insertions(+), 29 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 7880f7633..732dbceb6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2716,7 +2716,7 @@ class Autosubmit: os.fsync(fh.fileno()) # Detail after lock has been closed. - if (detail == True): + if detail == True: current_length = len(job_list.get_job_list()) if current_length > 1000: Log.warning("-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str(current_length) + " jobs.") diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index e27e1e8a7..12dda2758 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -467,18 +467,18 @@ class AutosubmitConfig(object): if parser.has_option(section, 'PLATFORM'): result = result and parser.check_is_choice(section, 'PLATFORM', False, platforms) - # if parser.has_option(section, 'DEPENDENCIES'): #TODO if any breaks is there - # for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).split(' '): - # if '-' in dependency: - # dependency = dependency.split('-')[0] - # elif '+' in dependency: - # dependency = dependency.split('+')[0] - # if '[' in dependency: - # dependency = dependency[:dependency.find('[')] - # if dependency not in sections: - # Log.error( - # 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, - # dependency)) + if parser.has_option(section, 'DEPENDENCIES'): + for dependency in str(parser.get_option(section, 'DEPENDENCIES', '')).split(' '): + if '-' in dependency: + dependency = dependency.split('-')[0] + elif '+' in dependency: + dependency = dependency.split('+')[0] + if '[' in dependency: + dependency = dependency[:dependency.find('[')] + if dependency not in sections: + Log.error( + 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, + dependency)) if parser.has_option(section, 'RERUN_DEPENDENCIES'): for dependency in str(parser.get_option(section, 'RERUN_DEPENDENCIES', diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 799062022..8466800bf 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -99,8 +99,14 @@ class JobPackager(object): and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) + if '&' not in section: + dependencies_keys = self._as_config.jobs_parser.get(section, "DEPENDENCIES").split() + else: + multiple_sections = section.split('&') + dependencies_keys=[] + for sectionN in multiple_sections: + dependencies_keys += self._as_config.jobs_parser.get(sectionN, "DEPENDENCIES").split() - dependencies_keys = self._as_config.jobs_parser.get(section, "DEPENDENCIES").split() hard_limit_wrapper = max_wrapped_jobs for k in dependencies_keys: if "-" in k: @@ -111,28 +117,46 @@ class JobPackager(object): hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED",self._as_config.get_min_wrapped_jobs()),hard_limit_wrapper) - - - + built_packages = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) + for p in built_packages_tmp: + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( + len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False elif self.wrapper_type == 'horizontal': built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - + for p in built_packages_tmp: + if len(p.jobs) >= self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED",self._as_config.get_min_wrapped_jobs()): # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._member_list.index(p.jobs[0].member) >= len( + self._jobs_list._member_list) - (len(self._jobs_list._member_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: - built_packages_tmp = self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - - built_packages = [] - for p in built_packages_tmp: - if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper - built_packages.append(p) - elif p.jobs[0].chunk >= self._jobs_list._chunk_list[-1] - ( int(self._jobs_list._chunk_list[-1]) % min_wrapped_jobs): #Last case, wrap remaining jobs - built_packages.append(p) - else: # If a package is discarded, allow to wrap their inner jobs again. - for job in p.jobs: - job.packed = False + built_packages_tmp =[] + built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section)) + for p in built_packages_tmp: + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( + len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False + built_packages=built_packages_tmp + else: + built_packages=built_packages_tmp packages_to_submit += built_packages else: -- GitLab From 4db94f258eaafe1a9638b41f5f099ebfb963cb92 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 3 Dec 2019 11:05:08 +0100 Subject: [PATCH 12/13] All working --- autosubmit/job/job.py | 39 +++++++++-------------- autosubmit/platforms/paramiko_platform.py | 2 +- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 06d2d699c..48c8d2f51 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -509,6 +509,7 @@ class Job(object): self.new_status = Status.HELD self.hold = True Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldAdmin)': Log.info("Job {0} Failed to be HELD, canceling... ", self.name) self.hold = True @@ -516,11 +517,7 @@ class Job(object): self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) else: self.hold = False - if self.status is Status.QUEUING: - Log.info("Job {0} is QUEUING {1}", self.name, reason) - else: - if self.status is Status.HELD: - Log.info("Job {0} is HELD", self.name) + Log.info("Job {0} is QUEUING {1}", self.name, reason) elif self.status is Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status is Status.COMPLETED: @@ -846,12 +843,12 @@ class Job(object): :rtype: bool """ - + valid=False out=False parameters = self.update_parameters(as_conf, parameters) template_content = self.update_content(as_conf) if template_content is not False: - + valid = True variables = re.findall('%(? Date: Tue, 3 Dec 2019 11:10:52 +0100 Subject: [PATCH 13/13] reverting check "fix" will work on it in another branch --- autosubmit/job/job.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 48c8d2f51..90fd84624 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -843,12 +843,10 @@ class Job(object): :rtype: bool """ - valid=False out=False parameters = self.update_parameters(as_conf, parameters) template_content = self.update_content(as_conf) if template_content is not False: - valid = True variables = re.findall('%(?