From 5d0eb6f7c043dd572871457b92526dcd756afefe Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 6 Oct 2023 10:29:21 +0200 Subject: [PATCH 1/8] added a debugfunction --- autosubmit/autosubmit.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 75f6c130a..ef9ed1d59 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,7 +73,7 @@ import re import random import signal import datetime - +import log.fd_show as fd_show import portalocker from pkg_resources import require, resource_listdir, resource_string, resource_filename from collections import defaultdict @@ -2107,6 +2107,7 @@ class Autosubmit: job_list.update_list(as_conf, submitter=submitter) job_list.save() # Submit jobs that are ready to run + Log.debug(f"FD submit: {fd_show.fd_table_status_str()}") if len(job_list.get_ready()) > 0: Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) @@ -2136,6 +2137,8 @@ class Autosubmit: if Autosubmit.exit: job_list.save() time.sleep(safetysleeptime) + Log.debug(f"FD endsubmit: {fd_show.fd_table_status_str()}") + except AutosubmitError as e: # If an error is detected, restore all connections and job_list Log.error("Trace: {0}", e.trace) -- GitLab From 6e47ee82d533a8a3f29f883e1d7131d23c2aa141 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 6 Oct 2023 15:44:34 +0200 Subject: [PATCH 2/8] more fixes --- autosubmit/job/job.py | 15 +++---- autosubmit/platforms/paramiko_platform.py | 50 +++++++++++++++++------ autosubmit/platforms/platform.py | 1 + 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index f92c438ee..85e4d8766 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -939,7 +939,6 @@ class Job(object): max_retrials = self.retrials max_logs = 0 last_log = 0 - sleep(5) stat_file = self.script_name[:-4] + "_STAT_" lang = locale.getlocale()[1] if lang is None: @@ -965,7 +964,7 @@ class Job(object): success = True except BaseException as e: error_message = str(e) - sleep(60 * 5) + sleep(10) pass count = count + 1 if not success: @@ -1084,6 +1083,10 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( str(e), self.name)) + try: + platform.closeConnection() + except: + pass except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) @@ -1091,7 +1094,6 @@ class Job(object): platform.closeConnection() except BaseException as e: pass - return except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) @@ -1099,12 +1101,7 @@ class Job(object): platform.closeConnection() except Exception as e: pass - return - try: - platform.closeConnection() - except BaseException as e: - pass - return + def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') parts = regex.match(wallclock) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0fda2f42c..1c9e7721a 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1302,18 +1302,44 @@ class ParamikoPlatform(Platform): return timedelta(**time_params) def closeConnection(self): - if self._ftpChannel is not None and len(str(self._ftpChannel)) > 0: - self._ftpChannel.close() - if self._ssh is not None and len(str(self._ssh)) > 0: - self._ssh.close() - self.transport.close() - self.transport.stop_thread() - try: - del self._ssh - del self._ftpChannel - del self.transport - except Exception as e: - pass + try: + if self._ftpChannel: + self._ftpChannel.close() + except: + pass + try: + if self._ssh._agent: + self._ssh._agent.close() + + except: + pass + try: + if self._ssh: + self._ssh.close() + except: + pass + try: + if self.transport: + self.transport.close() + self.transport.stop_thread() + except: + pass + try: + del self._ssh._agent + except: + pass + try: + del self._ssh + except Exception as e: + pass + try: + del self._ftpChannel + except: + pass + try: + del self.transport + except: + pass def check_tmp_exists(self): try: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50a68bd3e..f022a6f16 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -820,3 +820,4 @@ class Platform(object): Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. """ raise NotImplementedError + -- GitLab From e1b808eba7ed4bfde71156990e2806425a0b1929 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 10:30:45 +0200 Subject: [PATCH 3/8] Changes to th efunction, fix a bug with the connection, added a close for ._transport of ssh --- autosubmit/job/job.py | 13 ++++++------- autosubmit/platforms/locplatform.py | 4 +--- autosubmit/platforms/paramiko_platform.py | 19 ++++++++++++++----- autosubmit/platforms/pjmplatform.py | 4 +--- autosubmit/platforms/platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 4 +--- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 85e4d8766..a27e6d19e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -950,7 +950,7 @@ class Job(object): success = False error_message = "" platform = None - while (count < retries) or not success: + while (count < retries) and not success: try: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.reload(force_load=True) @@ -964,7 +964,7 @@ class Job(object): success = True except BaseException as e: error_message = str(e) - sleep(10) + sleep(5) pass count = count + 1 if not success: @@ -995,24 +995,22 @@ class Job(object): out_exist = False err_exist = False retries = 3 - sleeptime = 0 i = 0 try: while (not out_exist and not err_exist) and i < retries: try: out_exist = platform.check_file_exists( - remote_logs[0], False) + remote_logs[0], False, sleeptime=0, max_retries=1) except IOError as e: out_exist = False try: err_exist = platform.check_file_exists( - remote_logs[1], False) + remote_logs[1], False,sleeptime=0,max_retries=1) except IOError as e: err_exist = False if not out_exist or not err_exist: - sleeptime = sleeptime + 5 i = i + 1 - sleep(sleeptime) + sleep(5) try: platform.restore_connection() except BaseException as e: @@ -1101,6 +1099,7 @@ class Job(object): platform.closeConnection() except Exception as e: pass + return def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 0d5e097a4..ca44d37b5 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -175,7 +175,7 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src,wrapper_failed=False): + def check_file_exists(self, src,wrapper_failed=False,sleeptime=5, max_retries = 3): """ Moves a file on the platform :param src: source name @@ -185,10 +185,8 @@ class LocalPlatform(ParamikoPlatform): """ file_exist = False - sleeptime = 5 remote_path = os.path.join(self.get_files_path(), src) retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 1c9e7721a..51ff49165 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1308,9 +1308,14 @@ class ParamikoPlatform(Platform): except: pass try: - if self._ssh._agent: + if self._ssh._agent: # May not be in all runs self._ssh._agent.close() - + except: + pass + try: + if self._ssh._transport: + self._ssh._transport.close() + self._ssh._transport.stop_thread() except: pass try: @@ -1324,12 +1329,13 @@ class ParamikoPlatform(Platform): self.transport.stop_thread() except: pass + try: - del self._ssh._agent + del self._ssh._agent # May not be in all runs except: pass try: - del self._ssh + del self._ssh._transport except Exception as e: pass try: @@ -1340,7 +1346,10 @@ class ParamikoPlatform(Platform): del self.transport except: pass - + try: + del self._ssh + except: + pass def check_tmp_exists(self): try: if self.send_command("ls {0}".format(self.temp_dir)): diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 52ae05131..7b136935c 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -463,11 +463,9 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False): + def check_file_exists(self, filename,wrapper_failed=False,sleeptime=5, max_retries = 3): file_exist = False - sleeptime = 5 retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index f022a6f16..d1404ccfb 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -624,7 +624,7 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False): + def check_file_exists(self, src, wrapper_failed=False,sleeptime=5, max_retries = 3): return True def get_stat_file(self, job_name, retries=0): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 00c7293f6..55a1cafdd 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -606,11 +606,9 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False): + def check_file_exists(self, filename,wrapper_failed=False,sleeptime=5, max_retries = 3): file_exist = False - sleeptime = 5 retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist -- GitLab From b3d0141015904aab59189a276c7ee1226831aedb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 10:52:02 +0200 Subject: [PATCH 4/8] commented the debug line Changed version --- VERSION | 2 +- autosubmit/autosubmit.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/VERSION b/VERSION index 7dd09a1bf..e176f5208 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.0.96 +4.0.97 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ef9ed1d59..2d245ebed 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2107,7 +2107,7 @@ class Autosubmit: job_list.update_list(as_conf, submitter=submitter) job_list.save() # Submit jobs that are ready to run - Log.debug(f"FD submit: {fd_show.fd_table_status_str()}") + #Log.debug(f"FD submit: {fd_show.fd_table_status_str()}") if len(job_list.get_ready()) > 0: Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) @@ -2137,7 +2137,7 @@ class Autosubmit: if Autosubmit.exit: job_list.save() time.sleep(safetysleeptime) - Log.debug(f"FD endsubmit: {fd_show.fd_table_status_str()}") + #Log.debug(f"FD endsubmit: {fd_show.fd_table_status_str()}") except AutosubmitError as e: # If an error is detected, restore all connections and job_list -- GitLab From 37b3d1470f0ad5c6e1773fb5f27820fd772ae34c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 10:53:46 +0200 Subject: [PATCH 5/8] - --- autosubmit/job/job.py | 2 +- autosubmit/platforms/locplatform.py | 2 +- autosubmit/platforms/pjmplatform.py | 2 +- autosubmit/platforms/platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a27e6d19e..ba011ef55 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1005,7 +1005,7 @@ class Job(object): out_exist = False try: err_exist = platform.check_file_exists( - remote_logs[1], False,sleeptime=0,max_retries=1) + remote_logs[1], False, sleeptime=0, max_retries=1) except IOError as e: err_exist = False if not out_exist or not err_exist: diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ca44d37b5..7f41060eb 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -175,7 +175,7 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src,wrapper_failed=False,sleeptime=5, max_retries = 3): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): """ Moves a file on the platform :param src: source name diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 7b136935c..36b03d799 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -463,7 +463,7 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False,sleeptime=5, max_retries = 3): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 while not file_exist and retries < max_retries: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index d1404ccfb..95fea2bcd 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -624,7 +624,7 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False,sleeptime=5, max_retries = 3): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): return True def get_stat_file(self, job_name, retries=0): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 55a1cafdd..acfaaf7ba 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -606,7 +606,7 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False,sleeptime=5, max_retries = 3): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False retries = 0 while not file_exist and retries < max_retries: -- GitLab From cde7b2086ffc249b0ca0640579ac7fc74956f153 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 10:57:22 +0200 Subject: [PATCH 6/8] changed try: except for suppress --- autosubmit/platforms/paramiko_platform.py | 42 ++++++----------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 51ff49165..a906986b9 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1302,54 +1302,34 @@ class ParamikoPlatform(Platform): return timedelta(**time_params) def closeConnection(self): - try: + with suppress(Exception): if self._ftpChannel: self._ftpChannel.close() - except: - pass - try: + with suppress(Exception): if self._ssh._agent: # May not be in all runs self._ssh._agent.close() - except: - pass - try: + with suppress(Exception): if self._ssh._transport: self._ssh._transport.close() self._ssh._transport.stop_thread() - except: - pass - try: + with suppress(Exception): if self._ssh: self._ssh.close() - except: - pass - try: + with suppress(Exception): if self.transport: self.transport.close() self.transport.stop_thread() - except: - pass - - try: + with suppress(Exception): del self._ssh._agent # May not be in all runs - except: - pass - try: + with suppress(Exception): del self._ssh._transport - except Exception as e: - pass - try: + with suppress(Exception): del self._ftpChannel - except: - pass - try: + with suppress(Exception): del self.transport - except: - pass - try: + with suppress(Exception): del self._ssh - except: - pass + def check_tmp_exists(self): try: if self.send_command("ls {0}".format(self.temp_dir)): -- GitLab From 1529b8036612db665243bfaad9342e1387f90eea Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 10:58:22 +0200 Subject: [PATCH 7/8] added comment --- autosubmit/platforms/paramiko_platform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index a906986b9..916c95698 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1302,6 +1302,7 @@ class ParamikoPlatform(Platform): return timedelta(**time_params) def closeConnection(self): + # Ensure to delete all references to the ssh connection, so that it frees all the file descriptors with suppress(Exception): if self._ftpChannel: self._ftpChannel.close() -- GitLab From 21357e812f9d56772a7153cb371e8e5a1f22c88d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 9 Oct 2023 11:08:08 +0200 Subject: [PATCH 8/8] added another suppress --- autosubmit/job/job.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index ba011ef55..75ba6ddcb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -22,7 +22,7 @@ Main module for Autosubmit. Only contains an interface class to all functionalit """ from collections import OrderedDict - +from contextlib import suppress import copy import datetime import json @@ -1081,24 +1081,18 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( str(e), self.name)) - try: + with suppress(Exception): platform.closeConnection() - except: - pass except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) - try: + with suppress(Exception): platform.closeConnection() - except BaseException as e: - pass except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) - try: + with suppress(Exception): platform.closeConnection() - except Exception as e: - pass return def parse_time(self,wallclock): -- GitLab