diff --git a/VERSION b/VERSION index 7dd09a1bf745b4e8203c6db3a16da25d08061cc5..e176f520809fc0bc0051684a2c83a39fb556960e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.0.96 +4.0.97 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 75f6c130ae47dd0ef0eef57ad4634f56dbd31d58..2d245ebed934322eaf534cf3d69981c30e4c9f87 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,7 +73,7 @@ import re import random import signal import datetime - +import log.fd_show as fd_show import portalocker from pkg_resources import require, resource_listdir, resource_string, resource_filename from collections import defaultdict @@ -2107,6 +2107,7 @@ class Autosubmit: job_list.update_list(as_conf, submitter=submitter) job_list.save() # Submit jobs that are ready to run + #Log.debug(f"FD submit: {fd_show.fd_table_status_str()}") if len(job_list.get_ready()) > 0: Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, hold=False) job_list.update_list(as_conf, submitter=submitter) @@ -2136,6 +2137,8 @@ class Autosubmit: if Autosubmit.exit: job_list.save() time.sleep(safetysleeptime) + #Log.debug(f"FD endsubmit: {fd_show.fd_table_status_str()}") + except AutosubmitError as e: # If an error is detected, restore all connections and job_list Log.error("Trace: {0}", e.trace) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index f92c438ee0192b312f9f3cf1e7d8347ed568ebb6..75ba6ddcbe3b24f94cc89c44a5f1a6b09bfc82bf 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -22,7 +22,7 @@ Main module for Autosubmit. Only contains an interface class to all functionalit """ from collections import OrderedDict - +from contextlib import suppress import copy import datetime import json @@ -939,7 +939,6 @@ class Job(object): max_retrials = self.retrials max_logs = 0 last_log = 0 - sleep(5) stat_file = self.script_name[:-4] + "_STAT_" lang = locale.getlocale()[1] if lang is None: @@ -951,7 +950,7 @@ class Job(object): success = False error_message = "" platform = None - while (count < retries) or not success: + while (count < retries) and not success: try: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.reload(force_load=True) @@ -965,7 +964,7 @@ class Job(object): success = True except BaseException as e: error_message = str(e) - sleep(60 * 5) + sleep(5) pass count = count + 1 if not success: @@ -996,24 +995,22 @@ class Job(object): out_exist = False err_exist = False retries = 3 - sleeptime = 0 i = 0 try: while (not out_exist and not err_exist) and i < retries: try: out_exist = platform.check_file_exists( - remote_logs[0], False) + remote_logs[0], False, sleeptime=0, max_retries=1) except IOError as e: out_exist = False try: err_exist = platform.check_file_exists( - remote_logs[1], False) + remote_logs[1], False, sleeptime=0, max_retries=1) except IOError as e: err_exist = False if not out_exist or not err_exist: - sleeptime = sleeptime + 5 i = i + 1 - sleep(sleeptime) + sleep(5) try: platform.restore_connection() except BaseException as e: @@ -1084,27 +1081,20 @@ class Job(object): except BaseException as e: Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( str(e), self.name)) + with suppress(Exception): + platform.closeConnection() except AutosubmitError as e: Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( e.message, self.name), 6001) - try: + with suppress(Exception): platform.closeConnection() - except BaseException as e: - pass - return except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( e.message, self.name), 6001) - try: + with suppress(Exception): platform.closeConnection() - except Exception as e: - pass - return - try: - platform.closeConnection() - except BaseException as e: - pass return + def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') parts = regex.match(wallclock) diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 0d5e097a4a00ec4bbfcc4cf06547be52e204dfe4..7f41060eb80398eddce42dca098ca6260a49fa5b 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -175,7 +175,7 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src,wrapper_failed=False): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): """ Moves a file on the platform :param src: source name @@ -185,10 +185,8 @@ class LocalPlatform(ParamikoPlatform): """ file_exist = False - sleeptime = 5 remote_path = os.path.join(self.get_files_path(), src) retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 0fda2f42cf65db29883d8eec8ddcc5cc1c5ff093..916c95698db4c3af6389d159badd875850cc88ae 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1302,18 +1302,34 @@ class ParamikoPlatform(Platform): return timedelta(**time_params) def closeConnection(self): - if self._ftpChannel is not None and len(str(self._ftpChannel)) > 0: - self._ftpChannel.close() - if self._ssh is not None and len(str(self._ssh)) > 0: - self._ssh.close() - self.transport.close() - self.transport.stop_thread() - try: - del self._ssh - del self._ftpChannel - del self.transport - except Exception as e: - pass + # Ensure to delete all references to the ssh connection, so that it frees all the file descriptors + with suppress(Exception): + if self._ftpChannel: + self._ftpChannel.close() + with suppress(Exception): + if self._ssh._agent: # May not be in all runs + self._ssh._agent.close() + with suppress(Exception): + if self._ssh._transport: + self._ssh._transport.close() + self._ssh._transport.stop_thread() + with suppress(Exception): + if self._ssh: + self._ssh.close() + with suppress(Exception): + if self.transport: + self.transport.close() + self.transport.stop_thread() + with suppress(Exception): + del self._ssh._agent # May not be in all runs + with suppress(Exception): + del self._ssh._transport + with suppress(Exception): + del self._ftpChannel + with suppress(Exception): + del self.transport + with suppress(Exception): + del self._ssh def check_tmp_exists(self): try: diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 52ae05131c9e5b539d0354b17d1860ae8b81940c..36b03d799a18e6cedd712a706b1e31becc583340 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -463,11 +463,9 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False - sleeptime = 5 retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 50a68bd3e002be4373b113e1a7b840325671f32a..95fea2bcda04e3993c69f3936503dc22b5f3431e 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -624,7 +624,7 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): return True def get_stat_file(self, job_name, retries=0): @@ -820,3 +820,4 @@ class Platform(object): Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. """ raise NotImplementedError + diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 00c7293f66f77bb4bd48812ec14dd9f16019beb2..acfaaf7baad1fccec44a06ce1f1a2965917fd15b 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -606,11 +606,9 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename,wrapper_failed=False): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): file_exist = False - sleeptime = 5 retries = 0 - max_retries = 3 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist