From 2826d6589070d07f0309a65e472293fd5fa3a147 Mon Sep 17 00:00:00 2001 From: Deli Zhang Date: Sat, 15 Jul 2023 17:35:12 +0800 Subject: [PATCH] CP-35235: Replace paramiko by ssh command Also made below changes: - Install iperf rpm in dom0 - Rename package to auto-cert-kit Signed-off-by: Deli Zhang --- autocertkit/common.py | 158 ++++++++++++++++++++++ autocertkit/network_tests.py | 20 +-- autocertkit/ssh.py | 245 ----------------------------------- autocertkit/testbase.py | 3 +- autocertkit/utils.py | 34 +---- plugins/autocertkit | 56 +------- 6 files changed, 182 insertions(+), 334 deletions(-) create mode 100644 autocertkit/common.py delete mode 100644 autocertkit/ssh.py diff --git a/autocertkit/common.py b/autocertkit/common.py new file mode 100644 index 0000000..42397cb --- /dev/null +++ b/autocertkit/common.py @@ -0,0 +1,158 @@ +# Copyright (c) 2023-07-20 Cloud Software Group Holdings, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, +# with or without modification, are permitted provided +# that the following conditions are met: +# +# * Redistributions of source code must retain the above +# copyright notice, this list of conditions and the +# following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the +# following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. + +import os +import time +import subprocess +import uuid + + +SSH = '/usr/bin/ssh' +SCP = '/usr/bin/scp' +EXPECT = '/usr/bin/expect' +ACK_HOME = '/opt/xensource/packages/files/auto-cert-kit/' + + +def set_logger(logger): + global log + log = logger + + +def make_local_call(call, logging=True, std_out=subprocess.PIPE, + std_err=subprocess.PIPE, shell=False, timeout=None): + """Function wrapper for making a simple call to shell""" + if logging: + log.debug(f"make_local_call: {call}") + process = subprocess.Popen(call, stdout=std_out, stderr=std_err, + shell=shell, universal_newlines=True) + stdout, stderr = process.communicate(timeout=timeout) + res = {"returncode": process.returncode, "stdout": stdout.strip(), + "stderr": str(stderr).strip()} + if logging: + log.debug("returncode: %d" % process.returncode) + log.debug("stdout: %s" % str(stdout)) + log.debug("stderr: %s" % str(stderr)) + if res["returncode"] != 0: + log.error(f"ERR: Could not make local call: {call}") + + return res + + +class SecureChannel: + """Wrap of ssh and scp""" + + def __init__(self, ip, user, password, timeout=300): + self.ip = ip + self.user = user + self.password = password + self.timeout = timeout + + def _wrap_cmd(self, cmd): + escape_cmd = cmd.replace('$', '\$') + return fr'''{EXPECT} << EOF +set timeout {self.timeout} +spawn {escape_cmd} +expect {{ + "continue connecting (yes/no)?" {{send "yes\n"; exp_continue}} + "password:" {{send "{self.password}\n"; exp_continue}} + eof {{catch wait result; exit [lindex \$result 3]}} + timeout {{exit 250}} +}} +EOF +''' + + def _wrap_ssh(self, cmd): + return self._wrap_cmd(f'{SSH} {self.user}@{self.ip} {{ {cmd} }}') + + def run_cmd(self, cmd): + """Run command simply and ignore stderr""" + return make_local_call(self._wrap_ssh(cmd), shell=True, timeout=self.timeout) + + def run_cmd_ext(self, cmd): + """Run command and capture stdout and stderr separately""" + id = str(uuid.uuid4()) + fcmd = f'.ack_cmd.{id}' + frc = f'.ack_rc.{id}' + fout = f'.ack_out.{id}' + ferr = f'.ack_err.{id}' + + with open(f'/tmp/{fcmd}', 'w') as f: + f.write(cmd) + self.put_file(f'/tmp/{fcmd}') + + self.run_cmd(fr'sh {fcmd} >{fout} 2>{ferr}; echo "$?" >{frc}') + + self.get_file(f'.ack_*.{id}', '/tmp/') + contents = [] + for f in [frc, fout, ferr]: + with open(f'/tmp/{f}', 'r') as f: + contents.append(f.read().strip()) + + self.run_cmd(fr'rm -f .ack_*.{id}') + for f in [fcmd, frc, fout, ferr]: + os.remove(f'/tmp/{f}') + + res = {'returncode': int(contents[0]), 'stdout': contents[1], 'stderr': contents[2]} + log.debug(f'Real result: {res}') + + return res + + def _wrap_scp(self, src, dst): + return self._wrap_cmd(f'{SCP} {src} {dst}') + + def put_file(self, src, dst=''): + cmd = self._wrap_scp(src, f'{self.user}@{self.ip}:{dst}') + return make_local_call(cmd, shell=True, timeout=self.timeout) + + def get_file(self, src, dst='.'): + cmd = self._wrap_scp(f'{self.user}@{self.ip}:{src}', dst) + return make_local_call(cmd, shell=True, timeout=self.timeout) + + +def ssh_command(ip, username, password, cmd_str, dbg_str=None, attempts=10, timeout=900): + """execute an SSH command, return both exit code, stdout and stderr.""" + if dbg_str: + log.debug(dbg_str) + + for i in range(0, attempts): + log.debug("Attempt %d/%d: %s" % (i, attempts, cmd_str)) + + try: + result = SecureChannel(ip, username, password, timeout).run_cmd_ext(cmd_str) + except Exception as e: + log.debug("Exception: %s" % str(e)) + # Sleep before next attempt + time.sleep(20) + else: + return result + + log.debug("Max attempt reached %d/%d" % (attempts, attempts)) + return {"returncode": -1, "stdout": "", "stderr": "An unkown error has occured!"} + diff --git a/autocertkit/network_tests.py b/autocertkit/network_tests.py index 808d4fb..f6b7ccb 100644 --- a/autocertkit/network_tests.py +++ b/autocertkit/network_tests.py @@ -81,9 +81,6 @@ def __init__(self, session, self.config = opt.get('config', self.default_config) - # Store pool master in order to make plugin calls - self.host = get_pool_master(self.session) - self.timeout = 60 # Validate the references and setup run method @@ -234,11 +231,16 @@ def run(self): def deploy_iperf(self): """deploy iPerf on both client and server""" def deploy(vm_ref): - self.plugin_call('deploy_iperf', - {'vm_ref': vm_ref, - 'mip': self.vm_info[vm_ref]['ip_m'], - 'username': self.username, - 'password': self.password}) + host = None + if self.session.xenapi.VM.get_is_control_domain(vm_ref): + host = self.session.xenapi.VM.get_resident_on(vm_ref) + + call_ack_plugin(self.session, 'deploy_iperf', + {'vm_ref': vm_ref, + 'mip': self.vm_info[vm_ref]['ip_m'], + 'username': self.username, + 'password': self.password}, + host) deploy(self.client) deploy(self.server) @@ -296,7 +298,7 @@ def parse_iperf_line(self, data): def plugin_call(self, method, args): """Make a plugin call to autocertkit""" - return call_ack_plugin(self.session, method, args, self.host) + return call_ack_plugin(self.session, method, args) def get_iperf_client_cmd(self): params = [] diff --git a/autocertkit/ssh.py b/autocertkit/ssh.py deleted file mode 100644 index 8264f01..0000000 --- a/autocertkit/ssh.py +++ /dev/null @@ -1,245 +0,0 @@ -# Copyright (c) 2005-2022 Citrix Systems Inc. -# Copyright (c) 2022-12-01 Cloud Software Group Holdings, Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, -# with or without modification, are permitted provided -# that the following conditions are met: -# -# * Redistributions of source code must retain the above -# copyright notice, this list of conditions and the -# following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the -# following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND -# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. - -import socket -import string -import sys -import os -import os.path -import traceback -import time -import gc -import paramiko -from utils import * - -SSHPORT = 22 - - -class SSHSession: - - def __init__(self, - ip, - log, - username="root", - timeout=300, - password=None, - nowarn=False): - self.toreply = 0 - self.log = log - self.debug = False - self.trans = None - for retry in range(3): - self.trans = None - try: - self.connect(ip, username, password, timeout) - except Exception as e: - log.error(traceback.format_exc()) - desc = str(e) - log.error("SSH retry %d exception %s" % (retry, desc)) - if string.find(desc, "Signature verification") > -1 or \ - string.find(desc, - "Error reading SSH protocol banner") > -1: - # Have another go - log.warn("Retrying SSH connection after '%s'" % (desc)) - try: - self.close() - except: - pass - time.sleep(1) - continue - elif string.find(desc, "Authentication failed") > -1: - self.reply = "SSH authentication failed" - self.toreply = 1 - self.close() - break - else: - # Probably a legitimate exception - pass - self.reply = "SSH connection failed" - self.toreply = 1 - self.close() - break - # If we get here we have successfully opened a connection - return - # Even after retry(s) we didn't get a connection - self.reply = "SSH connection all tries failed" - self.toreply = 1 - self.close() - - def connect(self, ip, username, password, timeout): - self.log.debug("%s %s %s %d" % (ip, username, password, timeout)) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(timeout) - sock.connect((ip, SSHPORT)) - - # Create SSH transport. - self.trans = paramiko.Transport(sock) - self.trans.set_log_channel("") - - # Negotiate SSH session synchronously. - goes = 3 - while goes > 0: - try: - self.trans.start_client() - goes = 0 - except Exception as e: - goes = goes - 1 - if goes > 0: - self.log.debug("Retrying SSHSession connection %d" % goes) - time.sleep(10) - else: - raise e - - # Load DSS key. - k = None - try: - dsskey = ".ssh/id_dsa" - k = paramiko.DSSKey.from_private_key_file(dsskey) - except: - pass - - # Authenticate session. No host key checking is performed. - if password: - if password == "": - password = "" - self.trans.auth_password(username, password) - else: - if not k: - raise RuntimeError("No password given and no key read") - self.log.debug("Using SSH public key %s" % (dsskey)) - self.trans.auth_publickey(username, k) - if not self.trans.is_authenticated(): - raise RuntimeError("Problem with SSH authentication") - - def open_session(self): - return self.trans.open_session() - - def close(self): - if self.trans: - self.trans.sock.shutdown(socket.SHUT_RDWR) - self.trans.close() - self.trans = None - gc.collect() - time.sleep(5) - - def __del__(self): - self.close() - - -class SSHCommand(SSHSession): - """An SSH session guarded for target lockups.""" - - def __init__(self, ip, command, username, password, opt): - log = opt['log'] - timeout = opt.get('timeout', 300) - nowarn = opt.get('nowarn', False) - nolog = opt.get('nolog', False) - combine_stderr = opt.get('combine_stderr', False) - - self.log = log - if not nolog: - log.debug("ssh %s@%s %s" % (username, ip, command)) - SSHSession.__init__(self, - ip, - log, - username=username, - timeout=timeout, - password=password, - nowarn=nowarn) - self.command = command - self.nolog = nolog - - try: - self.client = self.open_session() - self.client.settimeout(timeout) - self.client.set_combine_stderr(combine_stderr) - self.client.exec_command(command) - self.client.shutdown(1) - self.hStdout = self.client.makefile() - self.hStderr = None if combine_stderr else self.client.makefile_stderr() - except Exception as e: - self.reply = "SSH command executed failed: %s" % str(e), - self.toreply = 1 - self.close() - - def read_file(self, inf, outf=None, label="stdout"): - reply = "" - while True: - try: - if outf: - output = inf.read(4096) - else: - output = inf.readline() - except socket.timeout: - if not self.nolog: - self.log.error("Error: SSH read time out!") - return reply - - if len(output) == 0: - break - if outf: - outf.write(output) - else: - reply += output - - self.log_output(label, output) - return reply - - def log_output(self, label, output): - if not self.nolog: - self.log.debug("%s: %s" % - (label, (output[:-1] if output and output[-1] == '\n' else output))) - - def read(self, out_file=None, err_file=None): - """Process the output and result of the command. - @:param out_file/err_file: Whether to write stdout/stderr to the file - None (Default) : just return stdout/stderr content - Not None : write stdout/stderr content to the file, which is used for large content - @:return dict including exit status, stdout and stderr - """ - - if self.toreply: - raise Exception(self.reply) - - self.exit_status = self.client.recv_exit_status() - if not self.nolog: - self.log.debug("returncode: %d" % self.exit_status) - self.stdout = self.read_file(self.hStdout, out_file) - self.stderr = self.read_file(self.hStderr, err_file, label="stderr") \ - if self.hStderr else "" - - # Local clean up. - self.close() - - return {"returncode": self.exit_status, "stdout": self.stdout, "stderr": self.stderr} - - def __del__(self): - SSHSession.__del__(self) diff --git a/autocertkit/testbase.py b/autocertkit/testbase.py index 6cf58aa..6b8dd0c 100644 --- a/autocertkit/testbase.py +++ b/autocertkit/testbase.py @@ -556,7 +556,8 @@ def _create_test_threads(self, session, vm_ref_list): threads.append(create_test_thread(lambda vm=vm_ref: TimeoutFunction(ssh_command(get_context_vm_mip(vm), self.username, self.password, - self.cmd_str), + self.cmd_str, + timeout=self.timeout), self.timeout, '%s test timed out %d' % (self.test, self.timeout)))) return threads diff --git a/autocertkit/utils.py b/autocertkit/utils.py index 0ef2830..31002be 100644 --- a/autocertkit/utils.py +++ b/autocertkit/utils.py @@ -30,13 +30,11 @@ # SUCH DAMAGE. """A module for utility functions shared with multiple test cases""" -import logging import subprocess import datetime import XenAPI import sys import time -import ssh import signal from datetime import datetime @@ -48,6 +46,8 @@ import socket import struct import ctypes + +from common import * sys.path.append("/opt/xensource/packages/files/auto-cert-kit/pypackages") from acktools.net import route, generate_mac import acktools.log @@ -93,6 +93,7 @@ def configure_logging(): configure_logging() +set_logger(log) def release_logging(): @@ -1289,31 +1290,6 @@ def ping_with_retry(session, vm_ref, mip, dst_vm_ip, interface, timeout=20, retr return False -@log_exceptions -def ssh_command(ip, username, password, cmd_str, dbg_str=None, attempts=10, timeout=900): - """execute an SSH command using the parimiko library, return both - exit code, stdout and stderr.""" - if dbg_str: - log.debug(dbg_str) - - for i in range(0, attempts): - log.debug("Attempt %d/%d: %s" % (i, attempts, cmd_str)) - - try: - sshcmd = ssh.SSHCommand(ip, cmd_str, username, password, - {'log': log, 'timeout': timeout}) - result = sshcmd.read() - except Exception as e: - log.debug("Exception: %s" % str(e)) - # Sleep before next attempt - time.sleep(20) - else: - return result - - log.debug("Max attempt reached %d/%d" % (attempts, attempts)) - return {"returncode": -1, "stdout": "", "stderr": "An unkown error has occured!"} - - def plug_pif(session, pif): """ Plug given pif""" log.debug("Plugging PIF: %s" % pif) @@ -2322,7 +2298,7 @@ def get_vm_interface(session, host, vm_ref, mip): ifs = {} # cmd output: "eth0: ec:f4:bb:ce:91:9c" - cmd = b"""ip -o link | awk '{if($2 ~ /^eth/) print $2,$17}'""" + cmd = """ip -o link | awk '{if($2 ~ /^eth/) print $2,$17}'""" res = ssh_command(mip, 'root', DEFAULT_PASSWORD, cmd) mac_re = re.compile(r"(?P.*): (?P.*)") # NOSONAR for line in res['stdout'].strip().split('\n'): @@ -2332,7 +2308,7 @@ def get_vm_interface(session, host, vm_ref, mip): ifs[device] = [device, mac, ''] # cmd output: "eth0 10.62.114.80/21" - cmd = b"""ip -o -f inet addr | awk '{if($2 ~ /^eth/) print $2,$4}'""" + cmd = """ip -o -f inet addr | awk '{if($2 ~ /^eth/) print $2,$4}'""" res = ssh_command(mip, 'root', DEFAULT_PASSWORD, cmd) ip_re = re.compile(r"(?P.*) (?P.*)") # NOSONAR for line in res['stdout'].strip().split('\n'): diff --git a/plugins/autocertkit b/plugins/autocertkit index 2c99e9e..188fd95 100755 --- a/plugins/autocertkit +++ b/plugins/autocertkit @@ -38,15 +38,11 @@ import XenAPI import XenAPIPlugin import subprocess -import xmlrpc import os import sys import shutil import time -import logging -import logging.handlers import re -import base64 import tarfile import datetime import json @@ -74,7 +70,7 @@ INSTALL_DIR = '/opt/xensource/packages/files/auto-cert-kit' if INSTALL_DIR not in sys.path: sys.path.insert(0, INSTALL_DIR) -import ssh # nopep8 +from common import * XE = '/opt/xensource/bin/xe' @@ -150,7 +146,7 @@ def configure_logging(): configure_logging() - +set_logger(log) def release_logging(): global log @@ -272,30 +268,6 @@ def get_from_xensource_inventory(key): ############# SSH KEY SETUP ########################## -@log_exceptions -def ssh_command(ip, username, password, cmd_str, dbg_str=None, attempts=10, timeout=900): - """execute an SSH command using the parimiko library, return both - exit code, stdout and stderr.""" - if dbg_str: - log.debug(dbg_str) - - for i in range(0, attempts): - log.debug("Attempt %d/%d: %s" % (i, attempts, cmd_str)) - - try: - sshcmd = ssh.SSHCommand(ip, cmd_str, username, password, - {'log': log, 'timeout': timeout}) - result = sshcmd.read() - except Exception as e: - log.debug("Exception: %s" % str(e)) - # Sleep before next attempt - time.sleep(20) - else: - return result - - log.debug("Max attempt reached %d/%d" % (attempts, attempts)) - return {"returncode": -1, "stdout": "", "stderr": "An unkown error has occured!"} - def install_ssh_key(session, vm_ref, ip, username, password): """Install Dom0 public key in droid VM""" @@ -446,25 +418,6 @@ def install_iperf(session, vm_ref, ip, username, password): ssh_command(ip, username, password, cmd_str)["stdout"] -def make_local_call(call, logging=True, std_out=subprocess.PIPE, shell=False): - """Function wrapper for making a simple call to shell""" - if logging: - log.debug("make_local_call: %s" % (" ".join(call))) - process = subprocess.Popen(call, stdout=std_out, shell=shell, universal_newlines=True) - stdout, stderr = process.communicate() - res = {"returncode": process.returncode, "stdout": - stdout.strip(), "stderr": str(stderr).strip()} - if logging: - log.debug("returncode: %d" % process.returncode) - log.debug("stdout: %s" % str(stdout)) - log.debug("stderr: %s" % str(stderr)) - if res["returncode"] != 0: - msg = "ERR: Could not make local call %s" % ' '.join(call) - log.error(msg) - - return res - - @log_exceptions def upload_file(session, vm_ref, ip, username, password, file): """upload one file to VM""" @@ -550,6 +503,9 @@ def deploy_iperf(session, args): i = i + 1 # Increment counter if i == limit: raise PluginError("Plugin Error: Cannot deploy Iperf") + else: + call = fr'''rpm -q iperf || rpm -i {INSTALL_DIR}/{IPERF_RPM}''' + make_local_call(call, shell=True) log.debug("iPerf installation completed") return json_dumps("OK") @@ -2050,7 +2006,7 @@ def get_kernel_version(session, args): @log_exceptions def get_ack_version(session, args): """Check ACK version using `rpm -qi`""" - call = ['rpm', '-qi', 'xenserver-auto-cert-kit'] + call = ['rpm', '-qi', 'auto-cert-kit'] out = make_local_call(call) if out['returncode']: return json_dumps(None)