X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=tests%2Fbase.py;fp=tests%2Fbase.py;h=0000000000000000000000000000000000000000;hp=aabf35972c464d814db05f2fe217bcb2c31d333a;hb=bb60728e5f59728c2905ccf5dad174ddac5d66b6;hpb=c1cf1b9a33dd1929fb834424626661f9c9dce64d diff --git a/tests/base.py b/tests/base.py deleted file mode 100644 index aabf359..0000000 --- a/tests/base.py +++ /dev/null @@ -1,1279 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2019 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import datetime -import logging -import n2vc.vnf -import pylxd -import pytest -import os -import shlex -import subprocess -import time -import uuid -import yaml - -from juju.controller import Controller - -# Disable InsecureRequestWarning w/LXD -import urllib3 -urllib3.disable_warnings() -logging.getLogger("urllib3").setLevel(logging.WARNING) - -here = os.path.dirname(os.path.realpath(__file__)) - - -class CleanController(): - """ - Context manager that automatically connects and disconnects from - the currently active controller. - - Note: Unlike CleanModel, this will not create a new controller for you, - and an active controller must already be available. - """ - def __init__(self): - self._controller = None - - async def __aenter__(self): - self._controller = Controller() - await self._controller.connect() - return self._controller - - async def __aexit__(self, exc_type, exc, tb): - await self._controller.disconnect() - - -def debug(msg): - """Format debug messages in a consistent way.""" - now = datetime.datetime.now() - - # TODO: Decide on the best way to log. Output from `logging.debug` shows up - # when a test fails, but print() will always show up when running tox with - # `-s`, which is really useful for debugging single tests without having to - # insert a False assert to see the log. - logging.debug( - "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg) - ) - print( - "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg) - ) - - -def get_charm_path(): - return "{}/charms".format(here) - - -def get_layer_path(): - return "{}/charms/layers".format(here) - - -def collect_metrics(application): - """Invoke Juju's metrics collector. - - Caveat: this shells out to the `juju collect-metrics` command, rather than - making an API call. At the time of writing, that API is not exposed through - the client library. - """ - - try: - subprocess.check_call(['juju', 'collect-metrics', application]) - except subprocess.CalledProcessError as e: - raise Exception("Unable to collect metrics: {}".format(e)) - - -def has_metrics(charm): - """Check if a charm has metrics defined.""" - metricsyaml = "{}/{}/metrics.yaml".format( - get_layer_path(), - charm, - ) - if os.path.exists(metricsyaml): - return True - return False - - -def get_descriptor(descriptor): - desc = None - try: - tmp = yaml.safe_load(descriptor) - - # Remove the envelope - root = list(tmp.keys())[0] - if root == "nsd:nsd-catalog": - desc = tmp['nsd:nsd-catalog']['nsd'][0] - elif root == "vnfd:vnfd-catalog": - desc = tmp['vnfd:vnfd-catalog']['vnfd'][0] - except ValueError: - assert False - return desc - - -def get_n2vc(loop=None): - """Return an instance of N2VC.VNF.""" - log = logging.getLogger() - log.level = logging.DEBUG - - # Extract parameters from the environment in order to run our test - vca_host = os.getenv('VCA_HOST', '127.0.0.1') - vca_port = os.getenv('VCA_PORT', 17070) - vca_user = os.getenv('VCA_USER', 'admin') - vca_charms = os.getenv('VCA_CHARMS', None) - vca_secret = os.getenv('VCA_SECRET', None) - vca_cacert = os.getenv('VCA_CACERT', None) - - # Get the Juju Public key - juju_public_key = get_juju_public_key() - if juju_public_key: - debug("Reading Juju public key @ {}".format(juju_public_key)) - with open(juju_public_key, 'r') as f: - juju_public_key = f.read() - debug("Found public key: {}".format(juju_public_key)) - else: - raise Exception("No Juju Public Key found") - - # Get the ca-cert - # os.path.expanduser("~/.config/lxc") - # with open("{}/agent.conf".format(AGENT_PATH), "r") as f: - # try: - # y = yaml.safe_load(f) - # self.cacert = y['cacert'] - # except yaml.YAMLError as exc: - # log("Unable to find Juju ca-cert.") - # raise exc - - client = n2vc.vnf.N2VC( - log=log, - server=vca_host, - port=vca_port, - user=vca_user, - secret=vca_secret, - artifacts=vca_charms, - loop=loop, - juju_public_key=juju_public_key, - ca_cert=vca_cacert, - ) - return client - - -def create_lxd_container(public_key=None, name="test_name"): - """ - Returns a container object - - If public_key isn't set, we'll use the Juju ssh key - - :param public_key: The public key to inject into the container - :param name: The name of the test being run - """ - container = None - - # Format name so it's valid - name = name.replace("_", "-").replace(".", "") - - client = get_lxd_client() - if not client: - raise Exception("Unable to connect to LXD") - - test_machine = "test-{}-{}".format( - uuid.uuid4().hex[-4:], - name, - ) - - private_key_path, public_key_path = find_n2vc_ssh_keys() - - try: - # create profile w/cloud-init and juju ssh key - if not public_key: - public_key = "" - with open(public_key_path, "r") as f: - public_key = f.readline() - - client.profiles.create( - test_machine, - config={ - 'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)}, - devices={ - 'root': {'path': '/', 'pool': 'default', 'type': 'disk'}, - 'eth0': { - 'nictype': 'bridged', - 'parent': 'lxdbr0', - 'type': 'nic' - } - } - ) - except Exception as ex: - debug("Error creating lxd profile {}: {}".format(test_machine, ex)) - raise ex - - try: - # create lxc machine - config = { - 'name': test_machine, - 'source': { - 'type': 'image', - 'alias': 'xenial', - 'mode': 'pull', - 'protocol': 'simplestreams', - 'server': 'https://cloud-images.ubuntu.com/releases', - }, - 'profiles': [test_machine], - } - container = client.containers.create(config, wait=True) - container.start(wait=True) - except Exception as ex: - debug("Error creating lxd container {}: {}".format(test_machine, ex)) - # This is a test-ending failure. - raise ex - - def wait_for_network(container, timeout=30): - """Wait for eth0 to have an ipv4 address.""" - starttime = time.time() - while(time.time() < starttime + timeout): - time.sleep(1) - if 'eth0' in container.state().network: - addresses = container.state().network['eth0']['addresses'] - if len(addresses) > 0: - if addresses[0]['family'] == 'inet': - return addresses[0] - return None - - try: - wait_for_network(container) - except Exception as ex: - debug( - "Error waiting for container {} network: {}".format( - test_machine, - ex, - ) - ) - - try: - waitcount = 0 - while waitcount <= 5: - if is_sshd_running(container): - break - waitcount += 1 - time.sleep(1) - if waitcount >= 5: - debug("couldn't detect sshd running") - raise Exception("Unable to verify container sshd") - - except Exception as ex: - debug( - "Error checking sshd status on {}: {}".format( - test_machine, - ex, - ) - ) - - # HACK: We need to give sshd a chance to bind to the interface, - # and pylxd's container.execute seems to be broken and fails and/or - # hangs trying to properly check if the service is up. - (exit_code, stdout, stderr) = container.execute([ - 'ping', - '-c', '5', # Wait for 5 ECHO_REPLY - '8.8.8.8', # Ping Google's public DNS - '-W', '15', # Set a 15 second deadline - ]) - if exit_code > 0: - # The network failed - raise Exception("Unable to verify container network") - - return container - - -def is_sshd_running(container): - """Check if sshd is running in the container. - - Check to see if the sshd process is running and listening on port 22. - - :param container: The container to check - :return boolean: True if sshd is running. - """ - debug("Container: {}".format(container)) - try: - (rc, stdout, stderr) = container.execute( - ["service", "ssh", "status"] - ) - # If the status is a) found and b) running, the exit code will be 0 - if rc == 0: - return True - except Exception as ex: - debug("Failed to check sshd service status: {}".format(ex)) - - return False - - -def destroy_lxd_container(container): - """Stop and delete a LXD container. - - Sometimes we see errors talking to LXD -- ephemerial issues like - load or a bug that's killed the API. We'll do our best to clean - up here, and we should run a cleanup after all tests are finished - to remove any extra containers and profiles belonging to us. - """ - - if type(container) is bool: - return - - name = container.name - debug("Destroying container {}".format(name)) - - client = get_lxd_client() - - def wait_for_stop(timeout=30): - """Wait for eth0 to have an ipv4 address.""" - starttime = time.time() - while(time.time() < starttime + timeout): - time.sleep(1) - if container.state == "Stopped": - return - - def wait_for_delete(timeout=30): - starttime = time.time() - while(time.time() < starttime + timeout): - time.sleep(1) - if client.containers.exists(name) is False: - return - - try: - container.stop(wait=False) - wait_for_stop() - except Exception as ex: - debug( - "Error stopping container {}: {}".format( - name, - ex, - ) - ) - - try: - container.delete(wait=False) - wait_for_delete() - except Exception as ex: - debug( - "Error deleting container {}: {}".format( - name, - ex, - ) - ) - - try: - # Delete the profile created for this container - profile = client.profiles.get(name) - if profile: - profile.delete() - except Exception as ex: - debug( - "Error deleting profile {}: {}".format( - name, - ex, - ) - ) - - -def find_lxd_config(): - """Find the LXD configuration directory.""" - paths = [] - paths.append(os.path.expanduser("~/.config/lxc")) - paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc")) - - for path in paths: - if os.path.exists(path): - crt = os.path.expanduser("{}/client.crt".format(path)) - key = os.path.expanduser("{}/client.key".format(path)) - if os.path.exists(crt) and os.path.exists(key): - return (crt, key) - return (None, None) - - -def find_n2vc_ssh_keys(): - """Find the N2VC ssh keys.""" - - paths = [] - paths.append(os.path.expanduser("~/.ssh/")) - - for path in paths: - if os.path.exists(path): - private = os.path.expanduser("{}/id_n2vc_rsa".format(path)) - public = os.path.expanduser("{}/id_n2vc_rsa.pub".format(path)) - if os.path.exists(private) and os.path.exists(public): - return (private, public) - return (None, None) - - -def find_juju_ssh_keys(): - """Find the Juju ssh keys.""" - - paths = [] - paths.append(os.path.expanduser("~/.local/share/juju/ssh")) - - for path in paths: - if os.path.exists(path): - private = os.path.expanduser("{}/juju_id_rsa".format(path)) - public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) - if os.path.exists(private) and os.path.exists(public): - return (private, public) - return (None, None) - - -def get_juju_private_key(): - keys = find_juju_ssh_keys() - return keys[0] - - -def get_juju_public_key(): - """Find the Juju public key.""" - paths = [] - - if 'VCA_PATH' in os.environ: - paths.append("{}/ssh".format(os.environ["VCA_PATH"])) - - paths.append(os.path.expanduser("~/.local/share/juju/ssh")) - paths.append("/root/.local/share/juju/ssh") - - for path in paths: - if os.path.exists(path): - public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) - if os.path.exists(public): - return public - return None - - -def get_lxd_client(host=None, port="8443", verify=False): - """ Get the LXD client.""" - - if host is None: - if 'LXD_HOST' in os.environ: - host = os.environ['LXD_HOST'] - else: - host = '127.0.0.1' - - passwd = None - if 'LXD_SECRET' in os.environ: - passwd = os.environ['LXD_SECRET'] - - # debug("Connecting to LXD remote {} w/authentication ({})".format( - # host, - # passwd - # )) - client = None - (crt, key) = find_lxd_config() - - if crt and key: - client = pylxd.Client( - endpoint="https://{}:{}".format(host, port), - cert=(crt, key), - verify=verify, - ) - - # If the LXD server has a pasword set, authenticate with it. - if not client.trusted and passwd: - try: - client.authenticate(passwd) - if not client.trusted: - raise Exception("Unable to authenticate with LXD remote") - except pylxd.exceptions.LXDAPIException as ex: - if 'Certificate already in trust store' in ex: - pass - - return client - - -# TODO: This is marked serial but can be run in parallel with work, including: -# - Fixing an event loop issue; seems that all tests stop when one test stops? - - -@pytest.mark.serial -class TestN2VC(object): - """TODO: - 1. Validator Validation - - Automatically validate the descriptors we're using here, unless the test - author explicitly wants to skip them. Useful to make sure tests aren't - being run against invalid descriptors, validating functionality that may - fail against a properly written descriptor. - - We need to have a flag (instance variable) that controls this behavior. It - may be necessary to skip validation and run against a descriptor - implementing features that have not yet been released in the Information - Model. - """ - - """ - The six phases of integration testing, for the test itself and each charm?: - - setup/teardown_class: - 1. Prepare - Verify the environment and create a new model - 2. Deploy - Mark the test as ready to execute - 3. Configure - Configuration to reach Active state - 4. Test - Execute primitive(s) to verify success - 5. Collect - Collect any useful artifacts for debugging (charm, logs) - 6. Destroy - Destroy the model - - - 1. Prepare - Building of charm - 2. Deploy - Deploying charm - 3. Configure - Configuration to reach Active state - 4. Test - Execute primitive(s) to verify success - 5. Collect - Collect any useful artifacts for debugging (charm, logs) - 6. Destroy - Destroy the charm - - """ - @classmethod - def setup_class(self): - """ setup any state specific to the execution of the given class (which - usually contains tests). - """ - # Initialize instance variable(s) - self.n2vc = None - - # Track internal state for each test run - self.state = {} - - # Parse the test's descriptors - self.nsd = get_descriptor(self.NSD_YAML) - self.vnfd = get_descriptor(self.VNFD_YAML) - - self.ns_name = self.nsd['name'] - self.vnf_name = self.vnfd['name'] - - self.charms = {} - self.parse_vnf_descriptor() - assert self.charms is not {} - - # Track artifacts, like compiled charms, that will need to be removed - self.artifacts = {} - - # Build the charm(s) needed for this test - for charm in self.get_charm_names(): - # debug("Building charm {}".format(charm)) - self.get_charm(charm) - - # A bit of a hack, in order to allow the N2VC callback to run parallel - # to pytest. Test(s) should wait for this flag to change to False - # before returning. - self._running = True - self._stopping = False - - @classmethod - def teardown_class(self): - """ teardown any state that was previously setup with a call to - setup_class. - """ - debug("Running teardown_class...") - try: - - debug("Destroying LXD containers...") - for application in self.state: - if self.state[application]['container']: - destroy_lxd_container(self.state[application]['container']) - debug("Destroying LXD containers...done.") - - # Logout of N2VC - if self.n2vc: - debug("teardown_class(): Logging out of N2VC...") - yield from self.n2vc.logout() - debug("teardown_class(): Logging out of N2VC...done.") - - debug("Running teardown_class...done.") - except Exception as ex: - debug("Exception in teardown_class: {}".format(ex)) - - @classmethod - def all_charms_active(self): - """Determine if the all deployed charms are active.""" - active = 0 - - for application in self.state: - if 'status' in self.state[application]: - debug("status of {} is '{}'".format( - application, - self.state[application]['status'], - )) - if self.state[application]['status'] == 'active': - active += 1 - - debug("Active charms: {}/{}".format( - active, - len(self.charms), - )) - - if active == len(self.charms): - return True - - return False - - @classmethod - def are_tests_finished(self): - appcount = len(self.state) - - # If we don't have state yet, keep running. - if appcount == 0: - debug("No applications") - return False - - if self._stopping: - debug("_stopping is True") - return True - - appdone = 0 - for application in self.state: - if self.state[application]['done']: - appdone += 1 - - debug("{}/{} charms tested".format(appdone, appcount)) - - if appcount == appdone: - return True - - return False - - @classmethod - async def running(self, timeout=600): - """Returns if the test is still running. - - @param timeout The time, in seconds, to wait for the test to complete. - """ - if self.are_tests_finished(): - await self.stop() - return False - - await asyncio.sleep(30) - - return self._running - - @classmethod - def get_charm(self, charm): - """Build and return the path to the test charm. - - Builds one of the charms in tests/charms/layers and returns the path - to the compiled charm. The charm will automatically be removed when - when the test is complete. - - Returns: The path to the built charm or None if `charm build` failed. - """ - # Make sure the charm snap is installed - charm_cmd = None - try: - subprocess.check_call(['which', 'charm']) - charm_cmd = "charm build" - except subprocess.CalledProcessError: - # charm_cmd = "charm-build" - # debug("Using legacy charm-build") - raise Exception("charm snap not installed.") - - if charm not in self.artifacts: - try: - # Note: This builds the charm under N2VC/tests/charms/builds/ - # Currently, the snap-installed command only has write access - # to the $HOME (changing in an upcoming release) so writing to - # /tmp isn't possible at the moment. - - builds = get_charm_path() - if not os.path.exists("{}/builds/{}".format(builds, charm)): - cmd = "{} --no-local-layers {}/{} -o {}/".format( - charm_cmd, - get_layer_path(), - charm, - builds, - ) - # debug(cmd) - - env = os.environ.copy() - env["CHARM_BUILD_DIR"] = builds - - subprocess.check_call(shlex.split(cmd), env=env) - - except subprocess.CalledProcessError as e: - # charm build will return error code 100 if the charm fails - # the auto-run of charm proof, which we can safely ignore for - # our CI charms. - if e.returncode != 100: - raise Exception("charm build failed: {}.".format(e)) - - self.artifacts[charm] = { - 'tmpdir': builds, - 'charm': "{}/builds/{}".format(builds, charm), - } - - return self.artifacts[charm]['charm'] - - @classmethod - async def deploy(self, vnf_index, charm, params, loop): - """An inner function to do the deployment of a charm from - either a vdu or vnf. - """ - - if not self.n2vc: - self.n2vc = get_n2vc(loop=loop) - - debug("Creating model for Network Service {}".format(self.ns_name)) - await self.n2vc.CreateNetworkService(self.ns_name) - - application = self.n2vc.FormatApplicationName( - self.ns_name, - self.vnf_name, - str(vnf_index), - ) - - # Initialize the state of the application - self.state[application] = { - 'status': None, # Juju status - 'container': None, # lxd container, for proxy charms - 'actions': {}, # Actions we've executed - 'done': False, # Are we done testing this charm? - 'phase': "deploy", # What phase is this application in? - } - - debug("Deploying charm at {}".format(self.artifacts[charm])) - - # If this is a native charm, we need to provision the underlying - # machine ala an LXC container. - machine_spec = {} - - if not self.isproxy(application): - debug("Creating container for native charm") - # args = ("default", application, None, None) - self.state[application]['container'] = create_lxd_container( - name=os.path.basename(__file__) - ) - - hostname = self.get_container_ip( - self.state[application]['container'], - ) - - machine_spec = { - 'hostname': hostname, - 'username': 'ubuntu', - } - - await self.n2vc.DeployCharms( - self.ns_name, - application, - self.vnfd, - self.get_charm(charm), - params, - machine_spec, - self.n2vc_callback, - ) - - @classmethod - def parse_vnf_descriptor(self): - """Parse the VNF descriptor to make running tests easier. - - Parse the charm information in the descriptor to make it easy to write - tests to run again it. - - Each charm becomes a dictionary in a list: - [ - 'is-proxy': True, - 'vnf-member-index': 1, - 'vnf-name': '', - 'charm-name': '', - 'initial-config-primitive': {}, - 'config-primitive': {} - ] - - charm name - - is this a proxy charm? - - what are the initial-config-primitives (day 1)? - - what are the config primitives (day 2)? - - """ - charms = {} - - # You'd think this would be explicit, but it's just an incremental - # value that should be consistent. - vnf_member_index = 0 - - """Get all vdu and/or vdu config in a descriptor.""" - config = self.get_config() - for cfg in config: - if 'juju' in cfg: - - # Get the name to be used for the deployed application - application_name = n2vc.vnf.N2VC().FormatApplicationName( - self.ns_name, - self.vnf_name, - str(vnf_member_index), - ) - - charm = { - 'application-name': application_name, - 'proxy': True, - 'vnf-member-index': vnf_member_index, - 'vnf-name': self.vnf_name, - 'name': None, - 'initial-config-primitive': {}, - 'config-primitive': {}, - } - - juju = cfg['juju'] - charm['name'] = juju['charm'] - - if 'proxy' in juju: - charm['proxy'] = juju['proxy'] - - if 'initial-config-primitive' in cfg: - charm['initial-config-primitive'] = \ - cfg['initial-config-primitive'] - - if 'config-primitive' in cfg: - charm['config-primitive'] = cfg['config-primitive'] - - charms[application_name] = charm - - # Increment the vnf-member-index - vnf_member_index += 1 - - self.charms = charms - - @classmethod - def isproxy(self, application_name): - - assert application_name in self.charms - assert 'proxy' in self.charms[application_name] - assert type(self.charms[application_name]['proxy']) is bool - - # debug(self.charms[application_name]) - return self.charms[application_name]['proxy'] - - @classmethod - def get_config(self): - """Return an iterable list of config items (vdu and vnf). - - As far as N2VC is concerned, the config section for vdu and vnf are - identical. This joins them together so tests only need to iterate - through one list. - """ - configs = [] - - """Get all vdu and/or vdu config in a descriptor.""" - vnf_config = self.vnfd.get("vnf-configuration") - if vnf_config: - juju = vnf_config['juju'] - if juju: - configs.append(vnf_config) - - for vdu in self.vnfd['vdu']: - vdu_config = vdu.get('vdu-configuration') - if vdu_config: - juju = vdu_config['juju'] - if juju: - configs.append(vdu_config) - - return configs - - @classmethod - def get_charm_names(self): - """Return a list of charms used by the test descriptor.""" - - charms = {} - - # Check if the VDUs in this VNF have a charm - for config in self.get_config(): - juju = config['juju'] - - name = juju['charm'] - if name not in charms: - charms[name] = 1 - - return charms.keys() - - @classmethod - def get_phase(self, application): - return self.state[application]['phase'] - - @classmethod - def set_phase(self, application, phase): - self.state[application]['phase'] = phase - - @classmethod - async def configure_proxy_charm(self, *args): - """Configure a container for use via ssh.""" - (model, application, _, _) = args - - try: - if self.get_phase(application) == "deploy": - self.set_phase(application, "configure") - - debug("Start CreateContainer for {}".format(application)) - self.state[application]['container'] = \ - await self.CreateContainer(*args) - debug("Done CreateContainer for {}".format(application)) - - if self.state[application]['container']: - debug("Configure {} for container".format(application)) - if await self.configure_ssh_proxy(application): - await asyncio.sleep(0.1) - return True - else: - debug("Failed to configure container for {}".format(application)) - else: - debug("skipping CreateContainer for {}: {}".format( - application, - self.get_phase(application), - )) - - except Exception as ex: - debug("configure_proxy_charm exception: {}".format(ex)) - finally: - await asyncio.sleep(0.1) - - return False - - @classmethod - async def execute_charm_tests(self, *args): - (model, application, _, _) = args - - debug("Executing charm test(s) for {}".format(application)) - - if self.state[application]['done']: - debug("Trying to execute tests against finished charm...aborting") - return False - - try: - phase = self.get_phase(application) - # We enter the test phase when after deploy (for native charms) or - # configure, for proxy charms. - if phase in ["deploy", "configure"]: - self.set_phase(application, "test") - if self.are_tests_finished(): - raise Exception("Trying to execute init-config on finished test") - - if await self.execute_initial_config_primitives(application): - # check for metrics - await self.check_metrics(application) - - debug("Done testing {}".format(application)) - self.state[application]['done'] = True - - except Exception as ex: - debug("Exception in execute_charm_tests: {}".format(ex)) - finally: - await asyncio.sleep(0.1) - - return True - - @classmethod - async def CreateContainer(self, *args): - """Create a LXD container for use with a proxy charm.abs - - 1. Get the public key from the charm via `get-ssh-public-key` action - 2. Create container with said key injected for the ubuntu user - - Returns a Container object - """ - # Create and configure a LXD container for use with a proxy charm. - (model, application, _, _) = args - - debug("[CreateContainer] {}".format(args)) - container = None - - try: - # Execute 'get-ssh-public-key' primitive and get returned value - uuid = await self.n2vc.ExecutePrimitive( - model, - application, - "get-ssh-public-key", - None, - ) - - result = await self.n2vc.GetPrimitiveOutput(model, uuid) - pubkey = result['pubkey'] - - container = create_lxd_container( - public_key=pubkey, - name=os.path.basename(__file__) - ) - - return container - except Exception as ex: - debug("Error creating container: {}".format(ex)) - pass - - return None - - @classmethod - async def stop(self): - """Stop the test. - - - Remove charms - - Stop and delete containers - - Logout of N2VC - - TODO: Clean up duplicate code between teardown_class() and stop() - """ - debug("stop() called") - - if self.n2vc and self._running and not self._stopping: - self._running = False - self._stopping = True - - # Destroy the network service - try: - await self.n2vc.DestroyNetworkService(self.ns_name) - except Exception as e: - debug( - "Error Destroying Network Service \"{}\": {}".format( - self.ns_name, - e, - ) - ) - - # Wait for the applications to be removed and delete the containers - for application in self.charms: - try: - - while True: - # Wait for the application to be removed - await asyncio.sleep(10) - if not await self.n2vc.HasApplication( - self.ns_name, - application, - ): - break - - # Need to wait for the charm to finish, because native charms - if self.state[application]['container']: - debug("Deleting LXD container...") - destroy_lxd_container( - self.state[application]['container'] - ) - self.state[application]['container'] = None - debug("Deleting LXD container...done.") - else: - debug("No container found for {}".format(application)) - except Exception as e: - debug("Error while deleting container: {}".format(e)) - - # Logout of N2VC - try: - debug("stop(): Logging out of N2VC...") - await self.n2vc.logout() - self.n2vc = None - debug("stop(): Logging out of N2VC...Done.") - except Exception as ex: - debug(ex) - - # Let the test know we're finished. - debug("Marking test as finished.") - # self._running = False - else: - debug("Skipping stop()") - - @classmethod - def get_container_ip(self, container): - """Return the IPv4 address of container's eth0 interface.""" - ipaddr = None - if container: - addresses = container.state().network['eth0']['addresses'] - # The interface may have more than one address, but we only need - # the first one for testing purposes. - ipaddr = addresses[0]['address'] - - return ipaddr - - @classmethod - async def configure_ssh_proxy(self, application, task=None): - """Configure the proxy charm to use the lxd container. - - Configure the charm to use a LXD container as it's VNF. - """ - debug("Configuring ssh proxy for {}".format(application)) - - mgmtaddr = self.get_container_ip( - self.state[application]['container'], - ) - - debug( - "Setting ssh-hostname for {} to {}".format( - application, - mgmtaddr, - ) - ) - - await self.n2vc.ExecutePrimitive( - self.ns_name, - application, - "config", - None, - params={ - 'ssh-hostname': mgmtaddr, - 'ssh-username': 'ubuntu', - } - ) - - return True - - @classmethod - async def execute_initial_config_primitives(self, application, task=None): - debug("Executing initial_config_primitives for {}".format(application)) - try: - init_config = self.charms[application] - - """ - The initial-config-primitive is run during deploy but may fail - on some steps because proxy charm access isn't configured. - - Re-run those actions so we can inspect the status. - """ - uuids = await self.n2vc.ExecuteInitialPrimitives( - self.ns_name, - application, - init_config, - ) - - """ - ExecutePrimitives will return a list of uuids. We need to check the - status of each. The test continues if all Actions succeed, and - fails if any of them fail. - """ - await self.wait_for_uuids(application, uuids) - debug("Primitives for {} finished.".format(application)) - - return True - except Exception as ex: - debug("execute_initial_config_primitives exception: {}".format(ex)) - raise ex - - return False - - @classmethod - async def check_metrics(self, application, task=None): - """Check and run metrics, if present. - - Checks to see if metrics are specified by the charm. If so, collects - the metrics. - - If no metrics, then mark the test as finished. - """ - if has_metrics(self.charms[application]['name']): - debug("Collecting metrics for {}".format(application)) - - metrics = await self.n2vc.GetMetrics( - self.ns_name, - application, - ) - - return await self.verify_metrics(application, metrics) - - @classmethod - async def verify_metrics(self, application, metrics): - """Verify the charm's metrics. - - Verify that the charm has sent metrics successfully. - - Stops the test when finished. - """ - debug("Verifying metrics for {}: {}".format(application, metrics)) - - if len(metrics): - return True - - else: - # TODO: Ran into a case where it took 9 attempts before metrics - # were available; the controller is slow sometimes. - await asyncio.sleep(30) - return await self.check_metrics(application) - - @classmethod - async def wait_for_uuids(self, application, uuids): - """Wait for primitives to execute. - - The task will provide a list of uuids representing primitives that are - queued to run. - """ - debug("Waiting for uuids for {}: {}".format(application, uuids)) - waitfor = len(uuids) - finished = 0 - - while waitfor > finished: - for uid in uuids: - await asyncio.sleep(10) - - if uuid not in self.state[application]['actions']: - self.state[application]['actions'][uid] = "pending" - - status = self.state[application]['actions'][uid] - - # Have we already marked this as done? - if status in ["pending", "running"]: - - debug("Getting status of {} ({})...".format(uid, status)) - status = await self.n2vc.GetPrimitiveStatus( - self.ns_name, - uid, - ) - debug("...state of {} is {}".format(uid, status)) - self.state[application]['actions'][uid] = status - - if status in ['completed', 'failed']: - finished += 1 - - debug("{}/{} actions complete".format(finished, waitfor)) - - # Wait for the primitive to finish and try again - if waitfor > finished: - debug("Waiting 10s for action to finish...") - await asyncio.sleep(10) - - @classmethod - def n2vc_callback(self, *args, **kwargs): - (model, application, status, message) = args - # debug("callback: {}".format(args)) - - if application not in self.state: - # Initialize the state of the application - self.state[application] = { - 'status': None, # Juju status - 'container': None, # lxd container, for proxy charms - 'actions': {}, # Actions we've executed - 'done': False, # Are we done testing this charm? - 'phase': "deploy", # What phase is this application in? - } - - self.state[application]['status'] = status - - if status in ['waiting', 'maintenance', 'unknown']: - # Nothing to do for these - return - - debug("callback: {}".format(args)) - - if self.state[application]['done']: - debug("{} is done".format(application)) - return - - if status in ['error']: - # To test broken charms, if a charm enters an error state we should - # end the test - debug("{} is in an error state, stop the test.".format(application)) - # asyncio.ensure_future(self.stop()) - self.state[application]['done'] = True - assert False - - if status in ["blocked"] and self.isproxy(application): - if self.state[application]['phase'] == "deploy": - debug("Configuring proxy charm for {}".format(application)) - asyncio.ensure_future(self.configure_proxy_charm(*args)) - - elif status in ["active"]: - """When a charm is active, we can assume that it has been properly - configured (not blocked), regardless of if it's a proxy or not. - - All primitives should be complete by init_config_primitive - """ - asyncio.ensure_future(self.execute_charm_tests(*args))