diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ad70399082e592680e11c970bb3c49e698f6f9a7..20289520767f3b087ffd4ce4dbad6045c0495aba 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -513,7 +513,8 @@ class Autosubmit: @staticmethod def _check_Ownership(expid): BasicConfig.read() - currentUser_id = os.getlogin() + #currentUser_id = os.getlogin() + currentUser_id = pwd.getpwuid(os.getuid())[0] currentOwner_id = pwd.getpwuid(os.stat(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid)).st_uid).pw_name if currentUser_id == currentOwner_id: return True @@ -1077,48 +1078,33 @@ class Autosubmit: try: with portalocker.Lock(os.path.join(tmp_path, 'autosubmit.lock'), timeout=1): Log.info("Preparing .lock file to avoid multiple instances with same experiment id") - Log.set_file(os.path.join(aslogs_path, 'run.log')) os.system('clear') - signal.signal(signal.SIGINT, signal_handler) - as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): Log.critical('Can not run with invalid configuration') return False - project_type = as_conf.get_project_type() if project_type != "none": # Check proj configuration as_conf.check_proj() - hpcarch = as_conf.get_platform() - safetysleeptime = as_conf.get_safetysleeptime() retrials = as_conf.get_retrials() - submitter = Autosubmit._get_submitter(as_conf) submitter.load_platforms(as_conf) - Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Default retrials: {0}", retrials) - Log.info("Starting job submission...") - pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) - Log.debug("Starting from job list restored from {0} files", pkl_dir) - Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # check the job list script creation Log.debug("Checking experiment templates...") - platforms_to_test = set() for job in job_list.get_job_list(): if job.platform_name is None: @@ -1127,12 +1113,9 @@ class Autosubmit: job.platform = submitter.platforms[job.platform_name.lower()] # noinspection PyTypeChecker platforms_to_test.add(job.platform) - job_list.check_scripts(as_conf) - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - if as_conf.get_wrapper_type() != 'none': os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) packages = packages_persistence.load() @@ -1140,23 +1123,20 @@ class Autosubmit: if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): from job.job import WrapperJob wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, None, - None, jobs[0].platform, as_conf) + None, jobs[0].platform, as_conf, jobs[0].hold) job_list.job_package_map[jobs[0].id] = wrapper_job job_list.update_list(as_conf) job_list.save() + Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) ######################### # AUTOSUBMIT - MAIN LOOP ######################### # Main loop. Finishing when all jobs have been submitted - Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version) while job_list.get_active(): - if Autosubmit.exit: - return 2 # reload parameters changes Log.debug("Reloading parameters...") as_conf.reload() @@ -1168,27 +1148,23 @@ class Autosubmit: total_jobs, time.strftime("%H:%M"))) safetysleeptime = as_conf.get_safetysleeptime() - Log.debug("Sleep: {0}", safetysleeptime) default_retrials = as_conf.get_retrials() - Log.debug("Number of retrials: {0}", default_retrials) - check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() + Log.debug("Sleep: {0}", safetysleeptime) + Log.debug("Number of retrials: {0}", default_retrials) Log.debug('WRAPPER CHECK TIME = {0}'.format(check_wrapper_jobs_sleeptime)) - save = False - slurm = [] for platform in platforms_to_test: list_jobid = "" completed_joblist = [] - list_prevStatus =[] + list_prevStatus = [] queuing_jobs = job_list.get_in_queue_grouped_id(platform) for job_id, job in queuing_jobs.items(): + # Check Wrappers one-by-one if job_list.job_package_map and job_id in job_list.job_package_map: Log.debug('Checking wrapper job with id ' + str(job_id)) wrapper_job = job_list.job_package_map[job_id] - if as_conf.get_remote_dependencies(): - wrapper_job.hold = wrapper_job.job_list[0].hold if as_conf.get_notifications() == 'true': for inner_job in wrapper_job.job_list: inner_job.prev_status= inner_job.status @@ -1197,15 +1173,23 @@ class Autosubmit: check_wrapper = True if datetime.timedelta.total_seconds(datetime.datetime.now() - wrapper_job.checked_time) >= check_wrapper_jobs_sleeptime else False if check_wrapper: wrapper_job.checked_time = datetime.datetime.now() - platform.check_job(wrapper_job) - Log.info( - 'Wrapper job ' + wrapper_job.name + ' is ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status])) - - wrapper_job.check_status(wrapper_job.new_status) - if wrapper_job.status == Status.WAITING: # if job failed to be held, delete it from packages table + platform.check_job(wrapper_job) # This is where wrapper will be checked on the slurm platform, update takes place. + try: + if wrapper_job.status != wrapper_job.new_status: + Log.info( + 'Wrapper job ' + wrapper_job.name + ' changed from ' + str(Status.VALUE_TO_KEY[wrapper_job.status]) + ' to status ' + str(Status.VALUE_TO_KEY[wrapper_job.new_status]) ) + except: + Log.critical("Status Is UNKNOWN, (NONE) exiting autosubmit") + exit(1) + + wrapper_job.check_status(wrapper_job.new_status) # New status will be saved and inner_jobs will be checked. + # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) + if wrapper_job.status == Status.WAITING: job_list.job_package_map.pop(job_id, None) job_list.packages_dict.pop(job_id, None) save = True + + #Notifications e-mail if as_conf.get_notifications() == 'true': for inner_job in wrapper_job.job_list: if inner_job.prev_status != inner_job.status: @@ -1214,19 +1198,16 @@ class Autosubmit: Status.VALUE_TO_KEY[inner_job.prev_status], Status.VALUE_TO_KEY[inner_job.status], as_conf.get_mails_to()) - else: - Log.info("Waiting for wrapper check time: {0}\n", check_wrapper_jobs_sleeptime) - else: + else: # Prepare jobs, if slurm check all active jobs at once. job = job[0] prev_status = job.status if job.status == Status.FAILED: continue - - if platform.type == "slurm": + if platform.type == "slurm": # List for add all jobs that will be checked list_jobid += str(job_id) + ',' list_prevStatus.append(prev_status) completed_joblist.append(job) - else: + else: # If they're not from slurm platform check one-by-one platform.check_job(job) if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): if as_conf.get_notifications() == 'true': @@ -1237,19 +1218,17 @@ class Autosubmit: as_conf.get_mails_to()) save = True - if platform.type == "slurm" and list_jobid!="": + if platform.type == "slurm" and list_jobid != "": slurm.append([platform,list_jobid,list_prevStatus,completed_joblist]) - #END LOOP - #CHECK ALL JOBS + #END Normal jobs + wrappers + #CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] platform.check_Alljobs(platform_jobs[3],jobs_to_check,as_conf.get_copy_remote_logs()) - for j_Indx in xrange(0,len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] - if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): if as_conf.get_notifications() == 'true': if Status.VALUE_TO_KEY[job.status] in job.notify_on: @@ -1258,20 +1237,21 @@ class Autosubmit: Status.VALUE_TO_KEY[job.status], as_conf.get_mails_to()) save = True - if job_list.update_list(as_conf) or save: + #End Check Current jobs + save2 = job_list.update_list(as_conf) + if save or save2: job_list.save() - - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=False): + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=False) + if as_conf.get_remote_dependencies() and len(job_list.get_ready(hold=True)) > 0: + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=True) + save = job_list.update_list(as_conf) + if save: job_list.save() - if as_conf.get_remote_dependencies(): - if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,hold=True): - job_list.save() - if Autosubmit.exit: + job_list.save() return 2 time.sleep(safetysleeptime) - Log.info("No more jobs to run.") if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") @@ -1310,32 +1290,32 @@ class Autosubmit: save = False for platform in platforms_to_test: Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform, hold=hold)), platform.name) - packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages(hold=hold) + packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages() if not inspect: platform.open_submit_script() valid_packages_to_submit = [] for package in packages_to_submit: try: + # If called from inspect command or -cw + if only_wrappers or inspect: + for innerJob in package._jobs: + # Setting status to COMPLETED so it does not get stuck in the loop that calls this function + innerJob.status = Status.COMPLETED + # If called from RUN or inspect command if not only_wrappers: try: package.submit(as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) except (IOError, OSError): - # write error file continue - if only_wrappers or inspect: - for innerJob in package._jobs: - # Setting status to COMPLETED so it does not get stuck in the loop that calls this function - innerJob.status = Status.COMPLETED - if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs from job.job import WrapperJob wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.READY, 0, package.jobs, package._wallclock, package._num_processors, - package.platform, as_conf) + package.platform, as_conf, hold) job_list.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): @@ -1361,6 +1341,7 @@ class Autosubmit: for job in package.jobs: job.id = str(jobs_id[i]) job.status = Status.SUBMITTED + job.hold = hold job.write_submit_time() if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs @@ -1368,7 +1349,7 @@ class Autosubmit: wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, package.jobs, package._wallclock, package._num_processors, - package.platform, as_conf) + package.platform, as_conf, hold) job_list.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 7f089c2ff9bfb86fed56b2c2e15bd20c97d25f39..1deb980f474586da95f87d8d63853acec338735b 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1089,6 +1089,14 @@ class AutosubmitConfig(object): :rtype: string """ return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + def get_wrapper_queue(self): + """ + Returns the wrapper queue if not defined, will be the one of the first job wrapped + + :return: expression (or none) + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'QUEUE', 'None') def get_min_wrapped_jobs(self): """ Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 414e7daceb44269d88331b63cfda8f2706a73f19..38d9dfdda1b8d69057b591f17c630dad7f559b2f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -25,6 +25,7 @@ import re import time import json import datetime +import textwrap from collections import OrderedDict from autosubmit.job.job_common import Status, Type @@ -32,7 +33,7 @@ from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPy from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates - +from time import sleep class Job(object): """ @@ -480,7 +481,7 @@ class Job(object): retrials_list.insert(0, retrial_dates) return retrials_list - def update_status(self, copy_remote_logs=False,iswrapper=False): + def update_status(self, copy_remote_logs=False): """ Updates job status, checking COMPLETED file if needed @@ -496,54 +497,33 @@ class Job(object): self.check_completion() else: self.status = new_status - if self.status == Status.QUEUING or self.status == Status.HELD: - if iswrapper: - reason = str() - if self.platform.type == 'slurm': - self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) - if self._queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, reason) - self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) - self.new_status =Status.FAILED - self.update_status(copy_remote_logs,True) - return - if reason == '(JobHeldUser)': - self.new_status = Status.HELD - self.hold = True - Log.info("Job {0} is HELD", self.name) - - elif reason == '(JobHeldAdmin)': - Log.info("Job {0} Failed to be HELD, canceling... ", self.name) - self.hold = True - self.new_status = Status.WAITING - self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) - else: - self.hold = False - self.new_status = Status.QUEUING - Log.info("Job {0} is QUEUING {1}", self.name, reason) - elif self.status is Status.RUNNING: + + if self.status == Status.RUNNING: Log.info("Job {0} is RUNNING", self.name) - elif self.status is Status.COMPLETED: + elif self.status == Status.QUEUING: + Log.info("Job {0} is QUEUING", self.name) + elif self.status == Status.HELD: + Log.info("Job {0} is HELD", self.name) + elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) - elif self.status is Status.FAILED: + elif self.status == Status.FAILED: Log.user_warning("Job {0} is FAILED. Checking completed files to confirm the failure...", self.name) self.platform.get_completed_files(self.name) self.check_completion() - if self.status is Status.COMPLETED: + if self.status == Status.COMPLETED: Log.warning('Job {0} seems to have failed but there is a COMPLETED file', self.name) Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() - elif self.status is Status.UNKNOWN: + elif self.status == Status.UNKNOWN: Log.debug("Job {0} in UNKNOWN status. Checking completed files...", self.name) self.platform.get_completed_files(self.name) self.check_completion(Status.UNKNOWN) - if self.status is Status.UNKNOWN: + if self.status == Status.UNKNOWN: Log.warning('Job {0} in UNKNOWN status', self.name) - elif self.status is Status.COMPLETED: + elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) - elif self.status is Status.SUBMITTED: + elif self.status == Status.SUBMITTED: # after checking the jobs , no job should have the status "submitted" Log.warning('Job {0} in SUBMITTED status after checking.', self.name) @@ -619,7 +599,7 @@ class Job(object): parameters['SPLIT'] = self.split parameters['DELAY'] = self.delay parameters['SYNCHRONIZE'] = self.synchronize - + parameters['PACKED'] = self.packed total_chunk = int(parameters['NUMCHUNKS']) chunk_length = int(parameters['CHUNKSIZE']) chunk_unit = parameters['CHUNKSIZEUNIT'].lower() @@ -728,11 +708,11 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 30' + template = 'sleep 60' elif self.type == Type.PYTHON: - template = 'time.sleep(30)' + template = 'time.sleep(60)' elif self.type == Type.R: - template = 'Sys.sleep(30)' + template = 'Sys.sleep(60)' else: template = '' @@ -889,7 +869,7 @@ class Job(object): :return: True if succesful, False otherwise :rtype: bool """ - if self.platform.get_stat_file(self.name, retries=0): + if self.platform.get_stat_file(self.name, retries=5): start_time = self.check_start_time() else: Log.warning('Could not get start time for {0}. Using current time as an approximation', self.name) @@ -1015,7 +995,7 @@ class WrapperJob(Job): :type as_config: AutosubmitConfig object \n """ - def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config): + def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config,hold): super(WrapperJob, self).__init__(name, job_id, status, priority) self.job_list = job_list # divide jobs in dictionary by state? @@ -1026,7 +1006,7 @@ class WrapperJob(Job): self.as_config = as_config # save start time, wallclock and processors?! self.checked_time = datetime.datetime.now() - self.hold = False + self.hold = hold def _queuing_reason_cancel(self, reason): try: if len(reason.split('(', 1)) > 1: @@ -1043,74 +1023,128 @@ class WrapperJob(Job): return False def check_status(self, status): + prev_status = self.status self.status = status - if self.status in [Status.FAILED, Status.UNKNOWN]: + Log.debug('Checking inner jobs status') + if self.status in [ Status.HELD, Status.QUEUING ]: # If WRAPPER is QUEUED OR HELD + self._check_inner_jobs_queue(prev_status) # This will update the inner jobs to QUEUE or HELD (normal behaviour) or WAITING ( if they fails to be held) + elif self.status == Status.RUNNING: # If wrapper is running + if prev_status in [ Status.SUBMITTED ]: # This will update the status from submitted or hold to running (if safety timer is high enough or queue is fast enough) + for job in self.job_list: + job.status = Status.QUEUING + self._check_running_jobs() #Check and update inner_jobs status that are elegible + + elif self.status == Status.COMPLETED: # Completed wrapper will always come from check function. + self.check_inner_jobs_completed(self.job_list) + + if self.status in [ Status.FAILED, Status.UNKNOWN ]: # Fail can come from check function or running/completed checkers. self.status = Status.FAILED self.cancel_failed_wrapper_job() self.update_failed_jobs() - elif self.status == Status.COMPLETED: - self.check_inner_jobs_completed(self.job_list) - elif self.status == Status.RUNNING: - time.sleep(3) - self.update_inner_jobs_queue() - Log.debug('Checking inner jobs status') - self.check_inner_job_status() - - - def check_inner_job_status(self): - self._check_running_jobs() - self.check_inner_jobs_completed(self.running_jobs_start.keys()) - self._check_wrapper_status() - + def check_inner_jobs_completed(self, jobs): not_completed_jobs = [job for job in jobs if job.status != Status.COMPLETED] not_completed_job_names = [job.name for job in not_completed_jobs] job_names = ' '.join(not_completed_job_names) - if job_names: completed_files = self.platform.check_completed_files(job_names) - completed_jobs = [] for job in not_completed_jobs: if completed_files and len(completed_files) > 0: if job.name in completed_files: completed_jobs.append(job) job.new_status=Status.COMPLETED - job.update_status(self.as_config.get_copy_remote_logs() == 'true',True) - if job.status != Status.COMPLETED and job in self.running_jobs_start: - self._check_inner_job_wallclock(job) + job.update_status(self.as_config.get_copy_remote_logs() == 'true') for job in completed_jobs: self.running_jobs_start.pop(job, None) + not_completed_jobs = list(set(not_completed_jobs) - set(completed_jobs)) - if self.status == Status.COMPLETED: - not_completed_jobs = list(set(not_completed_jobs) - set(completed_jobs)) - for job in not_completed_jobs: - self._check_finished_job(job) + for job in not_completed_jobs: + self._check_finished_job(job) + + def _check_inner_jobs_queue(self,prev_status): + reason = str() + if self.platform.type == 'slurm': + self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) + reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) + if self._queuing_reason_cancel(reason): + Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, + reason) + self.cancel_failed_wrapper_job() + self.update_failed_jobs() + return + if reason == '(JobHeldUser)': + if self.hold is False: + self.platform.send_command("scontrol release " + "{0}".format(self.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + self.status = Status.QUEUING + for job in self.job_list: + job.hold = self.hold + job.status = self.status + Log.info("Job {0} is QUEUING {1}", self.name, reason) + else: + self.status = Status.HELD + Log.info("Job {0} is HELD", self.name) + elif reason == '(JobHeldAdmin)': + Log.debug("Job {0} Failed to be HELD, canceling... ", self.name) + self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) + self.status = Status.WAITING + else: + Log.info("Job {0} is QUEUING {1}", self.name, reason) + if prev_status != self.status: + for job in self.job_list: + job.hold = self.hold + job.status = self.status + if self.status == Status.WAITING: + for job in self.job_list: + job.packed = False def _check_inner_job_wallclock(self, job): start_time = self.running_jobs_start[job] if self._is_over_wallclock(start_time, job.wallclock): - if self.as_config.get_wrapper_type() in ['vertical', 'horizontal']: - Log.error("Job {0} inside wrapper {1} is running for longer than its wallclock! Cancelling...".format(job.name, self.name)) - self.cancel_failed_wrapper_job() - else: - Log.error("Job {0} inside wrapper {1} is running for longer than its wallclock! Setting to FAILED...".format(job.name, self.name)) - self._check_finished_job(job) + #if self.as_config.get_wrapper_type() in ['vertical', 'horizontal']: + Log.error("Job {0} inside wrapper {1} is running for longer than it's wallclock! Cancelling...".format(job.name, self.name)) + job.new_status = Status.FAILED + job.update_status(self.as_config.get_copy_remote_logs() == 'true') + return True + return False def _check_running_jobs(self): - not_finished_jobs = [job for job in self.job_list if job.status not in [Status.COMPLETED, Status.FAILED]] - if not_finished_jobs: - not_finished_jobs_dict = OrderedDict() - for job in not_finished_jobs: + not_finished_jobs_dict = OrderedDict() + not_finished_jobs = [ job for job in self.job_list if job.status not in [ Status.COMPLETED, Status.FAILED ] ] + for job in not_finished_jobs: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or self.status == Status.COMPLETED] + if job.parents is None or len(tmp) == len(job.parents): not_finished_jobs_dict[job.name] = job - + if len(not_finished_jobs_dict.keys()) > 0: # Only running jobs will enter there not_finished_jobs_names = ' '.join(not_finished_jobs_dict.keys()) - remote_log_dir = self.platform.get_remote_log_dir() - command = 'cd ' + remote_log_dir + '; for job in ' + not_finished_jobs_names + '; do echo ${job} $(head ${job}_STAT); done' - output = self.platform.send_command(command, ignore_log=True) - - if output: + # PREPARE SCRIPT TO SEND + command = textwrap.dedent(""" +cd {1} +for job in {0} +do + if [ -f "${{job}}_STAT" ] + then + echo ${{job}} $(head ${{job}}_STAT) + else + echo ${{job}} + fi +done +""").format(str(not_finished_jobs_names),str(remote_log_dir), '\n'.ljust(13)) + + log_dir = os.path.join(self._tmp_path, 'LOG_{0}'.format(self.expid)) + multiple_checker_inner_jobs = os.path.join(log_dir, "inner_jobs_checker.sh") + open(multiple_checker_inner_jobs, 'w+').write(command) + os.chmod(multiple_checker_inner_jobs, 0o770) + self._platform.send_file(multiple_checker_inner_jobs, False) + command = os.path.join(self._platform.get_files_path(), "inner_jobs_checker.sh") + # + wait = 2 + retries = 5 + over_wallclock = False + content = '' + while content == '' and retries > 0: + self._platform.send_command(command,False) content = self.platform._ssh_output for line in content.split('\n'): out = line.split() @@ -1123,74 +1157,47 @@ class WrapperJob(Job): Log.info("Job {0} started at {1}".format(jobname, str(parse_date(start_time)))) self.running_jobs_start[job] = start_time job.new_status=Status.RUNNING - job.update_status(self.as_config.get_copy_remote_logs() == 'true',True) - elif len(out) == 2: + job.update_status(self.as_config.get_copy_remote_logs() == 'true') + + if len(out) == 2: Log.info("Job {0} is RUNNING".format(jobname)) - else: + over_wallclock = self._check_inner_job_wallclock(job) + if over_wallclock: + Log.error("Job {0} is FAILED".format(jobname)) + elif len(out) == 3: end_time = self._check_time(out, 2) - Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) self._check_finished_job(job) - else: - Log.debug("Job {0} is {1} and waiting for dependencies".format(jobname,Status.VALUE_TO_KEY[job.status])) + Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) + sleep(wait) + retries= retries -1 + + if retries == 0 or over_wallclock: + self.status = Status.FAILED def _check_finished_job(self, job): - if self.platform.check_completed_files(job.name): + wait = 2 + retries = 5 + output = '' + while output == '' and retries > 0: + output = self.platform.check_completed_files(job.name) + sleep(wait) + retries=retries-1 + if output != '': job.new_status=Status.COMPLETED - job.update_status(self.as_config.get_copy_remote_logs() == 'true',True) + job.update_status(self.as_config.get_copy_remote_logs() == 'true') else: Log.info("No completed filed found, setting {0} to FAILED...".format(job.name)) job.new_status=Status.FAILED - job.update_status(self.as_config.get_copy_remote_logs() == 'true',True) + job.update_status(self.as_config.get_copy_remote_logs() == 'true') self.running_jobs_start.pop(job, None) def update_failed_jobs(self): not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] for job in not_finished_jobs: self._check_finished_job(job) - def update_inner_jobs_queue(self): - jobs= [filter_job for filter_job in self.job_list if filter_job.status in [Status.SUBMITTED,Status.HELD ,Status.QUEUING]] #UPDATE STATUS OF JOBS - if self.status == Status.QUEUING or self.status == Status.HELD: - reason = str() - if self.platform.type == 'slurm': - self.platform.send_command(self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason(self.platform._ssh_output,self.id) - if self._queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", self.name, - reason) - self.cancel_failed_wrapper_job() - return - if reason == '(JobHeldUser)': - if self.hold is False: - self.platform.send_command("scontrol release " + "{0}".format(self.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - self.status = Status.QUEUING - Log.info("Job {0} is QUEUING {1}", self.name, reason) - else: - self.status = Status.HELD - Log.info("Job {0} is HELD", self.name) - elif reason == '(JobHeldAdmin)': - Log.debug("Job {0} Failed to be HELD, canceling... ", self.name) - self.hold = True - self.status = Status.WAITING - self.platform.send_command(self.platform.cancel_cmd + " {0}".format(self.id)) - else: - Log.info("Job {0} is QUEUING {1}", self.name, reason) - for job in jobs: - job.status = self.status - def _check_wrapper_status(self): - not_finished_jobs = [job for job in self.job_list if job.status not in [Status.FAILED, Status.COMPLETED]] - if not self.running_jobs_start and not_finished_jobs: - self.status = self.platform.check_job(self) - if self.status == Status.RUNNING: - self._check_running_jobs() - if not self.running_jobs_start: - Log.error("It seems there are no inner jobs running in the wrapper. Cancelling...") - self.cancel_failed_wrapper_job() - elif self.status == Status.COMPLETED: - Log.info("Wrapper job {0} COMPLETED. Setting all jobs to COMPLETED...".format(self.name)) - self._update_completed_jobs() def cancel_failed_wrapper_job(self): - Log.info("Cancelling job with id {0}".format(self.id)) + Log.error("Cancelling job with id {0}".format(self.id)) self.platform.send_command(self.platform.cancel_cmd + " " + str(self.id)) def _update_completed_jobs(self): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index d02b023049538636deff4d7eb5e8c852a64ab09b..32869e53964d75b0890c2ef49b9dd94ebf7aa6df 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -606,7 +606,7 @@ class JobList: """ return self._ordered_jobs_by_date_member - def get_completed(self, platform=None): + def get_completed(self, platform=None,wrapper=False): """ Returns a list of completed jobs @@ -615,9 +615,16 @@ class JobList: :return: completed jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + + completed_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.COMPLETED] - def get_uncompleted(self, platform=None): + if wrapper: + return [job for job in completed_jobs if job.packed is False] + + else: + return completed_jobs + + def get_uncompleted(self, platform=None, wrapper=False): """ Returns a list of completed jobs @@ -626,9 +633,14 @@ class JobList: :return: completed jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + uncompleted_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and job.status != Status.COMPLETED] - def get_submitted(self, platform=None, hold =False): + + if wrapper: + return [job for job in uncompleted_jobs if job.packed is False] + else: + return uncompleted_jobs + def get_submitted(self, platform=None, hold =False , wrapper=False): """ Returns a list of submitted jobs @@ -637,13 +649,19 @@ class JobList: :return: submitted jobs :rtype: list """ + submitted = list() if hold: - return [job for job in self._job_list if (platform is None or job.platform is platform) and + submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.SUBMITTED and job.hold == hold ] else: - return [job for job in self._job_list if (platform is None or job.platform is platform) and + submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.SUBMITTED ] - def get_running(self, platform=None): + if wrapper: + return [job for job in submitted if job.packed is False] + else: + return submitted + + def get_running(self, platform=None,wrapper=False): """ Returns a list of jobs running @@ -652,10 +670,13 @@ class JobList: :return: running jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + running= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.RUNNING] - - def get_queuing(self, platform=None): + if wrapper: + return [job for job in running if job.packed is False] + else: + return running + def get_queuing(self, platform=None,wrapper=False): """ Returns a list of jobs queuing @@ -664,10 +685,13 @@ class JobList: :return: queuedjobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + queuing= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.QUEUING] - - def get_failed(self, platform=None): + if wrapper: + return [job for job in queuing if job.packed is False] + else: + return queuing + def get_failed(self, platform=None,wrapper=False): """ Returns a list of failed jobs @@ -676,10 +700,13 @@ class JobList: :return: failed jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + failed= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.FAILED] - - def get_unsubmitted(self, platform=None): + if wrapper: + return [job for job in failed if job.packed is False] + else: + return failed + def get_unsubmitted(self, platform=None,wrapper=False): """ Returns a list of unsummited jobs @@ -688,9 +715,15 @@ class JobList: :return: all jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + unsubmitted= [job for job in self._job_list if (platform is None or job.platform == platform) and ( job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED ) ] - def get_all(self, platform=None): + + if wrapper: + return [job for job in unsubmitted if job.packed is False] + else: + return unsubmitted + + def get_all(self, platform=None,wrapper=False): """ Returns a list of all jobs @@ -699,8 +732,14 @@ class JobList: :return: all jobs :rtype: list """ - return [job for job in self._job_list] - def get_ready(self, platform=None, hold=False): + all = [job for job in self._job_list] + + if wrapper: + return [job for job in all if job.packed is False] + else: + return all + + def get_ready(self, platform=None, hold=False , wrapper=False ): """ Returns a list of ready jobs @@ -709,10 +748,15 @@ class JobList: :return: ready jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + ready = [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.READY and job.hold is hold] - def get_waiting(self, platform=None): + if wrapper: + return [job for job in ready if job.packed is False] + else: + return ready + + def get_waiting(self, platform=None,wrapper=False): """ Returns a list of jobs waiting @@ -721,9 +765,12 @@ class JobList: :return: waiting jobs :rtype: list """ - waiting_jobs= [job for job in self._job_list if (platform is None or job.platform is platform) and + waiting_jobs= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.WAITING] - return waiting_jobs + if wrapper: + return [job for job in waiting_jobs if job.packed is False] + else: + return waiting_jobs def get_waiting_remote_dependencies(self, platform_type='slurm'.lower()): """ @@ -751,7 +798,7 @@ class JobList: job.status == Status.HELD] - def get_unknown(self, platform=None): + def get_unknown(self, platform=None,wrapper=False): """ Returns a list of jobs on unknown state @@ -760,10 +807,13 @@ class JobList: :return: unknown state jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + submitted= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.UNKNOWN] - - def get_suspended(self, platform=None): + if wrapper: + return [job for job in submitted if job.packed is False] + else: + return submitted + def get_suspended(self, platform=None,wrapper=False): """ Returns a list of jobs on unknown state @@ -772,10 +822,13 @@ class JobList: :return: unknown state jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform is platform) and + suspended= [job for job in self._job_list if (platform is None or job.platform == platform) and job.status == Status.SUSPENDED] - - def get_in_queue(self, platform=None): + if wrapper: + return [job for job in suspended if job.packed is False] + else: + return suspended + def get_in_queue(self, platform=None, wrapper=False): """ Returns a list of jobs in the platforms (Submitted, Running, Queuing, Unknown,Held) @@ -784,10 +837,14 @@ class JobList: :return: jobs in platforms :rtype: list """ - return self.get_submitted(platform) + self.get_running(platform) + self.get_queuing( - platform) + self.get_unknown(platform) + self.get_held_jobs(platform) - def get_not_in_queue(self, platform=None): + in_queue = self.get_submitted(platform) + self.get_running(platform) + self.get_queuing( + platform) + self.get_unknown(platform) + self.get_held_jobs(platform) + if wrapper: + return [job for job in in_queue if job.packed is False] + else: + return in_queue + def get_not_in_queue(self, platform=None,wrapper=False): """ Returns a list of jobs NOT in the platforms (Ready, Waiting) @@ -796,9 +853,12 @@ class JobList: :return: jobs not in platforms :rtype: list """ - return self.get_ready(platform) + self.get_waiting(platform) - - def get_finished(self, platform=None): + not_queued= self.get_ready(platform) + self.get_waiting(platform) + if wrapper: + return [job for job in not_queued if job.packed is False] + else: + return not_queued + def get_finished(self, platform=None,wrapper=False): """ Returns a list of jobs finished (Completed, Failed) @@ -808,9 +868,12 @@ class JobList: :return: finished jobs :rtype: list """ - return self.get_completed(platform) + self.get_failed(platform) - - def get_active(self, platform=None): + finished= self.get_completed(platform) + self.get_failed(platform) + if wrapper: + return [job for job in finished if job.packed is False] + else: + return finished + def get_active(self, platform=None, wrapper=False): """ Returns a list of active jobs (In platforms queue + Ready) @@ -819,7 +882,14 @@ class JobList: :return: active jobs :rtype: list """ - return self.get_in_queue(platform) + self.get_ready(platform) + active = self.get_in_queue(platform) + self.get_ready(platform=platform,hold=True) + self.get_ready(platform=platform,hold=False) + tmp = [job for job in active if job.hold and not job.status == Status.SUBMITTED and not job.status == Status.READY] + if len(tmp) == len(active): # IF only held jobs left without dependencies satisfied + if len(tmp) != 0 and len(active) != 0: + Log.warning("Only Held Jobs active,Exiting Autosubmit (TIP: This can happen if suspended or/and Failed jobs are found on the workflow) ") + active = [] + return active + def get_job_by_name(self, name): """ @@ -844,6 +914,7 @@ class JobList: jobs_by_id[job.id].append(job) return jobs_by_id + def get_in_ready_grouped_id(self, platform): jobs=[] [jobs.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] @@ -1009,10 +1080,9 @@ class JobList: all_parents_completed = [] for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): + if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False - save = True Log.debug("Setting job: {0} status to: READY (all parents completed)...".format(job.name)) if as_conf.get_remote_dependencies(): all_parents_completed.append(job.name) @@ -1020,26 +1090,45 @@ class JobList: Log.debug('Updating WAITING jobs eligible for remote_dependencies') for job in self.get_waiting_remote_dependencies('slurm'.lower()): if job.name not in all_parents_completed: - tmp = [parent for parent in job.parents if (parent.status == Status.COMPLETED or (parent.status == Status.QUEUING and not parent.hold and parent.name.lower() not in "setup") or parent.status == Status.RUNNING)] + tmp = [parent for parent in job.parents if ( (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower() )] if len(tmp) == len(job.parents): job.status = Status.READY job.hold = True - save = True Log.debug("Setting job: {0} status to: READY for be held (all parents queuing, running or completed)...".format(job.name)) + Log.debug('Updating Held jobs') - for job in self.get_held_jobs(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.hold = False - Log.debug( - "Setting job: {0} status to: Queuing (all parents completed)...".format( - job.name)) - else: - job.hold = True - save= True - if as_conf.get_wrapper_type() is not None: - for wrapper_id in self.job_package_map: - self.job_package_map[wrapper_id].update_inner_jobs_queue() + if self.job_package_map: + held_jobs = [job for job in self.get_held_jobs() if ( job.id not in self.job_package_map.keys() ) ] + held_jobs += [wrapper_job for wrapper_job in self.job_package_map.values() if wrapper_job.status == Status.HELD ] + else: + held_jobs = self.get_held_jobs() + + for job in held_jobs: + if self.job_package_map and job.id in self.job_package_map.keys(): # Wrappers and inner jobs + hold_wrapper = False + for inner_job in job.job_list: + valid_parents = [ parent for parent in inner_job.parents if parent not in job.job_list] + tmp = [parent for parent in valid_parents if parent.status == Status.COMPLETED ] + if len(tmp) < len(valid_parents): + hold_wrapper = True + job.hold = hold_wrapper + if not job.hold: + for inner_job in job.job_list: + inner_job.hold = False + Log.debug( + "Setting job: {0} status to: Queuing (all parents completed)...".format( + job.name)) + else: # Non-wrapped jobs + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.hold = False + Log.debug( + "Setting job: {0} status to: Queuing (all parents completed)...".format( + job.name)) + else: + job.hold = True + + save = True Log.debug('Update finished') return save diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 2c3c4c7209a0ba930f547abe84c343942b41cd45..ca828517d490bc71e0865b07f828038975e6ca65 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -42,13 +42,26 @@ class JobPackager(object): self._as_config = as_config self._platform = platform self._jobs_list = jobs_list + self.hold = hold # Submitted + Queuing Jobs for specific Platform - waiting_jobs = len(jobs_list.get_submitted(platform) + jobs_list.get_queuing(platform)) + queuing_jobs = jobs_list.get_queuing(platform) + queued_by_id = dict() + for queued_job in queuing_jobs: + queued_by_id[queued_job.id] = queued_job + queuing_jobs_len = len(queued_by_id.keys()) + + submitted_jobs = jobs_list.get_submitted(platform) + submitted_by_id = dict() + for submitted_job in submitted_jobs: + submitted_by_id[submitted_job.id] = submitted_job + submitted_jobs_len = len(submitted_by_id.keys()) + + waiting_jobs = submitted_jobs_len + queuing_jobs_len # Calculate available space in Platform Queue self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs # .total_jobs is defined in each section of platforms_.conf, if not from there, it comes form autosubmit_.conf # .total_jobs Maximum number of jobs at the same time - self._max_jobs_to_submit = platform.total_jobs - len(jobs_list.get_in_queue(platform)) + self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() @@ -56,13 +69,14 @@ class JobPackager(object): # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() - Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform,hold=hold))) + Log.debug("Number of jobs ready: {0}", len(jobs_list.get_ready(platform,hold=self.hold))) Log.debug("Number of jobs available: {0}", self._max_wait_jobs_to_submit) - if len(jobs_list.get_ready(platform,hold=hold)) > 0: - Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform,hold=hold))) + if len(jobs_list.get_ready(platform,hold=self.hold)) > 0: + Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform,hold=self.hold))) self._maxTotalProcessors = 0 - def build_packages(self,only_generate=False, jobs_filtered=[],hold=False): + + def build_packages(self,only_generate=False, jobs_filtered=[]): """ Returns the list of the built packages to be submitted @@ -74,22 +88,20 @@ class JobPackager(object): if only_generate: jobs_to_submit = jobs_filtered else: - jobs_ready = self._jobs_list.get_ready(self._platform,hold=hold) - if hold and len(jobs_ready) > 0 : - jobs_in_held_status = self._jobs_list.get_held_jobs() + self._jobs_list.get_submitted(hold) - current_held_jobs = 0 - wrapper_jobs_visited = [] + jobs_ready = self._jobs_list.get_ready(self._platform,self.hold) + if self.hold and len(jobs_ready) > 0 : + jobs_in_held_status = self._jobs_list.get_held_jobs() + self._jobs_list.get_submitted(self._platform,hold=self.hold) + held_by_id = dict() for held_job in jobs_in_held_status: - if self._jobs_list.job_package_map and held_job.id in self._jobs_list.job_package_map: - if held_job.id not in wrapper_jobs_visited: - current_held_jobs += 1 - wrapper_jobs_visited.append(held_job.id) - else: - current_held_jobs += 1 + held_by_id[held_job.id] = held_job + current_held_jobs = len(held_by_id.keys()) + remaining_held_slots = 10 - current_held_jobs try: while len(jobs_ready) > remaining_held_slots: + if jobs_ready[-1].packed: + jobs_ready[-1].packed = False del jobs_ready[-1] except IndexError: pass @@ -176,6 +188,7 @@ class JobPackager(object): built_packages=built_packages_tmp else: built_packages=built_packages_tmp + self.max_jobs = self.max_jobs -1 packages_to_submit += built_packages else: @@ -185,7 +198,10 @@ class JobPackager(object): package = JobPackageSimpleWrapped([job]) else: package = JobPackageSimple([job]) + self.max_jobs = self.max_jobs - 1 packages_to_submit.append(package) + for package in packages_to_submit: + package.hold = self.hold return packages_to_submit def _divide_list_by_section(self, jobs_list): @@ -225,7 +241,7 @@ class JobPackager(object): if machinefile_function == 'COMPONENTS': jobs_resources = horizontal_packager.components_dict jobs_resources['MACHINEFILES'] = machinefile_function - current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources,method=self.wrapper_method) + current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources,method=self.wrapper_method,configuration=self._as_config) packages.append(current_package) @@ -261,9 +277,9 @@ class JobPackager(object): jobs_list = job_vertical_packager.build_vertical_package(job) # update max_jobs, potential_dependency is None - self.max_jobs -= len(jobs_list) + #self.max_jobs -= len(jobs_list) if job.status is Status.READY: - packages.append(JobPackageVertical(jobs_list)) + packages.append(JobPackageVertical(jobs_list,configuration=self._as_config)) else: package = JobPackageVertical(jobs_list, None) packages.append(package) @@ -305,7 +321,7 @@ class JobPackager(object): total_wallclock = sum_str_hours(total_wallclock, wallclock) return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, - jobs_resources=jobs_resources) + jobs_resources=jobs_resources,configuration=self._as_config) def _build_vertical_horizontal_package(self, horizontal_packager, max_wrapped_jobs, jobs_resources): total_wallclock = '00:00' @@ -325,7 +341,7 @@ class JobPackager(object): total_wallclock = sum_str_hours(total_wallclock, job.wallclock) return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, - jobs_resources=jobs_resources,method=self.wrapper_method) + jobs_resources=jobs_resources,method=self.wrapper_method,configuration=self._as_config) class JobPackagerVertical(object): @@ -542,7 +558,7 @@ class JobPackagerHorizontal(object): self._current_processors = 0 for job in self.job_list: if self.max_jobs > 0 and len(current_package) < self.max_wrapped_jobs: - self.max_jobs -= 1 + #self.max_jobs -= 1 if int(job.tasks) != 0 and int(job.tasks) != int(self.processors_node) and \ int(job.tasks) < job.total_processors: nodes = int(ceil(job.total_processors / float(job.tasks))) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 916c43f302e82b3ae6d30811c4e8aa6bfd590945..a489d86304e15b16e0bbc65ced9a1fee6c8cb5f9 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -42,6 +42,7 @@ class JobPackageBase(object): def __init__(self, jobs): self._jobs = jobs self._expid = jobs[0].expid + self.hold = False try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0].platform @@ -129,7 +130,6 @@ class JobPackageSimple(JobPackageBase): def __init__(self, jobs): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} - def _create_scripts(self, configuration): for job in self.jobs: self._job_scripts[job.name] = job.create_script(configuration) @@ -252,7 +252,7 @@ class JobPackageThread(JobPackageBase): """ FILE_PREFIX = 'ASThread' - def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None): super(JobPackageThread, self).__init__(jobs) self._job_scripts = {} # Seems like this one is not used at all in the class @@ -262,7 +262,13 @@ class JobPackageThread(JobPackageBase): self._num_processors = '0' self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper - self.queue = jobs[0]._queue + if configuration is not None: + if configuration.get_wrapper_queue() != 'None': + self.queue = configuration.get_wrapper_queue() + else: + self.queue = jobs[0]._queue + else: + self.queue = jobs[0]._queue self.method = method #pipeline @property @@ -364,8 +370,8 @@ class JobPackageThreadWrapped(JobPackageThread): """ FILE_PREFIX = 'ASThread' - def __init__(self, jobs, dependency=None): - super(JobPackageThreadWrapped, self).__init__(jobs) + def __init__(self, jobs, dependency=None,configuration=None): + super(JobPackageThreadWrapped, self).__init__(jobs,configuration) self._job_scripts = {} self._job_dependency = dependency self._common_script = None @@ -447,8 +453,8 @@ class JobPackageVertical(JobPackageThread): :param: dependency: """ - def __init__(self, jobs, dependency=None): - super(JobPackageVertical, self).__init__(jobs, dependency) + def __init__(self, jobs, dependency=None,configuration=None): + super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration) #TODO unit or regression test of the wrappers, it will fail as in issue 280 for job in jobs: @@ -476,9 +482,10 @@ class JobPackageHorizontal(JobPackageThread): Class to manage a horizontal thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): - super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources) + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None): + super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources,configuration=configuration) self.method = method + self._queue = self.queue for job in jobs: if job.wallclock > self._wallclock: @@ -504,9 +511,9 @@ class JobPackageHybrid(JobPackageThread): Class to manage a hybrid (horizontal and vertical) thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict(),method="ASThread"): + def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict(),method="ASThread",configuration=None): all_jobs = [item for sublist in jobs for item in sublist] #flatten list - super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources,method) + super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources,method,configuration=configuration) self.jobs_lists = jobs self.method=method self._num_processors = int(num_processors) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index ee8237174887946084714ee0cb777b95380b82fb..d2e78c68893a4fe274a2a7162e7a13400c72e4f2 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -121,32 +121,31 @@ class ParamikoPlatform(Platform): command = "find %s " % self.remote_log_dir if sections: for i, section in enumerate(sections.split()): - command += " -name \*%s_COMPLETED" % section + command += " -name *%s_COMPLETED" % section if i < len(sections.split())-1: command += " -o " else: - command += " -name \*_COMPLETED" + command += " -name *_COMPLETED" - if self.send_command(command): + if self.send_command(command,True): return self._ssh_output else: return None def remove_multiple_files(self, filenames): - #command = "rm " + filenames - command = "rm" - - if "cmd" in filenames: - command += " "+self.remote_log_dir+"/"+"*.cmd" - if "COMPLETED" in filenames or "STAT" in filenames: - #command += " "+self.remote_log_dir+"/"+"*COMPLETED" - command+= " "+filenames - #if "STAT" in filenames: - # command += " "+self.remote_log_dir+"/"+"*STAT" + #command = "rm" + + log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) + multiple_delete_previous_run = os.path.join(log_dir,"multiple_delete_previous_run.sh") + open(multiple_delete_previous_run, 'w+').write("rm -f "+filenames) + os.chmod(multiple_delete_previous_run, 0o770) + self.send_file(multiple_delete_previous_run, False) + command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh") + if self.send_command(command, ignore_log=True): return self._ssh_output else: - return None + return "" def send_file(self, filename, check=True): """ @@ -262,8 +261,9 @@ class ParamikoPlatform(Platform): try: #self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(),dest)) + return True except (IOError): - pass + return False #ftp.close() return True @@ -320,18 +320,17 @@ class ParamikoPlatform(Platform): """ job_id = job.id job_status = Status.UNKNOWN - if type(job_id) is not int and type(job_id) is not str: - # URi: logger Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id) - # URi: value ? - job.new_status= job_status - - while not ( self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0 ) or (self.get_ssh_output() == "" and retries >= 0): + job.new_status = job_status + sleep_time=5 + while not (self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0) or (self.get_ssh_output() == "" and retries >= 0): retries -= 1 - Log.warning('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) - Log.error('Can not get job status for job id ({0}), retrying in 10 sec', job_id) - sleep(10) + Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) + Log.debug('retries left {0}', retries) + Log.debug('Will be retrying in {0} seconds', sleep_time) + sleep(sleep_time) + sleep_time = sleep_time+5 if retries >= 0: Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) job_status = self.parse_job_output(self.get_ssh_output()).strip("\n") @@ -349,10 +348,15 @@ class ParamikoPlatform(Platform): else: job_status = Status.UNKNOWN else: + Log.error(" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) job_status = Status.UNKNOWN Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) job.new_status = job_status - + def _check_jobid_in_queue(self,ssh_output,job_list_cmd): + for job in job_list_cmd[:-1].split(','): + if job not in ssh_output: + return False + return True def check_Alljobs(self, job_list,job_list_cmd,remote_logs, retries=5): """ Checks jobs running status @@ -365,19 +369,24 @@ class ParamikoPlatform(Platform): :return: current job status :rtype: autosubmit.job.job_common.Status """ + cmd = self.get_checkAlljobs_cmd(job_list_cmd) - while not self.send_command(cmd) and retries >= 0: + sleep_time=5 + while not (self.send_command(cmd) and retries >= 0) or ( not self._check_jobid_in_queue(self.get_ssh_output(),job_list_cmd) and retries >= 0): retries -= 1 - Log.warning('Retrying check job command: {0}', cmd) - Log.warning('Can not get job status for all jobs, retrying in 3 sec') - sleep(3) + Log.debug('Retrying check job command: {0}', cmd) + Log.debug('retries left {0}', retries) + Log.debug('Will be retrying in {0} seconds', sleep_time) + + sleep(sleep_time) + sleep_time=sleep_time+5 job_list_status = self.get_ssh_output() Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: - in_queue_jobs=[] - list_queue_jobid="" + in_queue_jobs = [] + list_queue_jobid = "" for job in job_list: - job_id=job.id + job_id = job.id job_status = self.parse_Alljobs_output(job_list_status,job_id) # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED']: @@ -385,14 +394,8 @@ class ParamikoPlatform(Platform): elif job_status in self.job_status['RUNNING']: job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: - if job.status == Status.QUEUING: - job_status = Status.QUEUING - elif job.status == Status.HELD: - if not job.hold: - self.send_command("scontrol release "+"{0}".format(job_id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - job_status = Status.QUEUING - else: - job_status = Status.HELD + if job.hold: + job_status = Status.HELD # release? else: job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' @@ -401,6 +404,8 @@ class ParamikoPlatform(Platform): job_status = Status.FAILED elif retries == 0: job_status = Status.COMPLETED + job.update_status(remote_logs) + else: job_status = Status.UNKNOWN Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) @@ -420,19 +425,23 @@ class ParamikoPlatform(Platform): return elif reason == '(JobHeldUser)': job.new_status=Status.HELD - Log.info("Job {0} is HELD", job.name) + if not job.hold: + self.send_command("scontrol release "+"{0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + Log.info("Job {0} is being released (id:{1}) ", job.name,job.id) + else: + Log.info("Job {0} is HELD", job.name) elif reason == '(JobHeldAdmin)': Log.info("Job {0} Failed to be HELD, canceling... ", job.name) job.new_status = Status.WAITING job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) - else: - Log.info("Job {0} is QUEUING {1}", job.name, reason) + else: for job in job_list: job_status = Status.UNKNOWN Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) job.new_status=job_status + def get_checkjob_cmd(self, job_id): """ Returns command to check job status on remote platforms @@ -466,12 +475,12 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False - if "-rP" or "mv" or "find" or "convertLink" in command: - timeout = 3600.0 # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) + if "-rP" in command or "find" in command or "convertLink" in command: + timeout = 60*60 # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) elif "rm" in command: - timeout = 20.0 + timeout = 60/2 else: - timeout = 150.0 + timeout = 60*2 try: stdin, stdout, stderr = self._ssh.exec_command(command) channel = stdout.channel @@ -621,7 +630,7 @@ class ParamikoPlatform(Platform): :return: command to check job status script :rtype: str """ - return 'nohup kill -0 {0} >& /dev/null; echo $?'.format(job_id) + return 'nohup kill -0 {0} > /dev/null 2>&1; echo $?'.format(job_id) def get_submitted_job_id(self, output): """ diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index aa97d3bd556610910ece82bea36b22e25925e11a..ade701c6069fb8697cba9141b2851753b855568c 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -42,7 +42,7 @@ class SlurmPlatform(ParamikoPlatform): self.job_status['COMPLETED'] = ['COMPLETED'] self.job_status['RUNNING'] = ['RUNNING'] self.job_status['QUEUING'] = ['PENDING', 'CONFIGURING', 'RESIZING'] - self.job_status['FAILED'] = ['FAILED', 'CANCELLED', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME'] + self.job_status['FAILED'] = ['FAILED', 'CANCELLED','CANCELLED+', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME'] self._pathdir = "\$HOME/LOG_" + self.expid self._allow_arrays = False self._allow_wrappers = True @@ -70,7 +70,7 @@ class SlurmPlatform(ParamikoPlatform): :param job: job object :type job: autosubmit.job.job.Job :return: job id for submitted jobs - :rtype: list(int) + :rtype: list(str) """ self.send_file(self.get_submit_script(),False) cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) @@ -137,7 +137,7 @@ class SlurmPlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): - return 'sacct -n -j {1} -o "State"'.format(self.host, job_id) + return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) @@ -150,11 +150,6 @@ class SlurmPlatform(ParamikoPlatform): if len(reason) > 0: return reason[0] return reason - # output = output.split('\n') - # if len(output) > 1: - # return output[1] - # else: - # return output @staticmethod diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 5b49013b631774d398f9c067ce3478d8427b74a6..05816f45c51196085982aa3adf1d6f1094c4129f 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -688,44 +688,41 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): aux_scripts=("${{{0}[@]}}") while [ "${{#aux_scripts[@]}}" -gt 0 ]; do i_list=0 - prev_completed_path="" for script_list in "${{{0}[@]}}"; do - declare -i i=${{scripts_index[$i_list]}} - if [ $i -ne -1 ]; then - declare -n scripts=$script_list - template=${{scripts[$i]}} - jobname=${{template%"$suffix"}} - out="${{template}}.${{i}}.out" - err="${{template}}.${{i}}.err" - if [ $i -eq 0 ]; then - completed_filename=${{template%"$suffix"}} - prev_template=$template - else - prev_template_index=$((i-1)) - prev_template=${{scripts[$prev_template_index]}} - completed_filename=${{prev_template%"$suffix"}} - fi - completed_filename="$completed_filename"_COMPLETED - completed_path=${{PWD}}/$completed_filename - - if [ -f "$completed_path" ]; - then - echo "`date '+%d/%m/%Y_%H:%M:%S'` $prev_template has been COMPLETED" - if [ $i -ge "${{#scripts[@]}}" ]; then - unset aux_scripts[$i_list] - i="-1" + declare -i job_index=${{scripts_index[$i_list]}} + declare -n scripts=$script_list + if [ $job_index -ne -1 ]; then + if [ $job_index -lt "${{#scripts[@]}}" ]; then + template=${{scripts[$job_index]}} + jobname=${{template%"$suffix"}} + out="${{template}}.${{job_index}}.out" + err="${{template}}.${{job_index}}.err" + if [ $i_list -eq 0 ]; then + prev_template=$template + else + prev_template=${{prev_horizontal_scripts[$job_index]}} fi - fi - if [ $i -lt "${{#scripts[@]}}" ]; then - if [ $i -eq 0 ] || [ -f "$completed_path" ] ; then + completed_filename=${{prev_template%"$suffix"}} + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + #if [ -f "$completed_path" ]; + #then + # echo "`date '+%d/%m/%Y_%H:%M:%S'` $prev_template has been COMPLETED" + #fi + if [ $i_list -eq 0 ] || [ -f "$completed_path" ]; then #If first horizontal wrapper or last wrapper is completed srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & - ((i=i+1)) + ((job_index=job_index+1)) + if [ $job_index -ge "${{#scripts[@]}}" ]; then + unset aux_scripts[$i_list] + job_index=-1 + fi fi + sleep "0.2" fi - sleep "0.2" - scripts_index[$i_list]=$i - ((i_list=i_list+1)) - fi + fi + declare -n prev_horizontal_scripts=$script_list + scripts_index[$i_list]=$job_index + ((i_list=i_list+1)) # check next list ( needed for save list index ) done done wait