Improved Primitive support and better testing
[osm/N2VC.git] / tests / utils.py
diff --git a/tests/utils.py b/tests/utils.py
new file mode 100644 (file)
index 0000000..9f9000e
--- /dev/null
@@ -0,0 +1,188 @@
+#!/usr/bin/env python3
+
+import logging
+import n2vc.vnf
+import pylxd
+import os
+import time
+import uuid
+import yaml
+
+# Disable InsecureRequestWarning w/LXD
+import urllib3
+urllib3.disable_warnings()
+
+
+def get_descriptor(descriptor):
+    desc = None
+    try:
+        tmp = yaml.load(descriptor)
+
+        # Remove the envelope
+        root = list(tmp.keys())[0]
+        if root == "nsd:nsd-catalog":
+            desc = tmp['nsd:nsd-catalog']['nsd'][0]
+        elif root == "vnfd:vnfd-catalog":
+            desc = tmp['vnfd:vnfd-catalog']['vnfd'][0]
+    except ValueError:
+        assert False
+    return desc
+
+def get_n2vc():
+    """Return an instance of N2VC.VNF."""
+    log = logging.getLogger()
+    log.level = logging.DEBUG
+
+    # Extract parameters from the environment in order to run our test
+    vca_host = os.getenv('VCA_HOST', '127.0.0.1')
+    vca_port = os.getenv('VCA_PORT', 17070)
+    vca_user = os.getenv('VCA_USER', 'admin')
+    vca_charms = os.getenv('VCA_CHARMS', None)
+    vca_secret = os.getenv('VCA_SECRET', None)
+    client = n2vc.vnf.N2VC(
+        log=log,
+        server=vca_host,
+        port=vca_port,
+        user=vca_user,
+        secret=vca_secret,
+        artifacts=vca_charms,
+    )
+    return client
+
+def create_lxd_container(public_key=None):
+    """
+    Returns a container object
+
+    If public_key isn't set, we'll use the Juju ssh key
+    """
+
+    client = get_lxd_client()
+    test_machine = "test-{}-add-manual-machine-ssh".format(
+        uuid.uuid4().hex[-4:]
+    )
+
+    private_key_path, public_key_path = find_juju_ssh_keys()
+    # private_key_path = os.path.expanduser(
+    #     "~/.local/share/juju/ssh/juju_id_rsa"
+    # )
+    # public_key_path = os.path.expanduser(
+    #     "~/.local/share/juju/ssh/juju_id_rsa.pub"
+    # )
+
+    # Use the self-signed cert generated by lxc on first run
+    crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
+    assert os.path.exists(crt)
+
+    key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
+    assert os.path.exists(key)
+
+    # create profile w/cloud-init and juju ssh key
+    if not public_key:
+        public_key = ""
+        with open(public_key_path, "r") as f:
+            public_key = f.readline()
+
+    profile = client.profiles.create(
+        test_machine,
+        config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
+        devices={
+            'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
+            'eth0': {
+                'nictype': 'bridged',
+                'parent': 'lxdbr0',
+                'type': 'nic'
+            }
+        }
+    )
+
+    # create lxc machine
+    config = {
+        'name': test_machine,
+        'source': {
+            'type': 'image',
+            'alias': 'xenial',
+            'mode': 'pull',
+            'protocol': 'simplestreams',
+            'server': 'https://cloud-images.ubuntu.com/releases',
+        },
+        'profiles': [test_machine],
+    }
+    container = client.containers.create(config, wait=True)
+    container.start(wait=True)
+
+    def wait_for_network(container, timeout=30):
+        """Wait for eth0 to have an ipv4 address."""
+        starttime = time.time()
+        while(time.time() < starttime + timeout):
+            time.sleep(1)
+            if 'eth0' in container.state().network:
+                addresses = container.state().network['eth0']['addresses']
+                if len(addresses) > 0:
+                    if addresses[0]['family'] == 'inet':
+                        return addresses[0]
+        return None
+
+    host = wait_for_network(container)
+
+    # HACK: We need to give sshd a chance to bind to the interface,
+    # and pylxd's container.execute seems to be broken and fails and/or
+    # hangs trying to properly check if the service is up.
+    time.sleep(5)
+
+    return container
+
+
+def destroy_lxd_container(container):
+    """Stop and delete a LXD container."""
+    container.stop(wait=True)
+    container.delete()
+
+
+def find_lxd_config():
+    """Find the LXD configuration directory."""
+    paths = []
+    paths.append(os.path.expanduser("~/.config/lxc"))
+    paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc"))
+
+    for path in paths:
+        if os.path.exists(path):
+            crt = os.path.expanduser("{}/client.crt".format(path))
+            key = os.path.expanduser("{}/client.key".format(path))
+            if os.path.exists(crt) and os.path.exists(key):
+                return (crt, key)
+    return (None, None)
+
+
+def find_juju_ssh_keys():
+    """Find the Juju ssh keys."""
+
+    paths = []
+    paths.append(os.path.expanduser("~/.local/share/juju/ssh/"))
+
+    for path in paths:
+        if os.path.exists(path):
+            private = os.path.expanduser("{}/juju_id_rsa".format(path))
+            public = os.path.expanduser("{}/juju_id_rsa.pub".format(path))
+            if os.path.exists(private) and os.path.exists(public):
+                return (private, public)
+    return (None, None)
+
+
+def get_juju_private_key():
+    keys = find_juju_ssh_keys()
+    return keys[0]
+
+
+def get_lxd_client(host="127.0.0.1", port="8443", verify=False):
+    """ Get the LXD client."""
+    client = None
+    (crt, key) = find_lxd_config()
+
+    if crt and key:
+        client = pylxd.Client(
+            endpoint="https://{}:{}".format(host, port),
+            cert=(crt, key),
+            verify=verify,
+        )
+
+    return client