paramiko_platform.py 59.8 KB
Newer Older
dbeltran's avatar
dbeltran committed
import copy
dbeltran's avatar
dbeltran committed
import threading
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
import locale
dbeltran's avatar
dbeltran committed
from contextlib import suppress
jlope2's avatar
jlope2 committed
from time import sleep
dbeltran's avatar
dbeltran committed
import sys
dbeltran's avatar
dbeltran committed
import socket
jlope2's avatar
jlope2 committed
import os
import paramiko
import datetime
dbeltran's avatar
dbeltran committed
import select
import re
from datetime import timedelta
dbeltran's avatar
dbeltran committed
import random
jlope2's avatar
jlope2 committed
from autosubmit.job.job_common import Status
from autosubmit.job.job_common import Type
jlope2's avatar
jlope2 committed
from autosubmit.platforms.platform import Platform
from log.log import AutosubmitError, AutosubmitCritical, Log
from paramiko.ssh_exception import (SSHException)
dbeltran's avatar
dbeltran committed
import Xlib.support.connect as xlib_connect
dbeltran's avatar
dbeltran committed
from threading import Thread
dbeltran's avatar
dbeltran committed
import threading
dbeltran's avatar
dbeltran committed
from paramiko.agent import Agent
dbeltran's avatar
dbeltran committed
from autosubmit.helpers.utils import terminate_child_process
def threaded(fn):
    def wrapper(*args, **kwargs):
dbeltran's avatar
dbeltran committed
        thread = Thread(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_X11")
        thread.start()
        return thread
jlope2's avatar
jlope2 committed

    return wrapper

# noinspection PyMethodParameters
jlope2's avatar
jlope2 committed
class ParamikoPlatform(Platform):
    """
    Class to manage the connections to the different platforms with the Paramiko library.
    """

dbeltran's avatar
dbeltran committed
    def __init__(self, expid, name, config, auth_password = None):
jlope2's avatar
jlope2 committed
        """

        :param config:
        :param expid:
        :param name:
        """
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
        Platform.__init__(self, expid, name, config, auth_password=auth_password)
dbeltran's avatar
dbeltran committed
        self._ssh_output_err = ""
dbeltran's avatar
dbeltran committed
        self.connected = False
jlope2's avatar
jlope2 committed
        self._default_queue = None
        self.job_status = None
jlope2's avatar
jlope2 committed
        self._ssh = None
        self._ssh_config = None
        self._ssh_output = None
jlope2's avatar
jlope2 committed
        self._user_config_file = None
        self._host_config = None
        self._host_config_id = None
dbeltran's avatar
dbeltran committed
        self.submit_cmd = ""
dbeltran's avatar
dbeltran committed
        self.transport = None
dbeltran's avatar
dbeltran committed
        self.channels = {}
dbeltran's avatar
dbeltran committed
        if sys.platform != "linux":
            self.poller = select.kqueue()
        else:
            self.poller = select.poll()
        self._header = None
        self._wrapper = None
        self.remote_log_dir = ""
        #self.get_job_energy_cmd = ""
        display = os.getenv('DISPLAY')
        if display is None:
            display = "localhost:0"
        self.local_x11_display = xlib_connect.get_display(display)
wuruchi's avatar
wuruchi committed

jlope2's avatar
jlope2 committed
    @property
    def header(self):
        """
        Header to add to job for scheduler configuration
jlope2's avatar
jlope2 committed

        :return: header
        :rtype: object
        """
        return self._header

    @property
    def wrapper(self):
        """
        Handler to manage wrappers

        :return: wrapper-handler
        :rtype: object
        """
        return self._wrapper
dbeltran's avatar
dbeltran committed
    def reset(self):
        self.connected = False
        self._ssh = None
        self._ssh_config = None
        self._ssh_output = None
        self._user_config_file = None
        self._host_config = None
        self._host_config_id = None
        self._ftpChannel = None
        self.transport = None
dbeltran's avatar
dbeltran committed
        self.channels = {}
dbeltran's avatar
dbeltran committed
        if sys.platform != "linux":
            self.poller = select.kqueue()
        else:
            self.poller = select.poll()
        display = os.getenv('DISPLAY')
        if display is None:
            display = "localhost:0"
        self.local_x11_display = xlib_connect.get_display(display)
dbeltran's avatar
dbeltran committed
        self.log_retrieval_process_active = False
        terminate_child_process(self.expid, self.name)
    def test_connection(self,as_conf):
dbeltran's avatar
dbeltran committed
        """
        Test if the connection is still alive, reconnect if not.
        """
                    self.restore_connection(as_conf)
                    message = "OK"
                except BaseException as e:
                    message = str(e)
                if message.find("t accept remote connections") == -1:
dbeltran's avatar
dbeltran committed
                    try:
                        transport = self._ssh.get_transport()
                        transport.send_ignore()
                    except:
                        message = "Timeout connection"
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
        except EOFError as e:
            raise AutosubmitError("[{0}] not alive. Host: {1}".format(
                self.name, self.host), 6002, str(e))
dbeltran's avatar
dbeltran committed
        except (AutosubmitError,AutosubmitCritical,IOError):
dbeltran's avatar
dbeltran committed
            raise
        except BaseException as e:
dbeltran's avatar
dbeltran committed
            raise AutosubmitCritical(str(e),7051)
            #raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message)
    def restore_connection(self, as_conf):
dbeltran's avatar
dbeltran committed
            self.connected = False
dbeltran's avatar
dbeltran committed
            retries = 2
            retry = 0
dbeltran's avatar
dbeltran committed
            try:
                self.connect(as_conf)
dbeltran's avatar
dbeltran committed
            except Exception as e:
                if ',' in self.host:
                    Log.printlog("Connection Failed to {0}, will test another host".format(
                        self.host.split(',')[0]), 6002)
dbeltran's avatar
dbeltran committed
                else:
                        "First connection to {0} is failed, check host configuration or try another login node ".format(self.host), 7050,str(e))
dbeltran's avatar
dbeltran committed
            while self.connected is False and retry < retries:
dbeltran's avatar
dbeltran committed
                try:
                    self.connect(as_conf,True)
                except Exception as e:
dbeltran's avatar
dbeltran committed
                    pass
dbeltran's avatar
dbeltran committed
                retry += 1
            if not self.connected:
                trace = 'Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator'.format(
                    self.host, self.name)
                raise AutosubmitCritical(
                    'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit', 7050, trace)
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
        except AutosubmitCritical as e:
        except SSHException as e:
            raise
dbeltran's avatar
dbeltran committed
        except Exception as e:
                'Cant connect to this platform due an unknown error', 7050, str(e))
dbeltran's avatar
dbeltran committed
    def agent_auth(self,port):
        """
        Attempt to authenticate to the given SSH server using the most common authentication methods available. This will always try to use the SSH agent first, and will fall back to using the others methods if that fails.
        :parameter port: port to connect
        :return: True if authentication was successful, False otherwise
dbeltran's avatar
dbeltran committed
        """
        try:
dbeltran's avatar
dbeltran committed
            self._ssh._agent = Agent()
            for key in self._ssh._agent.get_keys():
                if not hasattr(key,"public_blob"):
                    key.public_blob = None
dbeltran's avatar
dbeltran committed
            self._ssh.connect(self._host_config['hostname'], port=port, username=self.user, timeout=60, banner_timeout=60)
dbeltran's avatar
dbeltran committed
        except BaseException as e:
            Log.debug(f'Failed to authenticate with ssh-agent due to {e}')
            Log.debug('Trying to authenticate with other methods')
dbeltran's avatar
dbeltran committed
            return False
        return True
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
    def interactive_auth_handler(self, title, instructions, prompt_list):
dbeltran's avatar
dbeltran committed
        answers = []
dbeltran's avatar
dbeltran committed
        # Walk the list of prompts that the server sent that we need to answer
dbeltran's avatar
dbeltran committed
        twofactor_nonpush = None
dbeltran's avatar
dbeltran committed
        for prompt_, _ in prompt_list:
            prompt = str(prompt_).strip().lower()
            # str() used to make sure that we're dealing with a string rather than a unicode string
dbeltran's avatar
dbeltran committed
            # strip() used to get rid of any padding spaces sent by the server
dbeltran's avatar
dbeltran committed
            if "password" in prompt:
dbeltran's avatar
dbeltran committed
                answers.append(self.pw)
dbeltran's avatar
dbeltran committed
            elif "token" in prompt or "2fa" in prompt or "otp" in prompt:
dbeltran's avatar
dbeltran committed
                if self.two_factor_method == "push":
                    answers.append("")
dbeltran's avatar
dbeltran committed
                elif self.two_factor_method == "token":
                    # Sometimes the server may ask for the 2FA code more than once this is to avoid asking the user again
                    # If it is wrong, just run again autosubmit run because the issue could be in the password step
                    if twofactor_nonpush is None:
dbeltran's avatar
dbeltran committed
                        twofactor_nonpush = input("Please type the 2FA/OTP/token code: ")
                    answers.append(twofactor_nonpush)
dbeltran's avatar
dbeltran committed
        # This is done from the server
        # if self.two_factor_method == "push":
        #     try:
        #         inputimeout(prompt='Press enter to complete the 2FA PUSH authentication', timeout=self.otp_timeout)
        #     except:
        #         pass
dbeltran's avatar
dbeltran committed
        return tuple(answers)
dbeltran's avatar
dbeltran committed

    def connect(self, as_conf, reconnect=False):
jlope2's avatar
jlope2 committed
        """
        Creates ssh connection to host

        :return: True if connection is created, False otherwise
        :rtype: bool
        """
dbeltran's avatar
dbeltran committed

jlope2's avatar
jlope2 committed
        try:
            display = os.getenv('DISPLAY')
            if display is None:
                display = "localhost:0"
            self.local_x11_display = xlib_connect.get_display(display)
jlope2's avatar
jlope2 committed
            self._ssh = paramiko.SSHClient()
            self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self._ssh_config = paramiko.SSHConfig()
            self._user_config_file = os.path.expanduser("~/.ssh/config")
            if os.path.exists(self._user_config_file):
                with open(self._user_config_file) as f:
                    self._ssh_config.parse(f)
jlope2's avatar
jlope2 committed
            self._host_config = self._ssh_config.lookup(self.host)
dbeltran's avatar
dbeltran committed
            if "," in self._host_config['hostname']:
dbeltran's avatar
dbeltran committed
                if reconnect:
                    self._host_config['hostname'] = random.choice(
                        self._host_config['hostname'].split(',')[1:])
dbeltran's avatar
dbeltran committed
                else:
dbeltran's avatar
dbeltran committed
                    self._host_config['hostname'] = self._host_config['hostname'].split(',')[0]
jlope2's avatar
jlope2 committed
            if 'identityfile' in self._host_config:
                self._host_config_id = self._host_config['identityfile']
dbeltran's avatar
dbeltran committed
            port = int(self._host_config.get('port',22))
dbeltran's avatar
dbeltran committed
            if not self.two_factor_auth:
dbeltran's avatar
dbeltran committed
                # Agent Auth
                if not self.agent_auth(port):
                    # Public Key Auth
                    if 'proxycommand' in self._host_config:
                        self._proxy = paramiko.ProxyCommand(self._host_config['proxycommand'])
                        try:
                            self._ssh.connect(self._host_config['hostname'], port, username=self.user,
                                              key_filename=self._host_config_id, sock=self._proxy, timeout=60 , banner_timeout=60)
                        except Exception as e:
                            self._ssh.connect(self._host_config['hostname'], port, username=self.user,
                                              key_filename=self._host_config_id, sock=self._proxy, timeout=60,
dbeltran's avatar
dbeltran committed
                                              banner_timeout=60, disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
dbeltran's avatar
dbeltran committed
                    else:
                        try:
                            self._ssh.connect(self._host_config['hostname'], port, username=self.user,
                                              key_filename=self._host_config_id, timeout=60 , banner_timeout=60)
                        except Exception as e:
                            self._ssh.connect(self._host_config['hostname'], port, username=self.user,
                                              key_filename=self._host_config_id, timeout=60 , banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
                self.transport = self._ssh.get_transport()
                self.transport.banner_timeout = 60
            else:
                Log.warning("2FA is enabled, this is an experimental feature and it may not work as expected")
                Log.warning("nohup can't be used as the password will be asked")
dbeltran's avatar
dbeltran committed
                Log.warning("If you are using a token, please type the token code when asked")
dbeltran's avatar
dbeltran committed
                if self.pw is None:
                    self.pw = getpass.getpass("Password for {0}: ".format(self.name))
dbeltran's avatar
dbeltran committed
                if self.two_factor_method == "push":
                    Log.warning("Please check your phone to complete the 2FA PUSH authentication")
dbeltran's avatar
dbeltran committed
                self.transport = paramiko.Transport((self._host_config['hostname'], port))
                self.transport.start_client()
dbeltran's avatar
dbeltran committed
                try:
                    self.transport.auth_interactive(self.user, self.interactive_auth_handler)
                except Exception as e:
                    Log.printlog("2FA authentication failed",7000)
dbeltran's avatar
dbeltran committed
                    raise
dbeltran's avatar
dbeltran committed
                if self.transport.is_authenticated():
                    self._ssh._transport = self.transport
                    self.transport.banner_timeout = 60
                else:
                    self.transport.close()
                    raise SSHException
dbeltran's avatar
dbeltran committed
            self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) )
            self._ftpChannel.get_channel().settimeout(120)
dbeltran's avatar
dbeltran committed
            self.connected = True
dbeltran's avatar
dbeltran committed
            if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"):
dbeltran's avatar
dbeltran committed
                self.log_retrieval_process_active = True
                if as_conf and as_conf.misc_data.get("AS_COMMAND", "").lower() == "run":
                    self.recover_job_logs()
dbeltran's avatar
dbeltran committed
        except SSHException:
dbeltran's avatar
dbeltran committed
        except IOError as e:
dbeltran's avatar
dbeltran committed
            if "refused" in str(e.strerror).lower():
dbeltran's avatar
dbeltran committed
                raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host))
dbeltran's avatar
dbeltran committed
            elif "name or service not known" in str(e.strerror).lower():
                raise SSHException(" {0} doesn't accept remote connections. Check if there is an typo in the hostname".format(self.host))
            else:
dbeltran's avatar
dbeltran committed
                raise AutosubmitError("File can't be located due an slow or timeout connection", 6016, str(e))
dbeltran's avatar
dbeltran committed
        except BaseException as e:
dbeltran's avatar
dbeltran committed
            self.connected = False
dbeltran's avatar
dbeltran committed
            if "Authentication failed." in str(e):
                raise AutosubmitCritical("Authentication Failed, please check the platform.conf of {0}".format(
dbeltran's avatar
dbeltran committed
                    self._host_config['hostname']), 7050, str(e))
dbeltran's avatar
dbeltran committed
            if not reconnect and "," in self._host_config['hostname']:
                self.restore_connection(as_conf)
dbeltran's avatar
dbeltran committed
            else:
dbeltran's avatar
dbeltran committed
                    "Couldn't establish a connection to the specified host, wrong configuration?", 6003, str(e))
    def check_completed_files(self, sections=None):
        if self.host == 'localhost':
            return None
        command = "find %s " % self.remote_log_dir
        if sections:
            for i, section in enumerate(sections.split()):
                command += " -name *%s_COMPLETED" % section
                if i < len(sections.split()) - 1:
        if self.send_command(command, True):
            return self._ssh_output
        else:
            return None

    def remove_multiple_files(self, filenames):
dbeltran's avatar
dbeltran committed
        #command = "rm"
        log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid))
        multiple_delete_previous_run = os.path.join(
            log_dir, "multiple_delete_previous_run.sh")
        if os.path.exists(log_dir):
            lang = locale.getlocale()[1]
            if lang is None:
                lang = locale.getdefaultlocale()[1]
                if lang is None:
                    lang = 'UTF-8'
            open(multiple_delete_previous_run, 'wb+').write( ("rm -f" + filenames).encode(lang))
            os.chmod(multiple_delete_previous_run, 0o770)
            self.send_file(multiple_delete_previous_run, False)
            command = os.path.join(self.get_files_path(),
                                   "multiple_delete_previous_run.sh")
            if self.send_command(command, ignore_log=True):
                return self._ssh_output
            else:
                return ""
        return ""
    def send_file(self, filename, check=True):
jlope2's avatar
jlope2 committed
        """
        Sends a local file to the platform
jlope2's avatar
jlope2 committed
        :param filename: name of the file to send
        :type filename: str
        """

        if check:
            self.check_remote_log_dir()
            self.delete_file(filename)
jlope2's avatar
jlope2 committed
        try:
dbeltran's avatar
dbeltran committed
            local_path = os.path.join(os.path.join(self.tmp_path, filename))
            remote_path = os.path.join(
                self.get_files_path(), os.path.basename(filename))
dbeltran's avatar
dbeltran committed
            self._ftpChannel.put(local_path, remote_path)
            self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode)
jlope2's avatar
jlope2 committed
            return True
dbeltran's avatar
dbeltran committed
        except IOError as e:
            raise AutosubmitError('Can not send file {0} to {1}'.format(os.path.join(
                self.tmp_path, filename), os.path.join(self.get_files_path(), filename)), 6004, str(e))
            raise AutosubmitError(
                'Send file failed. Connection seems to no be active', 6004)

    def get_list_of_files(self):
        return self._ftpChannel.get(self.get_files_path)

    def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False, wrapper_failed=False):
jlope2's avatar
jlope2 committed
        """
        Copies a file from the current platform to experiment's tmp folder

        :param wrapper_failed:
        :param ignore_log:
jlope2's avatar
jlope2 committed
        :param filename: file name
        :type filename: str
        :param must_exist: If True, raises an exception if file can not be copied
        :type must_exist: bool
        :param relative_path: path inside the tmp folder
        :type relative_path: str
        :return: True if file is copied successfully, false otherwise
jlope2's avatar
jlope2 committed
        :rtype: bool
        """
        local_path = os.path.join(self.tmp_path, relative_path)
        if not os.path.exists(local_path):
            os.makedirs(local_path)
dbeltran's avatar
dbeltran committed

        file_path = os.path.join(local_path, filename)
        if os.path.exists(file_path):
            os.remove(file_path)
        remote_path = os.path.join(self.get_files_path(), filename)
            self._ftpChannel.get(remote_path, file_path)
jlope2's avatar
jlope2 committed
            return True
dbeltran's avatar
dbeltran committed
        except Exception as e:
dbeltran's avatar
dbeltran committed
            try:
                os.remove(file_path)
dbeltran's avatar
dbeltran committed
                pass
            if str(e) in "Garbage":
                    Log.printlog(
                        "File {0} seems to no exists (skipping)".format(filename), 5004)
jlope2's avatar
jlope2 committed
            if must_exist:
                    Log.printlog(
                        "File {0} does not exists".format(filename), 6004)
dbeltran's avatar
dbeltran committed
                return False
                    Log.printlog(
                        "Log file couldn't be retrieved: {0}".format(filename), 5000)
jlope2's avatar
jlope2 committed

    def delete_file(self, filename):
        """
        Deletes a file from this platform

        :param filename: file name
        :type filename: str
        :return: True if successful or file does not exist
jlope2's avatar
jlope2 committed
        :rtype: bool
        """
dbeltran's avatar
dbeltran committed

jlope2's avatar
jlope2 committed
        try:
            self._ftpChannel.remove(os.path.join(
                self.get_files_path(), filename))
jlope2's avatar
jlope2 committed
            return True
dbeltran's avatar
dbeltran committed
        except IOError as e:
        except BaseException as e:
            Log.error('Could not remove file {0} due a wrong configuration'.format(
                os.path.join(self.get_files_path(), filename)))
dbeltran's avatar
dbeltran committed
            if str(e).lower().find("garbage") != -1:
dbeltran's avatar
dbeltran committed
                    "Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ", 7051, str(e))
    def move_file(self, src, dest, must_exist=False):
        """
        Moves a file on the platform (includes .err and .out)
        :param src: source name
        :type src: str
        :param dest: destination name
        :param must_exist: ignore if file exist or not
        :type dest: str
        """
dbeltran's avatar
dbeltran committed
            path_root = self.get_files_path()
dbeltran's avatar
dbeltran committed
            src = os.path.join(path_root, src)
            dest = os.path.join(path_root, dest)
            try:
                self._ftpChannel.stat(dest)
            except IOError:
                self._ftpChannel.rename(src,dest)
            return True
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
        except IOError as e:
            if str(e) in "Garbage":
                raise AutosubmitError('File {0} does not exists, something went wrong with the platform'.format(os.path.join(path_root,src)), 6004, str(e))
dbeltran's avatar
dbeltran committed
            if must_exist:
dbeltran's avatar
dbeltran committed
                raise AutosubmitError("File {0} does not exists".format(
                    os.path.join(path_root,src)), 6004, str(e))
dbeltran's avatar
dbeltran committed
            else:
                Log.debug("File {0} doesn't exists ".format(path_root))
                return False
        except Exception as e:
dbeltran's avatar
dbeltran committed
            if str(e) in "Garbage":
                raise AutosubmitError('File {0} does not exists'.format(
dbeltran's avatar
dbeltran committed
                    os.path.join(self.get_files_path(), src)), 6004, str(e))
dbeltran's avatar
dbeltran committed
                raise AutosubmitError("File {0} does not exists".format(
                    os.path.join(self.get_files_path(), src)), 6004, str(e))
                Log.printlog("Log file couldn't be moved: {0}".format(
                    os.path.join(self.get_files_path(), src)), 5001)
    def submit_job(self, job, script_name, hold=False, export="none"):
jlope2's avatar
jlope2 committed
        """
jlope2's avatar
jlope2 committed
        :param job: job object
        :type job: autosubmit.job.job.Job
        :param script_name: job script's name
jlope2's avatar
jlope2 committed
        :rtype scriptname: str
        :param hold: send job hold
        :type hold: boolean
jlope2's avatar
jlope2 committed
        """
dbeltran's avatar
dbeltran committed
        if job is None or not job:
            x11 = False
        else:
            x11 = job.x11
dbeltran's avatar
dbeltran committed

        cmd = self.get_submit_cmd(script_name, job, hold=hold, export=export)
        Log.debug(f"Submitting job with the command: {cmd}")
dbeltran's avatar
dbeltran committed
        if cmd is None:
            return None
dbeltran's avatar
dbeltran committed
        if self.send_command(cmd,x11=x11):
            job_id = self.get_submitted_job_id(self.get_ssh_output(),x11=job.x11)
            Log.debug("Job ID: {0}", job_id)
            return int(job_id)
dbeltran's avatar
dbeltran committed
        else:
dbeltran's avatar
dbeltran committed
            return None
dbeltran's avatar
dbeltran committed
    def get_job_energy_cmd(self, job_id):
        return self.get_ssh_output()
    def check_job_energy(self, job_id):
        """
        Checks job energy and return values. Defined in child classes.

        Args:

        Returns:
            4-tuple (int, int, int, int): submit time, start time, finish time, energy
        """
        check_energy_cmd = self.get_job_energy_cmd(job_id)
        self.send_command(check_energy_cmd)
        return self.get_ssh_output()
dbeltran's avatar
dbeltran committed
        """
        Sends a Submitfile Script, exec in platform and retrieve the Jobs_ID.
        :param hold: send job hold
        :type hold: boolean
        :return: job id for the submitted job
        :rtype: int
dbeltran's avatar
dbeltran committed
        """
        raise NotImplementedError
dbeltran's avatar
dbeltran committed
    def get_estimated_queue_time_cmd(self, job_id):
        """
        Returns command to get estimated queue time on remote platforms

        :param job_id: id of job to check
dbeltran's avatar
dbeltran committed
        :param job_id: str
dbeltran's avatar
dbeltran committed
        :return: command to get estimated queue time
        """
        raise NotImplementedError
    def parse_estimated_time(self, output):
        """
        Parses estimated queue time from output of get_estimated_queue_time_cmd

        :param output: output of get_estimated_queue_time_cmd
        :type output: str
        :return: estimated queue time
        :rtype:
        """
        raise NotImplementedError
    def check_job(self, job, default_status=Status.COMPLETED, retries=5, submit_hold_check=False, is_wrapper=False):
jlope2's avatar
jlope2 committed
        """
        Checks job running status

        :param is_wrapper:
        :param submit_hold_check:
jlope2's avatar
jlope2 committed
        :param retries: retries
        :param job: job
        :type job: autosubmit.job.job.Job
        :param default_status: default status if job is not found
        :type job: class(job)
jlope2's avatar
jlope2 committed
        :param default_status: status to assign if it can be retrieved from the platform
        :type default_status: autosubmit.job.job_common.Status
        :return: current job status
        :rtype: autosubmit.job.job_common.Status
jlope2's avatar
jlope2 committed
        """
dbeltran's avatar
dbeltran committed
        job_id = job.id
jlope2's avatar
jlope2 committed
        job_status = Status.UNKNOWN
        if type(job_id) is not int and type(job_id) is not str:
            Log.error(
                'check_job() The job id ({0}) is not an integer neither a string.', job_id)
            job.new_status = job_status
        sleep(2)
        self.send_command(self.get_checkjob_cmd(job_id))
dbeltran's avatar
dbeltran committed
        while self.get_ssh_output().strip(" ") == "" and retries > 0:
            retries = retries - 1
            Log.debug(
                'Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
            Log.debug('retries left {0}', retries)
dbeltran's avatar
dbeltran committed
            Log.debug('Will be retrying in {0} seconds', sleep_time)
            sleep(sleep_time)
            self.send_command(self.get_checkjob_cmd(job_id))
jlope2's avatar
jlope2 committed
        if retries >= 0:
dbeltran's avatar
dbeltran committed
            Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id))
            job_status = self.parse_job_output(
                self.get_ssh_output()).strip("\n")
jlope2's avatar
jlope2 committed
            # URi: define status list in HPC Queue Class
jlope2's avatar
jlope2 committed
            if job_status in self.job_status['COMPLETED'] or retries == 0:
jlope2's avatar
jlope2 committed
                job_status = Status.COMPLETED
            elif job_status in self.job_status['RUNNING']:
                job_status = Status.RUNNING
                if not is_wrapper:
                    if job.status != Status.RUNNING:
                        job.start_time = datetime.datetime.now() # URi: start time
                    if job.start_time is not None and str(job.wrapper_type).lower() == "none":
                        wallclock = job.wallclock
                        if job.wallclock == "00:00" or job.wallclock is None:
                            wallclock = job.platform.max_wallclock
                        if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "":
                            if job.is_over_wallclock(job.start_time,wallclock):
                                try:
                                    job.platform.get_completed_files(job.name)
                                    job_status = job.check_completion(over_wallclock=True)
                                except Exception as e:
dbeltran's avatar
dbeltran committed
            elif job_status in self.job_status['QUEUING'] and (not job.hold or job.hold.lower() != "true"):
jlope2's avatar
jlope2 committed
                job_status = Status.QUEUING
dbeltran's avatar
dbeltran committed
            elif job_status in self.job_status['QUEUING'] and (job.hold or job.hold.lower() == "true"):
jlope2's avatar
jlope2 committed
            elif job_status in self.job_status['FAILED']:
                job_status = Status.FAILED
            else:
                job_status = Status.UNKNOWN
        else:
            Log.error(
                " check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id))
jlope2's avatar
jlope2 committed
            job_status = Status.UNKNOWN
            Log.error(
                'check_job() The job id ({0}) status is {1}.', job_id, job_status)
dbeltran's avatar
dbeltran committed

        if job_status in [Status.FAILED, Status.COMPLETED]:
            job.updated_log = False
        if submit_hold_check:
            return job_status
        else:
            job.new_status = job_status

    def _check_jobid_in_queue(self, ssh_output, job_list_cmd):
        for job in job_list_cmd[:-1].split(','):
            if job not in ssh_output:
                return False
        return True
    def parse_joblist(self, job_list):
        """
        Convert a list of job_list to job_list_cmd
        :param job_list: list of jobs
        :type job_list: list
        :param ssh_output: ssh output
        :type ssh_output: str
        :return: job status
        :rtype: str
        """
        job_list_cmd = ""
        for job,job_prev_status in job_list:
            if job.id is None:
                job_str = "0"
            else:
                job_str = str(job.id)
            job_list_cmd += job_str+","
        if job_list_cmd[-1] == ",":
            job_list_cmd=job_list_cmd[:-1]
        return job_list_cmd
dbeltran's avatar
dbeltran committed
    def check_Alljobs(self, job_list, as_conf, retries=5):
dbeltran's avatar
dbeltran committed
        """
        Checks jobs running status
        :param job_list: list of jobs
        :type job_list: list
dbeltran's avatar
dbeltran committed
        :param job_list_cmd: list of jobs in the queue system
        :type job_list_cmd: str
        :param remote_logs: remote logs
        :type remote_logs: str
dbeltran's avatar
dbeltran committed
        :param retries: retries
dbeltran's avatar
dbeltran committed
        :type default_status: bool
        :return: current job status
        :rtype: autosubmit.job.job_common.Status
dbeltran's avatar
dbeltran committed
        """
dbeltran's avatar
dbeltran committed
        job_status = Status.UNKNOWN
dbeltran's avatar
dbeltran committed
        remote_logs = as_conf.get_copy_remote_logs()
        job_list_cmd = self.parse_joblist(job_list)
dbeltran's avatar
dbeltran committed
        cmd = self.get_checkAlljobs_cmd(job_list_cmd)
dbeltran's avatar
dbeltran committed
        sleep(sleep_time)
dbeltran's avatar
dbeltran committed
        slurm_error = False
        e_msg = ""
        try:
dbeltran's avatar
dbeltran committed
            self.send_command(cmd)
dbeltran's avatar
dbeltran committed
        except AutosubmitError as e:
dbeltran's avatar
dbeltran committed
            slurm_error = True
        if not slurm_error:
            while not self._check_jobid_in_queue(self.get_ssh_output(), job_list_cmd) and retries > 0:
                try:
                    self.send_command(cmd)
                except AutosubmitError as e:
dbeltran's avatar
dbeltran committed
                    slurm_error = True
                    break
                Log.debug('Retrying check job command: {0}', cmd)
                Log.debug('retries left {0}', retries)
                Log.debug('Will be retrying in {0} seconds', sleep_time)
                retries -= 1
                sleep(sleep_time)
                sleep_time = sleep_time + 5

dbeltran's avatar
dbeltran committed
        if retries >= 0:
            Log.debug('Successful check job command')
            in_queue_jobs = []
            list_queue_jobid = ""
dbeltran's avatar
dbeltran committed
            for job,job_prev_status in job_list:
dbeltran's avatar
dbeltran committed
                if not slurm_error:
                    job_id = job.id
                    job_status = self.parse_Alljobs_output(job_list_status, job_id)
dbeltran's avatar
dbeltran committed
                    while len(job_status) <= 0 and retries >= 0:
                        retries -= 1
                        self.send_command(cmd)
                        job_list_status = self.get_ssh_output()
                        job_status = self.parse_Alljobs_output(job_list_status, job_id)
                        if len(job_status) <= 0:
                            Log.debug('Retrying check job command: {0}', cmd)
                            Log.debug('retries left {0}', retries)
                            Log.debug('Will be retrying in {0} seconds', sleep_time)
                            sleep(sleep_time)
                            sleep_time = sleep_time + 5
                    # URi: define status list in HPC Queue Class
                else:
                    job_status = job.status
                if job.status != Status.RUNNING:
                    job.start_time = datetime.datetime.now() # URi: start time
                if job.start_time is not None and str(job.wrapper_type).lower() == "none":
                    wallclock = job.wallclock
                    if job.wallclock == "00:00":
                        wallclock = job.platform.max_wallclock
dbeltran's avatar
dbeltran committed
                    if wallclock != "00:00" and wallclock != "00:00:00" and wallclock != "":
                        if job.is_over_wallclock(job.start_time,wallclock):
                            try:
                                job.platform.get_completed_files(job.name)
                                job_status = job.check_completion(over_wallclock=True)
                                if job_status is Status.FAILED:
                                    try:
dbeltran's avatar
dbeltran committed
                                        if self.cancel_cmd is not None:
                                            job.platform.send_command(self.cancel_cmd + " " + str(job.id))
dbeltran's avatar
dbeltran committed
                            except:
                                job_status = Status.FAILED
                if job_status in self.job_status['COMPLETED']:
dbeltran's avatar
dbeltran committed
                    job_status = Status.COMPLETED
                elif job_status in self.job_status['RUNNING']:
                    job_status = Status.RUNNING
                elif job_status in self.job_status['QUEUING']:
                    if job.hold:
                        job_status = Status.HELD  # release?
dbeltran's avatar
dbeltran committed
                    else:
                        job_status = Status.QUEUING
                    list_queue_jobid += str(job.id) + ','
                    in_queue_jobs.append(job)
dbeltran's avatar
dbeltran committed
                elif job_status in self.job_status['FAILED']:
                    job_status = Status.FAILED
                elif retries == 0:
                    job_status = Status.COMPLETED
                    job.update_status(as_conf)
dbeltran's avatar
dbeltran committed
                else:
                    job_status = Status.UNKNOWN
dbeltran's avatar
dbeltran committed
                        'check_job() The job id ({0}) status is {1}.', job.id, job_status)
            self.get_queue_status(in_queue_jobs,list_queue_jobid,as_conf)
                Log.warning(
                    'check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status)
            raise AutosubmitError("Some Jobs are in Unknown status", 6008)
            # job.new_status=job_status
dbeltran's avatar
dbeltran committed
        if slurm_error:
            raise AutosubmitError(e_msg,6000)
dbeltran's avatar
dbeltran committed
    def get_jobid_by_jobname(self,job_name,retries=2):
        """
        Get job id by job name
dbeltran's avatar
dbeltran committed
        :param retries: retries
        :type retries: int
        :return: job id
        """
        #sleep(5)
dbeltran's avatar
dbeltran committed
        cmd = self.get_jobid_by_jobname_cmd(job_name)
        self.send_command(cmd)
        job_id_name = self.get_ssh_output()
        while len(job_id_name) <= 0 and retries > 0:
            self.send_command(cmd)
            job_id_name = self.get_ssh_output()
            retries -= 1
            sleep(2)
        if retries >= 0:
            #get id last line
            job_ids_names = job_id_name.split('\n')[1:-1]
            #get all ids by job-name
dbeltran's avatar
dbeltran committed
            job_ids = [job_id.split(',')[0] for job_id in job_ids_names]
        return job_ids

    def get_queue_status(self, in_queue_jobs, list_queue_jobid, as_conf):
        """Get queue status for a list of jobs.

        The job statuses are normally found via a command sent to the remote platform.

        Each ``job`` in ``in_queue_jobs`` must be updated. Implementations may check
        for the reason for queueing cancellation, or if the job is held, and update
        the ``job`` status appropriately.
        """
        raise NotImplementedError


    def get_checkjob_cmd(self, job_id):
        """
        Returns command to check job status on remote platforms

        :param job_id: id of job to check
        :param job_id: int
        :return: command to check job status
        :rtype: str
        """
        raise NotImplementedError

dbeltran's avatar
dbeltran committed
    def get_checkAlljobs_cmd(self, jobs_id):
        """
        Returns command to check jobs status on remote platforms

        :param jobs_id: id of jobs to check
dbeltran's avatar
dbeltran committed
        :return: command to check job status
        :rtype: str
        """
        raise NotImplementedError
    def get_jobid_by_jobname_cmd(self, job_name):
        """
        Returns command to get job id by job name on remote platforms
        :param job_name:
        :return: str
        """
        return NotImplementedError

    def get_queue_status_cmd(self, job_name):
        """
        Returns command to get queue status on remote platforms
        :return: str
        """
        return NotImplementedError

dbeltran's avatar
dbeltran committed
    def x11_handler(self, channel, xxx_todo_changeme):
dbeltran's avatar
dbeltran committed
        '''handler for incoming x11 connections
        for each x11 incoming connection,
        - get a connection to the local display
        - maintain bidirectional map of remote x11 channel to local x11 channel
        - add the descriptors to the poller
        - queue the channel (use transport.accept())'''
dbeltran's avatar
dbeltran committed
        (src_addr, src_port) = xxx_todo_changeme
dbeltran's avatar
dbeltran committed
        x11_chanfd = channel.fileno()
dbeltran's avatar
dbeltran committed
        local_x11_socket = xlib_connect.get_socket(*self.local_x11_display[:4])
dbeltran's avatar
dbeltran committed
        local_x11_socket_fileno = local_x11_socket.fileno()
        self.channels[x11_chanfd] = channel, local_x11_socket
        self.channels[local_x11_socket_fileno] = local_x11_socket, channel
dbeltran's avatar
dbeltran committed
        self.poller.register(x11_chanfd, select.POLLIN)
        self.poller.register(local_x11_socket, select.POLLIN)
dbeltran's avatar
dbeltran committed
        self.transport._queue_incoming_channel(channel)
dbeltran's avatar
dbeltran committed

    def flush_out(self,session):
        while session.recv_ready():
dbeltran's avatar
dbeltran committed
            sys.stdout.write(session.recv(4096).decode(locale.getlocale()[1]))
dbeltran's avatar
dbeltran committed
        while session.recv_stderr_ready():
dbeltran's avatar
dbeltran committed
            sys.stderr.write(session.recv_stderr(4096).decode(locale.getlocale()[1]))
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
    @threaded
    def x11_status_checker(self, session, session_fileno):
dbeltran's avatar
dbeltran committed
        poller = None
dbeltran's avatar
dbeltran committed
        self.transport.accept()
        while not session.exit_status_ready():
dbeltran's avatar
dbeltran committed
                if type(self.poller) is not list:
                    if sys.platform != "linux":
                        poller = self.poller.kqueue()
                    else:
                        poller = self.poller.poll()
                # accept subsequent x11 connections if any
                if len(self.transport.server_accepts) > 0:
                    self.transport.accept()
dbeltran's avatar
dbeltran committed
                if not poller:  # this should not happen, as we don't have a timeout.
dbeltran's avatar
dbeltran committed
                for fd, event in poller:
                    if fd == session_fileno:
                        self.flush_out(session)
                    # data either on local/remote x11 socket
dbeltran's avatar
dbeltran committed
                    if fd in list(self.channels.keys()):
                        channel, counterpart = self.channels[fd]
                        try:
                            # forward data between local/remote x11 socket.
                            data = channel.recv(4096)
                            counterpart.sendall(data)
                        except socket.error:
                            channel.close()
                            counterpart.close()
                            del self.channels[fd]
            except Exception as e:
dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed

dbeltran's avatar
dbeltran committed
    def exec_command(self, command, bufsize=-1, timeout=30, get_pty=False,retries=3, x11=False):
dbeltran's avatar
dbeltran committed
        """
        Execute a command on the SSH server.  A new `.Channel` is opened and
dbeltran's avatar
dbeltran committed
        the requested command is execed.  The command's input and output
dbeltran's avatar
dbeltran committed
        streams are returned as Python ``file``-like objects representing
        stdin, stdout, and stderr.

        :param x11:
        :param retries:
        :param get_pty:
dbeltran's avatar
dbeltran committed
        :param str command: the command to execute
        :param int bufsize:
            interpreted the same way as by the built-in ``file()`` function in
            Python
        :param int timeout:
            set command's channel timeout. See `Channel.settimeout`.settimeout
        :return:
            the stdin, stdout, and stderr of the executing command, as a
            3-tuple

        :raises SSHException: if the server fails to execute the command
        """
        while retries > 0:
            try:
                if x11:
                    display = os.getenv('DISPLAY')
dbeltran's avatar
dbeltran committed
                    if display is None or not display:
                        display = "localhost:0"
                    self.local_x11_display = xlib_connect.get_display(display)
dbeltran's avatar
dbeltran committed
                    chan = self.transport.open_session()
                    chan.request_x11(single_connection=False,handler=self.x11_handler)
dbeltran's avatar
dbeltran committed
                else:
dbeltran's avatar
dbeltran committed
                    chan = self.transport.open_session()
                if x11:
dbeltran's avatar
dbeltran committed
                    if "timeout" in command:
                        timeout_command = command.split("timeout ")[1].split(" ")[0]
                        if timeout_command == 0:
                            timeout_command = "infinity"
                        command = f'{command} ; sleep {timeout_command} 2>/dev/null'
                    #command = f'export display {command}'
                    Log.info(command)
                    try:
                        chan.exec_command(command)
                    except BaseException as e:
                        raise AutosubmitCritical("Failed to execute command: %s" % e)
dbeltran's avatar
dbeltran committed
                    chan_fileno = chan.fileno()
                    self.poller.register(chan_fileno, select.POLLIN)
                    self.x11_status_checker(chan, chan_fileno)
dbeltran's avatar
dbeltran committed
                else:
                    chan.exec_command(command)
dbeltran's avatar
dbeltran committed
                stdin = chan.makefile('wb', bufsize)
dbeltran's avatar
dbeltran committed
                stdout = chan.makefile('rb', bufsize)
                stderr = chan.makefile_stderr('rb', bufsize)
dbeltran's avatar
dbeltran committed
                return stdin, stdout, stderr
            except paramiko.SSHException as e:
                if str(e) in "SSH session not active":
                    self._ssh = None
                    self.restore_connection(None)
dbeltran's avatar
dbeltran committed
                timeout = timeout + 60
                retries = retries - 1
        if retries <= 0:
            return False , False, False
dbeltran's avatar
dbeltran committed
    def exec_command_x11(self, command, bufsize=-1, timeout=0, get_pty=False,retries=3, x11=False):
dbeltran's avatar
dbeltran committed
        session = self.transport.open_session()