+++ /dev/null
-#!/usr/bin/env python3
-
-import logging
-import n2vc.vnf
-import pylxd
-import os
-import shlex
-import subprocess
-import time
-import uuid
-import yaml
-
-# Disable InsecureRequestWarning w/LXD
-import urllib3
-urllib3.disable_warnings()
-
-here = os.path.dirname(os.path.realpath(__file__))
-
-
-def get_charm_path():
- return "{}/charms".format(here)
-
-
-def get_layer_path():
- return "{}/charms/layers".format(here)
-
-
-def parse_metrics(application, results):
- """Parse the returned metrics into a dict."""
-
- # We'll receive the results for all units, to look for the one we want
- # Caveat: we're grabbing results from the first unit of the application,
- # which is enough for testing, since we're only deploying a single unit.
- retval = {}
- for unit in results:
- if unit.startswith(application):
- for result in results[unit]:
- retval[result['key']] = result['value']
- return retval
-
-def collect_metrics(application):
- """Invoke Juju's metrics collector.
-
- Caveat: this shells out to the `juju collect-metrics` command, rather than
- making an API call. At the time of writing, that API is not exposed through
- the client library.
- """
-
- try:
- logging.debug("Collecting metrics")
- subprocess.check_call(['juju', 'collect-metrics', application])
- except subprocess.CalledProcessError as e:
- raise Exception("Unable to collect metrics: {}".format(e))
-
-
-def build_charm(charm):
- """Build a test charm.
-
- Builds one of the charms in tests/charms/layers and returns the path
- to the compiled charm. The calling test is responsible for removing
- the charm artifact during cleanup.
- """
- # stream_handler = logging.StreamHandler(sys.stdout)
- # log.addHandler(stream_handler)
-
- # Make sure the charm snap is installed
- try:
- logging.debug("Looking for charm-tools")
- subprocess.check_call(['which', 'charm'])
- except subprocess.CalledProcessError as e:
- raise Exception("charm snap not installed.")
-
- try:
- builds = get_charm_path()
-
- cmd = "charm build {}/{} -o {}/".format(
- get_layer_path(),
- charm,
- builds,
- )
- subprocess.check_call(shlex.split(cmd))
- return "{}/{}".format(builds, charm)
- except subprocess.CalledProcessError as e:
- raise Exception("charm build failed: {}.".format(e))
-
- return None
-
-
-def get_descriptor(descriptor):
- desc = None
- try:
- tmp = yaml.load(descriptor)
-
- # Remove the envelope
- root = list(tmp.keys())[0]
- if root == "nsd:nsd-catalog":
- desc = tmp['nsd:nsd-catalog']['nsd'][0]
- elif root == "vnfd:vnfd-catalog":
- desc = tmp['vnfd:vnfd-catalog']['vnfd'][0]
- except ValueError:
- assert False
- return desc
-
-def get_n2vc():
- """Return an instance of N2VC.VNF."""
- log = logging.getLogger()
- log.level = logging.DEBUG
-
- # Extract parameters from the environment in order to run our test
- vca_host = os.getenv('VCA_HOST', '127.0.0.1')
- vca_port = os.getenv('VCA_PORT', 17070)
- vca_user = os.getenv('VCA_USER', 'admin')
- vca_charms = os.getenv('VCA_CHARMS', None)
- vca_secret = os.getenv('VCA_SECRET', None)
- client = n2vc.vnf.N2VC(
- log=log,
- server=vca_host,
- port=vca_port,
- user=vca_user,
- secret=vca_secret,
- artifacts=vca_charms,
- )
- return client
-
-def create_lxd_container(public_key=None):
- """
- Returns a container object
-
- If public_key isn't set, we'll use the Juju ssh key
- """
-
- client = get_lxd_client()
- test_machine = "test-{}-add-manual-machine-ssh".format(
- uuid.uuid4().hex[-4:]
- )
-
- private_key_path, public_key_path = find_juju_ssh_keys()
- # private_key_path = os.path.expanduser(
- # "~/.local/share/juju/ssh/juju_id_rsa"
- # )
- # public_key_path = os.path.expanduser(
- # "~/.local/share/juju/ssh/juju_id_rsa.pub"
- # )
-
- # Use the self-signed cert generated by lxc on first run
- crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
- assert os.path.exists(crt)
-
- key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
- assert os.path.exists(key)
-
- # create profile w/cloud-init and juju ssh key
- if not public_key:
- public_key = ""
- with open(public_key_path, "r") as f:
- public_key = f.readline()
-
- profile = client.profiles.create(
- test_machine,
- config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
- devices={
- 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
- 'eth0': {
- 'nictype': 'bridged',
- 'parent': 'lxdbr0',
- 'type': 'nic'
- }
- }
- )
-
- # create lxc machine
- config = {
- 'name': test_machine,
- 'source': {
- 'type': 'image',
- 'alias': 'xenial',
- 'mode': 'pull',
- 'protocol': 'simplestreams',
- 'server': 'https://cloud-images.ubuntu.com/releases',
- },
- 'profiles': [test_machine],
- }
- container = client.containers.create(config, wait=True)
- container.start(wait=True)
-
- def wait_for_network(container, timeout=30):
- """Wait for eth0 to have an ipv4 address."""
- starttime = time.time()
- while(time.time() < starttime + timeout):
- time.sleep(1)
- if 'eth0' in container.state().network:
- addresses = container.state().network['eth0']['addresses']
- if len(addresses) > 0:
- if addresses[0]['family'] == 'inet':
- return addresses[0]
- return None
-
- host = wait_for_network(container)
-
- # HACK: We need to give sshd a chance to bind to the interface,
- # and pylxd's container.execute seems to be broken and fails and/or
- # hangs trying to properly check if the service is up.
- time.sleep(5)
-
- return container
-
-
-def destroy_lxd_container(container):
- """Stop and delete a LXD container."""
- container.stop(wait=True)
- container.delete()
-
-
-def find_lxd_config():
- """Find the LXD configuration directory."""
- paths = []
- paths.append(os.path.expanduser("~/.config/lxc"))
- paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc"))
-
- for path in paths:
- if os.path.exists(path):
- crt = os.path.expanduser("{}/client.crt".format(path))
- key = os.path.expanduser("{}/client.key".format(path))
- if os.path.exists(crt) and os.path.exists(key):
- return (crt, key)
- return (None, None)
-
-
-def find_juju_ssh_keys():
- """Find the Juju ssh keys."""
-
- paths = []
- paths.append(os.path.expanduser("~/.local/share/juju/ssh/"))
-
- for path in paths:
- if os.path.exists(path):
- private = os.path.expanduser("{}/juju_id_rsa".format(path))
- public = os.path.expanduser("{}/juju_id_rsa.pub".format(path))
- if os.path.exists(private) and os.path.exists(public):
- return (private, public)
- return (None, None)
-
-
-def get_juju_private_key():
- keys = find_juju_ssh_keys()
- return keys[0]
-
-
-def get_lxd_client(host="127.0.0.1", port="8443", verify=False):
- """ Get the LXD client."""
- client = None
- (crt, key) = find_lxd_config()
-
- if crt and key:
- client = pylxd.Client(
- endpoint="https://{}:{}".format(host, port),
- cert=(crt, key),
- verify=verify,
- )
-
- return client