X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fprovisioner.py;h=c4d8b5b1df1db724b8aaabe5c946723d6f3da1f1;hp=510724255b8a011b133c4df3ddf67c4967bf5d64;hb=a4f57d6260e6520aa6a89e86f9d1b2ca5e0a3a08;hpb=f52cb7cfeb4e24febe7c66af3d5bb275a50d7f99;ds=inline diff --git a/n2vc/provisioner.py b/n2vc/provisioner.py index 5107242..c4d8b5b 100644 --- a/n2vc/provisioner.py +++ b/n2vc/provisioner.py @@ -14,16 +14,12 @@ import logging import os import re -import shlex from subprocess import CalledProcessError import tempfile -import time import uuid from juju.client import client -import n2vc.exceptions -import paramiko - +import asyncio arches = [ [re.compile(r"amd64|x86_64"), "amd64"], @@ -63,7 +59,7 @@ temp=$(mktemp) echo 'ubuntu ALL=(ALL) NOPASSWD:ALL' > $temp install -m 0440 $temp /etc/sudoers.d/90-juju-ubuntu rm $temp -su ubuntu -c 'install -D -m 0600 /dev/null ~/.ssh/authorized_keys' +su ubuntu -c '[ -f ~/.ssh/authorized_keys ] || install -D -m 0600 /dev/null ~/.ssh/authorized_keys' export authorized_keys="{}" if [ ! -z "$authorized_keys" ]; then su ubuntu -c 'echo $authorized_keys >> ~/.ssh/authorized_keys' @@ -72,163 +68,108 @@ fi IPTABLES_SCRIPT = """#!/bin/bash set -e -apt-get update -DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent +[ -v `which netfilter-persistent` ] && apt update \ + && DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {} netfilter-persistent save """ -class SSHProvisioner: +class AsyncSSHProvisioner: """Provision a manually created machine via SSH.""" - def __init__(self, user, host, private_key_path, log=None): + user = "" + host = "" + private_key_path = "" + def __init__(self, user, host, private_key_path, log=None): self.host = host self.user = user self.private_key_path = private_key_path + self.log = log if log else logging.getLogger(__name__) - if log: - self.log = log - else: - self.log = logging.getLogger(__name__) + async def _scp(self, source_file, destination_file): + """Execute an scp command. Requires a fully qualified source and + destination. - def _get_ssh_client(self, host=None, user=None, private_key_path=None): - """Return a connected Paramiko ssh object. - - :param str host: The host to connect to. - :param str user: The user to connect as. - :param str key: The private key to authenticate with. - - :return: object: A paramiko.SSHClient - :raises: :class:`paramiko.ssh_exception.SSHException` if the - connection failed + :param str source_file: Path to the source file + :param str destination_file: Path to the destination file """ + cmd = [ + "scp", + "-i", + os.path.expanduser(self.private_key_path), + "-o", + "StrictHostKeyChecking=no", + "-q", + "-B", + ] + destination = "{}@{}:{}".format(self.user, self.host, destination_file) + cmd.extend([source_file, destination]) + process = await asyncio.create_subprocess_exec(*cmd) + await process.wait() + if process.returncode != 0: + raise CalledProcessError(returncode=process.returncode, cmd=cmd) + + async def _ssh(self, command): + """Run a command remotely via SSH. - if not host: - host = self.host - - if not user: - user = self.user - - if not private_key_path: - private_key_path = self.private_key_path - - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + :param str command: The command to execute + :return: tuple: The stdout and stderr of the command execution + :raises: :class:`CalledProcessError` if the command fails + """ - pkey = None + destination = "{}@{}".format(self.user, self.host) + cmd = [ + "ssh", + "-i", + os.path.expanduser(self.private_key_path), + "-o", + "StrictHostKeyChecking=no", + "-q", + destination, + ] + cmd.extend([command]) + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() - # Read the private key into a paramiko.RSAKey - if os.path.exists(private_key_path): - with open(private_key_path, "r") as f: - pkey = paramiko.RSAKey.from_private_key(f) + if process.returncode != 0: + output = stderr.decode("utf-8").strip() + raise CalledProcessError( + returncode=process.returncode, cmd=cmd, output=output + ) + return (stdout.decode("utf-8").strip(), stderr.decode("utf-8").strip()) - ####################################################################### - # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where # - # the server may not send the SSH_MSG_USERAUTH_BANNER message except # - # when responding to an auth_none request. For example, paramiko will # - # attempt to use password authentication when a password is set, but # - # the server could deny that, instead requesting keyboard-interactive.# - # The hack to workaround this is to attempt a reconnect, which will # - # receive the right banner, and authentication can proceed. See the # - # following for more info: # - # https://github.com/paramiko/paramiko/issues/432 # - # https://github.com/paramiko/paramiko/pull/438 # - ####################################################################### + async def _init_ubuntu_user(self): + """Initialize the ubuntu user. + :return: bool: If the initialization was successful + :raises: :class:`CalledProcessError` if the _ssh command fails + """ retry = 10 attempts = 0 delay = 15 while attempts <= retry: try: attempts += 1 - # Attempt to establish a SSH connection - ssh.connect( - host, - port=22, - username=user, - pkey=pkey, - # allow_agent=False, - # look_for_keys=False, - ) + stdout, stderr = await self._ssh("sudo -n true") break - except paramiko.ssh_exception.SSHException as e: - if "Error reading SSH protocol banner" == str(e): - # Once more, with feeling - ssh.connect(host, port=22, username=user, pkey=pkey) - else: - # Reraise the original exception - self.log.debug("Unhandled exception caught: {}".format(e)) + except CalledProcessError as e: + self.log.debug( + "Waiting for VM to boot, sleeping {} seconds".format(delay) + ) + if attempts > retry: raise e - except Exception as e: - if "Unable to connect to port" in str(e): - self.log.debug( - "Waiting for VM to boot, sleeping {} seconds".format(delay) - ) - if attempts > retry: - raise e - else: - time.sleep(delay) - # Slowly back off the retry - delay += 15 else: - self.log.debug(e) - raise e - return ssh - - def _run_command(self, ssh, cmd, pty=True): - """Run a command remotely via SSH. - - :param object ssh: The SSHClient - :param str cmd: The command to execute - :param list cmd: The `shlex.split` command to execute - :param bool pty: Whether to allocate a pty - - :return: tuple: The stdout and stderr of the command execution - :raises: :class:`CalledProcessError` if the command fails - """ - - if isinstance(cmd, str): - cmd = shlex.split(cmd) - - if type(cmd) is not list: - cmd = [cmd] - - cmds = " ".join(cmd) - _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty) - retcode = stdout.channel.recv_exit_status() - - if retcode > 0: - output = stderr.read().strip() - raise CalledProcessError(returncode=retcode, cmd=cmd, output=output) - return ( - stdout.read().decode("utf-8").strip(), - stderr.read().decode("utf-8").strip(), - ) - - def _init_ubuntu_user(self): - """Initialize the ubuntu user. - - :return: bool: If the initialization was successful - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the authentication fails - """ - ssh = None - try: - # Run w/o allocating a pty, so we fail if sudo prompts for a passwd - ssh = self._get_ssh_client() - self._run_command(ssh, "sudo -n true", pty=False) - except paramiko.ssh_exception.AuthenticationException: - raise n2vc.exceptions.AuthenticationFailed(self.user) - except paramiko.ssh_exception.NoValidConnectionsError: - raise n2vc.exceptions.NoRouteToHost(self.host) - finally: - if ssh: - ssh.close() + await asyncio.sleep(delay) + # Slowly back off the retry + delay += 15 # Infer the public key + public_key = None public_key_path = "{}.pub".format(self.private_key_path) if not os.path.exists(public_key_path): @@ -241,23 +182,13 @@ class SSHProvisioner: script = INITIALIZE_UBUNTU_SCRIPT.format(public_key) - try: - ssh = self._get_ssh_client() - - self._run_command( - ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True - ) - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - ssh.close() + stdout, stderr = await self._run_configure_script(script) return True - def _detect_hardware_and_os(self, ssh): + async def _detect_hardware_and_os(self): """Detect the target hardware capabilities and OS series. - :param object ssh: The SSHClient :return: str: A raw string containing OS and hardware information. """ @@ -268,17 +199,9 @@ class SSHProvisioner: "mem": "", } - stdout, _ = self._run_command( - ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True, - ) + stdout, stderr = await self._run_configure_script(DETECTION_SCRIPT) lines = stdout.split("\n") - - # Remove extraneous line if DNS resolution of hostname famils - # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out - if "unable to resolve host" in lines[0]: - lines = lines[1:] - info["series"] = lines[0].strip() info["arch"] = normalize_arch(lines[1].strip()) @@ -291,6 +214,7 @@ class SSHProvisioner: recorded = {} for line in lines[3:]: physical_id = "" + print(line) if line.find("physical id") == 0: physical_id = line.split(":")[1].strip() @@ -303,46 +227,35 @@ class SSHProvisioner: return info - def provision_machine(self): + async def provision_machine(self): """Perform the initial provisioning of the target machine. :return: bool: The client.AddMachineParams - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the upload fails """ params = client.AddMachineParams() - if self._init_ubuntu_user(): - try: - ssh = self._get_ssh_client() - - hw = self._detect_hardware_and_os(ssh) - params.series = hw["series"] - params.instance_id = "manual:{}".format(self.host) - params.nonce = "manual:{}:{}".format( - self.host, str(uuid.uuid4()), # a nop for Juju w/manual machines - ) - params.hardware_characteristics = { - "arch": hw["arch"], - "mem": int(hw["mem"]), - "cpu-cores": int(hw["cpu-cores"]), - } - params.addresses = [ - {"value": self.host, "type": "ipv4", "scope": "public"} - ] - - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - ssh.close() + if await self._init_ubuntu_user(): + hw = await self._detect_hardware_and_os() + params.series = hw["series"] + params.instance_id = "manual:{}".format(self.host) + params.nonce = "manual:{}:{}".format( + self.host, str(uuid.uuid4()), + ) # a nop for Juju w/manual machines + params.hardware_characteristics = { + "arch": hw["arch"], + "mem": int(hw["mem"]), + "cpu-cores": int(hw["cpu-cores"]), + } + params.addresses = [{"value": self.host, "type": "ipv4", "scope": "public"}] return params - async def install_agent(self, connection, nonce, machine_id, api): + async def install_agent(self, connection, nonce, machine_id, proxy=None): """ :param object connection: Connection to Juju API :param str nonce: The nonce machine specification :param str machine_id: The id assigned to the machine + :param str proxy: IP of the API_PROXY :return: bool: If the initialization was successful """ @@ -371,72 +284,59 @@ class SSHProvisioner: - 127.0.0.1:17070 - '[::1]:17070' """ - m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) - apiaddress = m.group(1) - - """Add IP Table rule - - In order to route the traffic to the private ip of the Juju controller - we use a DNAT rule to tell the machine that the destination for the - private address is the public address of the machine where the Juju - controller is running in LXD. That machine will have a complimentary - iptables rule, routing traffic to the appropriate LXD container. - """ - - script = IPTABLES_SCRIPT.format(apiaddress, api) - - # Run this in a retry loop, because dpkg may be running and cause the - # script to fail. - retry = 10 - attempts = 0 - delay = 15 - - while attempts <= retry: - try: - attempts += 1 - - self._run_configure_script(script) - break - except Exception as e: - self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) - if attempts > retry: - raise e - else: - time.sleep(delay) - # Slowly back off the retry - delay += 15 + if proxy: + m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) + apiaddress = m.group(1) + + """Add IP Table rule + + In order to route the traffic to the private ip of the Juju controller + we use a DNAT rule to tell the machine that the destination for the + private address is the public address of the machine where the Juju + controller is running in LXD. That machine will have a complimentary + iptables rule, routing traffic to the appropriate LXD container. + """ + + script = IPTABLES_SCRIPT.format(apiaddress, proxy) + + # Run this in a retry loop, because dpkg may be running and cause the + # script to fail. + retry = 10 + attempts = 0 + delay = 15 + + while attempts <= retry: + try: + attempts += 1 + stdout, stderr = await self._run_configure_script(script) + break + except Exception as e: + self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) + if attempts > retry: + raise e + else: + await asyncio.sleep(delay) + # Slowly back off the retry + delay += 15 # self.log.debug("Running configure script") - self._run_configure_script(results.script) + await self._run_configure_script(results.script) # self.log.debug("Configure script finished") - def _run_configure_script(self, script: str): + async def _run_configure_script(self, script, root=True): """Run the script to install the Juju agent on the target machine. - :param str script: The script returned by the ProvisioningScript API - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the upload fails + :param str script: The script to be executed """ _, tmpFile = tempfile.mkstemp() with open(tmpFile, "w") as f: f.write(script) - try: - # get ssh client - ssh = self._get_ssh_client(user="ubuntu",) - - # copy the local copy of the script to the remote machine - sftp = paramiko.SFTPClient.from_transport(ssh.get_transport()) - sftp.put( - tmpFile, tmpFile, - ) + f.close() - # run the provisioning script - self._run_command( - ssh, "sudo /bin/bash {}".format(tmpFile), - ) + # copy the local copy of the script to the remote machine + await self._scp(tmpFile, tmpFile) - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - os.remove(tmpFile) - ssh.close() + # run the provisioning script + return await self._ssh( + "{} /bin/bash {}".format("sudo" if root else "", tmpFile) + )