X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=tests%2Futils.py;fp=tests%2Futils.py;h=9f9000efab1a315e619bca14de515b23caaea46c;hp=0000000000000000000000000000000000000000;hb=b09436613925b2eb334c10f219b743868e4b3fe5;hpb=421c4a23dc5e60db9596b79ea87cdc19cc463e9b diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..9f9000e --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 + +import logging +import n2vc.vnf +import pylxd +import os +import time +import uuid +import yaml + +# Disable InsecureRequestWarning w/LXD +import urllib3 +urllib3.disable_warnings() + + +def get_descriptor(descriptor): + desc = None + try: + tmp = yaml.load(descriptor) + + # Remove the envelope + root = list(tmp.keys())[0] + if root == "nsd:nsd-catalog": + desc = tmp['nsd:nsd-catalog']['nsd'][0] + elif root == "vnfd:vnfd-catalog": + desc = tmp['vnfd:vnfd-catalog']['vnfd'][0] + except ValueError: + assert False + return desc + +def get_n2vc(): + """Return an instance of N2VC.VNF.""" + log = logging.getLogger() + log.level = logging.DEBUG + + # Extract parameters from the environment in order to run our test + vca_host = os.getenv('VCA_HOST', '127.0.0.1') + vca_port = os.getenv('VCA_PORT', 17070) + vca_user = os.getenv('VCA_USER', 'admin') + vca_charms = os.getenv('VCA_CHARMS', None) + vca_secret = os.getenv('VCA_SECRET', None) + client = n2vc.vnf.N2VC( + log=log, + server=vca_host, + port=vca_port, + user=vca_user, + secret=vca_secret, + artifacts=vca_charms, + ) + return client + +def create_lxd_container(public_key=None): + """ + Returns a container object + + If public_key isn't set, we'll use the Juju ssh key + """ + + client = get_lxd_client() + test_machine = "test-{}-add-manual-machine-ssh".format( + uuid.uuid4().hex[-4:] + ) + + private_key_path, public_key_path = find_juju_ssh_keys() + # private_key_path = os.path.expanduser( + # "~/.local/share/juju/ssh/juju_id_rsa" + # ) + # public_key_path = os.path.expanduser( + # "~/.local/share/juju/ssh/juju_id_rsa.pub" + # ) + + # Use the self-signed cert generated by lxc on first run + crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt') + assert os.path.exists(crt) + + key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key') + assert os.path.exists(key) + + # create profile w/cloud-init and juju ssh key + if not public_key: + public_key = "" + with open(public_key_path, "r") as f: + public_key = f.readline() + + profile = client.profiles.create( + test_machine, + config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)}, + devices={ + 'root': {'path': '/', 'pool': 'default', 'type': 'disk'}, + 'eth0': { + 'nictype': 'bridged', + 'parent': 'lxdbr0', + 'type': 'nic' + } + } + ) + + # create lxc machine + config = { + 'name': test_machine, + 'source': { + 'type': 'image', + 'alias': 'xenial', + 'mode': 'pull', + 'protocol': 'simplestreams', + 'server': 'https://cloud-images.ubuntu.com/releases', + }, + 'profiles': [test_machine], + } + container = client.containers.create(config, wait=True) + container.start(wait=True) + + def wait_for_network(container, timeout=30): + """Wait for eth0 to have an ipv4 address.""" + starttime = time.time() + while(time.time() < starttime + timeout): + time.sleep(1) + if 'eth0' in container.state().network: + addresses = container.state().network['eth0']['addresses'] + if len(addresses) > 0: + if addresses[0]['family'] == 'inet': + return addresses[0] + return None + + host = wait_for_network(container) + + # HACK: We need to give sshd a chance to bind to the interface, + # and pylxd's container.execute seems to be broken and fails and/or + # hangs trying to properly check if the service is up. + time.sleep(5) + + return container + + +def destroy_lxd_container(container): + """Stop and delete a LXD container.""" + container.stop(wait=True) + container.delete() + + +def find_lxd_config(): + """Find the LXD configuration directory.""" + paths = [] + paths.append(os.path.expanduser("~/.config/lxc")) + paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc")) + + for path in paths: + if os.path.exists(path): + crt = os.path.expanduser("{}/client.crt".format(path)) + key = os.path.expanduser("{}/client.key".format(path)) + if os.path.exists(crt) and os.path.exists(key): + return (crt, key) + return (None, None) + + +def find_juju_ssh_keys(): + """Find the Juju ssh keys.""" + + paths = [] + paths.append(os.path.expanduser("~/.local/share/juju/ssh/")) + + for path in paths: + if os.path.exists(path): + private = os.path.expanduser("{}/juju_id_rsa".format(path)) + public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) + if os.path.exists(private) and os.path.exists(public): + return (private, public) + return (None, None) + + +def get_juju_private_key(): + keys = find_juju_ssh_keys() + return keys[0] + + +def get_lxd_client(host="127.0.0.1", port="8443", verify=False): + """ Get the LXD client.""" + client = None + (crt, key) = find_lxd_config() + + if crt and key: + client = pylxd.Client( + endpoint="https://{}:{}".format(host, port), + cert=(crt, key), + verify=verify, + ) + + return client