From 3eb86505710905d32085ba24bd0112a1fb3ba180 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 11:57:45 +0100 Subject: [PATCH 1/9] Non-related to Meluxa, rerun was not working with the new dependencies --- autosubmit/autosubmit.py | 4 ++-- autosubmit/job/job_list.py | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 4d499e382..51bc0e1a0 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -4510,7 +4510,7 @@ class Autosubmit: jobs_data=as_conf.experiment_data, as_conf=as_conf) if str(rerun).lower() == "true": - job_list.rerun(as_conf.get_rerun_jobs()) + job_list.rerun(as_conf.get_rerun_jobs(),as_conf) else: job_list.remove_rerun_only_jobs(notransitive) Log.info("\nSaving the jobs list...") @@ -5778,7 +5778,7 @@ class Autosubmit: jobs_data=as_conf.experiment_data, as_conf=as_conf) if str(rerun).lower() == "true": rerun_jobs = as_conf.get_rerun_jobs() - job_list.rerun(rerun_jobs, monitor=monitor) + job_list.rerun(rerun_jobs,as_conf, monitor=monitor) else: job_list.remove_rerun_only_jobs(notransitive) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4480dfed7..55db17103 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2165,7 +2165,7 @@ class JobList(object): self._job_list.remove(job) - def rerun(self, job_list_unparsed, monitor=False): + def rerun(self, job_list_unparsed,as_conf, monitor=False): """ Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun @@ -2196,15 +2196,18 @@ class JobList(object): self._member_list = list(member_list) self._chunk_list = list(chunk_list) self._date_list = list(date_list) - jobs_parser = self._get_jobs_parser() Log.info("Adding dependencies...") dependencies = dict() for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - if jobs_parser.has_option(job_section, 'DEPENDENCIES'): - dependencies_keys = jobs_parser.get(job_section, "DEPENDENCIES").upper().split() + if len(as_conf.jobs_data[job_section].get('DEPENDENCIES',{})) > 0: + dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES',{}) + if type(dependencies_keys) is str: + dependencies_keys = dependencies_keys.upper().split() + if dependencies_keys is None: + dependencies_keys = [] dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs, job_section) for job in self.get_jobs_by_section(job_section): for key in dependencies_keys: -- GitLab From ef59fc1e0f80bb33be7c2bb7f01f92554a41971f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 12:58:29 +0100 Subject: [PATCH 2/9] conection working #931 --- autosubmit/autosubmit.py | 13 +++++---- autosubmit/platforms/paramiko_platform.py | 32 ++++++++++++----------- requeriments.txt | 2 +- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 51bc0e1a0..0bf76e047 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2219,7 +2219,7 @@ class Autosubmit: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except BaseException as e: raise AutosubmitCritical("This seems like a bug in the code, please contact AS developers", 7070, str(e)) @@ -2272,14 +2272,17 @@ class Autosubmit: Log.printlog("[{1}] Connection successful to host {0}".format(platform.host, platform.name), Log.RESULT) else: - platform.connected = False - Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) + if platform.connected: + platform.connected = False + Log.printlog("[{1}] Connection sucessful to host {0}, however there are issues with %hpcroot%".format(platform.host, platform.name), + Log.WARNING) + else: + Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) if issues != "": if ssh_config_issues.find(private_key_error[:-2]) != -1: raise AutosubmitCritical("Private key is encrypted, Autosubmit does not run in interative mode.\nPlease, add the key to the ssh agent(ssh-add ).\nIt will remain open as long as session is active, for force clean you can prompt ssh-add -D",7073, issues + "\n" + ssh_config_issues) else: - raise AutosubmitCritical( - "Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) + raise AutosubmitCritical("Issues while checking the connectivity of platforms.", 7010, issues + "\n" + ssh_config_issues) @staticmethod def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index a09b93f7a..d4676423a 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -125,8 +125,11 @@ class ParamikoPlatform(Platform): except BaseException as e: message = str(e) if message.find("t accept remote connections") == -1: - transport = self._ssh.get_transport() - transport.send_ignore() + try: + transport = self._ssh.get_transport() + transport.send_ignore() + except: + message = "Timeout connection" return message except EOFError as e: self.connected = False @@ -147,8 +150,6 @@ class ParamikoPlatform(Platform): retry = 0 try: self.connect() - except SSHException as e: - raise except Exception as e: if ',' in self.host: Log.printlog("Connection Failed to {0}, will test another host".format( @@ -168,7 +169,7 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical( 'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit', 7050, trace) - except AutosubmitCritical: + except AutosubmitCritical as e: raise except SSHException as e: raise @@ -207,24 +208,25 @@ class ParamikoPlatform(Platform): 0] if 'identityfile' in self._host_config: self._host_config_id = self._host_config['identityfile'] - + #pkey = paramiko.Ed25519Key.from_private_key_file(self._host_config_id[0]) + port = int(self._host_config.get('port',22)) if 'proxycommand' in self._host_config: self._proxy = paramiko.ProxyCommand( self._host_config['proxycommand']) try: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, + self._ssh.connect(self._host_config['hostname'], port, username=self.user, key_filename=self._host_config_id, sock=self._proxy, timeout=120 , banner_timeout=120) except Exception as e: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, + self._ssh.connect(self._host_config['hostname'], port, username=self.user, key_filename=self._host_config_id, sock=self._proxy, timeout=120, banner_timeout=120,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) else: try: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id, timeout=120 , banner_timeout=120) + self._ssh.connect(self._host_config['hostname'], port, username=self.user, + key_filename=self._host_config_id, timeout=60 , banner_timeout=60) except Exception as e: - self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id, timeout=120 , banner_timeout=120,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) + self._ssh.connect(self._host_config['hostname'], port, username=self.user, + key_filename=self._host_config_id, timeout=60 , banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) self.transport = self._ssh.get_transport() #self.transport = paramiko.Transport((self._host_config['hostname'], 22)) #self.transport.connect(username=self.user) @@ -236,12 +238,12 @@ class ParamikoPlatform(Platform): except SSHException as e: raise except IOError as e: - if "refused" in e.strerror.lower(): + if "refused" in str(e.strerror).lower(): raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host)) - elif "name or service not known" in e.strerror.lower(): + elif "name or service not known" in str(e.strerror).lower(): raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host)) else: - raise AutosubmitError("File can't be located due an slow connection", 6016, str(e)) + raise AutosubmitError("File can't be located due an slow or timeout connection", 6016, str(e)) except BaseException as e: self.connected = False if "Authentication failed." in str(e): diff --git a/requeriments.txt b/requeriments.txt index 7db650934..4a580ac4e 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -28,4 +28,4 @@ xlib Pygments packaging==19 typing>=3.7 -autosubmitconfigparser==0.0.74 +autosubmitconfigparser==0.0.75 -- GitLab From ffb5db5ee1bdaa213f9d2e28b1d8bfc4b452dff0 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 13:22:51 +0100 Subject: [PATCH 3/9] AS dummy working #931 --- autosubmit/autosubmit.py | 2 +- autosubmit/job/job.py | 4 +++- autosubmit/job/job_dict.py | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0bf76e047..94fbfcd18 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2274,7 +2274,7 @@ class Autosubmit: else: if platform.connected: platform.connected = False - Log.printlog("[{1}] Connection sucessful to host {0}, however there are issues with %hpcroot%".format(platform.host, platform.name), + Log.printlog("[{1}] Connection sucessful to host {0}, however there are issues with %HPCROOT%".format(platform.host, platform.name), Log.WARNING) else: Log.printlog("[{1}] Connection failed to host {0}".format(platform.host, platform.name), Log.WARNING) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bfdadc95d..a99dc8c05 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1084,7 +1084,9 @@ class Job(object): self.tasks = job_platform.processors_per_node self.memory = str(as_conf.jobs_data[self.section].get("MEMORY","")) self.memory_per_task = str(as_conf.jobs_data[self.section].get("MEMORY_PER_TASK","")) - self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK",None) + remote_max_wallclock = as_conf.experiment_data["PLATFORMS"].get(self.platform_name,{}) + remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) + self.wallclock = as_conf.jobs_data[self.section].get("WALLCLOCK",remote_max_wallclock) self.wchunkinc = str(as_conf.jobs_data[self.section].get("WCHUNKINC","")) if self.wallclock is None and job_platform.type not in ['ps',"local","PS","LOCAL"]: self.wallclock = "01:59" diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index d453cd51c..ff8b60734 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -402,7 +402,9 @@ class DicJobs: job.tasks = str(parameters[section].get( "TASKS", "")) job.memory = str(parameters[section].get("MEMORY", "")) job.memory_per_task = str(parameters[section].get("MEMORY_PER_TASK", "")) - job.wallclock = parameters[section].get("WALLCLOCK", None) + remote_max_wallclock = self.experiment_data["PLATFORMS"].get(job.platform_name,{}) + remote_max_wallclock = remote_max_wallclock.get("MAX_WALLCLOCK",None) + job.wallclock = parameters[section].get("WALLCLOCK", remote_max_wallclock) job.retrials = int(parameters[section].get( 'RETRIALS', 0)) job.delay_retrials = int(parameters[section].get( 'DELAY_RETRY_TIME', "-1")) if job.wallclock is None and job.platform_name.upper() != "LOCAL": -- GitLab From e74dc321138ec02075c867c0622fb2aadfdf57a2 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 14:22:05 +0100 Subject: [PATCH 4/9] Added NODES parameter to jobs --- autosubmit/job/job.py | 5 +++++ autosubmit/platforms/headers/slurm_header.py | 15 ++++++++++++++- autosubmit/platforms/paramiko_platform.py | 3 +++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a99dc8c05..ce76e333d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -97,6 +97,7 @@ class Job(object): self.wallclock = None # type: str self.wchunkinc = None self.tasks = '1' + self.nodes = "" self.default_parameters = {'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'} self.threads = '1' @@ -1077,6 +1078,8 @@ class Job(object): self.processors = str(as_conf.jobs_data[self.section].get("PROCESSORS","1")) self.threads = str(as_conf.jobs_data[self.section].get("THREADS","1")) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS","1")) + self.nodes = str(as_conf.jobs_data[self.section].get("NODES","")) + self.hyperthreading = str(as_conf.jobs_data[self.section].get("HYPERTHREADING","none")) if self.hyperthreading == 'none' and len(self.hyperthreading) > 0: self.hyperthreading = job_platform.hyperthreading @@ -1125,6 +1128,8 @@ class Job(object): parameters['CPUS_PER_TASK'] = self.threads parameters['NUMTASK'] = self.tasks parameters['TASKS'] = self.tasks + parameters['NODES'] = self.nodes + parameters['TASKS_PER_NODE'] = self.tasks parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 4a8830519..c953e9a4e 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -68,6 +68,18 @@ class SlurmHeader(object): return "SBATCH -A {0}".format(job.parameters['CURRENT_PROJ']) return "" + def get_nodes_directive(self, job): + """ + Returns nodes directive for the specified job + :param job: job to create nodes directive for + :type job: Job + :return: nodes directive + :rtype: str + """ + # There is no account, so directive is empty + if job.parameters.get('NODES',"") != '': + return "SBATCH -N {0}".format(job.parameters.get('NODES',"")) + return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal def get_memory_directive(self, job): """ @@ -143,9 +155,9 @@ class SlurmHeader(object): #%PARTITION_DIRECTIVE% #%ACCOUNT_DIRECTIVE% #%MEMORY_DIRECTIVE% - #%THREADS_PER_TASK_DIRECTIVE% #%TASKS_PER_NODE_DIRECTIVE% +#%NODES_DIRECTIVE% #SBATCH -n %NUMPROC% #SBATCH -t %WALLCLOCK%:00 #SBATCH -J %JOBNAME% @@ -168,6 +180,7 @@ class SlurmHeader(object): #%MEMORY_DIRECTIVE% #%MEMORY_PER_TASK_DIRECTIVE% #%THREADS_PER_TASK_DIRECTIVE% +#%NODES_DIRECTIVE% #SBATCH -n %NUMPROC% #%TASKS_PER_NODE_DIRECTIVE% #SBATCH -t %WALLCLOCK%:00 diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d4676423a..c50eb133e 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1176,6 +1176,9 @@ class ParamikoPlatform(Platform): if hasattr(self.header, 'get_account_directive'): header = header.replace( '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + if hasattr(self.header, 'get_nodes_directive'): + header = header.replace( + '%NODES_DIRECTIVE%', self.header.get_nodes_directive(job)) if hasattr(self.header, 'get_memory_directive'): header = header.replace( '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) -- GitLab From 5a5f663b5176b9d103b5451b8be4f051acc9a3e3 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 15:15:58 +0100 Subject: [PATCH 5/9] Update slurm_header.py --- autosubmit/platforms/headers/slurm_header.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index c953e9a4e..9e55da58c 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -77,8 +77,9 @@ class SlurmHeader(object): :rtype: str """ # There is no account, so directive is empty - if job.parameters.get('NODES',"") != '': - return "SBATCH -N {0}".format(job.parameters.get('NODES',"")) + nodes = job.parameters.get('NODES',"") + if nodes != '': + return "SBATCH -N {0}".format(nodes) return "" # noinspection PyMethodMayBeStatic,PyUnusedLocal def get_memory_directive(self, job): -- GitLab From 3ebbe17f8fadf8badab5aa36cf737971e6d08680 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 15:20:45 +0100 Subject: [PATCH 6/9] Added nodes to docs --- docs/source/devguide/variables.rst | 1 + docs/source/userguide/configure/index.rst | 3 +++ 2 files changed, 4 insertions(+) diff --git a/docs/source/devguide/variables.rst b/docs/source/devguide/variables.rst index fa46bbaa5..f52454899 100644 --- a/docs/source/devguide/variables.rst +++ b/docs/source/devguide/variables.rst @@ -43,6 +43,7 @@ This variables are relatives to the current job. - **NUMPROC**: Number of processors that the job will use. - **NUMTHREADS**: Number of threads that the job will use. - **NUMTASKS**: Number of tasks that the job will use. +- **NODES**: Number of nodes that the job will use. - **HYPERTHREADING**: Detects if hyperthreading is enabled or not. - **WALLCLOCK**: Number of processors that the job will use. - **SCRATCH_FREE_SPACE**: Percentage of free space required on the ``scratch``. diff --git a/docs/source/userguide/configure/index.rst b/docs/source/userguide/configure/index.rst index 09679c890..ad6414e92 100644 --- a/docs/source/userguide/configure/index.rst +++ b/docs/source/userguide/configure/index.rst @@ -110,6 +110,9 @@ To do this use: * TASKS: tasks number to be submitted to the HPC. If not specified, defaults to 1. +* NODES: nodes number to be submitted to the HPC. If not specified, the directive is not added. + + * HYPERTHREADING: Enables Hyper-threading, this will double the max amount of threads. defaults to false. ( Not available on slurm platforms ) * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. -- GitLab From a2ec9979b3861c0e41fa2424350aa6771c61284f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 15:22:20 +0100 Subject: [PATCH 7/9] rerun pydoc https://earth.bsc.es/gitlab/es/autosubmit/-/merge_requests/306/diffs#cc15adebb72d3bb0b219d0f8fae0d8aed65ed739 --- autosubmit/job/job_list.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 55db17103..832e1e536 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2170,8 +2170,11 @@ class JobList(object): Updates job list to rerun the jobs specified by a job list :param job_list_unparsed: list of jobs to rerun :type job_list_unparsed: str + :param as_conf: experiment configuration + :type as_conf: AutosubmitConfig :param monitor: if True, the job list will be monitored :type monitor: bool + """ self.parse_jobs_by_filter(job_list_unparsed,two_step_start=False) member_list = set() -- GitLab From 801cdb80bd6bf7d040c65b8be56bf334f3ff502d Mon Sep 17 00:00:00 2001 From: Bruno de Paula Kinoshita <777-bdepaula@users.noreply.earth.bsc.es> Date: Wed, 1 Feb 2023 15:24:06 +0100 Subject: [PATCH 8/9] Apply 1 suggestion(s) to 1 file(s) --- autosubmit/job/job_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 832e1e536..7e87c8fbe 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2205,7 +2205,7 @@ class JobList(object): for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - if len(as_conf.jobs_data[job_section].get('DEPENDENCIES',{})) > 0: + if as_conf.jobs_data[job_section].get('DEPENDENCIES')) is not None: dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES',{}) if type(dependencies_keys) is str: dependencies_keys = dependencies_keys.upper().split() -- GitLab From 3e7d1f0fe92bcb1d101a234d6fc8ec34da8ed5b5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Feb 2023 15:41:37 +0100 Subject: [PATCH 9/9] Syntax fix --- autosubmit/job/job_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 7e87c8fbe..b92c89260 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2205,7 +2205,7 @@ class JobList(object): for job_section in job_sections: Log.debug( "Reading rerun dependencies for {0} jobs".format(job_section)) - if as_conf.jobs_data[job_section].get('DEPENDENCIES')) is not None: + if as_conf.jobs_data[job_section].get('DEPENDENCIES',None) is not None: dependencies_keys = as_conf.jobs_data[job_section].get('DEPENDENCIES',{}) if type(dependencies_keys) is str: dependencies_keys = dependencies_keys.upper().split() -- GitLab