diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 4d499e3823f0a7005049de5063a0e61d057f9997..94fbfcd184faa76ff02c510f36c20610b100c963 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2219,7 +2219,7 @@ class Autosubmit: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except BaseException as e: raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @@ -2272,14 +2272,17 @@ class Autosubmit: Log.printlog("[{1}] Connection successful to host {0}".format(platform.host, platform.name), Log.RESULT) else: - platform.connected = False - Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) + if platform.connected: + platform.connected = False + Log.printlog("[{1}] Connection sucessful to host {0}, however there are issues with %HPCROOT%".format(platform.host, platform.name), + Log.WARNING) + else: + Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) if issues != "": if ssh_config_issues.find(private_key_error[:-2]) != -1: raise AutosubmitCritical("Private key is encrypted, Autosubmit does not run in interative mode.\nPlease, add the key to the ssh agent(ssh-add ).\nIt will remain open as long as session is active, for force clean you can prompt ssh-add -D",7073, issues + "\n" + ssh_config_issues) else: - raise AutosubmitCritical( - "Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) + raise AutosubmitCritical("Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, @@ -4510,7 +4513,7 @@ class Autosubmit: jobs_data=as_conf.experiment_data, as_conf=as_conf) if str(rerun).lower() == "true": - job_list.rerun(as_conf.get_rerun_jobs()) + job_list.rerun(as_conf.get_rerun_jobs(),as_conf) else: job_list.remove_rerun_only_jobs(notransitive) Log.info("\nSaving the jobs list...") @@ -5778,7 +5781,7 @@ class Autosubmit: jobs_data=as_conf.experiment_data, as_conf=as_conf) if str(rerun).lower() == "true": rerun_jobs = as_conf.get_rerun_jobs() - job_list.rerun(rerun_jobs, monitor=monitor) + job_list.rerun(rerun_jobs,as_conf, monitor=monitor) else: job_list.remove_rerun_only_jobs(notransitive) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bfdadc95d6ec10da703ee6145a0117e990f7fea2..ce76e333dce268ac6a4b525ef2a83adcf07fc16c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -97,6 +97,7 @@ class Job(object): self.wallclock = None # type: str self.wchunkinc = None self.tasks = '1' + self.nodes = "" self.default_parameters = {'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'} self.threads = '1' @@ -1077,6 +1078,8 @@ class Job(object): self.processors = str(as_conf.jobs_data[self.section].get("PROCESSORS","1")) self.threads = str(as_conf.jobs_data[self.section].get("THREADS","1")) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS","1")) + self.nodes = str(as_conf.jobs_data[self.section].get("NODES","")) + self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING","none")) if self.hyperthreading == 'none' and len(self.hyperthreading) > 0: self.hyperthreading = job_platform.hyperthreading @@ -1084,7 +1087,9 @@ class Job(object): self.tasks = job_platform.processors_per_node self.memory = str(as_conf.jobs_data[self.section].get("MEMORY","")) self.memory_per_task = str(as_conf.jobs_data[self.section].get("MEMORY_PER_TASK","")) - self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK",None) + remote_max_wallclock = as_conf.experiment_data["PLATFORMS"].get(self.platform_name,{}) + remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) + self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK",remote_max_wallclock) self.wchunkinc = str(as_conf.jobs_data[self.section].get("WCHUNKINC","")) if self.wallclock is None and job_platform.type not in ['ps',"local","PS","LOCAL"]: self.wallclock = "01:59" @@ -1123,6 +1128,8 @@ class Job(object): parameters['CPUS_PER_TASK'] = self.threads parameters['NUMTASK'] = self.tasks parameters['TASKS'] = self.tasks + parameters['NODES'] = self.nodes + parameters['TASKS_PER_NODE'] = self.tasks parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index d453cd51c5105074018ea072a7abb7ff837f56cb..ff8b607344629c3434c612ba2a1228484f23e1e9 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -402,7 +402,9 @@ class DicJobs: job.tasks = str(parameters[section].get( "TASKS", "")) job.memory = str(parameters[section].get("MEMORY", "")) job.memory_per_task = str(parameters[section].get("MEMORY_PER_TASK", "")) - job.wallclock = parameters[section].get("WALLCLOCK", None) + remote_max_wallclock = self.experiment_data["PLATFORMS"].get(job.platform_name,{}) + remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) + job.wallclock = parameters[section].get("WALLCLOCK", remote_max_wallclock) job.retrials = int(parameters[section].get( 'RETRIALS', 0)) job.delay_retrials = int(parameters[section].get( 'DELAY_RETRY_TIME', "-1")) if job.wallclock is None and job.platform_name.upper() != "LOCAL": diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4480dfed7e81551abd37dff25d5b00a97b6c80c3..b92c892609cd8da6ea6f31f59f7dc1c6e7090428 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2165,13 +2165,16 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, job_list_unparsed, monitor=False): + def rerun(self, job_list_unparsed,as_conf, monitor=False): """ Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun :type job_list_unparsed: str + :param as_conf: experiment configuration + :type as_conf: AutosubmitConfig :param monitor: if True, the job list will be monitored :type monitor: bool + """ self.parse_jobs_by_filter(job_list_unparsed,two_step_start=False) member_list = set() @@ -2196,15 +2199,18 @@ class JobList(object): self._member_list = list(member_list) self._chunk_list = list(chunk_list) self._date_list = list(date_list) - jobs_parser = self._get_jobs_parser() Log.info("Adding dependencies...") dependencies = dict() for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - if jobs_parser.has_option(job_section, 'DEPENDENCIES'): - dependencies_keys = jobs_parser.get(job_section, "DEPENDENCIES").upper().split() + if as_conf.jobs_data[job_section].get('DEPENDENCIES',None) is not None: + dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES',{}) + if type(dependencies_keys) is str: + dependencies_keys = dependencies_keys.upper().split() + if dependencies_keys is None: + dependencies_keys = [] dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs, job_section) for job in self.get_jobs_by_section(job_section): for key in dependencies_keys: diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 4a88305196cea31880cc7922f912db99b71fe23e..9e55da58c14abb590c92060514b09132483d18f8 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -68,6 +68,19 @@ class SlurmHeader(object): return "SBATCH -A {0}".format(job.parameters['CURRENT_PROJ']) return "" + def get_nodes_directive(self, job): + """ + Returns nodes directive for the specified job + :param job: job to create nodes directive for + :type job: Job + :return: nodes directive + :rtype: str + """ + # There is no account, so directive is empty + nodes = job.parameters.get('NODES',"") + if nodes != '': + return "SBATCH -N {0}".format(nodes) + return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal def get_memory_directive(self, job): """ @@ -143,9 +156,9 @@ class SlurmHeader(object): #%PARTITION_DIRECTIVE% #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% - #%THREADS_PER_TASK_DIRECTIVE% #%TASKS_PER_NODE_DIRECTIVE% +#%NODES_DIRECTIVE% #SBATCH -n %NUMPROC% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% @@ -168,6 +181,7 @@ class SlurmHeader(object): #%MEMORY_DIRECTIVE% #%MEMORY_PER_TASK_DIRECTIVE% #%THREADS_PER_TASK_DIRECTIVE% +#%NODES_DIRECTIVE% #SBATCH -n %NUMPROC% #%TASKS_PER_NODE_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index a09b93f7a6e1768a25e9236d345fa28a7c1ed76e..c50eb133eaaf01cbcf6f39c5a8b1ddaacd8f12b9 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -125,8 +125,11 @@ class ParamikoPlatform(Platform): except BaseException as e: message = str(e) if message.find("t accept remote connections") == -1: - transport = self._ssh.get_transport() - transport.send_ignore() + try: + transport = self._ssh.get_transport() + transport.send_ignore() + except: + message = "Timeout connection" return message except EOFError as e: self.connected = False @@ -147,8 +150,6 @@ class ParamikoPlatform(Platform): retry = 0 try: self.connect() - except SSHException as e: - raise except Exception as e: if ',' in self.host: Log.printlog("Connection Failed to {0}, will test another host".format( @@ -168,7 +169,7 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical( 'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit', 7050, trace) - except AutosubmitCritical: + except AutosubmitCritical as e: raise except SSHException as e: raise @@ -207,24 +208,25 @@ class ParamikoPlatform(Platform): 0] if 'identityfile' in self._host_config: self._host_config_id = self._host_config['identityfile'] - + #pkey = paramiko.Ed25519Key.from_private_key_file(self._host_config_id[0]) + port = int(self._host_config.get('port',22)) if 'proxycommand' in self._host_config: self._proxy = paramiko.ProxyCommand( self._host_config['proxycommand']) try: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, + self._ssh.connect(self._host_config['hostname'], port, username=self.user, key_filename=self._host_config_id, sock=self._proxy, timeout=120 , banner_timeout=120) except Exception as e: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, + self._ssh.connect(self._host_config['hostname'], port, username=self.user, key_filename=self._host_config_id, sock=self._proxy, timeout=120, banner_timeout=120,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) else: try: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id, timeout=120 , banner_timeout=120) + self._ssh.connect(self._host_config['hostname'], port, username=self.user, + key_filename=self._host_config_id, timeout=60 , banner_timeout=60) except Exception as e: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id, timeout=120 , banner_timeout=120,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) + self._ssh.connect(self._host_config['hostname'], port, username=self.user, + key_filename=self._host_config_id, timeout=60 , banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) self.transport = self._ssh.get_transport() #self.transport = paramiko.Transport((self._host_config['hostname'], 22)) #self.transport.connect(username=self.user) @@ -236,12 +238,12 @@ class ParamikoPlatform(Platform): except SSHException as e: raise except IOError as e: - if "refused" in e.strerror.lower(): + if "refused" in str(e.strerror).lower(): raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host)) - elif "name or service not known" in e.strerror.lower(): + elif "name or service not known" in str(e.strerror).lower(): raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host)) else: - raise AutosubmitError("File can't be located due an slow connection", 6016, str(e)) + raise AutosubmitError("File can't be located due an slow or timeout connection", 6016, str(e)) except BaseException as e: self.connected = False if "Authentication failed." in str(e): @@ -1174,6 +1176,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_account_directive'): header = header.replace( '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + if hasattr(self.header, 'get_nodes_directive'): + header = header.replace( + '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) if hasattr(self.header, 'get_memory_directive'): header = header.replace( '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) diff --git a/docs/source/devguide/variables.rst b/docs/source/devguide/variables.rst index fa46bbaa57ac8b42aec601fa27bac946f853b662..f52454899f49c93e46499e8089c8aad31fc45b70 100644 --- a/docs/source/devguide/variables.rst +++ b/docs/source/devguide/variables.rst @@ -43,6 +43,7 @@ This variables are relatives to the current job. - **NUMPROC**: Number of processors that the job will use. - **NUMTHREADS**: Number of threads that the job will use. - **NUMTASKS**: Number of tasks that the job will use. +- **NODES**: Number of nodes that the job will use. - **HYPERTHREADING**: Detects if hyperthreading is enabled or not. - **WALLCLOCK**: Number of processors that the job will use. - **SCRATCH_FREE_SPACE**: Percentage of free space required on the ``scratch``. diff --git a/docs/source/userguide/configure/index.rst b/docs/source/userguide/configure/index.rst index 09679c890f489a4d36e9c56a440c1ba5246aa6a5..ad6414e92022519cdc0b4c3dc15a6664e3b89a44 100644 --- a/docs/source/userguide/configure/index.rst +++ b/docs/source/userguide/configure/index.rst @@ -110,6 +110,9 @@ To do this use: * TASKS: tasks number to be submitted to the HPC. If not specified, defaults to 1. +* NODES: nodes number to be submitted to the HPC. If not specified, the directive is not added. + + * HYPERTHREADING: Enables Hyper-threading, this will double the max amount of threads. defaults to false. ( Not available on slurm platforms ) * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. diff --git a/requeriments.txt b/requeriments.txt index 7db6509346f9ab18a9fdaf7349fe9f2f18f8c2fa..4a580ac4ed830157de8c5558ce4bf05bd06c190a 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -28,4 +28,4 @@ xlib Pygments packaging==19 typing>=3.7 -autosubmitconfigparser==0.0.74 +autosubmitconfigparser==0.0.75