From 3a4e1645c3a327c802c15ed9eb093e91a7f2ddd7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 18 Jan 2023 15:52:25 +0100 Subject: [PATCH 1/4] ECACCESS - WIP #917 --- autosubmit/autosubmit.py | 2 ++ autosubmit/job/job_packager.py | 2 +- autosubmit/platforms/ecplatform.py | 47 +++++++++++++++++++++--------- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d852e8fdd..e71a0a936 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2244,6 +2244,8 @@ class Autosubmit: elif message.find("private key file is encrypted") != -1: if private_key_error not in ssh_config_issues: ssh_config_issues += private_key_error + elif message.find("Invalid certificate") != -1: + ssh_config_issues += message + ".Please, the eccert expiration date" else: ssh_config_issues += message + " this is an PARAMIKO SSHEXCEPTION: indicates that there is something incompatible in the ssh_config for host:{0}\n maybe you need to contact your sysadmin".format( platform.host) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index f0eeeb1fe..222942bdb 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -219,7 +219,7 @@ class JobPackager(object): self.current_wrapper_section = wrapper_name for section,jobs in section_jobs.items(): if len(jobs) > 0: - if not self._platform.allow_wrappers: + if self.current_wrapper_section != "SIMPLE" and not self._platform.allow_wrappers: Log.warning("Platform {0} does not allow wrappers, submitting jobs individually".format(self._platform.name)) if wrapper_name != "SIMPLE" and self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical'] : # Trying to find the value in jobs_parser, if not, default to an autosubmit_.yml value (Looks first in [wrapper] section) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 8b3849715..1db32b96b 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -51,6 +51,7 @@ class EcPlatform(ParamikoPlatform): def __init__(self, expid, name, config, scheduler): ParamikoPlatform.__init__(self, expid, name, config) + #version=scheduler if scheduler == 'pbs': self._header = EcCcaHeader() elif scheduler == 'loadleveler': @@ -93,7 +94,8 @@ class EcPlatform(ParamikoPlatform): self.cancel_cmd = "eceaccess-job-delete" self._checkjob_cmd = "ecaccess-job-list " self._checkhost_cmd = "ecaccess-certificate-list" - self._submit_cmd = ("ecaccess-job-submit -distant -queueName " + self.host + " " + self.host + ":" + + self._checkvalidcert_cmd = "ecaccess-gateway-connected" + self._submit_cmd = ("ecaccess-job-submit -distant -queueName " + self.queue + " " + self.host + ":" + self.remote_log_dir + "/") self._submit_command_name = "ecaccess-job-submit" self.put_cmd = "ecaccess-file-put" @@ -155,7 +157,11 @@ class EcPlatform(ParamikoPlatform): :return: True :rtype: bool """ - self.connected = True + output = subprocess.check_output(self._checkvalidcert_cmd, shell=True).decode(locale.getlocale()[1]) + if output.lower().find("yes") != -1: + self.connected = True + else: + self.connected = False def restore_connection(self): """ In this case, it does nothing because connection is established for each command @@ -163,7 +169,11 @@ class EcPlatform(ParamikoPlatform): :return: True :rtype: bool """ - self.connected = True + output = subprocess.check_output(self._checkvalidcert_cmd, shell=True).decode(locale.getlocale()[1]) + if output.lower().find("yes") != -1: + self.connected = True + else: + self.connected = False def test_connection(self): """ In this case, it does nothing because connection is established for each command @@ -171,34 +181,43 @@ class EcPlatform(ParamikoPlatform): :return: True :rtype: bool """ - self.connected = True + output = subprocess.check_output(self._checkvalidcert_cmd, shell=True).decode(locale.getlocale()[1]) + if output.lower().find("yes") != -1: + self.connected = True + return "OK" + else: + self.connected = False + return "Invalid certificate" + def check_remote_permissions(self): try: try: - output = subprocess.check_output(self.check_remote_permissions_remove_cmd, shell=True) + subprocess.check_output(self.check_remote_permissions_remove_cmd, shell=False) except Exception as e: pass - output = subprocess.check_output(self.check_remote_permissions_cmd, shell=True) + subprocess.check_output(self.check_remote_permissions_cmd, shell=True) pass - output = subprocess.check_output(self.check_remote_permissions_remove_cmd, shell=True) + subprocess.check_output(self.check_remote_permissions_remove_cmd, shell=True) return True except Exception as e: return False def send_command(self, command, ignore_log=False, x11 = False): - try: - output = subprocess.check_output(command, shell=True) - except subprocess.CalledProcessError as e: - if not ignore_log: - raise AutosubmitError('Could not execute command {0} on {1}'.format(e.cmd, self.host),7500,str(e)) - return False lang = locale.getlocale()[1] if lang is None: lang = locale.getdefaultlocale()[1] if lang is None: lang = 'UTF-8' - self._ssh_output = output.decode(lang) + try: + output = subprocess.check_output(command, shell=True).decode(lang) + except subprocess.CalledProcessError as e: + if command.find("ecaccess-job-submit") != -1: + raise AutosubmitError("bad parameters. Error submitting job.") + if not ignore_log: + raise AutosubmitError('Could not execute command {0} on {1}'.format(e.cmd, self.host),7500,str(e)) + return False + self._ssh_output = output return True def send_file(self, filename, check=True): -- GitLab From 76c967a58b090bc66e06d1fa021ab5d19eb43206 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 25 Jan 2023 16:07:35 +0100 Subject: [PATCH 2/4] Ec-access working but the tests are randomly crashing due a bug, debugin --- autosubmit/platforms/ecplatform.py | 6 ++++-- autosubmit/platforms/paramiko_submitter.py | 2 ++ autosubmit/platforms/platform.py | 3 +++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 1db32b96b..4f67d82c7 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -26,7 +26,7 @@ from autosubmit.platforms.headers.ec_cca_header import EcCcaHeader from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import EcWrapperFactory from time import sleep - +import locale class EcPlatform(ParamikoPlatform): """ Class to manage queues with ecaccess @@ -95,7 +95,7 @@ class EcPlatform(ParamikoPlatform): self._checkjob_cmd = "ecaccess-job-list " self._checkhost_cmd = "ecaccess-certificate-list" self._checkvalidcert_cmd = "ecaccess-gateway-connected" - self._submit_cmd = ("ecaccess-job-submit -distant -queueName " + self.queue + " " + self.host + ":" + + self._submit_cmd = ("ecaccess-job-submit -distant -queueName " + self.ec_queue + " " + self.host + ":" + self.remote_log_dir + "/") self._submit_command_name = "ecaccess-job-submit" self.put_cmd = "ecaccess-file-put" @@ -241,6 +241,7 @@ class EcPlatform(ParamikoPlatform): while not process_ok and retries < 5: process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,stderr=FNULL) out, _ = process.communicate() + out=out.decode(locale.getlocale()[1]) if 'No such file' in out or process.returncode != 0: retries = retries + 1 process_ok = False @@ -273,6 +274,7 @@ class EcPlatform(ParamikoPlatform): while not process_ok and retries < 5: process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,stderr=FNULL) out, _ = process.communicate() + out = out.decode(locale.getlocale()[1]) if 'No such file' in out or process.returncode != 0: retries = retries + 1 process_ok = False diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index b2c31ea58..1f89466f2 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -180,6 +180,8 @@ class ParamikoSubmitter(Submitter): remote_platform.temp_dir = platform_data[section].get('TEMP_DIR', "") remote_platform._default_queue = platform_data[section].get('QUEUE', "") remote_platform._serial_queue = platform_data[section].get('SERIAL_QUEUE', "") + remote_platform.ec_queue = platform_data[section].get('EC_QUEUE', "hpc") + remote_platform.processors_per_node = platform_data[section].get('PROCESSORS_PER_NODE',"1") remote_platform.custom_directives = platform_data[section].get('CUSTOM_DIRECTIVES',"") if len(remote_platform.custom_directives) > 0: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 6b463947e..4a8cce357 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -28,6 +28,7 @@ class Platform(object): self._serial_platform = None self._serial_queue = None self._default_queue = None + self.ec_queue = "hpc" self.processors_per_node = "1" self.scratch_free_space = None self.custom_directives = None @@ -252,6 +253,8 @@ class Platform(object): parameters['{0}ARCH'.format(prefix)] = self.name parameters['{0}HOST'.format(prefix)] = self.host parameters['{0}QUEUE'.format(prefix)] = self.queue + parameters['{0}EC_QUEUE'.format(prefix)] = self.ec_queue + parameters['{0}USER'.format(prefix)] = self.user parameters['{0}PROJ'.format(prefix)] = self.project parameters['{0}BUDG'.format(prefix)] = self.budget -- GitLab From 50a2a49559087fcb1c6f62b9dbada2c8aa71344e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 30 Jan 2023 12:58:57 +0100 Subject: [PATCH 3/4] Ecaccess support --- autosubmit/autosubmit.py | 2 +- autosubmit/job/job_dict.py | 6 +++++- autosubmit/platforms/ecplatform.py | 4 ++++ autosubmit/platforms/paramiko_submitter.py | 2 ++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e71a0a936..8342e54c6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2322,7 +2322,7 @@ class Autosubmit: if not inspect and len(valid_packages_to_submit) > 0: job_list.save() save_2 = False - if platform.type == "slurm" and not inspect and not only_wrappers: # return to == + if platform.type == "slurm" and not inspect and not only_wrappers: # Process the script generated in submit_ready_jobs save_2, valid_packages_to_submit = platform.process_batch_ready_jobs(valid_packages_to_submit, failed_packages, diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 85b81bbc9..84103762b 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -379,11 +379,15 @@ class DicJobs: elif job_type == 'r': job.type = Type.R job.executable = str(parameters[section].get( "EXECUTABLE", "")) - job.platform_name = str(parameters[section].get( "PLATFORM", "")).upper() + job.platform_name = str(parameters[section].get( "PLATFORM", self.experiment_data["DEFAULT"].get("HPCARCH",''))).upper() job.file = str(parameters[section].get( "FILE", "")) job.additional_files = parameters[section].get( "ADDITIONAL_FILES", []) job.queue = str(parameters[section].get( "QUEUE", "")) + job.ec_queue = str(parameters[section].get("EC_QUEUE", "")) + if job.ec_queue == "": + job.ec_queue = str(self.experiment_data["PLATFORMS"][job.platform_name].get("EC_QUEUE","hpc")) + job.partition = str(parameters[section].get( "PARTITION", "")) if job.partition == "" and job.platform_name.upper() not in ["LOCAL",""]: job.partition = str(self.experiment_data["PLATFORMS"][job.platform_name].get("PARTITION","")) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 4f67d82c7..2c014008d 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -115,6 +115,9 @@ class EcPlatform(ParamikoPlatform): def get_mkdir_cmd(self): return self.mkdir_cmd + def set_submit_cmd(self,ec_queue="hpc"): + self._submit_cmd = ("ecaccess-job-submit -distant -queueName " + ec_queue + " " + self.host + ":" + + self.remote_log_dir + "/") def check_Alljobs(self, job_list, as_conf, retries=5): for job,prev_status in job_list: @@ -144,6 +147,7 @@ class EcPlatform(ParamikoPlatform): return self._checkjob_cmd + str(job_id) def get_submit_cmd(self, job_script, job, hold=False, export=""): + self.set_submit_cmd(job.ec_queue) if (export is None or export == "none") or len(export) == 0: export = "" else: diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 1f89466f2..46f436eb6 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -182,6 +182,8 @@ class ParamikoSubmitter(Submitter): remote_platform._serial_queue = platform_data[section].get('SERIAL_QUEUE', "") remote_platform.ec_queue = platform_data[section].get('EC_QUEUE', "hpc") + remote_platform.ec_queue = platform_data[section].get('EC_QUEUE', "hpc") + remote_platform.processors_per_node = platform_data[section].get('PROCESSORS_PER_NODE',"1") remote_platform.custom_directives = platform_data[section].get('CUSTOM_DIRECTIVES',"") if len(remote_platform.custom_directives) > 0: -- GitLab From ed8a42c6283ea08b66511b035a2d17bea82228b0 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 30 Jan 2023 13:20:38 +0100 Subject: [PATCH 4/4] Ecaccess support -pipeline --- autosubmit/job/job_dict.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 84103762b..d81fbe456 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -379,7 +379,8 @@ class DicJobs: elif job_type == 'r': job.type = Type.R job.executable = str(parameters[section].get( "EXECUTABLE", "")) - job.platform_name = str(parameters[section].get( "PLATFORM", self.experiment_data["DEFAULT"].get("HPCARCH",''))).upper() + default_data = self.experiment_data.get("DEFAULT",{}) + job.platform_name = str(parameters[section].get( "PLATFORM", default_data.get("HPCARCH",''))).upper() job.file = str(parameters[section].get( "FILE", "")) job.additional_files = parameters[section].get( "ADDITIONAL_FILES", []) -- GitLab