--- /dev/null
+#!/usr/bin/env python3
+
+import logging
+import n2vc.vnf
+import pylxd
+import os
+import time
+import uuid
+import yaml
+
+# Disable InsecureRequestWarning w/LXD
+import urllib3
+urllib3.disable_warnings()
+
+
+def get_descriptor(descriptor):
+ desc = None
+ try:
+ tmp = yaml.load(descriptor)
+
+ # Remove the envelope
+ root = list(tmp.keys())[0]
+ if root == "nsd:nsd-catalog":
+ desc = tmp['nsd:nsd-catalog']['nsd'][0]
+ elif root == "vnfd:vnfd-catalog":
+ desc = tmp['vnfd:vnfd-catalog']['vnfd'][0]
+ except ValueError:
+ assert False
+ return desc
+
+def get_n2vc():
+ """Return an instance of N2VC.VNF."""
+ log = logging.getLogger()
+ log.level = logging.DEBUG
+
+ # Extract parameters from the environment in order to run our test
+ vca_host = os.getenv('VCA_HOST', '127.0.0.1')
+ vca_port = os.getenv('VCA_PORT', 17070)
+ vca_user = os.getenv('VCA_USER', 'admin')
+ vca_charms = os.getenv('VCA_CHARMS', None)
+ vca_secret = os.getenv('VCA_SECRET', None)
+ client = n2vc.vnf.N2VC(
+ log=log,
+ server=vca_host,
+ port=vca_port,
+ user=vca_user,
+ secret=vca_secret,
+ artifacts=vca_charms,
+ )
+ return client
+
+def create_lxd_container(public_key=None):
+ """
+ Returns a container object
+
+ If public_key isn't set, we'll use the Juju ssh key
+ """
+
+ client = get_lxd_client()
+ test_machine = "test-{}-add-manual-machine-ssh".format(
+ uuid.uuid4().hex[-4:]
+ )
+
+ private_key_path, public_key_path = find_juju_ssh_keys()
+ # private_key_path = os.path.expanduser(
+ # "~/.local/share/juju/ssh/juju_id_rsa"
+ # )
+ # public_key_path = os.path.expanduser(
+ # "~/.local/share/juju/ssh/juju_id_rsa.pub"
+ # )
+
+ # Use the self-signed cert generated by lxc on first run
+ crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
+ assert os.path.exists(crt)
+
+ key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
+ assert os.path.exists(key)
+
+ # create profile w/cloud-init and juju ssh key
+ if not public_key:
+ public_key = ""
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ profile = client.profiles.create(
+ test_machine,
+ config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
+ devices={
+ 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
+ 'eth0': {
+ 'nictype': 'bridged',
+ 'parent': 'lxdbr0',
+ 'type': 'nic'
+ }
+ }
+ )
+
+ # create lxc machine
+ config = {
+ 'name': test_machine,
+ 'source': {
+ 'type': 'image',
+ 'alias': 'xenial',
+ 'mode': 'pull',
+ 'protocol': 'simplestreams',
+ 'server': 'https://cloud-images.ubuntu.com/releases',
+ },
+ 'profiles': [test_machine],
+ }
+ container = client.containers.create(config, wait=True)
+ container.start(wait=True)
+
+ def wait_for_network(container, timeout=30):
+ """Wait for eth0 to have an ipv4 address."""
+ starttime = time.time()
+ while(time.time() < starttime + timeout):
+ time.sleep(1)
+ if 'eth0' in container.state().network:
+ addresses = container.state().network['eth0']['addresses']
+ if len(addresses) > 0:
+ if addresses[0]['family'] == 'inet':
+ return addresses[0]
+ return None
+
+ host = wait_for_network(container)
+
+ # HACK: We need to give sshd a chance to bind to the interface,
+ # and pylxd's container.execute seems to be broken and fails and/or
+ # hangs trying to properly check if the service is up.
+ time.sleep(5)
+
+ return container
+
+
+def destroy_lxd_container(container):
+ """Stop and delete a LXD container."""
+ container.stop(wait=True)
+ container.delete()
+
+
+def find_lxd_config():
+ """Find the LXD configuration directory."""
+ paths = []
+ paths.append(os.path.expanduser("~/.config/lxc"))
+ paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc"))
+
+ for path in paths:
+ if os.path.exists(path):
+ crt = os.path.expanduser("{}/client.crt".format(path))
+ key = os.path.expanduser("{}/client.key".format(path))
+ if os.path.exists(crt) and os.path.exists(key):
+ return (crt, key)
+ return (None, None)
+
+
+def find_juju_ssh_keys():
+ """Find the Juju ssh keys."""
+
+ paths = []
+ paths.append(os.path.expanduser("~/.local/share/juju/ssh/"))
+
+ for path in paths:
+ if os.path.exists(path):
+ private = os.path.expanduser("{}/juju_id_rsa".format(path))
+ public = os.path.expanduser("{}/juju_id_rsa.pub".format(path))
+ if os.path.exists(private) and os.path.exists(public):
+ return (private, public)
+ return (None, None)
+
+
+def get_juju_private_key():
+ keys = find_juju_ssh_keys()
+ return keys[0]
+
+
+def get_lxd_client(host="127.0.0.1", port="8443", verify=False):
+ """ Get the LXD client."""
+ client = None
+ (crt, key) = find_lxd_config()
+
+ if crt and key:
+ client = pylxd.Client(
+ endpoint="https://{}:{}".format(host, port),
+ cert=(crt, key),
+ verify=verify,
+ )
+
+ return client