--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import os
+import re
+import shlex
+import tempfile
+import time
+import uuid
+from subprocess import CalledProcessError
+
+import paramiko
+import n2vc.exceptions
+
+from juju.client import client
+
+arches = [
+ [re.compile(r"amd64|x86_64"), "amd64"],
+ [re.compile(r"i?[3-9]86"), "i386"],
+ [re.compile(r"(arm$)|(armv.*)"), "armhf"],
+ [re.compile(r"aarch64"), "arm64"],
+ [re.compile(r"ppc64|ppc64el|ppc64le"), "ppc64el"],
+ [re.compile(r"s390x?"), "s390x"],
+
+]
+
+
+def normalize_arch(rawArch):
+ """Normalize the architecture string."""
+ for arch in arches:
+ if arch[0].match(rawArch):
+ return arch[1]
+
+
+DETECTION_SCRIPT = """#!/bin/bash
+set -e
+os_id=$(grep '^ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
+if [ "$os_id" = 'centos' ]; then
+ os_version=$(grep '^VERSION_ID=' /etc/os-release | tr -d '"' | cut -d= -f2)
+ echo "centos$os_version"
+else
+ lsb_release -cs
+fi
+uname -m
+grep MemTotal /proc/meminfo
+cat /proc/cpuinfo
+"""
+
+INITIALIZE_UBUNTU_SCRIPT = """set -e
+(id ubuntu &> /dev/null) || useradd -m ubuntu -s /bin/bash
+umask 0077
+temp=$(mktemp)
+echo 'ubuntu ALL=(ALL) NOPASSWD:ALL' > $temp
+install -m 0440 $temp /etc/sudoers.d/90-juju-ubuntu
+rm $temp
+su ubuntu -c 'install -D -m 0600 /dev/null ~/.ssh/authorized_keys'
+export authorized_keys="{}"
+if [ ! -z "$authorized_keys" ]; then
+ su ubuntu -c 'echo $authorized_keys >> ~/.ssh/authorized_keys'
+fi
+"""
+
+IPTABLES_SCRIPT = """#!/bin/bash
+set -e
+DEBIAN_FRONTEND=noninteractive apt-get install -yqq iptables-persistent
+iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {}
+netfilter-persistent save
+"""
+
+class SSHProvisioner:
+ """Provision a manually created machine via SSH."""
+
+ def __init__(self, user, host, private_key_path, log=None):
+
+ self.host = host
+ self.user = user
+ self.private_key_path = private_key_path
+
+ if log:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
+ def _get_ssh_client(self, host=None, user=None, private_key_path=None):
+ """Return a connected Paramiko ssh object.
+
+ :param str host: The host to connect to.
+ :param str user: The user to connect as.
+ :param str key: The private key to authenticate with.
+
+ :return: object: A paramiko.SSHClient
+ :raises: :class:`paramiko.ssh_exception.SSHException` if the
+ connection failed
+ """
+
+ if not host:
+ host = self.host
+
+ if not user:
+ user = self.user
+
+ if not private_key_path:
+ private_key_path = self.private_key_path
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ pkey = None
+
+ # Read the private key into a paramiko.RSAKey
+ if os.path.exists(private_key_path):
+ with open(private_key_path, 'r') as f:
+ pkey = paramiko.RSAKey.from_private_key(f)
+
+ #######################################################################
+ # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
+ # the server may not send the SSH_MSG_USERAUTH_BANNER message except #
+ # when responding to an auth_none request. For example, paramiko will #
+ # attempt to use password authentication when a password is set, but #
+ # the server could deny that, instead requesting keyboard-interactive.#
+ # The hack to workaround this is to attempt a reconnect, which will #
+ # receive the right banner, and authentication can proceed. See the #
+ # following for more info: #
+ # https://github.com/paramiko/paramiko/issues/432 #
+ # https://github.com/paramiko/paramiko/pull/438 #
+ #######################################################################
+
+ retry = 10
+ attempts = 0
+ delay = 15
+ while attempts <= retry:
+ try:
+ attempts += 1
+
+ # Attempt to establish a SSH connection
+ ssh.connect(
+ host,
+ port=22,
+ username=user,
+ pkey=pkey,
+ # allow_agent=False,
+ # look_for_keys=False,
+ )
+ break
+ except paramiko.ssh_exception.SSHException as e:
+ if 'Error reading SSH protocol banner' == str(e):
+ # Once more, with feeling
+ ssh.connect(host, port=22, username=user, pkey=pkey)
+ else:
+ # Reraise the original exception
+ self.log.debug("Unhandled exception caught: {}".format(e))
+ raise e
+ except Exception as e:
+ if 'Unable to connect to port' in str(e):
+ self.log.debug("Waiting for VM to boot, sleeping {} seconds".format(delay))
+ if attempts > retry:
+ raise e
+ else:
+ time.sleep(delay)
+ # Slowly back off the retry
+ delay += 15
+ else:
+ self.log.debug(e)
+ raise e
+ return ssh
+
+ def _run_command(self, ssh, cmd, pty=True):
+ """Run a command remotely via SSH.
+
+ :param object ssh: The SSHClient
+ :param str cmd: The command to execute
+ :param list cmd: The `shlex.split` command to execute
+ :param bool pty: Whether to allocate a pty
+
+ :return: tuple: The stdout and stderr of the command execution
+ :raises: :class:`CalledProcessError` if the command fails
+ """
+
+ if isinstance(cmd, str):
+ cmd = shlex.split(cmd)
+
+ if type(cmd) is not list:
+ cmd = [cmd]
+
+ cmds = ' '.join(cmd)
+ stdin, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
+ retcode = stdout.channel.recv_exit_status()
+
+ if retcode > 0:
+ output = stderr.read().strip()
+ raise CalledProcessError(returncode=retcode, cmd=cmd,
+ output=output)
+ return (
+ stdout.read().decode('utf-8').strip(),
+ stderr.read().decode('utf-8').strip()
+ )
+
+ def _init_ubuntu_user(self):
+ """Initialize the ubuntu user.
+
+ :return: bool: If the initialization was successful
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the authentication fails
+ """
+ ssh = None
+ try:
+ # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
+ ssh = self._get_ssh_client()
+ stdout, stderr = self._run_command(ssh, "sudo -n true", pty=False)
+ except paramiko.ssh_exception.AuthenticationException:
+ raise n2vc.exceptions.AuthenticationFailed(self.user)
+ except paramiko.ssh_exception.NoValidConnectionsError:
+ raise n2vc.exceptions.NoRouteToHost(self.host)
+ finally:
+ if ssh:
+ ssh.close()
+
+ # Infer the public key
+ public_key = None
+ public_key_path = "{}.pub".format(self.private_key_path)
+
+ if not os.path.exists(public_key_path):
+ raise FileNotFoundError(
+ "Public key '{}' doesn't exist.".format(public_key_path)
+ )
+
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
+
+ try:
+ ssh = self._get_ssh_client()
+
+ self._run_command(
+ ssh,
+ ["sudo", "/bin/bash -c " + shlex.quote(script)],
+ pty=True
+ )
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ ssh.close()
+
+ return True
+
+ def _detect_hardware_and_os(self, ssh):
+ """Detect the target hardware capabilities and OS series.
+
+ :param object ssh: The SSHClient
+ :return: str: A raw string containing OS and hardware information.
+ """
+
+ info = {
+ 'series': '',
+ 'arch': '',
+ 'cpu-cores': '',
+ 'mem': '',
+ }
+
+ stdout, stderr = self._run_command(
+ ssh,
+ ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)],
+ pty=True,
+ )
+
+ lines = stdout.split("\n")
+
+ # Remove extraneous line if DNS resolution of hostname famils
+ # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
+ if 'unable to resolve host' in lines[0]:
+ lines = lines[1:]
+
+ info['series'] = lines[0].strip()
+ info['arch'] = normalize_arch(lines[1].strip())
+
+ memKb = re.split(r'\s+', lines[2])[1]
+
+ # Convert megabytes -> kilobytes
+ info['mem'] = round(int(memKb) / 1024)
+
+ # Detect available CPUs
+ recorded = {}
+ for line in lines[3:]:
+ physical_id = ""
+
+ if line.find("physical id") == 0:
+ physical_id = line.split(":")[1].strip()
+ elif line.find("cpu cores") == 0:
+ cores = line.split(":")[1].strip()
+
+ if physical_id not in recorded.keys():
+ info['cpu-cores'] += cores
+ recorded[physical_id] = True
+
+ return info
+
+ def provision_machine(self):
+ """Perform the initial provisioning of the target machine.
+
+ :return: bool: The client.AddMachineParams
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the upload fails
+ """
+ params = client.AddMachineParams()
+
+ if self._init_ubuntu_user():
+ try:
+ ssh = self._get_ssh_client()
+
+ hw = self._detect_hardware_and_os(ssh)
+ params.series = hw['series']
+ params.instance_id = "manual:{}".format(self.host)
+ params.nonce = "manual:{}:{}".format(
+ self.host,
+ str(uuid.uuid4()), # a nop for Juju w/manual machines
+ )
+ params.hardware_characteristics = {
+ 'arch': hw['arch'],
+ 'mem': int(hw['mem']),
+ 'cpu-cores': int(hw['cpu-cores']),
+ }
+ params.addresses = [{
+ 'value': self.host,
+ 'type': 'ipv4',
+ 'scope': 'public',
+ }]
+
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ ssh.close()
+
+ return params
+
+ async def install_agent(self, connection, nonce, machine_id, api):
+ """
+ :param object connection: Connection to Juju API
+ :param str nonce: The nonce machine specification
+ :param str machine_id: The id assigned to the machine
+
+ :return: bool: If the initialization was successful
+ """
+ # The path where the Juju agent should be installed.
+ data_dir = "/var/lib/juju"
+
+ # Disabling this prevents `apt-get update` from running initially, so
+ # charms will fail to deploy
+ disable_package_commands = False
+
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.ProvisioningScript(
+ data_dir=data_dir,
+ disable_package_commands=disable_package_commands,
+ machine_id=machine_id,
+ nonce=nonce,
+ )
+
+ """Get the IP of the controller
+
+ Parse the provisioning script, looking for the first apiaddress.
+
+ Example:
+ apiaddresses:
+ - 10.195.8.2:17070
+ - 127.0.0.1:17070
+ - '[::1]:17070'
+ """
+ m = re.search('apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070', results.script)
+ apiaddress = m.group(1)
+
+ """Add IP Table rule
+
+ In order to route the traffic to the private ip of the Juju controller
+ we use a DNAT rule to tell the machine that the destination for the
+ private address is the public address of the machine where the Juju
+ controller is running in LXD. That machine will have a complimentary
+ iptables rule, routing traffic to the appropriate LXD container.
+ """
+
+ script = IPTABLES_SCRIPT.format(apiaddress, api)
+
+ # Run this in a retry loop, because dpkg may be running and cause the
+ # script to fail.
+ retry = 10
+ attempts = 0
+ delay = 15
+ while attempts <= retry:
+ try:
+ attempts += 1
+
+ self._run_configure_script(script)
+ break
+ except Exception as e:
+ self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
+ if attempts > retry:
+ raise e
+ else:
+ time.sleep(delay)
+ # Slowly back off the retry
+ delay += 15
+
+ # self.log.debug("Running configure script")
+ self._run_configure_script(results.script)
+ # self.log.debug("Configure script finished")
+
+
+
+ def _run_configure_script(self, script: str):
+ """Run the script to install the Juju agent on the target machine.
+
+ :param str script: The script returned by the ProvisioningScript API
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the upload fails
+ """
+ _, tmpFile = tempfile.mkstemp()
+ with open(tmpFile, 'w') as f:
+ f.write(script)
+ try:
+ # get ssh client
+ ssh = self._get_ssh_client(
+ user="ubuntu",
+ )
+
+ # copy the local copy of the script to the remote machine
+ sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
+ sftp.put(
+ tmpFile,
+ tmpFile,
+ )
+
+ # run the provisioning script
+ stdout, stderr = self._run_command(
+ ssh,
+ "sudo /bin/bash {}".format(tmpFile),
+ )
+
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ os.remove(tmpFile)
+ ssh.close()
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
import asyncio
import logging
import os
import subprocess
import sys
# import time
+from n2vc.provisioner import SSHProvisioner
# FIXME: this should load the juju inside or modules without having to
# explicitly install it. Check why it's not working.
if path not in sys.path:
sys.path.insert(1, path)
+from juju.client import client
from juju.controller import Controller
from juju.model import ModelObserver
from juju.errors import JujuAPIError, JujuError
+
# We might need this to connect to the websocket securely, but test and verify.
try:
ssl._create_default_https_context = ssl._create_unverified_context
# Custom exceptions
+# Deprecated. Please use n2vc.exceptions namespace.
class JujuCharmNotFound(Exception):
"""The Charm can't be found or is not readable."""
class PrimitiveDoesNotExist(Exception):
"""The Primitive being executed does not exist."""
+
# Quiet the debug logging
logging.getLogger('websockets.protocol').setLevel(logging.INFO)
logging.getLogger('juju.client.connection').setLevel(logging.WARN)
loop=None,
juju_public_key=None,
ca_cert=None,
+ api_proxy=None
):
"""Initialize N2VC
+
+ Initializes the N2VC object, allowing the caller to interoperate with the VCA.
+
+
:param log obj: The logging object to log to
:param server str: The IP Address or Hostname of the Juju controller
:param port int: The port of the Juju Controller
:param loop obj: The loop to use.
:param juju_public_key str: The contents of the Juju public SSH key
:param ca_cert str: The CA certificate to use to authenticate
-
+ :param api_proxy str: The IP of the host machine
:Example:
client = n2vc.vnf.N2VC(
loop=loop,
juju_public_key='<contents of the juju public key>',
ca_cert='<contents of CA certificate>',
+ api_proxy='192.168.1.155'
)
"""
self.controller = None
self.connecting = False
self.authenticated = False
+ self.api_proxy = api_proxy
# For debugging
self.refcount = {
self.port = 17070
self.username = ""
self.secret = ""
-
+
self.juju_public_key = juju_public_key
if juju_public_key:
self._create_juju_public_key(juju_public_key)
- self.ca_cert = ca_cert
+ # TODO: Verify ca_cert is valid before using. VCA will crash
+ # if the ca_cert isn't formatted correctly.
+ # self.ca_cert = ca_cert
+ self.ca_cert = None
if log:
self.log = log
))
await self.Subscribe(model_name, application_name, callback, *callback_args)
- ########################################################
- # Check for specific machine placement (native charms) #
- ########################################################
- to = ""
- if machine_spec.keys():
- if all(k in machine_spec for k in ['host', 'user']):
- # Enlist an existing machine as a Juju unit
- machine = await model.add_machine(spec='ssh:{}@{}:{}'.format(
- machine_spec['username'],
- machine_spec['hostname'],
- self.GetPrivateKeyPath(),
- ))
- to = machine.id
-
#######################################
# Get the initial charm configuration #
#######################################
{'<rw_mgmt_ip>': rw_mgmt_ip}
)
- self.log.debug("JujuApi: Deploying charm ({}/{}) from {}".format(
+ ########################################################
+ # Check for specific machine placement (native charms) #
+ ########################################################
+ to = ""
+ series = "xenial"
+
+ if machine_spec.keys():
+ if all(k in machine_spec for k in ['hostname', 'username']):
+
+ # Allow series to be derived from the native charm
+ series = None
+
+ self.log.debug("Provisioning manual machine {}@{}".format(
+ machine_spec['username'],
+ machine_spec['hostname'],
+ ))
+
+ """Native Charm support
+
+ Taking a bare VM (assumed to be an Ubuntu cloud image),
+ the provisioning process will:
+ - Create an ubuntu user w/sudo access
+ - Detect hardware
+ - Detect architecture
+ - Download and install Juju agent from controller
+ - Enable Juju agent
+ - Add an iptables rule to route traffic to the API proxy
+ """
+
+ to = await self.provision_machine(
+ model_name=model_name,
+ username=machine_spec['username'],
+ hostname=machine_spec['hostname'],
+ private_key_path=self.GetPrivateKeyPath(),
+ )
+ self.log.debug("Provisioned machine id {}".format(to))
+
+ # TODO: If to is none, raise an exception
+
+ # The native charm won't have the sshproxy layer, typically, but LCM uses the config primitive
+ # to interpret what the values are. That's a gap to fill.
+
+ """
+ The ssh-* config parameters are unique to the sshproxy layer,
+ which most native charms will not be aware of.
+
+ Setting invalid config parameters will cause the deployment to
+ fail.
+
+ For the moment, we will strip the ssh-* parameters from native
+ charms, until the feature gap is addressed in the information
+ model.
+ """
+
+ # Native charms don't include the ssh-* config values, so strip them
+ # from the initial_config, otherwise the deploy will raise an error.
+ # self.log.debug("Removing ssh-* from initial-config")
+ for k in ['ssh-hostname', 'ssh-username', 'ssh-password']:
+ if k in initial_config:
+ self.log.debug("Removing parameter {}".format(k))
+ del initial_config[k]
+
+ self.log.debug("JujuApi: Deploying charm ({}/{}) from {} to {}".format(
model_name,
application_name,
charm_path,
- to=to,
+ to,
))
########################################################
application_name=application_name,
# Proxy charms should use the current LTS. This will need to be
# changed for native charms.
- series='xenial',
+ series=series,
# Apply the initial 'config' primitive during deployment
config=initial_config,
# Where to deploy the charm to.
to=to,
)
+
#############################
# Map the vdu id<->app name #
#############################
)
await app.remove()
- await self.disconnect_model(self.monitors[model_name])
+ # await self.disconnect_model(self.monitors[model_name])
self.notify_callback(
model_name,
return False
if not self.authenticated:
- self.log.debug("Authenticating with Juju")
await self.login()
+ models = await self.controller.list_models()
+ if ns_uuid in models:
+ model = await self.controller.get_model(ns_uuid)
+
+ for application in model.applications:
+ app = model.applications[application]
+
+ await self.RemoveCharms(ns_uuid, application)
+
+ self.log.debug("Unsubscribing Watcher for {}".format(application))
+ await self.Unsubscribe(ns_uuid, application)
+
+ self.log.debug("Waiting for application to terminate")
+ timeout = 30
+ try:
+ await model.block_until(
+ lambda: all(
+ unit.workload_status in ['terminated'] for unit in app.units
+ ),
+ timeout=timeout
+ )
+ except Exception as e:
+ self.log.debug("Timed out waiting for {} to terminate.".format(application))
+
+ for machine in model.machines:
+ try:
+ self.log.debug("Destroying machine {}".format(machine))
+ await model.machines[machine].destroy(force=True)
+ except JujuAPIError as e:
+ if 'does not exist' in str(e):
+ # Our cached model may be stale, because the machine
+ # has already been removed. It's safe to continue.
+ continue
+ else:
+ self.log.debug("Caught exception: {}".format(e))
+ raise e
+
# Disconnect from the Model
if ns_uuid in self.models:
- await self.disconnect_model(self.models[ns_uuid])
+ self.log.debug("Disconnecting model {}".format(ns_uuid))
+ # await self.disconnect_model(self.models[ns_uuid])
+ await self.disconnect_model(ns_uuid)
try:
+ self.log.debug("Destroying model {}".format(ns_uuid))
await self.controller.destroy_models(ns_uuid)
except JujuError:
raise NetworkServiceDoesNotExist(
async def disconnect_model(self, model):
self.log.debug("Disconnecting model {}".format(model))
if model in self.models:
- print("Disconnecting model")
- await self.models[model].disconnect()
- self.refcount['model'] -= 1
- self.models[model] = None
+ try:
+ await self.models[model].disconnect()
+ self.refcount['model'] -= 1
+ self.models[model] = None
+ except Exception as e:
+ self.log.debug("Caught exception: {}".format(e))
+
+ async def provision_machine(self, model_name: str,
+ hostname: str, username: str,
+ private_key_path: str) -> int:
+ """Provision a machine.
+
+ This executes the SSH provisioner, which will log in to a machine via
+ SSH and prepare it for use with the Juju model
+
+ :param model_name str: The name of the model
+ :param hostname str: The IP or hostname of the target VM
+ :param user str: The username to login to
+ :param private_key_path str: The path to the private key that's been injected to the VM via cloud-init
+ :return machine_id int: Returns the id of the machine or None if provisioning fails
+ """
+ if not self.authenticated:
+ await self.login()
+
+ machine_id = None
+
+ if self.api_proxy:
+ self.log.debug("Instantiating SSH Provisioner for {}@{} ({})".format(
+ username,
+ hostname,
+ private_key_path
+ ))
+ provisioner = SSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log,
+ )
+
+ params = None
+ try:
+ params = provisioner.provision_machine()
+ except Exception as ex:
+ self.log.debug("caught exception from provision_machine: {}".format(ex))
+ return None
+
+ if params:
+ params.jobs = ['JobHostUnits']
+
+ model = await self.get_model(model_name)
+
+ connection = model.connection()
+
+ # Submit the request.
+ self.log.debug("Adding machine to model")
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+ if error:
+ raise ValueError("Error adding machine: %s" % error.message)
+
+ machine_id = results.machines[0].machine
+
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ self.log.debug("Installing Juju agent")
+ await provisioner.install_agent(
+ connection,
+ params.nonce,
+ machine_id,
+ self.api_proxy,
+ )
+ else:
+ self.log.debug("Missing API Proxy")
+ return machine_id
# async def remove_application(self, name):
# """Remove the application."""