Newer
Older
import re
from datetime import timedelta
from log.log import AutosubmitError, AutosubmitCritical, Log
from paramiko.ssh_exception import (SSHException)
def threaded(fn):
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_X11")
thread.start()
return thread
return wrapper
# noinspection PyMethodParameters
class ParamikoPlatform(Platform):
"""
Class to manage the connections to the different platforms with the Paramiko library.
"""
"""
:param config:
:param expid:
:param name:
"""
Platform.__init__(self, expid, name, config, auth_password=auth_password)
self._user_config_file = None
self._host_config = None
self._host_config_id = None
dbeltran
committed
self._ftpChannel = None
if sys.platform != "linux":
self.poller = select.kqueue()
else:
self.poller = select.poll()
self._header = None
self._wrapper = None
self.remote_log_dir = ""
#self.get_job_energy_cmd = ""
display = os.getenv('DISPLAY')
if display is None:
display = "localhost:0"
self.local_x11_display = xlib_connect.get_display(display)
Header to add to job for scheduler configuration
:return: header
:rtype: object
"""
return self._header
Joan Lopez
committed
@property
def wrapper(self):
"""
Handler to manage wrappers
:return: wrapper-handler
:rtype: object
"""
return self._wrapper
def reset(self):
self.connected = False
self._ssh = None
self._ssh_config = None
self._ssh_output = None
self._user_config_file = None
self._host_config = None
self._host_config_id = None
self._ftpChannel = None
self.transport = None
if sys.platform != "linux":
self.poller = select.kqueue()
else:
self.poller = select.poll()
display = os.getenv('DISPLAY')
if display is None:
display = "localhost:0"
self.local_x11_display = xlib_connect.get_display(display)
self.log_retrieval_process_active = False
terminate_child_process(self.expid, self.name)
def test_connection(self,as_conf):
"""
Test if the connection is still alive, reconnect if not.
"""
dbeltran
committed
try:
if not self.connected:
self.reset()
try:
self.restore_connection(as_conf)
dbeltran
committed
message = "OK"
except BaseException as e:
message = str(e)
if message.find("t accept remote connections") == -1:
try:
transport = self._ssh.get_transport()
transport.send_ignore()
except:
message = "Timeout connection"
dbeltran
committed
return message
dbeltran
committed
self.connected = False
raise AutosubmitError("[{0}] not alive. Host: {1}".format(
self.name, self.host), 6002, str(e))
dbeltran
committed
self.connected = False
dbeltran
committed
self.connected = False
#raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message)
def restore_connection(self, as_conf):
try:
Log.printlog("Connection Failed to {0}, will test another host".format(
self.host.split(',')[0]), 6002)
raise AutosubmitCritical(
"First connection to {0} is failed, check host configuration or try another login node ".format(self.host), 7050,str(e))
self.connect(as_conf,True)
except Exception as e:
trace = 'Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator'.format(
self.host, self.name)
raise AutosubmitCritical(
'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit', 7050, trace)
raise
except SSHException as e:
raise
raise AutosubmitCritical(
'Cant connect to this platform due an unknown error', 7050, str(e))
Attempt to authenticate to the given SSH server using the most common authentication methods available. This will always try to use the SSH agent first, and will fall back to using the others methods if that fails.
:parameter port: port to connect
:return: True if authentication was successful, False otherwise
self._ssh._agent = Agent()
for key in self._ssh._agent.get_keys():
if not hasattr(key,"public_blob"):
key.public_blob = None
self._ssh.connect(self._host_config['hostname'], port=port, username=self.user, timeout=60, banner_timeout=60)
Log.debug(f'Failed to authenticate with ssh-agent due to {e}')
Log.debug('Trying to authenticate with other methods')
def interactive_auth_handler(self, title, instructions, prompt_list):
# Walk the list of prompts that the server sent that we need to answer
for prompt_, _ in prompt_list:
prompt = str(prompt_).strip().lower()
# str() used to make sure that we're dealing with a string rather than a unicode string
elif "token" in prompt or "2fa" in prompt or "otp" in prompt:
if self.two_factor_method == "push":
answers.append("")
elif self.two_factor_method == "token":
# Sometimes the server may ask for the 2FA code more than once this is to avoid asking the user again
# If it is wrong, just run again autosubmit run because the issue could be in the password step
if twofactor_nonpush is None:
twofactor_nonpush = input("Please type the 2FA/OTP/token code: ")
answers.append(twofactor_nonpush)
# This is done from the server
# if self.two_factor_method == "push":
# try:
# inputimeout(prompt='Press enter to complete the 2FA PUSH authentication', timeout=self.otp_timeout)
# except:
# pass
def connect(self, as_conf, reconnect=False):
"""
Creates ssh connection to host
:return: True if connection is created, False otherwise
:rtype: bool
"""
display = os.getenv('DISPLAY')
if display is None:
display = "localhost:0"
self.local_x11_display = xlib_connect.get_display(display)
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh_config = paramiko.SSHConfig()
self._user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(self._user_config_file):
with open(self._user_config_file) as f:
self._ssh_config.parse(f)
self._host_config = self._ssh_config.lookup(self.host)
self._host_config['hostname'] = random.choice(
self._host_config['hostname'].split(',')[1:])
self._host_config['hostname'] = self._host_config['hostname'].split(',')[0]
if 'identityfile' in self._host_config:
self._host_config_id = self._host_config['identityfile']
# Agent Auth
if not self.agent_auth(port):
# Public Key Auth
if 'proxycommand' in self._host_config:
self._proxy = paramiko.ProxyCommand(self._host_config['proxycommand'])
try:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
key_filename=self._host_config_id, sock=self._proxy, timeout=60 , banner_timeout=60)
except Exception as e:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
key_filename=self._host_config_id, sock=self._proxy, timeout=60,
banner_timeout=60, disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
else:
try:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
key_filename=self._host_config_id, timeout=60 , banner_timeout=60)
except Exception as e:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
key_filename=self._host_config_id, timeout=60 , banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
self.transport = self._ssh.get_transport()
self.transport.banner_timeout = 60
else:
Log.warning("2FA is enabled, this is an experimental feature and it may not work as expected")
Log.warning("nohup can't be used as the password will be asked")
Log.warning("If you are using a token, please type the token code when asked")
if self.pw is None:
self.pw = getpass.getpass("Password for {0}: ".format(self.name))
if self.two_factor_method == "push":
Log.warning("Please check your phone to complete the 2FA PUSH authentication")
self.transport = paramiko.Transport((self._host_config['hostname'], port))
self.transport.start_client()
try:
self.transport.auth_interactive(self.user, self.interactive_auth_handler)
except Exception as e:
Log.printlog("2FA authentication failed",7000)
if self.transport.is_authenticated():
self._ssh._transport = self.transport
self.transport.banner_timeout = 60
else:
self.transport.close()
raise SSHException
self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) )
dbeltran
committed
self._ftpChannel.get_channel().settimeout(120)
if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"):
if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run":
raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host))
elif "name or service not known" in str(e.strerror).lower():
raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host))
else:
raise AutosubmitError("File can't be located due an slow or timeout connection", 6016, str(e))
raise AutosubmitCritical("Authentication Failed, please check the platform.conf of {0}".format(
self.restore_connection(as_conf)
raise AutosubmitError(
"Couldn't establish a connection to the specified host, wrong configuration?", 6003, str(e))
lbatista
committed
def check_completed_files(self, sections=None):
if self.host == 'localhost':
return None
command = "find %s " % self.remote_log_dir
lbatista
committed
if sections:
for i, section in enumerate(sections.split()):
command += " -name *%s_COMPLETED" % section
if i < len(sections.split()) - 1:
lbatista
committed
command += " -o "
else:
command += " -name *_COMPLETED"
if self.send_command(command, True):
return self._ssh_output
else:
return None
def remove_multiple_files(self, filenames):
#command = "rm"
log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid))
multiple_delete_previous_run = os.path.join(
log_dir, "multiple_delete_previous_run.sh")
lang = locale.getlocale()[1]
if lang is None:
lang = locale.getdefaultlocale()[1]
if lang is None:
lang = 'UTF-8'
open(multiple_delete_previous_run, 'wb+').write( ("rm -f" + filenames).encode(lang))
os.chmod(multiple_delete_previous_run, 0o770)
self.send_file(multiple_delete_previous_run, False)
command = os.path.join(self.get_files_path(),
"multiple_delete_previous_run.sh")
if self.send_command(command, ignore_log=True):
return self._ssh_output
else:
return ""
return ""
def send_file(self, filename, check=True):
:param filename: name of the file to send
:type filename: str
"""
if check:
self.check_remote_log_dir()
self.delete_file(filename)
local_path = os.path.join(os.path.join(self.tmp_path, filename))
remote_path = os.path.join(
self.get_files_path(), os.path.basename(filename))
self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode)
raise AutosubmitError('Can not send file {0} to {1}'.format(os.path.join(
self.tmp_path, filename), os.path.join(self.get_files_path(), filename)), 6004, str(e))
dbeltran
committed
except BaseException as e:
raise AutosubmitError(
'Send file failed. Connection seems to no be active', 6004)
def get_list_of_files(self):
return self._ftpChannel.get(self.get_files_path)
# Gets .err and .out
def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False, wrapper_failed=False):
"""
Copies a file from the current platform to experiment's tmp folder
:param wrapper_failed:
:param ignore_log:
:param filename: file name
:type filename: str
:param must_exist: If True, raises an exception if file can not be copied
:type must_exist: bool
:param relative_path: path inside the tmp folder
:type relative_path: str
:return: True if file is copied successfully, false otherwise
local_path = os.path.join(self.tmp_path, relative_path)
if not os.path.exists(local_path):
os.makedirs(local_path)
file_path = os.path.join(local_path, filename)
if os.path.exists(file_path):
os.remove(file_path)
remote_path = os.path.join(self.get_files_path(), filename)
self._ftpChannel.get(remote_path, file_path)
if not ignore_log:
Log.printlog(
"File {0} seems to no exists (skipping)".format(filename), 5004)
if not ignore_log:
Log.printlog(
"File {0} does not exists".format(filename), 6004)
else:
if not ignore_log:
Log.printlog(
"Log file couldn't be retrieved: {0}".format(filename), 5000)
return False
def delete_file(self, filename):
"""
Deletes a file from this platform
:param filename: file name
:type filename: str
:return: True if successful or file does not exist
self._ftpChannel.remove(os.path.join(
self.get_files_path(), filename))
Log.error('Could not remove file {0} due a wrong configuration'.format(
os.path.join(self.get_files_path(), filename)))
raise AutosubmitCritical(
"Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ", 7051, str(e))
def move_file(self, src, dest, must_exist=False):
"""
Moves a file on the platform (includes .err and .out)
:param src: source name
:type src: str
:param dest: destination name
:param must_exist: ignore if file exist or not
:type dest: str
"""
src = os.path.join(path_root, src)
dest = os.path.join(path_root, dest)
try:
self._ftpChannel.stat(dest)
except IOError:
self._ftpChannel.rename(src,dest)
raise AutosubmitError('File {0} does not exists, something went wrong with the platform'.format(os.path.join(path_root,src)), 6004, str(e))
raise AutosubmitError("File {0} does not exists".format(
os.path.join(path_root,src)), 6004, str(e))
else:
Log.debug("File {0} doesn't exists ".format(path_root))
return False
except Exception as e:
raise AutosubmitError('File {0} does not exists'.format(
if must_exist:
raise AutosubmitError("File {0} does not exists".format(
os.path.join(self.get_files_path(), src)), 6004, str(e))
else:
Log.printlog("Log file couldn't be moved: {0}".format(
os.path.join(self.get_files_path(), src)), 5001)
return False
def submit_job(self, job, script_name, hold=False, export="none"):
Domingo Manubens-Gil
committed
Submit a job from a given job object.
:param job: job object
:type job: autosubmit.job.job.Job
:param script_name: job script's name
dbeltran
committed
:param hold: send job hold
:type hold: boolean
Domingo Manubens-Gil
committed
:return: job id for the submitted job
:rtype: int
cmd = self.get_submit_cmd(script_name, job, hold=hold, export=export)
Log.debug(f"Submitting job with the command: {cmd}")
if self.send_command(cmd,x11=x11):
job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=job.x11)
Log.debug("Job ID: {0}", job_id)
return int(job_id)
wuruchi
committed
wuruchi
committed
"""
Checks job energy and return values. Defined in child classes.
Args:
job_id (int): ID of Job
wuruchi
committed
Returns:
4-tuple (int, int, int, int): submit time, start time, finish time, energy
"""
check_energy_cmd = self.get_job_energy_cmd(job_id)
self.send_command(check_energy_cmd)
wuruchi
committed
dbeltran
committed
def submit_Script(self, hold=False):
Sends a Submitfile Script, exec in platform and retrieve the Jobs_ID.
:param hold: send job hold
:type hold: boolean
:return: job id for the submitted job
:rtype: int
def get_estimated_queue_time_cmd(self, job_id):
"""
Returns command to get estimated queue time on remote platforms
:param job_id: id of job to check
:return: command to get estimated queue time
"""
raise NotImplementedError
def parse_estimated_time(self, output):
"""
Parses estimated queue time from output of get_estimated_queue_time_cmd
:param output: output of get_estimated_queue_time_cmd
:type output: str
:return: estimated queue time
:rtype:
"""
raise NotImplementedError
dbeltran
committed
def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False):
:param is_wrapper:
:param submit_hold_check:
dbeltran
committed
:type job: autosubmit.job.job.Job
:param default_status: default status if job is not found
:param default_status: status to assign if it can be retrieved from the platform
:type default_status: autosubmit.job.job_common.Status
:return: current job status
:rtype: autosubmit.job.job_common.Status
dbeltran
committed
if type(job_id) is not int and type(job_id) is not str:
Log.error(
'check_job() The job id ({0}) is not an integer neither a string.', job_id)
sleep_time = 5
sleep(2)
self.send_command(self.get_checkjob_cmd(job_id))
while self.get_ssh_output().strip(" ") == "" and retries > 0:
Log.debug(
'Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
Log.debug('retries left {0}', retries)
Log.debug('Will be retrying in {0} seconds', sleep_time)
sleep(sleep_time)
sleep_time = sleep_time + 5
self.send_command(self.get_checkjob_cmd(job_id))
Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id))
job_status = self.parse_job_output(
self.get_ssh_output()).strip("\n")
if job_status in self.job_status['COMPLETED'] or retries == 0:
job_status = Status.COMPLETED
elif job_status in self.job_status['RUNNING']:
job_status = Status.RUNNING
dbeltran
committed
if not is_wrapper:
if job.status != Status.RUNNING:
job.start_time = datetime.datetime.now() # URi: start time
if job.start_time is not None and str(job.wrapper_type).lower() == "none":
wallclock = job.wallclock
dbeltran
committed
if job.wallclock == "00:00" or job.wallclock is None:
wallclock = job.platform.max_wallclock
dbeltran
committed
if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "":
if job.is_over_wallclock(job.start_time,wallclock):
try:
job.platform.get_completed_files(job.name)
job_status = job.check_completion(over_wallclock=True)
except Exception as e:
dbeltran
committed
job_status = Status.FAILED
elif job_status in self.job_status['QUEUING'] and (not job.hold or job.hold.lower() != "true"):
elif job_status in self.job_status['QUEUING'] and (job.hold or job.hold.lower() == "true"):
job_status = Status.HELD
elif job_status in self.job_status['FAILED']:
job_status = Status.FAILED
else:
job_status = Status.UNKNOWN
else:
Log.error(
" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id))
Log.error(
'check_job() The job id ({0}) status is {1}.', job_id, job_status)
if job_status in [Status.FAILED, Status.COMPLETED]:
job.updated_log = False
if submit_hold_check:
return job_status
else:
job.new_status = job_status
def _check_jobid_in_queue(self, ssh_output, job_list_cmd):
for job in job_list_cmd[:-1].split(','):
if job not in ssh_output:
return False
return True
def parse_joblist(self, job_list):
"""
Convert a list of job_list to job_list_cmd
:param job_list: list of jobs
:type job_list: list
:param ssh_output: ssh output
:type ssh_output: str
:return: job status
:rtype: str
"""
job_list_cmd = ""
for job,job_prev_status in job_list:
if job.id is None:
job_str = "0"
else:
job_str = str(job.id)
job_list_cmd += job_str+","
if job_list_cmd[-1] == ",":
job_list_cmd=job_list_cmd[:-1]
:param job_list: list of jobs
:type job_list: list
:param job_list_cmd: list of jobs in the queue system
:type job_list_cmd: str
:param remote_logs: remote logs
:type remote_logs: str
:type default_status: bool
:return: current job status
:rtype: autosubmit.job.job_common.Status
job_list_cmd = self.parse_joblist(job_list)
sleep_time = 5
e_msg = e.error_message
slurm_error = True
if not slurm_error:
while not self._check_jobid_in_queue(self.get_ssh_output(), job_list_cmd) and retries > 0:
try:
self.send_command(cmd)
except AutosubmitError as e:
e_msg = e.error_message
slurm_error = True
break
Log.debug('Retrying check job command: {0}', cmd)
Log.debug('retries left {0}', retries)
Log.debug('Will be retrying in {0} seconds', sleep_time)
retries -= 1
sleep(sleep_time)
sleep_time = sleep_time + 5
dbeltran
committed
job_list_status = self.get_ssh_output()
Log.debug('Successful check job command')
in_queue_jobs = []
list_queue_jobid = ""
job_status = self.parse_Alljobs_output(job_list_status, job_id)
while len(job_status) <= 0 and retries >= 0:
retries -= 1
self.send_command(cmd)
job_list_status = self.get_ssh_output()
job_status = self.parse_Alljobs_output(job_list_status, job_id)
if len(job_status) <= 0:
Log.debug('Retrying check job command: {0}', cmd)
Log.debug('retries left {0}', retries)
Log.debug('Will be retrying in {0} seconds', sleep_time)
sleep(sleep_time)
sleep_time = sleep_time + 5
# URi: define status list in HPC Queue Class
else:
job_status = job.status
if job.status != Status.RUNNING:
job.start_time = datetime.datetime.now() # URi: start time
if job.start_time is not None and str(job.wrapper_type).lower() == "none":
wallclock = job.wallclock
if job.wallclock == "00:00":
wallclock = job.platform.max_wallclock
if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "":
if job.is_over_wallclock(job.start_time,wallclock):
try:
job.platform.get_completed_files(job.name)
job_status = job.check_completion(over_wallclock=True)
if job_status is Status.FAILED:
try:
if self.cancel_cmd is not None:
job.platform.send_command(self.cancel_cmd + " " + str(job.id))
if job_status in self.job_status['COMPLETED']:
job_status = Status.COMPLETED
elif job_status in self.job_status['RUNNING']:
job_status = Status.RUNNING
elif job_status in self.job_status['QUEUING']:
job_status = Status.HELD # release?
else:
job_status = Status.QUEUING
list_queue_jobid += str(job.id) + ','
in_queue_jobs.append(job)
elif job_status in self.job_status['FAILED']:
job_status = Status.FAILED
elif retries == 0:
job_status = Status.COMPLETED
job.update_status(as_conf)
Log.error(
'check_job() The job id ({0}) status is {1}.', job.id, job_status)
job.new_status = job_status
self.get_queue_status(in_queue_jobs,list_queue_jobid,as_conf)
dbeltran
committed
else:
Bruno P. Kinoshita
committed
for job, job_prev_status in job_list:
dbeltran
committed
job_status = Status.UNKNOWN
Log.warning(
'check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status)
raise AutosubmitError("Some Jobs are in Unknown status", 6008)
# job.new_status=job_status
def get_jobid_by_jobname(self,job_name,retries=2):
"""
Get job id by job name
:param retries: retries
:type retries: int
:return: job id
"""
#sleep(5)
cmd = self.get_jobid_by_jobname_cmd(job_name)
self.send_command(cmd)
job_id_name = self.get_ssh_output()
while len(job_id_name) <= 0 and retries > 0:
self.send_command(cmd)
job_id_name = self.get_ssh_output()
retries -= 1
sleep(2)
if retries >= 0:
#get id last line
job_ids_names = job_id_name.split('\n')[1:-1]
#get all ids by job-name
job_ids = [job_id.split(',')[0] for job_id in job_ids_names]
return job_ids
def get_queue_status(self, in_queue_jobs, list_queue_jobid, as_conf):
"""Get queue status for a list of jobs.
The job statuses are normally found via a command sent to the remote platform.
Each ``job`` in ``in_queue_jobs`` must be updated. Implementations may check
for the reason for queueing cancellation, or if the job is held, and update
the ``job`` status appropriately.
"""
raise NotImplementedError
def get_checkjob_cmd(self, job_id):
"""
Returns command to check job status on remote platforms
:param job_id: id of job to check
:param job_id: int
:return: command to check job status
:rtype: str
"""
raise NotImplementedError
def get_checkAlljobs_cmd(self, jobs_id):
"""
Returns command to check jobs status on remote platforms
:param jobs_id: id of jobs to check
:param jobs_id: str
:return: command to check job status
:rtype: str
"""
raise NotImplementedError
def get_jobid_by_jobname_cmd(self, job_name):
"""
Returns command to get job id by job name on remote platforms
:param job_name:
:return: str
"""
return NotImplementedError
def get_queue_status_cmd(self, job_name):
"""
Returns command to get queue status on remote platforms
:return: str
"""
return NotImplementedError
'''handler for incoming x11 connections
for each x11 incoming connection,
- get a connection to the local display
- maintain bidirectional map of remote x11 channel to local x11 channel
- add the descriptors to the poller
- queue the channel (use transport.accept())'''
local_x11_socket = xlib_connect.get_socket(*self.local_x11_display[:4])
local_x11_socket_fileno = local_x11_socket.fileno()
self.channels[x11_chanfd] = channel, local_x11_socket
self.channels[local_x11_socket_fileno] = local_x11_socket, channel
self.poller.register(x11_chanfd, select.POLLIN)
self.poller.register(local_x11_socket, select.POLLIN)
def flush_out(self,session):
while session.recv_ready():
sys.stderr.write(session.recv_stderr(4096).decode(locale.getlocale()[1]))
@threaded
def x11_status_checker(self, session, session_fileno):
self.transport.accept()
while not session.exit_status_ready():
if type(self.poller) is not list:
if sys.platform != "linux":
poller = self.poller.kqueue()
else:
poller = self.poller.poll()
# accept subsequent x11 connections if any
if len(self.transport.server_accepts) > 0:
self.transport.accept()
if not poller: # this should not happen, as we don't have a timeout.
if fd == session_fileno:
self.flush_out(session)
# data either on local/remote x11 socket
channel, counterpart = self.channels[fd]
try:
# forward data between local/remote x11 socket.
data = channel.recv(4096)
counterpart.sendall(data)
except socket.error:
channel.close()
counterpart.close()
del self.channels[fd]
except Exception as e:
def exec_command(self, command, bufsize=-1, timeout=30, get_pty=False,retries=3, x11=False):
"""
Execute a command on the SSH server. A new `.Channel` is opened and
the requested command is execed. The command's input and output
streams are returned as Python ``file``-like objects representing
stdin, stdout, and stderr.
:param x11:
:param retries:
:param get_pty:
:param str command: the command to execute
:param int bufsize:
interpreted the same way as by the built-in ``file()`` function in
Python
:param int timeout:
set command's channel timeout. See `Channel.settimeout`.settimeout
:return:
the stdin, stdout, and stderr of the executing command, as a
3-tuple
:raises SSHException: if the server fails to execute the command
"""
while retries > 0:
try:
display = "localhost:0"
self.local_x11_display = xlib_connect.get_display(display)
chan = self.transport.open_session()
chan.request_x11(single_connection=False,handler=self.x11_handler)
if "timeout" in command:
timeout_command = command.split("timeout ")[1].split(" ")[0]
if timeout_command == 0:
timeout_command = "infinity"
command = f'{command} ; sleep {timeout_command} 2>/dev/null'
#command = f'export display {command}'
Log.info(command)
try:
chan.exec_command(command)
except BaseException as e:
raise AutosubmitCritical("Failed to execute command: %s" % e)
chan_fileno = chan.fileno()
self.poller.register(chan_fileno, select.POLLIN)
self.x11_status_checker(chan, chan_fileno)
stdout = chan.makefile('rb', bufsize)
stderr = chan.makefile_stderr('rb', bufsize)
return stdin, stdout, stderr
except paramiko.SSHException as e:
if str(e) in "SSH session not active":
self._ssh = None
self.restore_connection(None)
timeout = timeout + 60
retries = retries - 1
if retries <= 0:
return False , False, False
def exec_command_x11(self, command, bufsize=-1, timeout=0, get_pty=False,retries=3, x11=False):