diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 50b1fb9559115871d44ecd5a5708881cb659fad5..4960fa0f986ec85e0c012540c036dd717cefd188 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -937,7 +937,7 @@ class Autosubmit: :rtype: \n """ job_list._job_list=jobs_filtered - job_list.update_list(as_conf,False) + # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object @@ -959,11 +959,11 @@ class Autosubmit: job_list.check_scripts(as_conf) job_list.update_list(as_conf, False) # Loading parameters again - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) while job_list.get_active(): # Sending only_wrappers = True - Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) - job_list.update_list(as_conf, False) + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers,hold=False) + job_list.update_list(as_conf, False,False) @@ -1112,7 +1112,10 @@ class Autosubmit: if job_list.job_package_map and job_id in job_list.job_package_map: Log.debug('Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] + if as_conf.get_remote_dependencies(): + wrapper_job.hold = wrapper_job.job_list[0].hold check_wrapper = True + if wrapper_job.status == Status.RUNNING: check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now() - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False if check_wrapper: @@ -1120,7 +1123,11 @@ class Autosubmit: platform.check_job(wrapper_job) Log.info( 'Wrapper job ' + wrapper_job.name + ' is ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) + wrapper_job.check_status(wrapper_job.new_status) + if wrapper_job.status == Status.WAITING: # if job failed to be held, delete it from packages table + job_list.job_package_map.pop(job_id, None) + job_list.packages_dict.pop(job_id, None) save = True else: Log.info("Waiting for wrapper check time: {0}\n", check_wrapper_jobs_sleeptime) @@ -1148,6 +1155,7 @@ class Autosubmit: if platform.type == "slurm" and list_jobid!="": slurm.append([platform,list_jobid,list_prevStatus,completed_joblist]) #END LOOP + #CHECK ALL JOBS for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] @@ -1157,8 +1165,6 @@ class Autosubmit: prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] - - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: @@ -1167,15 +1173,16 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True - if job_list.update_list(as_conf) or save: job_list.save() - - - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=False): job_list.save() + if as_conf.get_remote_dependencies(): + if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=True): + job_list.save() + if Autosubmit.exit: return 2 time.sleep(safetysleeptime) @@ -1196,7 +1203,7 @@ class Autosubmit: @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, - only_wrappers=False): + only_wrappers=False,hold=False): """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -1217,29 +1224,24 @@ class Autosubmit: """ save = False for platform in platforms_to_test: - Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform)), platform.name) - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() + Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform, hold=hold)), platform.name) + packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages(hold=hold) if not inspect: platform.open_submit_script() valid_packages_to_submit = [] for package in packages_to_submit: try: - if hasattr(package, "name"): - if remote_dependencies_dict and package.name in remote_dependencies_dict['dependencies']: - remote_dependency = remote_dependencies_dict['dependencies'][package.name] - remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] - package.set_job_dependency(remote_dependency_id) if not only_wrappers: try: - package.submit(as_conf, job_list.parameters, inspect) + package.submit(as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) - except (IOError,OSError): - #write error file + except (IOError, OSError): + # write error file continue if only_wrappers or inspect: for innerJob in package._jobs: # Setting status to COMPLETED so it does not get stuck in the loop that calls this function - innerJob.status=Status.COMPLETED + innerJob.status = Status.COMPLETED if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs @@ -1249,8 +1251,7 @@ class Autosubmit: package._wallclock, package._num_processors, package.platform, as_conf) job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + if isinstance(package, JobPackageThread): # If it is instance of JobPackageThread, then it is JobPackageVertical. packages_persistence.save(package.name, package.jobs, package._expid, inspect) @@ -1266,14 +1267,13 @@ class Autosubmit: try: save = True if len(valid_packages_to_submit) > 0: - jobs_id = platform.submit_Script() + jobs_id = platform.submit_Script(hold=hold) if jobs_id is None: raise BaseException("Exiting AS being unable to get jobID") i = 0 for package in valid_packages_to_submit: for job in package.jobs: job.id = str(jobs_id[i]) - Log.info("{0} submitted", job.name) job.status = Status.SUBMITTED job.write_submit_time() if hasattr(package, "name"): @@ -1284,23 +1284,17 @@ class Autosubmit: package._wallclock, package._num_processors, package.platform, as_conf) job_list.job_package_map[package.jobs[0].id] = wrapper_job - if remote_dependencies_dict and package.name in remote_dependencies_dict[ - 'name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 - - + save = True except WrongTemplateException as e: Log.error("Invalid parameter substitution in {0} template", e.job_name) raise except Exception: Log.error("{0} submission failed", platform.name) raise - - return save @staticmethod @@ -1434,8 +1428,9 @@ class Autosubmit: packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) - jobs_wr = copy.deepcopy(jobs) - [job for job in jobs_wr if (job.status != Status.COMPLETED)] + jobs_wr_aux = copy.deepcopy(jobs) + jobs_wr = [] + [jobs_wr.append(job) for job in jobs_wr_aux if (job.status == Status.READY or job.status == Status.WAITING)] for job in jobs_wr: for child in job.children: if child not in jobs_wr: @@ -1451,6 +1446,8 @@ class Autosubmit: packages_persistence, True) packages = packages_persistence.load(True) + packages+= JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() else: packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).load() @@ -2725,7 +2722,7 @@ class Autosubmit: os.fsync(fh.fileno()) # Detail after lock has been closed. - if (detail == True): + if detail == True: current_length = len(job_list.get_job_list()) if current_length > 1000: Log.warning("-d option: Experiment has too many jobs to be printed in the terminal. Maximum job quantity is 1000, your experiment has " + str(current_length) + " jobs.") diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index a0f441454c14e7f0874e22a01fb07f3dd00146e6..5f9b41668c0f6faa31d3bd644a2e8bc2530f60ac 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1049,8 +1049,11 @@ class AutosubmitConfig(object): :return: if remote dependencies :rtype: bool """ - return self._conf_parser.get_option('wrapper', 'DEPENDENCIES', 'false').lower() == 'true' - + config_value = self._conf_parser.get_option('config', 'DEPENDENCIES', 'false').lower() + if config_value == "true": + return True + else: + return False def get_wrapper_type(self): """ Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config @@ -1068,16 +1071,22 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + def get_min_wrapped_jobs(self): + """ + Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file + + :return: minim number of jobs (or total jobs) + :rtype: int + """ + return int(self._conf_parser.get_option('wrapper', 'MIN_WRAPPED', 2)) def get_max_wrapped_jobs(self): """ Returns the maximum number of jobs that can be wrapped together as configured in autosubmit's config file :return: maximum number of jobs (or total jobs) - :rtype: string + :rtype: int """ - #return int(self._conf_parser.get_option('wrapper', 'MAXWRAPPEDJOBS', self.get_total_jobs())) - return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) def get_wrapper_check_time(self): """ diff --git a/autosubmit/config/files/autosubmit.conf b/autosubmit/config/files/autosubmit.conf index 7c5bf3c4306a38e09145b010d3cb55745fe343e7..12c1d9fc8433bc375a26390d2a065ffb938f303a 100644 --- a/autosubmit/config/files/autosubmit.conf +++ b/autosubmit/config/files/autosubmit.conf @@ -14,12 +14,27 @@ TOTALJOBS = 6 # Time (seconds) between connections to the HPC queue scheduler to poll already submitted jobs status # Default = 10 SAFETYSLEEPTIME = 10 -# Number of retrials if a job fails. Can ve override at job level +# Number of retrials if a job fails. Can be override at job level # Default = 0 RETRIALS = 0 # Default output type for CREATE, MONITOR, SET STATUS, RECOVERY. Available options: pdf, svg, png, ps, txt # Default = pdf OUTPUT = pdf +# Allow to send jobs earlier +# Default = False +DEPENDENCIES = FALSE + +# Basic Configuration of wrapper +# Types available: Horizontal,vertical,vertical-mixed,horizontal-vertical +# JOBS_IN_WRAPPER = Sections that should be wrapped together ex SIM +# MIN_WRAPPED set the minim number of jobs that should be included in the wrapper. DEFAULT = 2 +# MAX_WRAPPED set the maxim number of jobs that should be included in the wrapper. DEFAULT = TOTALJOBS + +#[wrapper] +#TYPE = Vertical +#JOBS_IN_WRAPPER = SIM +#MIN_WRAPPED = 2 +#MAX_WRAPPED = 9999 [mail] # Enable mail notifications diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 9edb4ce08b4b321ab3ea36b7a3f5452fbfc57f67..90fd84624c8288cce27d9a374b384490cf4c2170 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -100,6 +100,7 @@ class Job(object): self._platform = None self.check = 'True' self.packed = False + self.hold = False def __getstate__(self): @@ -492,19 +493,31 @@ class Job(object): self.check_completion() else: self.status = new_status - if self.status is Status.QUEUING: + if self.status is Status.QUEUING or self.status is Status.HELD: if iswrapper: reason = str() if self.platform.type == 'slurm': self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output) + reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) if self._queuing_reason_cancel(reason): Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, reason) self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) self.new_status =Status.FAILED self.update_status(copy_remote_logs,True) return - Log.info("Job {0} is QUEUING {1}", self.name, reason) + elif reason == '(JobHeldUser)': + self.new_status = Status.HELD + self.hold = True + Log.info("Job {0} is HELD", self.name) + + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", self.name) + self.hold = True + self.new_status = Status.WAITING + self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + else: + self.hold = False + Log.info("Job {0} is QUEUING {1}", self.name, reason) elif self.status is Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) elif self.status is Status.COMPLETED: @@ -549,7 +562,7 @@ class Job(object): def update_children_status(self): children = list(self.children) for child in children: - if child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: + if child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: #TODO add held? child.status = Status.FAILED children += list(child.children) @@ -710,11 +723,11 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 8' elif self.type == Type.PYTHON: - template = 'time.sleep(5)' + template = 'time.sleep(8)' elif self.type == Type.R: - template = 'Sys.sleep(5)' + template = 'Sys.sleep(8)' else: template = '' @@ -830,12 +843,10 @@ class Job(object): :rtype: bool """ - out=False parameters = self.update_parameters(as_conf, parameters) template_content = self.update_content(as_conf) if template_content is not False: - variables = re.findall('%(? 1: @@ -1025,29 +1038,45 @@ class WrapperJob(Job): return False def check_status(self, status): - if status != self.status: - if status == Status.QUEUING: - reason = str() - if self.platform.type == 'slurm': - self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) - - if self._queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, - reason) - self.cancel_failed_wrapper_job() - return + self.status = status + if status == Status.QUEUING or status == Status.HELD: + reason = str() + if self.platform.type == 'slurm': + self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) + reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) + if self._queuing_reason_cancel(reason): + Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, + reason) + self.cancel_failed_wrapper_job() + return + elif reason == '(JobHeldUser)': + if self.hold is False: + self.platform.send_command("scontrol release " + "{0}".format(self.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + self.status = Status.QUEUING + Log.info("Job {0} is QUEUING {1}", self.name, reason) + else: + self.status = Status.HELD + Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", self.name) + self.hold = True + self.status = Status.WAITING + self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + else: Log.info("Job {0} is QUEUING {1}", self.name, reason) - self.status = status - if status in [Status.FAILED, Status.UNKNOWN]: - self.cancel_failed_wrapper_job() - self.update_failed_jobs() - elif status == Status.COMPLETED: - self.check_inner_jobs_completed(self.job_list) - elif status == Status.RUNNING: - time.sleep(3) - Log.debug('Checking inner jobs status') - self.check_inner_job_status() + self.update_inner_jobs_queue() + else: + if self.status in [Status.FAILED, Status.UNKNOWN]: + self.cancel_failed_wrapper_job() + self.update_failed_jobs() + elif self.status == Status.COMPLETED: + self.check_inner_jobs_completed(self.job_list) + elif self.status == Status.RUNNING: + #time.sleep(3) + #self.update_inner_jobs_queue() + Log.debug('Checking inner jobs status') + self.check_inner_job_status() + def check_inner_job_status(self): self._check_running_jobs() @@ -1123,7 +1152,7 @@ class WrapperJob(Job): Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) self._check_finished_job(job) else: - Log.debug("Job {0} is SUBMITTED and waiting for dependencies".format(jobname)) + Log.debug("Job {0} is {1} and waiting for dependencies".format(jobname,Status.VALUE_TO_KEY[job.status])) def _check_finished_job(self, job): if self.platform.check_completed_files(job.name): @@ -1139,6 +1168,19 @@ class WrapperJob(Job): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] for job in not_finished_jobs: self._check_finished_job(job) + def update_inner_jobs_queue(self): + if self.status == Status.WAITING: + for job in self.job_list: + job.status = self.status + job.hold = True + job.packed = False + else: + jobs = [job for job in self.job_list if job.status in [Status.SUBMITTED or Status.HELD or Status.QUEUING]] + for job in jobs: + job.status = self.status + if self.status == Status.QUEUING: + job.hold = False + def _check_wrapper_status(self): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index f779b453bfdbb4f0c2de253a160b52153e79dbbb..0522cfd6adeae6e43fe36127a358e69c96d34918 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -29,13 +29,14 @@ class Status: QUEUING = 3 RUNNING = 4 COMPLETED = 5 + HELD = 6 FAILED = -1 UNKNOWN = -2 SUSPENDED = -3 ####### # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', - 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED'} + 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD'} def retval(self, value): return getattr(self, value) @@ -57,9 +58,10 @@ class bcolors: QUEUING = '\033[35;1m' RUNNING = '\033[32m' COMPLETED = '\033[33m' + HELD = '\033[34;1m' FAILED = '\033[31m' SUSPENDED = '\033[31;1m' - CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED} + CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD} diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index e9ceca8df0f2f6b5ef39ff139216eec51f43dfc2..88b01e521dd5eb95fb563b1f0e436765a1f22a46 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -643,7 +643,7 @@ class JobList: :rtype: list """ return [job for job in self._job_list] - def get_ready(self, platform=None): + def get_ready(self, platform=None, hold=False): """ Returns a list of ready jobs @@ -653,7 +653,7 @@ class JobList: :rtype: list """ return [job for job in self._job_list if (platform is None or job.platform is platform) and - job.status == Status.READY] + job.status == Status.READY and job.hold is hold] def get_waiting(self, platform=None): """ @@ -664,8 +664,34 @@ class JobList: :return: waiting jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + waiting_jobs= [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.WAITING] + return waiting_jobs + + def get_waiting_remote_dependencies(self, platform_type='slurm'.lower()): + """ + Returns a list of jobs waiting on slurm scheduler + + :param platform: job platform + :type platform: HPCPlatform + :return: waiting jobs + :rtype: list + """ + waiting_jobs= [job for job in self._job_list if (job.platform.type == platform_type and + job.status == Status.WAITING)] + return waiting_jobs + + def get_held_jobs(self,platform = None): + """ + Returns a list of jobs in the platforms (Held) + + :param platform: job platform + :type platform: HPCPlatform + :return: jobs in platforms + :rtype: list + """ + return [job for job in self._job_list if (platform is None or job.platform == platform) and + job.status == Status.HELD] def get_unknown(self, platform=None): """ @@ -693,7 +719,7 @@ class JobList: def get_in_queue(self, platform=None): """ - Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown) + Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held) :param platform: job platform :type platform: HPCPlatform @@ -701,7 +727,7 @@ class JobList: :rtype: list """ return self.get_submitted(platform) + self.get_running(platform) + self.get_queuing( - platform) + self.get_unknown(platform) + platform) + self.get_unknown(platform) + self.get_held_jobs(platform) def get_not_in_queue(self, platform=None): """ @@ -874,7 +900,7 @@ class JobList: def parameters(self, value): self._parameters = value - def update_list(self, as_conf,store_change=True,fromSetStatus=False): + def update_list(self, as_conf,store_change=True,fromSetStatus=False,ignoreRemoteDependency=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -911,11 +937,8 @@ class JobList: Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY - - for job in self.get_completed(): - - if job.synchronize is not None: #and job in self.get_active(): + if job.synchronize is not None: Log.debug('Updating SYNC jobs') tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): @@ -923,16 +946,41 @@ class JobList: save = True Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) Log.debug('Update finished') - - Log.debug('Updating WAITING jobs') - for job in self.get_waiting(): - if not fromSetStatus: + if as_conf.get_remote_dependencies(): + all_parents_completed=[] + if not fromSetStatus: + for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY + job.hold = False save = True Log.debug("Setting job: {0} status to: READY (all parents completed)...".format(job.name)) + if as_conf.get_remote_dependencies(): + all_parents_completed.append(job.name) + if as_conf.get_remote_dependencies(): + Log.debug('Updating WAITING jobs eligible for remote_dependencies') + for job in self.get_waiting_remote_dependencies('slurm'.lower()): + if job.name not in all_parents_completed: + tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or (parent.status == Status.QUEUING and not parent.hold and parent.name.lower() not in "setup") or parent.status == Status.RUNNING)] + if len(tmp) == len(job.parents): + job.status = Status.READY + job.hold = True + save = True + Log.debug("Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) + Log.debug('Updating Held jobs') + for job in self.get_held_jobs(): + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.hold = False + Log.debug( + "Setting job: {0} status to: Queuing (all parents completed)...".format( + job.name)) + else: + job.hold = True + save= True + Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 206be38cca6d2de39904961b3cfac45d033eaabb..8466800bf3b06bbb36a399982a79c2f67bfaa514 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -38,7 +38,7 @@ class JobPackager(object): :type jobs_list: JobList object. """ - def __init__(self, as_config, platform, jobs_list): + def __init__(self, as_config, platform, jobs_list,hold=False): self._as_config = as_config self._platform = platform self._jobs_list = jobs_list @@ -48,22 +48,20 @@ class JobPackager(object): self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf # .total_jobs Maximum number of jobs at the same time - self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) + self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) - # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() # True or False - self.remote_dependencies = self._as_config.get_remote_dependencies() self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() - Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform))) + Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform,hold=hold))) Log.debug("Number of jobs available: {0}", self._max_wait_jobs_to_submit) - if len(jobs_list.get_ready(platform)) > 0: - Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) + if len(jobs_list.get_ready(platform,hold=hold)) > 0: + Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform,hold=hold))) self._maxTotalProcessors = 0 - def build_packages(self,only_generate=False, jobs_filtered=[]): + def build_packages(self,only_generate=False, jobs_filtered=[],hold=False): """ Returns the list of the built packages to be submitted @@ -71,18 +69,17 @@ class JobPackager(object): :rtype: List() of JobPackageVertical """ packages_to_submit = list() - remote_dependencies_dict = dict() # only_wrappers = False when coming from Autosubmit.submit_ready_jobs, jobs_filtered empty if only_generate: jobs_to_submit = jobs_filtered else: - jobs_ready = self._jobs_list.get_ready(self._platform) + jobs_ready = self._jobs_list.get_ready(self._platform,hold=hold) if jobs_ready == 0: # If there are no jobs ready, result is tuple of empty - return packages_to_submit, remote_dependencies_dict + return packages_to_submit if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): # If there is no more space in platform, result is tuple of empty - return packages_to_submit, remote_dependencies_dict + return packages_to_submit # Sort by 6 first digits of date available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) @@ -93,6 +90,7 @@ class JobPackager(object): jobs_to_submit = list_of_available[0:num_jobs_to_submit] # print(len(jobs_to_submit)) jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) + for section in jobs_to_submit_by_section: # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] @@ -101,19 +99,66 @@ class JobPackager(object): and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) - + if '&' not in section: + dependencies_keys = self._as_config.jobs_parser.get(section, "DEPENDENCIES").split() + else: + multiple_sections = section.split('&') + dependencies_keys=[] + for sectionN in multiple_sections: + dependencies_keys += self._as_config.jobs_parser.get(sectionN, "DEPENDENCIES").split() + + hard_limit_wrapper = max_wrapped_jobs + for k in dependencies_keys: + if "-" in k: + k_divided = k.split("-") + if k_divided[0] not in self.jobs_in_wrapper: + number = int(k_divided[1].strip(" ")) + if number < hard_limit_wrapper: + hard_limit_wrapper = number + min_wrapped_jobs = min(self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED",self._as_config.get_min_wrapped_jobs()),hard_limit_wrapper) + + built_packages = [] if self.wrapper_type in ['vertical', 'vertical-mixed']: - built_packages, remote_dependencies_dict = self._build_vertical_packages(jobs_to_submit_by_section[section], + built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs) - packages_to_submit += built_packages + for p in built_packages_tmp: + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( + len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False elif self.wrapper_type == 'horizontal': - built_packages, remote_dependencies_dict = self._build_horizontal_packages(jobs_to_submit_by_section[section], + built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - packages_to_submit += built_packages - + for p in built_packages_tmp: + if len(p.jobs) >= self._as_config.jobs_parser.get_option(section, "MIN_WRAPPED",self._as_config.get_min_wrapped_jobs()): # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._member_list.index(p.jobs[0].member) >= len( + self._jobs_list._member_list) - (len(self._jobs_list._member_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: - built_packages = self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section) - packages_to_submit.append(built_packages) + built_packages_tmp =[] + built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section)) + for p in built_packages_tmp: + if len(p.jobs) >= min_wrapped_jobs: # if the quantity is not enough, don't make the wrapper + built_packages.append(p) + elif self._jobs_list._chunk_list.index(p.jobs[0].chunk) >= len(self._jobs_list._chunk_list) - ( + len(self._jobs_list._chunk_list) % min_wrapped_jobs): # Last case, wrap remaining jobs + built_packages.append(p) + else: # If a package is discarded, allow to wrap their inner jobs again. + for job in p.jobs: + job.packed = False + built_packages=built_packages_tmp + else: + built_packages=built_packages_tmp + packages_to_submit += built_packages + else: # No wrapper allowed / well-configured for job in jobs_to_submit_by_section[section]: @@ -122,8 +167,7 @@ class JobPackager(object): else: package = JobPackageSimple([job]) packages_to_submit.append(package) - - return packages_to_submit, remote_dependencies_dict + return packages_to_submit def _divide_list_by_section(self, jobs_list): """ @@ -149,8 +193,6 @@ class JobPackager(object): def _build_horizontal_packages(self, section_list, max_wrapped_jobs, section): packages = [] - remote_dependencies_dict = dict() - horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, max_wrapped_jobs, self.max_jobs, self._platform.processors_per_node) @@ -167,13 +209,9 @@ class JobPackager(object): current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources) packages.append(current_package) - if self.remote_dependencies and current_package: - remote_dependencies_dict['name_to_id'] = dict() - remote_dependencies_dict['dependencies'] = dict() - packages += horizontal_packager.get_next_packages(section, potential_dependency=current_package.name, - remote_dependencies_dict=remote_dependencies_dict) - return packages, remote_dependencies_dict + + return packages def _build_vertical_packages(self, section_list, max_wrapped_jobs): """ @@ -183,17 +221,12 @@ class JobPackager(object): :type section_list: List() of Job Objects. \n :param max_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type max_wrapped_jobs: Integer. \n + :param min_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n + :type min_wrapped_jobs: Integer. \n :return: List of Wrapper Packages, Dictionary that details dependencies. \n :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) """ packages = [] - potential_dependency = None - remote_dependencies_dict = dict() - # True when autosubmit_.conf value [wrapper].DEPENDENCIES has been set to true - if self.remote_dependencies: - remote_dependencies_dict['name_to_id'] = dict() - remote_dependencies_dict['dependencies'] = dict() - for job in section_list: if self.max_jobs > 0: if job.packed is False: @@ -212,21 +245,13 @@ class JobPackager(object): self.max_jobs -= len(jobs_list) if job.status is Status.READY: packages.append(JobPackageVertical(jobs_list)) - else: - if self.remote_dependencies: - # Sending last item in list of packaged - child = job_vertical_packager.get_wrappable_child(jobs_list[-1]) - if child is not None: - section_list.insert(section_list.index(job) + 1, child) - potential_dependency = packages[-1].name - package = JobPackageVertical(jobs_list, potential_dependency) - packages.append(package) - # Possible need of "if self.remote_dependencies here" - remote_dependencies_dict['name_to_id'][potential_dependency] = -1 - remote_dependencies_dict['dependencies'][package.name] = potential_dependency + else: + package = JobPackageVertical(jobs_list, None) + packages.append(package) + else: break - return packages, remote_dependencies_dict + return packages def _build_hybrid_package(self, jobs_list, max_wrapped_jobs, section): jobs_resources = dict() @@ -541,7 +566,7 @@ class JobPackagerHorizontal(object): jobname = jobname.split('_')[-1] return self._sort_order_dict[jobname] - def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, remote_dependencies_dict=dict(),horizontal_vertical=False,max_procs=0): + def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, packages_remote_dependencies=list(),horizontal_vertical=False,max_procs=0): packages = [] job = max(self.job_list, key=attrgetter('total_wallclock')) wallclock = job.wallclock @@ -573,10 +598,7 @@ class JobPackagerHorizontal(object): if total_wallclock > max_wallclock: return packages packages.append(package_jobs) - if remote_dependencies_dict: - current_package = JobPackageHorizontal(package_jobs, potential_dependency) - remote_dependencies_dict['name_to_id'][potential_dependency] = -1 - remote_dependencies_dict['dependencies'][current_package.name] = potential_dependency + else: break diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 4303f7bbc3c9c91ae5710aa1ac21d5c463e516f1..add805e389735fff771e92a640d2d613f10eacc7 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -47,7 +47,7 @@ class JobPackageBase(object): self._platform = jobs[0].platform self._custom_directives = set() for job in jobs: - if job.platform != self._platform or job.platform is None: + if job.platform.name != self._platform.name or job.platform is None: raise Exception('Only one valid platform per package') except IndexError: raise Exception('No jobs given') @@ -75,7 +75,7 @@ class JobPackageBase(object): """ return self._platform - def submit(self, configuration, parameters,only_generate=False): + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n :type configuration: AutosubmitConfig object \n @@ -102,7 +102,7 @@ class JobPackageBase(object): else: self._create_scripts(configuration) self._send_files() - self._do_submission() + self._do_submission(job_scripts=None,hold=hold) def _create_scripts(self, configuration): @@ -111,7 +111,7 @@ class JobPackageBase(object): def _send_files(self): raise Exception('Not implemented') - def _do_submission(self): + def _do_submission(self,job_scripts=None, hold=False): raise Exception('Not implemented') @@ -133,13 +133,13 @@ class JobPackageSimple(JobPackageBase): for job in self.jobs: self.platform.send_file(self._job_scripts[job.name]) - def _do_submission(self, job_scripts=None): + def _do_submission(self, job_scripts=None, hold=False): if job_scripts is None: job_scripts = self._job_scripts for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - job.id = self.platform.submit_job(job, job_scripts[job.name]) + job.id = self.platform.submit_job(job, job_scripts[job.name], hold=hold) if job.id is None: continue Log.info("{0} submitted", job.name) @@ -166,10 +166,10 @@ class JobPackageSimpleWrapped(JobPackageSimple): for job in self.jobs: self.platform.send_file(self._job_wrapped_scripts[job.name]) - def _do_submission(self, job_scripts=None): + def _do_submission(self, job_scripts=None, hold=False): if job_scripts is None: job_scripts = self._job_wrapped_scripts - super(JobPackageSimpleWrapped, self)._do_submission(job_scripts) + super(JobPackageSimpleWrapped, self)._do_submission(job_scripts, hold=hold) class JobPackageArray(JobPackageBase): @@ -221,12 +221,12 @@ class JobPackageArray(JobPackageBase): self.platform.send_file(self._job_inputs[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: return @@ -316,7 +316,7 @@ class JobPackageThread(JobPackageBase): self.platform.send_file(self._job_scripts[job.name], check=False) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: @@ -327,8 +327,10 @@ class JobPackageThread(JobPackageBase): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) + if hold: + job.hold = hold - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: return @@ -400,12 +402,14 @@ class JobPackageThreadWrapped(JobPackageThread): self.platform.send_file(self._job_scripts[job.name]) self.platform.send_file(self._common_script) - def _do_submission(self): + def _do_submission(self, job_scripts=None, hold=False): for job in self.jobs: self.platform.remove_stat_file(job.name) self.platform.remove_completed_file(job.name) + if hold: + job.hold = hold - package_id = self.platform.submit_job(None, self._common_script) + package_id = self.platform.submit_job(None, self._common_script, hold=hold) if package_id is None: raise Exception('Submission failed') diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index eec9a36523b554c48a6eae0e772bb9c5ff6591b1..a2d35c7fc13ac5a360f3bb280fdcb35457df3da1 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -42,7 +42,7 @@ from diagram import create_bar_diagram class Monitor: """Class to handle monitoring of Jobs at HPC.""" _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'), - (Status.SUBMITTED, 'cyan'), (Status.QUEUING, 'lightpink'), (Status.RUNNING, 'green'), + (Status.SUBMITTED, 'cyan'), (Status.HELD, 'salmon'), (Status.QUEUING, 'pink'), (Status.RUNNING, 'green'), (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')]) @staticmethod @@ -61,12 +61,15 @@ class Monitor: return Monitor._table[Status.READY] elif status == Status.SUBMITTED: return Monitor._table[Status.SUBMITTED] + elif status == Status.HELD: + return Monitor._table[Status.HELD] elif status == Status.QUEUING: return Monitor._table[Status.QUEUING] elif status == Status.RUNNING: return Monitor._table[Status.RUNNING] elif status == Status.COMPLETED: return Monitor._table[Status.COMPLETED] + elif status == Status.FAILED: return Monitor._table[Status.FAILED] elif status == Status.SUSPENDED: @@ -98,12 +101,15 @@ class Monitor: fillcolor=self._table[Status.READY])) legend.add_node(pydotplus.Node(name='SUBMITTED', shape='box', style="filled", fillcolor=self._table[Status.SUBMITTED])) + legend.add_node(pydotplus.Node(name='HELD', shape='box', style="filled", + fillcolor=self._table[Status.HELD])) legend.add_node(pydotplus.Node(name='QUEUING', shape='box', style="filled", fillcolor=self._table[Status.QUEUING])) legend.add_node(pydotplus.Node(name='RUNNING', shape='box', style="filled", fillcolor=self._table[Status.RUNNING])) legend.add_node(pydotplus.Node(name='COMPLETED', shape='box', style="filled", fillcolor=self._table[Status.COMPLETED])) + legend.add_node(pydotplus.Node(name='FAILED', shape='box', style="filled", fillcolor=self._table[Status.FAILED])) legend.add_node(pydotplus.Node(name='SUSPENDED', shape='box', style="filled", diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 7900d87874cb5f260c2c819a8a696bdf013f30d1..bff993360a38b5e79eb45fdc84e7d7845a5b9893 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -111,7 +111,7 @@ class EcPlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): return self._checkjob_cmd + str(job_id) - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self._submit_cmd + job_script def connect(self): diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index f2766be55608566a9a197669ee8dc12c17787f5f..0f3d1852b335ded671c6651941c7e8cda73b9dd7 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -80,7 +80,7 @@ class LocalPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self.get_call(job_script, job) def get_checkjob_cmd(self, job_id): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4d7b826b8aedc607e57f30ba01ed729264c8a5eb..ad6e359a4e0dd1cafbae9a94ecfaa35a982f921a 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -248,7 +248,7 @@ class ParamikoPlatform(Platform): os.path.join(self.get_files_path(), dest))) return False - def submit_job(self, job, script_name): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -256,11 +256,13 @@ class ParamikoPlatform(Platform): :type job: autosubmit.job.job.Job :param script_name: job script's name :rtype scriptname: str + :param hold: send job hold + :type hold: boolean :return: job id for the submitted job :rtype: int """ if self.type == 'slurm': - self.get_submit_cmd(script_name, job) + self.get_submit_cmd(script_name, job, hold=hold) return None else: if self.send_command(self.get_submit_cmd(script_name, job)): @@ -269,7 +271,7 @@ class ParamikoPlatform(Platform): return int(job_id) else: return None - def submit_Script(self): + def submit_Script(self, hold=False): """ Sends a SubmitfileScript, exec in platform and retrieve the Jobs_ID. @@ -301,22 +303,25 @@ class ParamikoPlatform(Platform): # URi: value ? job.new_status= job_status - while not self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0: + while not ( self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0 ) or (self.get_ssh_output() == "" and retries >= 0): retries -= 1 Log.warning('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) Log.error('Can not get job status for job id ({0}), retrying in 10 sec', job_id) sleep(5) - + output=self.get_ssh_output() + output_stripped=output.strip() if retries >= 0: - Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) + #Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) job_status = self.parse_job_output(self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: job_status = Status.COMPLETED elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING - elif job_status in self.job_status['QUEUING']: + elif job_status in self.job_status['QUEUING'] and job.hold is False: job_status = Status.QUEUING + elif job_status in self.job_status['QUEUING'] and job.hold is True: + job_status = Status.HELD elif job_status in self.job_status['FAILED']: job_status = Status.FAILED else: @@ -345,23 +350,29 @@ class ParamikoPlatform(Platform): Log.warning('Retrying check job command: {0}', cmd) Log.warning('Can not get job status for all jobs, retrying in 3 sec') sleep(3) - Log.debug('Successful check job command: {0}', cmd) + job_list_status = self.get_ssh_output() + Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: in_queue_jobs=[] list_queue_jobid="" for job in job_list: job_id=job.id job_status = Status.UNKNOWN - - job_status = self.parse_Alljobs_output(self.get_ssh_output(),job_id) + job_status = self.parse_Alljobs_output(job_list_status,job_id) # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: - job_status = Status.QUEUING if self.type == "slurm": + if job.status == Status.HELD: + job_status = Status.HELD + if not job.hold: + self.send_command("scontrol release "+"{0}".format(job_id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + job_status = Status.QUEUING + else: + job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' in_queue_jobs.append(job) elif job_status in self.job_status['FAILED']: @@ -376,19 +387,28 @@ class ParamikoPlatform(Platform): if self.type == 'slurm' and len(in_queue_jobs) > 0: cmd=self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) + queue_status=self._ssh_output for job in in_queue_jobs: - reason = self.parse_queue_reason(self._ssh_output,job.id) + reason = self.parse_queue_reason(queue_status,job.id) if job.queuing_reason_cancel(reason): Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) self.send_command(self.platform.cancel_cmd + " {0}".format(job.id)) - job.new_status=Status.FAILED + job.new_status = Status.FAILED job.update_status(remote_logs) return - Log.info("Job {0} is QUEUING {1}", job.name, reason) + elif reason == '(JobHeldUser)': + job.new_status=Status.HELD + Log.info("Job {0} is HELD", job.name) + elif reason == '(JobHeldAdmin)': + Log.info("Job {0} Failed to be HELD, canceling... ", job.name) + job.new_status = Status.WAITING + job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) + else: + Log.info("Job {0} is QUEUING {1}", job.name, reason) else: for job in job_list: job_status = Status.UNKNOWN - Log.warning('check_job() The job id ({0}) from platform {1} has an status of {1}.', job_id, self.name, job_status) + Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) job.new_status=job_status def get_checkjob_cmd(self, job_id): @@ -463,16 +483,16 @@ class ParamikoPlatform(Platform): stdout.close() stderr.close() self._ssh_output = "" - if len(stdout_chunks) > 0: - for s in stdout_chunks: - if s is not None: - self._ssh_output += s + for s in stdout_chunks: + if s != '': + self._ssh_output += s for errorLine in stderr_readlines: if errorLine.find("submission failed") != -1: Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) return False if not ignore_log: - Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + if len(stderr_readlines) > 0: + Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) return True @@ -508,13 +528,15 @@ class ParamikoPlatform(Platform): pass - def get_submit_cmd(self, job_script, job_type): + def get_submit_cmd(self, job_script, job_type,hold=False): """ Get command to add job to scheduler :param job_type: :param job_script: path to job script :param job_script: str + :param hold: submit a job in a held status + :param hold: boolean :return: command to submit job to platforms :rtype: str """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index b117f88ce2c469b1520ea602eb6ee0c59f44d538..0a2a0c79a7ddaf9ff922807a628f96796b7ead9c 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -301,7 +301,7 @@ class Platform(object): path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path - def submit_job(self, job, scriptname): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index 83fc6a18a4b3e665b1b11489fa2dab72df3f8596..9861db07e1e1986c2a46553868fb8b19d47f12ab 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -74,7 +74,7 @@ class PsPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): + def get_submit_cmd(self, job_script, job, hold=False): return self.get_call(job_script, job) def get_checkjob_cmd(self, job_id): diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 3d1f1e88bbb67b0a1a1383fa70b1629e3fec4557..6495a7abd9aaae3bc25b42e364dd2025b5f92895 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -199,7 +199,7 @@ class SagaPlatform(Platform): except saga.DoesNotExist: return True - def submit_job(self, job, scriptname): + def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -210,7 +210,7 @@ class SagaPlatform(Platform): :return: job id for the submitted job :rtype: int """ - saga_job = self.create_saga_job(job, scriptname) + saga_job = self.create_saga_job(job, script_name) saga_job.run() return saga_job.id diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c40850616b9a4d23d1f203050e8acb45c27fea83..cdacf2090e195be0fd67bd32ceaa30359fccc852 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -63,20 +63,16 @@ class SlurmPlatform(ParamikoPlatform): return os.path.join(BasicConfig.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) - def submit_Script(self): + def submit_Script(self,hold=False): """ - Sends a SubmitfileScript, execute it in the platform and retrieves the Jobs_ID of all jobs at once. + Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. :param job: job object :type job: autosubmit.job.job.Job :return: job id for submitted jobs :rtype: list(int) """ - self.send_file(self.get_submit_script(),False) - - #cmd = '(cd '+self.get_files_path()+';'+' ./'+os.path.basename(self._submit_script_path)+')' - cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) if self.send_command(cmd): jobs_id = self.get_submitted_job_id(self.get_ssh_output()) @@ -93,6 +89,8 @@ class SlurmPlatform(ParamikoPlatform): self.cancel_cmd = "scancel" self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir) + self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir) + self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir @@ -130,15 +128,19 @@ class SlurmPlatform(ParamikoPlatform): jobs_xml = dom.getElementsByTagName("JB_job_number") return [int(element.firstChild.nodeValue) for element in jobs_xml] - def get_submit_cmd(self, job_script, job): - self._submit_script_file.write(self._submit_cmd + job_script + "\n") + def get_submit_cmd(self, job_script, job, hold=False): + if not hold: + self._submit_script_file.write(self._submit_cmd + job_script + "\n") + else: + self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" ) + def get_checkjob_cmd(self, job_id): return 'sacct -n -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): - return "sacct -n -X -j {1} -o 'jobid,State'".format(self.host, jobs_id) + return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) diff --git a/test/unit/test_job_common.py b/test/unit/test_job_common.py index a2b7b86a68fed42fd3b53b866cce0482f42dca6a..76ed7a62e08322d737e751d34686f3e34228aeb9 100644 --- a/test/unit/test_job_common.py +++ b/test/unit/test_job_common.py @@ -15,6 +15,8 @@ class TestJobCommon(TestCase): self.assertEquals('WAITING', Status.VALUE_TO_KEY[Status.WAITING]) self.assertEquals('READY', Status.VALUE_TO_KEY[Status.READY]) self.assertEquals('SUBMITTED', Status.VALUE_TO_KEY[Status.SUBMITTED]) + self.assertEquals('HELD', Status.VALUE_TO_KEY[Status.HELD]) self.assertEquals('QUEUING', Status.VALUE_TO_KEY[Status.QUEUING]) self.assertEquals('RUNNING', Status.VALUE_TO_KEY[Status.RUNNING]) self.assertEquals('COMPLETED', Status.VALUE_TO_KEY[Status.COMPLETED]) + diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 9dc6daf7c393a0302180a0e394ee3245af3c02b9..b0604c42d8612c77ad9be54e296f7114d971fe00 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -218,7 +218,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -267,7 +267,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -313,7 +313,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -359,7 +359,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -398,7 +398,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2), JobPackageVertical(package_m2_s2)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -462,7 +462,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -522,7 +522,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -584,7 +584,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -645,7 +645,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -706,7 +706,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -778,7 +778,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical(package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] - returned_packages = returned_packages[0] + #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs)