diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index e7f3613ef3f4b089a4270aaaba6df2e828af82b0..138fe5b60b35a2b451cafb3c2cd48c6e6395933d 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1306,7 +1306,13 @@ class AutosubmitConfig(object): :rtype: str """ return self._conf_parser.get_option('config', 'MAX_WALLCLOCK', '') - + def get_disable_recovery_threads(self, section): + """ + Returns FALSE/TRUE + :return: recovery_threads_option + :rtype: str + """ + return self._platforms_parser.get_option(section, 'DISABLE_RECOVERY_THREADS', 'FALSE').lower() def get_max_processors(self): """ Returns max processors from autosubmit's config file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 604784dbb8e01d1d50daeba2deb887532b6e31f0..67e524b333b7ef0820483b5ed9b26c9f8fb0a6d2 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -515,25 +515,75 @@ class Job(object): retrials_list.insert(0, retrial_dates) return retrials_list + def retrieve_logfiles_unthreaded(self, copy_remote_logs, local_logs): + remote_logs = (self.script_name + ".out", self.script_name + ".err") + out_exist = False + err_exist = False + retries = 3 + sleeptime = 0 + i = 0 + no_continue = False + try: + while (not out_exist and not err_exist) and i < retries: + try: + out_exist = self._platform.check_file_exists( + remote_logs[0], True) + except IOError as e: + out_exist = False + try: + err_exist = self._platform.check_file_exists( + remote_logs[1], True) + except IOError as e: + err_exists = False + if not out_exist or not err_exist: + sleeptime = sleeptime + 5 + i = i + 1 + sleep(sleeptime) + if i >= retries: + if not out_exist or not err_exist: + Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format( + retries, remote_logs[0], remote_logs[1])) + return + if copy_remote_logs: + # unifying names for log files + if remote_logs != local_logs: + self.synchronize_logs(self._platform, remote_logs, local_logs) + remote_logs = copy.deepcopy(local_logs) + self._platform.get_logs_files(self.expid, remote_logs) + # Update the logs with Autosubmit Job Id Brand + try: + for local_log in local_logs: + self._platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( + e.message, self.name)) + + except AutosubmitError as e: + Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( + e.message, self.name), 6001) + + except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( + e.message, self.name), 6001) + return + @threaded def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name): - remote_logs = (self.script_name + ".out", self.script_name + ".err") - as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) - as_conf.reload() - - submitter = self._get_submitter(as_conf) - submitter.load_platforms(as_conf) - hpcarch = as_conf.get_platform() - platforms_to_test = set() - if self.platform_name is None: - self.platform_name = hpcarch - # serial - self._platform = submitter.platforms[self.platform_name.lower()] try: - self._platform.restore_connection() + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf.reload() + remote_logs = (self.script_name + ".out", self.script_name + ".err") + submitter = self._get_submitter(as_conf) + submitter.load_platforms(as_conf) + self._platform = submitter.platforms[platform_name.lower()] + try: + self._platform.test_connection() + except: + pass except Exception as e: Log.printlog( - "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(str(e), self.name), 6001) + "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) out_exist = False err_exist = False retries = 5 @@ -559,7 +609,7 @@ class Job(object): sleep(sleeptime) try: self._platform.restore_connection() - except: + except BaseException as e: Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format( e.message, self.name), 6001) if i >= retries: @@ -584,7 +634,7 @@ class Job(object): e.message, self.name)) try: self._platform.closeConnection() - except: + except BaseException as e: pass return except AutosubmitError as e: @@ -592,7 +642,7 @@ class Job(object): e.message, self.name), 6001) try: self._platform.closeConnection() - except: + except BaseException as e: pass return @@ -608,7 +658,7 @@ class Job(object): sleep(5) # safe wait before end a thread try: self._platform.closeConnection() - except: + except BaseException as e: pass return @@ -679,8 +729,12 @@ class Job(object): platform_name = copy.deepcopy(self.platform_name.lower()) local_logs = copy.deepcopy(self.local_logs) remote_logs = copy.deepcopy(self.remote_logs) - self.retrieve_logfiles( - copy_remote_logs, local_logs, remote_logs, expid, platform_name) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf.reload() + if as_conf.get_disable_recovery_threads(self.platform.name) == "true": + self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) + else: + self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name) return self.status @@ -880,7 +934,8 @@ class Job(object): :return: script code :rtype: str """ - if self.parameters['PROJECT_TYPE'].lower() != "none": + parameters = self.parameters + if parameters['PROJECT_TYPE'].lower() != "none": template_file = open(os.path.join( as_conf.get_project_dir(), self.file), 'r') template = template_file.read() diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 46cac5e8969531a6950b12239bbcc65bceee79fe..ff99edc21bedcfa7459e9b8b869e8186662074f9 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -77,7 +77,10 @@ class ParamikoPlatform(Platform): """ try: self.reset() - self.restore_connection() + try: + self.restore_connection() + except: + pass transport = self._ssh.get_transport() transport.send_ignore() except EOFError as e: @@ -118,7 +121,7 @@ class ParamikoPlatform(Platform): raise except Exception as e: raise AutosubmitCritical( - 'Cant connect to this platform due an unknown error', 7050, str(e)) + 'Cant connect to this platform due an unknown error', 7050, e.message) def connect(self, reconnect=False): """ @@ -152,10 +155,10 @@ class ParamikoPlatform(Platform): self._proxy = paramiko.ProxyCommand( self._host_config['proxycommand']) self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id, sock=self._proxy) + key_filename=self._host_config_id, sock=self._proxy, timeout=120 , banner_timeout=120) else: self._ssh.connect(self._host_config['hostname'], 22, username=self.user, - key_filename=self._host_config_id) + key_filename=self._host_config_id, timeout=120 , banner_timeout=120) self.transport = paramiko.Transport( (self._host_config['hostname'], 22)) self.transport.connect(username=self.user) @@ -258,7 +261,6 @@ class ParamikoPlatform(Platform): return True except Exception as e: if str(e) in "Garbage": - #raise AutosubmitError("Files couldn't be retrieved, session not active".format(filename),6004,e.message) if not ignore_log: Log.printlog( "File {0} seems to no exists (skipping)".format(filename), 5004) @@ -288,7 +290,6 @@ class ParamikoPlatform(Platform): self.get_files_path(), filename)) return True except IOError as e: - #Log.printlog("{0} couldn't be retrieved, session not active".format(os.path.join(self.get_files_path(), filename)),6004) return False except BaseException as e: Log.error('Could not remove file {0} due a wrong configuration'.format( @@ -325,10 +326,10 @@ class ParamikoPlatform(Platform): except Exception as e: if str(e) in "Garbage": raise AutosubmitError('File {0} does not exists'.format( - os.path.join(self.get_files_path(), src)), 6004, str(e)) + os.path.join(self.get_files_path(), src)), 6004, e.message) if must_exist: raise AutosubmitError("A critical file couldn't be retrieved, File {0} does not exists".format( - os.path.join(self.get_files_path(), src)), 6004, str(e)) + os.path.join(self.get_files_path(), src)), 6004, e.message) else: Log.printlog("Log file couldn't be moved: {0}".format( os.path.join(self.get_files_path(), src)), 5001) @@ -460,7 +461,8 @@ class ParamikoPlatform(Platform): :return: current job status :rtype: autosubmit.job.job_common.Status """ - + if job_list_cmd[-1] == ",": + job_list_cmd=job_list_cmd[:-1] cmd = self.get_checkAlljobs_cmd(job_list_cmd) sleep_time = 5 @@ -857,7 +859,7 @@ class ParamikoPlatform(Platform): try: # Test if remote_path exists self._ftpChannel.chdir(self.remote_log_dir) - except IOError: + except IOError as e: try: if self.send_command(self.get_mkdir_cmd()): Log.debug('{0} has been created on {1} .', diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c2595b9e3edfd2a333d68bc3a81eca531841fcbd..9402a1498c28bb26be50da46b0822b00b58b63eb 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -315,7 +315,7 @@ class SlurmPlatform(ParamikoPlatform): status = [x.split()[1] for x in output.splitlines() if x.split()[0] == str(job_id)] except BaseException as e: - return status + pass if len(status) == 0: return status return status[0] @@ -347,16 +347,16 @@ class SlurmPlatform(ParamikoPlatform): self._submit_hold_cmd + job_script + "\n") def get_checkjob_cmd(self, job_id): - return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) + return 'sacct -n -X --jobs {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): - return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) + return "sacct -n -X --jobs {1} -o jobid,State".format(self.host, jobs_id) def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): - return 'sacct -n -j {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) + return 'sacct -n --jobs {0} -o JobId%25,State,NCPUS,NNodes,Submit,Start,End,ConsumedEnergy,MaxRSS%25,AveRSS%25'.format(job_id) def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() diff --git a/docs/source/troubleshoot.rst b/docs/source/troubleshoot.rst index ada93ce2b3e697562a56658024f3791f447f5c66..d64d677a53caae126b394549cf2da4767620c6b7 100644 --- a/docs/source/troubleshoot.rst +++ b/docs/source/troubleshoot.rst @@ -19,6 +19,12 @@ My project parameters are not being substituted in the templates. *Solution*: Don't repeat section names and parameters names until Autosubmit 4.0 release. +Unable to recover remote logs files. +======================================================== + +*Explanation*: If there are limitations on the remote platform regarding multiple connections, +*Solution*: You can try DISABLE_RECOVERY_THREADS = TRUE under the [platform_name] section in the platform.conf. + Other possible errors =====================