| #!/usr/bin/env python3 |
| |
| import logging |
| import n2vc.vnf |
| import pylxd |
| import os |
| import time |
| import uuid |
| import yaml |
| |
| # Disable InsecureRequestWarning w/LXD |
| import urllib3 |
| urllib3.disable_warnings() |
| |
| |
| def get_descriptor(descriptor): |
| desc = None |
| try: |
| tmp = yaml.load(descriptor) |
| |
| # Remove the envelope |
| root = list(tmp.keys())[0] |
| if root == "nsd:nsd-catalog": |
| desc = tmp['nsd:nsd-catalog']['nsd'][0] |
| elif root == "vnfd:vnfd-catalog": |
| desc = tmp['vnfd:vnfd-catalog']['vnfd'][0] |
| except ValueError: |
| assert False |
| return desc |
| |
| def get_n2vc(): |
| """Return an instance of N2VC.VNF.""" |
| log = logging.getLogger() |
| log.level = logging.DEBUG |
| |
| # Extract parameters from the environment in order to run our test |
| vca_host = os.getenv('VCA_HOST', '127.0.0.1') |
| vca_port = os.getenv('VCA_PORT', 17070) |
| vca_user = os.getenv('VCA_USER', 'admin') |
| vca_charms = os.getenv('VCA_CHARMS', None) |
| vca_secret = os.getenv('VCA_SECRET', None) |
| client = n2vc.vnf.N2VC( |
| log=log, |
| server=vca_host, |
| port=vca_port, |
| user=vca_user, |
| secret=vca_secret, |
| artifacts=vca_charms, |
| ) |
| return client |
| |
| def create_lxd_container(public_key=None): |
| """ |
| Returns a container object |
| |
| If public_key isn't set, we'll use the Juju ssh key |
| """ |
| |
| client = get_lxd_client() |
| test_machine = "test-{}-add-manual-machine-ssh".format( |
| uuid.uuid4().hex[-4:] |
| ) |
| |
| private_key_path, public_key_path = find_juju_ssh_keys() |
| # private_key_path = os.path.expanduser( |
| # "~/.local/share/juju/ssh/juju_id_rsa" |
| # ) |
| # public_key_path = os.path.expanduser( |
| # "~/.local/share/juju/ssh/juju_id_rsa.pub" |
| # ) |
| |
| # Use the self-signed cert generated by lxc on first run |
| crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt') |
| assert os.path.exists(crt) |
| |
| key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key') |
| assert os.path.exists(key) |
| |
| # create profile w/cloud-init and juju ssh key |
| if not public_key: |
| public_key = "" |
| with open(public_key_path, "r") as f: |
| public_key = f.readline() |
| |
| profile = client.profiles.create( |
| test_machine, |
| config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)}, |
| devices={ |
| 'root': {'path': '/', 'pool': 'default', 'type': 'disk'}, |
| 'eth0': { |
| 'nictype': 'bridged', |
| 'parent': 'lxdbr0', |
| 'type': 'nic' |
| } |
| } |
| ) |
| |
| # create lxc machine |
| config = { |
| 'name': test_machine, |
| 'source': { |
| 'type': 'image', |
| 'alias': 'xenial', |
| 'mode': 'pull', |
| 'protocol': 'simplestreams', |
| 'server': 'https://cloud-images.ubuntu.com/releases', |
| }, |
| 'profiles': [test_machine], |
| } |
| container = client.containers.create(config, wait=True) |
| container.start(wait=True) |
| |
| def wait_for_network(container, timeout=30): |
| """Wait for eth0 to have an ipv4 address.""" |
| starttime = time.time() |
| while(time.time() < starttime + timeout): |
| time.sleep(1) |
| if 'eth0' in container.state().network: |
| addresses = container.state().network['eth0']['addresses'] |
| if len(addresses) > 0: |
| if addresses[0]['family'] == 'inet': |
| return addresses[0] |
| return None |
| |
| host = wait_for_network(container) |
| |
| # HACK: We need to give sshd a chance to bind to the interface, |
| # and pylxd's container.execute seems to be broken and fails and/or |
| # hangs trying to properly check if the service is up. |
| time.sleep(5) |
| |
| return container |
| |
| |
| def destroy_lxd_container(container): |
| """Stop and delete a LXD container.""" |
| container.stop(wait=True) |
| container.delete() |
| |
| |
| def find_lxd_config(): |
| """Find the LXD configuration directory.""" |
| paths = [] |
| paths.append(os.path.expanduser("~/.config/lxc")) |
| paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc")) |
| |
| for path in paths: |
| if os.path.exists(path): |
| crt = os.path.expanduser("{}/client.crt".format(path)) |
| key = os.path.expanduser("{}/client.key".format(path)) |
| if os.path.exists(crt) and os.path.exists(key): |
| return (crt, key) |
| return (None, None) |
| |
| |
| def find_juju_ssh_keys(): |
| """Find the Juju ssh keys.""" |
| |
| paths = [] |
| paths.append(os.path.expanduser("~/.local/share/juju/ssh/")) |
| |
| for path in paths: |
| if os.path.exists(path): |
| private = os.path.expanduser("{}/juju_id_rsa".format(path)) |
| public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) |
| if os.path.exists(private) and os.path.exists(public): |
| return (private, public) |
| return (None, None) |
| |
| |
| def get_juju_private_key(): |
| keys = find_juju_ssh_keys() |
| return keys[0] |
| |
| |
| def get_lxd_client(host="127.0.0.1", port="8443", verify=False): |
| """ Get the LXD client.""" |
| client = None |
| (crt, key) = find_lxd_config() |
| |
| if crt and key: |
| client = pylxd.Client( |
| endpoint="https://{}:{}".format(host, port), |
| cert=(crt, key), |
| verify=verify, |
| ) |
| |
| return client |