diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 3acd40d47651213831ea223a42e15f364608b252..94b0da49130c27168b95ec5d6c35c337d0a3e52b 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -625,7 +625,7 @@ class Autosubmit: def _delete_expid(expid_delete, force): """ Removes an experiment from path and database - If current user is eadmin and -f has been sent, it deletes regardless + If current user is eadmin and -f has been sent, it deletes regardless of experiment owner :type expid_delete: str @@ -1120,7 +1120,7 @@ class Autosubmit: :type as_conf: AutosubmitConfig() Object \n :param job_list: Representation of the jobs of the experiment, keeps the list of jobs inside. \n :type job_list: JobList() Object \n - :param jobs_filtered: list of jobs that are relevant to the process. \n + :param jobs_filtered: list of jobs that are relevant to the process. \n :type jobs_filtered: List() of Job Objects \n :param packages_persistence: Object that handles local db persistence. \n :type packages_persistence: JobPackagePersistence() Object \n @@ -1130,8 +1130,6 @@ class Autosubmit: :rtype: \n """ job_list._job_list = jobs_filtered - job_list.update_list(as_conf, False) - # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object @@ -1503,9 +1501,15 @@ class Autosubmit: # Save job_list if not is a failed submitted job recovery = True try: - job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) + Autosubmit._load_parameters( + as_conf, job_list, submitter.platforms) + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name.lower()] + packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) packages = packages_persistence.load() @@ -1525,7 +1529,7 @@ class Autosubmit: save = job_list.update_list(as_conf) job_list.save() except BaseException as e: - raise AutosubmitCritical("Corrupted job_list, backup couldn't be restored", 7040, + raise AutosubmitCritical("Job_list couldn't be restored", 7040, e.message) # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. if main_loop_retrials > 0: @@ -1683,22 +1687,52 @@ class Autosubmit: raise except Exception as e: raise - if platform.type == "slurm" and not inspect and not only_wrappers: + failed_packages = list() try: save = True if len(valid_packages_to_submit) > 0: jobs_id = platform.submit_Script(hold=hold) + if jobs_id is None: raise BaseException( "Exiting AS, AS is unable to get jobID this can be due a failure on the platform or a bad parameter on job.conf(check that queue parameter is valid for your current platform(CNS,BSC32,PRACE...)") i = 0 + if hold: + sleep(10) for package in valid_packages_to_submit: + if hold: + retries = 5 + package.jobs[0].id = str(jobs_id[i]) + try: + can_continue = True + while can_continue and retries > 0: + cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i]) + package.jobs[0].platform.send_command(cmd) + queue_status = package.jobs[0].platform._ssh_output + reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i]) + if reason == '(JobHeldAdmin)': + can_continue = False + elif reason == '(JobHeldUser)': + can_continue = True + else: + can_continue = False + sleep(5) + retries = retries - 1 + if not can_continue: + package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i])) + i = i + 1 + continue # skip job if is bug by the admin bug. + if not platform.hold_job(package.jobs[0]): + i = i + 1 + continue + except Exception as e: + failed_packages.append(jobs_id) + continue for job in package.jobs: + job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - job.hold = hold - job.write_submit_time() if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs from job.job import WrapperJob @@ -1709,10 +1743,13 @@ class Autosubmit: job_list.job_package_map[package.jobs[0].id] = wrapper_job if isinstance(package, JobPackageThread): # Saving only when it is a real multi job package - packages_persistence.save( - package.name, package.jobs, package._expid, inspect) + packages_persistence.save(package.name, package.jobs, package._expid, inspect) i += 1 save = True + if len(failed_packages) > 0: + for job_id in failed_packages: + package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(job_id)) + raise AutosubmitError("{0} submission failed, some hold jobs failed to be held".format(platform.name), 6015) except WrongTemplateException as e: raise AutosubmitCritical("Invalid parameter substitution in {0} template".format( e.job_name), 7014, e.message) @@ -1721,8 +1758,7 @@ class Autosubmit: except AutosubmitCritical as e: raise except Exception as e: - raise AutosubmitError("{0} submission failed".format( - platform.name), 6015, e.message) + raise AutosubmitError("{0} submission failed".format(platform.name), 6015, e.message) return save @@ -3030,6 +3066,7 @@ class Autosubmit: exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + # checking if there is a lock file to avoid multiple running on the same expid try: # Encapsulating the lock @@ -3049,8 +3086,7 @@ class Autosubmit: return False update_job = not os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_list_" + expid + ".pkl")) - Autosubmit._create_project_associated_conf( - as_conf, False, update_job) + Autosubmit._create_project_associated_conf(as_conf, False, update_job) # Load parameters Log.info("Loading parameters...") @@ -3131,8 +3167,7 @@ class Autosubmit: for job in jobs_wr: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove - Autosubmit.generate_scripts_andor_wrappers( - as_conf, job_list_wrappers, jobs_wr, packages_persistence, True) + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, packages_persistence, True) packages = packages_persistence.load(True) else: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index fbde35011063f1122d4068dc672880b4809a83fc..b768766a4289ac51caa8cbea52bb3781ef763f12 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -511,14 +511,18 @@ class Job(object): def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.reload() + submitter = self._get_submitter(as_conf) submitter.load_platforms(as_conf) - platform = submitter.platforms[platform_name] + hpcarch = as_conf.get_platform() + platforms_to_test = set() + if self.platform_name is None: + self.platform_name = hpcarch + self.platform = submitter.platforms[self.platform_name.lower()] try: - platform.restore_connection() + self.platform.restore_connection() except Exception as e: - Log.printlog("{0} \n Failed to connect to the platform, can't recover the logs ".format(e.message), 6001) - + Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message,self.name), 6001) out_exist = False err_exist = False retries = 3 @@ -529,11 +533,11 @@ class Job(object): while (not out_exist and not err_exist) and i < retries: try: try: - out_exist = platform.check_file_exists(remote_logs[0]) # will do 5 retries + out_exist = self.platform.check_file_exists(remote_logs[0]) # will do 5 retries except IOError as e: out_exist = False try: - err_exist = platform.check_file_exists(remote_logs[1]) # will do 5 retries + err_exist = self.platform.check_file_exists(remote_logs[1]) # will do 5 retries except IOError as e: err_exists = False except Exception as e: @@ -546,27 +550,28 @@ class Job(object): sleep(sleeptime) if i >= retries: if not out_exist or not err_exist: - raise AutosubmitError("Retries = {0}, Failed to retrieve log files {1} and {2}".format(retries,remote_logs[0],remote_logs[1]),6001) + Log.printlog("Retries = {0}, Failed to retrieve log files {1} and {2}".format(retries,remote_logs[0],remote_logs[1]), 6001) + if copy_remote_logs: if local_logs != remote_logs: # unifying names for log files - self.synchronize_logs(platform, remote_logs, local_logs) + self.synchronize_logs(self.platform, remote_logs, local_logs) remote_logs = local_logs - platform.get_logs_files(self.expid, remote_logs) + self.platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand try: for local_log in local_logs: - platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except Exception as e: - Log.printlog("Failed to write the jobid".format(self.name), 6001) + self.platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1}".format(e.message,self.name), 6001) except AutosubmitError as e: - Log.printlog("Failed to retrieve log file for job {0}".format(self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Failed to retrieve log file for job {0}".format(self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) try: - platform.closeConnection() + self.platform.closeConnection() except: pass sleep(5) # safe wait before end a thread @@ -831,7 +836,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 60' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 16be7477e96f592b6e55bd53441b7f60fdab5362..55937055822346ef21821792cce8d4a8c955346b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -829,8 +829,7 @@ class JobList: :return: waiting jobs :rtype: list """ - waiting_jobs = [job for job in self._job_list if (job.platform.type == platform_type and - job.status == Status.WAITING)] + waiting_jobs = [job for job in self._job_list if (job.platform.type == platform_type and job.status == Status.WAITING)] return waiting_jobs def get_held_jobs(self, platform=None): @@ -1168,8 +1167,7 @@ class JobList: if not fromSetStatus: all_parents_completed = [] for job in self.get_waiting(): - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index c999091733df4e7b2490e4ae9dc4ae3bbc10aa74..e6b4891b960dfceb86a6592b5600b4adb9a3bc6f 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -345,7 +345,7 @@ class ParamikoPlatform(Platform): """ raise NotImplementedError - def check_job(self, job, default_status=Status.COMPLETED, retries=5): + def check_job(self, job, default_status=Status.COMPLETED, retries=5,submit_hold_check=False): """ Checks job running status @@ -390,7 +390,11 @@ class ParamikoPlatform(Platform): Log.error(" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) job_status = Status.UNKNOWN Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) - job.new_status = job_status + if submit_hold_check: + return job_status + else: + job.new_status = job_status + def _check_jobid_in_queue(self,ssh_output,job_list_cmd): for job in job_list_cmd[:-1].split(','): if job not in ssh_output: @@ -465,12 +469,11 @@ class ParamikoPlatform(Platform): elif reason == '(JobHeldUser)': job.new_status=Status.HELD if not job.hold: - self.send_command("scontrol release "+"{0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - Log.info("Job {0} is being released (id:{1}) ", job.name,job.id) + self.send_command("scontrol release {0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS else: - Log.info("Job {0} is HELD", job.name) - elif reason == '(JobHeldAdmin)': - Log.info("Job {0} Failed to be HELD, canceling... ", job.name) + pass + elif reason == '(JobHeldAdmin)': #This shouldn't happen anymore TODO delete + Log.debug("Job {0} Failed to be HELD, canceling... ", job.name) job.new_status = Status.WAITING job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index f0e9dff58c31499164509c6de5dae453d0486b2f..e42884a15b965c52dfb858d9431fee219a499596 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -151,7 +151,7 @@ class Platform(object): """ raise NotImplementedError - def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False): + def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False): """ Copies a file from the current platform to experiment's tmp folder @@ -192,12 +192,12 @@ class Platform(object): :rtype: bool """ raise NotImplementedError - + # Executed when calling from Job def get_logs_files(self, exp_id, remote_logs): """ Get the given LOGS files - + :param exp_id: experiment id :type exp_id: str :param remote_logs: names of the log files @@ -206,7 +206,7 @@ class Platform(object): (job_out_filename, job_err_filename) = remote_logs self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) - def get_completed_files(self, job_name, retries=0,recovery=False): + def get_completed_files(self, job_name, retries=0, recovery=False): """ Get the COMPLETED file of the given job @@ -219,7 +219,7 @@ class Platform(object): :rtype: bool """ if recovery: - if self.get_file('{0}_COMPLETED'.format(job_name), False,ignore_log=recovery): + if self.get_file('{0}_COMPLETED'.format(job_name), False, ignore_log=recovery): return True else: return False @@ -231,7 +231,6 @@ class Platform(object): else: return False - def remove_stat_file(self, job_name): """ Removes *STAT* files from remote @@ -261,6 +260,7 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def check_file_exists(self, src): return True @@ -325,8 +325,10 @@ class Platform(object): :rtype: autosubmit.job.job_common.Status """ raise NotImplementedError + def closeConnection(self): return + def write_jobid(self, jobid, complete_path): """ Writes Job id in an out file. @@ -340,7 +342,7 @@ class Platform(object): """ try: title_job = "[INFO] JOBID=" + str(jobid) - if os.path.exists(complete_path): + if os.path.exists(complete_path): file_type = complete_path[-3:] if file_type == "out" or file_type == "err": with open(complete_path, "r+") as f: @@ -348,16 +350,16 @@ class Platform(object): first_line = f.readline() # Not rewrite if not first_line.startswith("[INFO] JOBID="): - content = f.read() + content = f.read() # Write again (Potentially slow) - #start = time() - #Log.info("Attempting job identification of " + str(jobid)) - f.seek(0,0) - f.write(title_job + "\n\n" + first_line + content) - f.close() - #finish = time() - #Log.info("Job correctly identified in " + str(finish - start) + " seconds") + # start = time() + # Log.info("Attempting job identification of " + str(jobid)) + f.seek(0, 0) + f.write(title_job + "\n\n" + first_line + content) + f.close() + # finish = time() + # Log.info("Job correctly identified in " + str(finish - start) + " seconds") except Exception as ex: - Log.error("Writing Job Id Failed : " + str(ex)) - + Log.error("Writing Job Id Failed : " + str(ex)) + diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 26132d8283fea06aced5cb74fd0e8b2195d1c8cc..051820af1acc1d4fb0fc0844d67c2bd5a1cc8cb3 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -25,6 +25,7 @@ import traceback from xml.dom.minidom import parseString +from autosubmit.job.job_common import Status from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory @@ -117,11 +118,39 @@ class SlurmPlatform(ParamikoPlatform): self.host, self.remote_log_dir) self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) - + #jobid =$(sbatch WOA_run_mn4.sh 2 > & 1 | grep -o "[0-9]*"); scontrol hold $jobid; self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir + def hold_job(self,job): + try: + cmd = "scontrol release {0} ; scontrol hold {0} ".format(job.id) + self.send_command(cmd) + job_status = self.check_job(job, submit_hold_check=True) + if job_status == Status.RUNNING: + self.send_command("scancel {0}".format(job.id)) + return False + cmd=self.get_queue_status_cmd(job.id) + self.send_command(cmd) + + queue_status = self._ssh_output + reason = str() + reason = self.parse_queue_reason(queue_status, job.id) + if reason == '(JobHeldUser)': + return True + else: + self.send_command("scancel {0}".format(job.id)) + return False + except BaseException as e: + try: + self.send_command("scancel {0}".format(job.id)) + raise AutosubmitError("Can't hold jobid:{0}, canceling job".format(job.id), 6000, e.message) + except BaseException as e: + raise AutosubmitError("Can't cancel the jobid: {0}".format(job.id),6000,e.message) + except AutosubmitError as e: + raise + def get_checkhost_cmd(self): return self._checkhost_cmd