diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index abf18b5d6949cdc30b8382cd1c66bc07b82ffadf..339f8c11f78cf86304378897eaa2fa7fc5d2b35d 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -28,7 +28,9 @@ from xml.dom.minidom import parseString from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory -from log.log import AutosubmitCritical,AutosubmitError,Log +from log.log import AutosubmitCritical, AutosubmitError, Log + + class SlurmPlatform(ParamikoPlatform): """ Class to manage jobs to host using SLURM scheduler @@ -45,7 +47,8 @@ class SlurmPlatform(ParamikoPlatform): self.job_status['COMPLETED'] = ['COMPLETED'] self.job_status['RUNNING'] = ['RUNNING'] self.job_status['QUEUING'] = ['PENDING', 'CONFIGURING', 'RESIZING'] - self.job_status['FAILED'] = ['FAILED', 'CANCELLED','CANCELLED+', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME'] + self.job_status['FAILED'] = ['FAILED', 'CANCELLED', 'CANCELLED+', 'NODE_FAIL', + 'PREEMPTED', 'SUSPENDED', 'TIMEOUT', 'OUT_OF_MEMORY', 'OUT_OF_ME+', 'OUT_OF_ME'] self._pathdir = "\$HOME/LOG_" + self.expid self._allow_arrays = False self._allow_wrappers = True @@ -53,7 +56,8 @@ class SlurmPlatform(ParamikoPlatform): self.config = config exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") - self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") + self._submit_script_path = os.path.join( + tmp_path, config.LOCAL_ASLOG_DIR, "submit_" + self.name + ".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -63,10 +67,9 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) + return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path)) - - def submit_Script(self,hold=False): + def submit_Script(self, hold=False): """ Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. @@ -76,8 +79,9 @@ class SlurmPlatform(ParamikoPlatform): :rtype: list(str) """ try: - self.send_file(self.get_submit_script(),False) - cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) + self.send_file(self.get_submit_script(), False) + cmd = os.path.join(self.get_files_path(), + os.path.basename(self._submit_script_path)) try: self.send_command(cmd) except AutosubmitError as e: @@ -90,31 +94,34 @@ class SlurmPlatform(ParamikoPlatform): jobs_id = self.get_submitted_job_id(self.get_ssh_output()) return jobs_id except IOError as e: - raise AutosubmitError("Submit script is not found, retry again in next AS iteration", 6008, e.message) + raise AutosubmitError( + "Submit script is not found, retry again in next AS iteration", 6008, e.message) except AutosubmitError as e: raise except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitError("Job couldn't be submitted, retry again in next AS iteration", 6008, e.message) - + raise AutosubmitError( + "Job couldn't be submitted, retry again in next AS iteration", 6008, e.message) def update_cmds(self): """ Updates commands for platforms """ - self.root_dir = os.path.join(self.scratch, self.project, self.user, self.expid) + self.root_dir = os.path.join( + self.scratch, self.project, self.user, self.expid) self.remote_log_dir = os.path.join(self.root_dir, "LOG_" + self.expid) self.cancel_cmd = "scancel" self._checkhost_cmd = "echo 1" - self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir) - self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir) + self._submit_cmd = 'sbatch -D {1} {1}/'.format( + self.host, self.remote_log_dir) + self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( + self.host, self.remote_log_dir) self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir - def get_checkhost_cmd(self): return self._checkhost_cmd @@ -235,9 +242,12 @@ class SlurmPlatform(ParamikoPlatform): last_letter = string_number.strip()[-1] multiplier = 1 if last_letter == "G": + multiplier = 1000000000 + number = string_number[:-1] + elif last_letter == "M": multiplier = 1000000 number = string_number[:-1] - elif last_letter == "K" or last_letter == "M": + elif last_letter == "K": multiplier = 1000 number = string_number[:-1] else: @@ -249,27 +259,29 @@ class SlurmPlatform(ParamikoPlatform): pass return number - def parse_Alljobs_output(self, output,job_id): + def parse_Alljobs_output(self, output, job_id): try: - status = [x.split()[1] for x in output.splitlines() if x.split()[0] == str(job_id)] + status = [x.split()[1] for x in output.splitlines() + if x.split()[0] == str(job_id)] except BaseException as e: return status if len(status) == 0: return status return status[0] - - def get_submitted_job_id(self, outputlines): try: if outputlines.find("failed") != -1: - raise AutosubmitCritical("Submission failed. Command Failed", 7014) + raise AutosubmitCritical( + "Submission failed. Command Failed", 7014) jobs_id = [] for output in outputlines.splitlines(): jobs_id.append(int(output.split(' ')[3])) return jobs_id except IndexError: - raise AutosubmitCritical("Submission failed. There are issues on your config file",7014) + raise AutosubmitCritical( + "Submission failed. There are issues on your config file", 7014) + def jobs_in_queue(self): dom = parseString('') jobs_xml = dom.getElementsByTagName("JB_job_number") @@ -277,33 +289,34 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_cmd(self, job_script, job, hold=False): if not hold: - self._submit_script_file.write(self._submit_cmd + job_script + "\n") + self._submit_script_file.write( + self._submit_cmd + job_script + "\n") else: - self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" ) - - + self._submit_script_file.write( + self._submit_hold_cmd + job_script + "\n") def get_checkjob_cmd(self, job_id): return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) + def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): return 'sacct -n -j {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) - def parse_queue_reason(self, output,job_id): - reason =[x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] + def parse_queue_reason(self, output, job_id): + reason = [x.split(',')[1] for x in output.splitlines() + if x.split(',')[0] == str(job_id)] if len(reason) > 0: return reason[0] return reason - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"): - if method =='srun': + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads"): + if method == 'srun': language = "#!/bin/bash" return \ language + """ @@ -328,7 +341,7 @@ class SlurmPlatform(ParamikoPlatform): else: language = "#!/usr/bin/env python" return \ - language+""" + language + """ ############################################################################### # {0} ############################################################################### @@ -346,30 +359,33 @@ class SlurmPlatform(ParamikoPlatform): # ############################################################################### """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives),threads) + '\n'.ljust(13).join(str(s) for s in directives), threads) @staticmethod def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" - def check_file_exists(self,filename): + def check_file_exists(self, filename): file_exist = False sleeptime = 5 retries = 0 max_retries = 3 while not file_exist and retries < max_retries: try: - self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist + # This return IOError if path doesn't exist + self._ftpChannel.stat(os.path.join( + self.get_files_path(), filename)) file_exist = True except IOError: # File doesn't exist, retry in sleeptime Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(),filename)) + max_retries - retries, os.path.join(self.get_files_path(), filename)) sleep(sleeptime) sleeptime = sleeptime + 5 retries = retries + 1 except BaseException as e: # Unrecoverable error - Log.critical("Crashed while retrieving remote logs",6001,e.message) + Log.critical( + "Crashed while retrieving remote logs", 6001, e.message) file_exist = False # won't exist retries = 999 # no more retries - return file_exist \ No newline at end of file + return file_exist