X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=tests%2Fbase.py;h=ce95056d7e436253c3658edf3f7375c39e2cf902;hp=e1ec45238228e284ba41294b6f08161d89cae0f9;hb=cda5f41db80360640c9665018325f29b57fd8878;hpb=dcdf82bbc1ef310379f746518b2dd3b006353cb3 diff --git a/tests/base.py b/tests/base.py index e1ec452..ce95056 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,67 +1,1263 @@ -import mock +#!/usr/bin/env python3 +import asyncio +import datetime +import logging +import n2vc.vnf +import pylxd +import pytest +import os +import shlex import subprocess +import time import uuid - -import pytest +import yaml from juju.controller import Controller -from juju.client.connection import JujuData - -def is_bootstrapped(): - result = subprocess.run(['juju', 'switch'], stdout=subprocess.PIPE) - return ( - result.returncode == 0 and - len(result.stdout.decode().strip()) > 0) +# Disable InsecureRequestWarning w/LXD +import urllib3 +urllib3.disable_warnings() +logging.getLogger("urllib3").setLevel(logging.WARNING) -bootstrapped = pytest.mark.skipif( - not is_bootstrapped(), - reason='bootstrapped Juju environment required') +here = os.path.dirname(os.path.realpath(__file__)) class CleanController(): + """ + Context manager that automatically connects and disconnects from + the currently active controller. + + Note: Unlike CleanModel, this will not create a new controller for you, + and an active controller must already be available. + """ def __init__(self): - self.controller = None + self._controller = None async def __aenter__(self): - self.controller = Controller() - await self.controller.connect_current() - return self.controller + self._controller = Controller() + await self._controller.connect() + return self._controller async def __aexit__(self, exc_type, exc, tb): - await self.controller.disconnect() + await self._controller.disconnect() -class CleanModel(): - def __init__(self): - self.controller = None - self.model = None +def debug(msg): + """Format debug messages in a consistent way.""" + now = datetime.datetime.now() - async def __aenter__(self): - self.controller = Controller() - await self.controller.connect_current() + # TODO: Decide on the best way to log. Output from `logging.debug` shows up + # when a test fails, but print() will always show up when running tox with + # `-s`, which is really useful for debugging single tests without having to + # insert a False assert to see the log. + logging.debug( + "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg) + ) + print( + "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg) + ) - model_name = 'model-{}'.format(uuid.uuid4()) - self.model = await self.controller.add_model(model_name) - # save the model UUID in case test closes model - self.model_uuid = self.model.info.uuid +def get_charm_path(): + return "{}/charms".format(here) - # Ensure that we connect to the new model by default. This also - # prevents failures if test was started with no current model. - self._patch_cm = mock.patch.object(JujuData, 'current_model', - return_value=model_name) - self._patch_cm.start() - return self.model +def get_layer_path(): + return "{}/charms/layers".format(here) - async def __aexit__(self, exc_type, exc, tb): - self._patch_cm.stop() - await self.model.disconnect() - await self.controller.destroy_model(self.model_uuid) - await self.controller.disconnect() +def collect_metrics(application): + """Invoke Juju's metrics collector. + + Caveat: this shells out to the `juju collect-metrics` command, rather than + making an API call. At the time of writing, that API is not exposed through + the client library. + """ + + try: + subprocess.check_call(['juju', 'collect-metrics', application]) + except subprocess.CalledProcessError as e: + raise Exception("Unable to collect metrics: {}".format(e)) + + +def has_metrics(charm): + """Check if a charm has metrics defined.""" + metricsyaml = "{}/{}/metrics.yaml".format( + get_layer_path(), + charm, + ) + if os.path.exists(metricsyaml): + return True + return False + + +def get_descriptor(descriptor): + desc = None + try: + tmp = yaml.load(descriptor) + + # Remove the envelope + root = list(tmp.keys())[0] + if root == "nsd:nsd-catalog": + desc = tmp['nsd:nsd-catalog']['nsd'][0] + elif root == "vnfd:vnfd-catalog": + desc = tmp['vnfd:vnfd-catalog']['vnfd'][0] + except ValueError: + assert False + return desc + + +def get_n2vc(loop=None): + """Return an instance of N2VC.VNF.""" + log = logging.getLogger() + log.level = logging.DEBUG + + # Extract parameters from the environment in order to run our test + vca_host = os.getenv('VCA_HOST', '127.0.0.1') + vca_port = os.getenv('VCA_PORT', 17070) + vca_user = os.getenv('VCA_USER', 'admin') + vca_charms = os.getenv('VCA_CHARMS', None) + vca_secret = os.getenv('VCA_SECRET', None) + + # Get the Juju Public key + juju_public_key = get_juju_public_key() + if juju_public_key: + debug("Reading Juju public key @ {}".format(juju_public_key)) + with open(juju_public_key, 'r') as f: + juju_public_key = f.read() + debug("Found public key: {}".format(juju_public_key)) + else: + raise Exception("No Juju Public Key found") + + # Get the ca-cert + # os.path.expanduser("~/.config/lxc") + # with open("{}/agent.conf".format(AGENT_PATH), "r") as f: + # try: + # y = yaml.safe_load(f) + # self.cacert = y['cacert'] + # except yaml.YAMLError as exc: + # log("Unable to find Juju ca-cert.") + # raise exc + + client = n2vc.vnf.N2VC( + log=log, + server=vca_host, + port=vca_port, + user=vca_user, + secret=vca_secret, + artifacts=vca_charms, + loop=loop, + juju_public_key=juju_public_key, + ) + return client + + +def create_lxd_container(public_key=None, name="test_name"): + """ + Returns a container object + + If public_key isn't set, we'll use the Juju ssh key + + :param public_key: The public key to inject into the container + :param name: The name of the test being run + """ + container = None + + # Format name so it's valid + name = name.replace("_", "-").replace(".", "") + + client = get_lxd_client() + if not client: + raise Exception("Unable to connect to LXD") + + test_machine = "test-{}-{}".format( + uuid.uuid4().hex[-4:], + name, + ) + + private_key_path, public_key_path = find_n2vc_ssh_keys() + + try: + # create profile w/cloud-init and juju ssh key + if not public_key: + public_key = "" + with open(public_key_path, "r") as f: + public_key = f.readline() + + client.profiles.create( + test_machine, + config={ + 'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)}, + devices={ + 'root': {'path': '/', 'pool': 'default', 'type': 'disk'}, + 'eth0': { + 'nictype': 'bridged', + 'parent': 'lxdbr0', + 'type': 'nic' + } + } + ) + except Exception as ex: + debug("Error creating lxd profile {}: {}".format(test_machine, ex)) + raise ex + + try: + # create lxc machine + config = { + 'name': test_machine, + 'source': { + 'type': 'image', + 'alias': 'xenial', + 'mode': 'pull', + 'protocol': 'simplestreams', + 'server': 'https://cloud-images.ubuntu.com/releases', + }, + 'profiles': [test_machine], + } + container = client.containers.create(config, wait=True) + container.start(wait=True) + except Exception as ex: + debug("Error creating lxd container {}: {}".format(test_machine, ex)) + # This is a test-ending failure. + raise ex + + def wait_for_network(container, timeout=30): + """Wait for eth0 to have an ipv4 address.""" + starttime = time.time() + while(time.time() < starttime + timeout): + time.sleep(1) + if 'eth0' in container.state().network: + addresses = container.state().network['eth0']['addresses'] + if len(addresses) > 0: + if addresses[0]['family'] == 'inet': + return addresses[0] + return None + + try: + wait_for_network(container) + except Exception as ex: + debug( + "Error waiting for container {} network: {}".format( + test_machine, + ex, + ) + ) + + try: + waitcount = 0 + while waitcount <= 5: + if is_sshd_running(container): + break + waitcount += 1 + time.sleep(1) + if waitcount >= 5: + debug("couldn't detect sshd running") + raise Exception("Unable to verify container sshd") + + except Exception as ex: + debug( + "Error checking sshd status on {}: {}".format( + test_machine, + ex, + ) + ) + + # HACK: We need to give sshd a chance to bind to the interface, + # and pylxd's container.execute seems to be broken and fails and/or + # hangs trying to properly check if the service is up. + (exit_code, stdout, stderr) = container.execute([ + 'ping', + '-c', '5', # Wait for 5 ECHO_REPLY + '8.8.8.8', # Ping Google's public DNS + '-W', '15', # Set a 15 second deadline + ]) + if exit_code > 0: + # The network failed + raise Exception("Unable to verify container network") + + return container + + +def is_sshd_running(container): + """Check if sshd is running in the container. + + Check to see if the sshd process is running and listening on port 22. + + :param container: The container to check + :return boolean: True if sshd is running. + """ + debug("Container: {}".format(container)) + try: + (rc, stdout, stderr) = container.execute( + ["service", "ssh", "status"] + ) + # If the status is a) found and b) running, the exit code will be 0 + if rc == 0: + return True + except Exception as ex: + debug("Failed to check sshd service status: {}".format(ex)) + + return False + + +def destroy_lxd_container(container): + """Stop and delete a LXD container. + + Sometimes we see errors talking to LXD -- ephemerial issues like + load or a bug that's killed the API. We'll do our best to clean + up here, and we should run a cleanup after all tests are finished + to remove any extra containers and profiles belonging to us. + """ + + if type(container) is bool: + return + + name = container.name + debug("Destroying container {}".format(name)) + + client = get_lxd_client() + + def wait_for_stop(timeout=30): + """Wait for eth0 to have an ipv4 address.""" + starttime = time.time() + while(time.time() < starttime + timeout): + time.sleep(1) + if container.state == "Stopped": + return + + def wait_for_delete(timeout=30): + starttime = time.time() + while(time.time() < starttime + timeout): + time.sleep(1) + if client.containers.exists(name) is False: + return + + try: + container.stop(wait=False) + wait_for_stop() + except Exception as ex: + debug( + "Error stopping container {}: {}".format( + name, + ex, + ) + ) + + try: + container.delete(wait=False) + wait_for_delete() + except Exception as ex: + debug( + "Error deleting container {}: {}".format( + name, + ex, + ) + ) + + try: + # Delete the profile created for this container + profile = client.profiles.get(name) + if profile: + profile.delete() + except Exception as ex: + debug( + "Error deleting profile {}: {}".format( + name, + ex, + ) + ) + + +def find_lxd_config(): + """Find the LXD configuration directory.""" + paths = [] + paths.append(os.path.expanduser("~/.config/lxc")) + paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc")) + + for path in paths: + if os.path.exists(path): + crt = os.path.expanduser("{}/client.crt".format(path)) + key = os.path.expanduser("{}/client.key".format(path)) + if os.path.exists(crt) and os.path.exists(key): + return (crt, key) + return (None, None) + + +def find_n2vc_ssh_keys(): + """Find the N2VC ssh keys.""" + + paths = [] + paths.append(os.path.expanduser("~/.ssh/")) + + for path in paths: + if os.path.exists(path): + private = os.path.expanduser("{}/id_n2vc_rsa".format(path)) + public = os.path.expanduser("{}/id_n2vc_rsa.pub".format(path)) + if os.path.exists(private) and os.path.exists(public): + return (private, public) + return (None, None) + + +def find_juju_ssh_keys(): + """Find the Juju ssh keys.""" + + paths = [] + paths.append(os.path.expanduser("~/.local/share/juju/ssh")) + + for path in paths: + if os.path.exists(path): + private = os.path.expanduser("{}/juju_id_rsa".format(path)) + public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) + if os.path.exists(private) and os.path.exists(public): + return (private, public) + return (None, None) + + +def get_juju_private_key(): + keys = find_juju_ssh_keys() + return keys[0] + + +def get_juju_public_key(): + """Find the Juju public key.""" + paths = [] + + if 'VCA_PATH' in os.environ: + paths.append("{}/ssh".format(os.environ["VCA_PATH"])) + + paths.append(os.path.expanduser("~/.local/share/juju/ssh")) + paths.append("/root/.local/share/juju/ssh") + + for path in paths: + if os.path.exists(path): + public = os.path.expanduser("{}/juju_id_rsa.pub".format(path)) + if os.path.exists(public): + return public + return None + + +def get_lxd_client(host=None, port="8443", verify=False): + """ Get the LXD client.""" + + if host is None: + if 'LXD_HOST' in os.environ: + host = os.environ['LXD_HOST'] + else: + host = '127.0.0.1' + + passwd = None + if 'LXD_SECRET' in os.environ: + passwd = os.environ['LXD_SECRET'] + + # debug("Connecting to LXD remote {} w/authentication ({})".format( + # host, + # passwd + # )) + client = None + (crt, key) = find_lxd_config() + + if crt and key: + client = pylxd.Client( + endpoint="https://{}:{}".format(host, port), + cert=(crt, key), + verify=verify, + ) + + # If the LXD server has a pasword set, authenticate with it. + if not client.trusted and passwd: + try: + client.authenticate(passwd) + if not client.trusted: + raise Exception("Unable to authenticate with LXD remote") + except pylxd.exceptions.LXDAPIException as ex: + if 'Certificate already in trust store' in ex: + pass + + return client + + +# TODO: This is marked serial but can be run in parallel with work, including: +# - Fixing an event loop issue; seems that all tests stop when one test stops? + + +@pytest.mark.serial +class TestN2VC(object): + """TODO: + 1. Validator Validation + + Automatically validate the descriptors we're using here, unless the test + author explicitly wants to skip them. Useful to make sure tests aren't + being run against invalid descriptors, validating functionality that may + fail against a properly written descriptor. + + We need to have a flag (instance variable) that controls this behavior. It + may be necessary to skip validation and run against a descriptor + implementing features that have not yet been released in the Information + Model. + """ + + """ + The six phases of integration testing, for the test itself and each charm?: + + setup/teardown_class: + 1. Prepare - Verify the environment and create a new model + 2. Deploy - Mark the test as ready to execute + 3. Configure - Configuration to reach Active state + 4. Test - Execute primitive(s) to verify success + 5. Collect - Collect any useful artifacts for debugging (charm, logs) + 6. Destroy - Destroy the model + + + 1. Prepare - Building of charm + 2. Deploy - Deploying charm + 3. Configure - Configuration to reach Active state + 4. Test - Execute primitive(s) to verify success + 5. Collect - Collect any useful artifacts for debugging (charm, logs) + 6. Destroy - Destroy the charm + + """ + @classmethod + def setup_class(self): + """ setup any state specific to the execution of the given class (which + usually contains tests). + """ + # Initialize instance variable(s) + self.n2vc = None + + # Track internal state for each test run + self.state = {} + + # Parse the test's descriptors + self.nsd = get_descriptor(self.NSD_YAML) + self.vnfd = get_descriptor(self.VNFD_YAML) + + self.ns_name = self.nsd['name'] + self.vnf_name = self.vnfd['name'] + + self.charms = {} + self.parse_vnf_descriptor() + assert self.charms is not {} + + # Track artifacts, like compiled charms, that will need to be removed + self.artifacts = {} + + # Build the charm(s) needed for this test + for charm in self.get_charm_names(): + # debug("Building charm {}".format(charm)) + self.get_charm(charm) + + # A bit of a hack, in order to allow the N2VC callback to run parallel + # to pytest. Test(s) should wait for this flag to change to False + # before returning. + self._running = True + self._stopping = False + + @classmethod + def teardown_class(self): + """ teardown any state that was previously setup with a call to + setup_class. + """ + debug("Running teardown_class...") + try: + + debug("Destroying LXD containers...") + for application in self.state: + if self.state[application]['container']: + destroy_lxd_container(self.state[application]['container']) + debug("Destroying LXD containers...done.") + + # Logout of N2VC + if self.n2vc: + debug("teardown_class(): Logging out of N2VC...") + yield from self.n2vc.logout() + debug("teardown_class(): Logging out of N2VC...done.") + + debug("Running teardown_class...done.") + except Exception as ex: + debug("Exception in teardown_class: {}".format(ex)) + + @classmethod + def all_charms_active(self): + """Determine if the all deployed charms are active.""" + active = 0 + + for application in self.state: + if 'status' in self.state[application]: + debug("status of {} is '{}'".format( + application, + self.state[application]['status'], + )) + if self.state[application]['status'] == 'active': + active += 1 + + debug("Active charms: {}/{}".format( + active, + len(self.charms), + )) + + if active == len(self.charms): + return True + + return False + + @classmethod + def are_tests_finished(self): + appcount = len(self.state) + + # If we don't have state yet, keep running. + if appcount == 0: + debug("No applications") + return False + + if self._stopping: + debug("_stopping is True") + return True + + appdone = 0 + for application in self.state: + if self.state[application]['done']: + appdone += 1 + + debug("{}/{} charms tested".format(appdone, appcount)) + + if appcount == appdone: + return True + + return False + + @classmethod + async def running(self, timeout=600): + """Returns if the test is still running. + + @param timeout The time, in seconds, to wait for the test to complete. + """ + if self.are_tests_finished(): + await self.stop() + return False + + await asyncio.sleep(30) + + return self._running + + @classmethod + def get_charm(self, charm): + """Build and return the path to the test charm. + + Builds one of the charms in tests/charms/layers and returns the path + to the compiled charm. The charm will automatically be removed when + when the test is complete. + + Returns: The path to the built charm or None if `charm build` failed. + """ + # Make sure the charm snap is installed + charm_cmd = None + try: + subprocess.check_call(['which', 'charm']) + charm_cmd = "charm build" + except subprocess.CalledProcessError: + # charm_cmd = "charm-build" + # debug("Using legacy charm-build") + raise Exception("charm snap not installed.") + + if charm not in self.artifacts: + try: + # Note: This builds the charm under N2VC/tests/charms/builds/ + # Currently, the snap-installed command only has write access + # to the $HOME (changing in an upcoming release) so writing to + # /tmp isn't possible at the moment. + + builds = get_charm_path() + if not os.path.exists("{}/builds/{}".format(builds, charm)): + cmd = "{} --no-local-layers {}/{} -o {}/".format( + charm_cmd, + get_layer_path(), + charm, + builds, + ) + # debug(cmd) + + env = os.environ.copy() + env["CHARM_BUILD_DIR"] = builds + + subprocess.check_call(shlex.split(cmd), env=env) + + except subprocess.CalledProcessError as e: + # charm build will return error code 100 if the charm fails + # the auto-run of charm proof, which we can safely ignore for + # our CI charms. + if e.returncode != 100: + raise Exception("charm build failed: {}.".format(e)) + + self.artifacts[charm] = { + 'tmpdir': builds, + 'charm': "{}/builds/{}".format(builds, charm), + } + + return self.artifacts[charm]['charm'] + + @classmethod + async def deploy(self, vnf_index, charm, params, loop): + """An inner function to do the deployment of a charm from + either a vdu or vnf. + """ + + if not self.n2vc: + self.n2vc = get_n2vc(loop=loop) + + debug("Creating model for Network Service {}".format(self.ns_name)) + await self.n2vc.CreateNetworkService(self.ns_name) + + application = self.n2vc.FormatApplicationName( + self.ns_name, + self.vnf_name, + str(vnf_index), + ) + + # Initialize the state of the application + self.state[application] = { + 'status': None, # Juju status + 'container': None, # lxd container, for proxy charms + 'actions': {}, # Actions we've executed + 'done': False, # Are we done testing this charm? + 'phase': "deploy", # What phase is this application in? + } + + debug("Deploying charm at {}".format(self.artifacts[charm])) + + # If this is a native charm, we need to provision the underlying + # machine ala an LXC container. + machine_spec = {} + + if not self.isproxy(application): + debug("Creating container for native charm") + # args = ("default", application, None, None) + self.state[application]['container'] = create_lxd_container( + name=os.path.basename(__file__) + ) + + hostname = self.get_container_ip( + self.state[application]['container'], + ) + + machine_spec = { + 'hostname': hostname, + 'username': 'ubuntu', + } + + await self.n2vc.DeployCharms( + self.ns_name, + application, + self.vnfd, + self.get_charm(charm), + params, + machine_spec, + self.n2vc_callback, + ) + + @classmethod + def parse_vnf_descriptor(self): + """Parse the VNF descriptor to make running tests easier. + + Parse the charm information in the descriptor to make it easy to write + tests to run again it. + + Each charm becomes a dictionary in a list: + [ + 'is-proxy': True, + 'vnf-member-index': 1, + 'vnf-name': '', + 'charm-name': '', + 'initial-config-primitive': {}, + 'config-primitive': {} + ] + - charm name + - is this a proxy charm? + - what are the initial-config-primitives (day 1)? + - what are the config primitives (day 2)? + + """ + charms = {} + + # You'd think this would be explicit, but it's just an incremental + # value that should be consistent. + vnf_member_index = 0 + + """Get all vdu and/or vdu config in a descriptor.""" + config = self.get_config() + for cfg in config: + if 'juju' in cfg: + + # Get the name to be used for the deployed application + application_name = n2vc.vnf.N2VC().FormatApplicationName( + self.ns_name, + self.vnf_name, + str(vnf_member_index), + ) + + charm = { + 'application-name': application_name, + 'proxy': True, + 'vnf-member-index': vnf_member_index, + 'vnf-name': self.vnf_name, + 'name': None, + 'initial-config-primitive': {}, + 'config-primitive': {}, + } + + juju = cfg['juju'] + charm['name'] = juju['charm'] + + if 'proxy' in juju: + charm['proxy'] = juju['proxy'] + + if 'initial-config-primitive' in cfg: + charm['initial-config-primitive'] = \ + cfg['initial-config-primitive'] + + if 'config-primitive' in cfg: + charm['config-primitive'] = cfg['config-primitive'] + + charms[application_name] = charm + + # Increment the vnf-member-index + vnf_member_index += 1 + + self.charms = charms + + @classmethod + def isproxy(self, application_name): + + assert application_name in self.charms + assert 'proxy' in self.charms[application_name] + assert type(self.charms[application_name]['proxy']) is bool + + # debug(self.charms[application_name]) + return self.charms[application_name]['proxy'] + + @classmethod + def get_config(self): + """Return an iterable list of config items (vdu and vnf). + + As far as N2VC is concerned, the config section for vdu and vnf are + identical. This joins them together so tests only need to iterate + through one list. + """ + configs = [] + + """Get all vdu and/or vdu config in a descriptor.""" + vnf_config = self.vnfd.get("vnf-configuration") + if vnf_config: + juju = vnf_config['juju'] + if juju: + configs.append(vnf_config) + + for vdu in self.vnfd['vdu']: + vdu_config = vdu.get('vdu-configuration') + if vdu_config: + juju = vdu_config['juju'] + if juju: + configs.append(vdu_config) + + return configs + + @classmethod + def get_charm_names(self): + """Return a list of charms used by the test descriptor.""" + + charms = {} + + # Check if the VDUs in this VNF have a charm + for config in self.get_config(): + juju = config['juju'] + + name = juju['charm'] + if name not in charms: + charms[name] = 1 + + return charms.keys() + + @classmethod + def get_phase(self, application): + return self.state[application]['phase'] + + @classmethod + def set_phase(self, application, phase): + self.state[application]['phase'] = phase + + @classmethod + async def configure_proxy_charm(self, *args): + """Configure a container for use via ssh.""" + (model, application, _, _) = args + + try: + if self.get_phase(application) == "deploy": + self.set_phase(application, "configure") + + debug("Start CreateContainer for {}".format(application)) + self.state[application]['container'] = \ + await self.CreateContainer(*args) + debug("Done CreateContainer for {}".format(application)) + + if self.state[application]['container']: + debug("Configure {} for container".format(application)) + if await self.configure_ssh_proxy(application): + await asyncio.sleep(0.1) + return True + else: + debug("Failed to configure container for {}".format(application)) + else: + debug("skipping CreateContainer for {}: {}".format( + application, + self.get_phase(application), + )) + + except Exception as ex: + debug("configure_proxy_charm exception: {}".format(ex)) + finally: + await asyncio.sleep(0.1) + + return False + + @classmethod + async def execute_charm_tests(self, *args): + (model, application, _, _) = args + + debug("Executing charm test(s) for {}".format(application)) + + if self.state[application]['done']: + debug("Trying to execute tests against finished charm...aborting") + return False + + try: + phase = self.get_phase(application) + # We enter the test phase when after deploy (for native charms) or + # configure, for proxy charms. + if phase in ["deploy", "configure"]: + self.set_phase(application, "test") + if self.are_tests_finished(): + raise Exception("Trying to execute init-config on finished test") + + if await self.execute_initial_config_primitives(application): + # check for metrics + await self.check_metrics(application) + + debug("Done testing {}".format(application)) + self.state[application]['done'] = True + + except Exception as ex: + debug("Exception in execute_charm_tests: {}".format(ex)) + finally: + await asyncio.sleep(0.1) + + return True + + @classmethod + async def CreateContainer(self, *args): + """Create a LXD container for use with a proxy charm.abs + + 1. Get the public key from the charm via `get-ssh-public-key` action + 2. Create container with said key injected for the ubuntu user + + Returns a Container object + """ + # Create and configure a LXD container for use with a proxy charm. + (model, application, _, _) = args + + debug("[CreateContainer] {}".format(args)) + container = None + + try: + # Execute 'get-ssh-public-key' primitive and get returned value + uuid = await self.n2vc.ExecutePrimitive( + model, + application, + "get-ssh-public-key", + None, + ) + + result = await self.n2vc.GetPrimitiveOutput(model, uuid) + pubkey = result['pubkey'] + + container = create_lxd_container( + public_key=pubkey, + name=os.path.basename(__file__) + ) + + return container + except Exception as ex: + debug("Error creating container: {}".format(ex)) + pass + + return None + + @classmethod + async def stop(self): + """Stop the test. + + - Remove charms + - Stop and delete containers + - Logout of N2VC + + TODO: Clean up duplicate code between teardown_class() and stop() + """ + debug("stop() called") + + if self.n2vc and self._running and not self._stopping: + self._running = False + self._stopping = True + + # Destroy the network service + try: + await self.n2vc.DestroyNetworkService(self.ns_name) + except Exception as e: + debug( + "Error Destroying Network Service \"{}\": {}".format( + self.ns_name, + e, + ) + ) + + # Wait for the applications to be removed and delete the containers + for application in self.charms: + try: + + while True: + # Wait for the application to be removed + await asyncio.sleep(10) + if not await self.n2vc.HasApplication( + self.ns_name, + application, + ): + break + + # Need to wait for the charm to finish, because native charms + if self.state[application]['container']: + debug("Deleting LXD container...") + destroy_lxd_container( + self.state[application]['container'] + ) + self.state[application]['container'] = None + debug("Deleting LXD container...done.") + else: + debug("No container found for {}".format(application)) + except Exception as e: + debug("Error while deleting container: {}".format(e)) + + # Logout of N2VC + try: + debug("stop(): Logging out of N2VC...") + await self.n2vc.logout() + self.n2vc = None + debug("stop(): Logging out of N2VC...Done.") + except Exception as ex: + debug(ex) + + # Let the test know we're finished. + debug("Marking test as finished.") + # self._running = False + else: + debug("Skipping stop()") + + @classmethod + def get_container_ip(self, container): + """Return the IPv4 address of container's eth0 interface.""" + ipaddr = None + if container: + addresses = container.state().network['eth0']['addresses'] + # The interface may have more than one address, but we only need + # the first one for testing purposes. + ipaddr = addresses[0]['address'] + + return ipaddr + + @classmethod + async def configure_ssh_proxy(self, application, task=None): + """Configure the proxy charm to use the lxd container. + + Configure the charm to use a LXD container as it's VNF. + """ + debug("Configuring ssh proxy for {}".format(application)) + + mgmtaddr = self.get_container_ip( + self.state[application]['container'], + ) + + debug( + "Setting ssh-hostname for {} to {}".format( + application, + mgmtaddr, + ) + ) + + await self.n2vc.ExecutePrimitive( + self.ns_name, + application, + "config", + None, + params={ + 'ssh-hostname': mgmtaddr, + 'ssh-username': 'ubuntu', + } + ) + + return True + + @classmethod + async def execute_initial_config_primitives(self, application, task=None): + debug("Executing initial_config_primitives for {}".format(application)) + try: + init_config = self.charms[application] + + """ + The initial-config-primitive is run during deploy but may fail + on some steps because proxy charm access isn't configured. + + Re-run those actions so we can inspect the status. + """ + uuids = await self.n2vc.ExecuteInitialPrimitives( + self.ns_name, + application, + init_config, + ) + + """ + ExecutePrimitives will return a list of uuids. We need to check the + status of each. The test continues if all Actions succeed, and + fails if any of them fail. + """ + await self.wait_for_uuids(application, uuids) + debug("Primitives for {} finished.".format(application)) + + return True + except Exception as ex: + debug("execute_initial_config_primitives exception: {}".format(ex)) + raise ex + + return False + + @classmethod + async def check_metrics(self, application, task=None): + """Check and run metrics, if present. + + Checks to see if metrics are specified by the charm. If so, collects + the metrics. + + If no metrics, then mark the test as finished. + """ + if has_metrics(self.charms[application]['name']): + debug("Collecting metrics for {}".format(application)) + + metrics = await self.n2vc.GetMetrics( + self.ns_name, + application, + ) + + return await self.verify_metrics(application, metrics) + + @classmethod + async def verify_metrics(self, application, metrics): + """Verify the charm's metrics. + + Verify that the charm has sent metrics successfully. + + Stops the test when finished. + """ + debug("Verifying metrics for {}: {}".format(application, metrics)) + + if len(metrics): + return True + + else: + # TODO: Ran into a case where it took 9 attempts before metrics + # were available; the controller is slow sometimes. + await asyncio.sleep(30) + return await self.check_metrics(application) + + @classmethod + async def wait_for_uuids(self, application, uuids): + """Wait for primitives to execute. + + The task will provide a list of uuids representing primitives that are + queued to run. + """ + debug("Waiting for uuids for {}: {}".format(application, uuids)) + waitfor = len(uuids) + finished = 0 + + while waitfor > finished: + for uid in uuids: + await asyncio.sleep(10) + + if uuid not in self.state[application]['actions']: + self.state[application]['actions'][uid] = "pending" + + status = self.state[application]['actions'][uid] + + # Have we already marked this as done? + if status in ["pending", "running"]: + + debug("Getting status of {} ({})...".format(uid, status)) + status = await self.n2vc.GetPrimitiveStatus( + self.ns_name, + uid, + ) + debug("...state of {} is {}".format(uid, status)) + self.state[application]['actions'][uid] = status + + if status in ['completed', 'failed']: + finished += 1 + + debug("{}/{} actions complete".format(finished, waitfor)) + + # Wait for the primitive to finish and try again + if waitfor > finished: + debug("Waiting 10s for action to finish...") + await asyncio.sleep(10) + + @classmethod + def n2vc_callback(self, *args, **kwargs): + (model, application, status, message) = args + # debug("callback: {}".format(args)) + + if application not in self.state: + # Initialize the state of the application + self.state[application] = { + 'status': None, # Juju status + 'container': None, # lxd container, for proxy charms + 'actions': {}, # Actions we've executed + 'done': False, # Are we done testing this charm? + 'phase': "deploy", # What phase is this application in? + } + + self.state[application]['status'] = status + + if status in ['waiting', 'maintenance', 'unknown']: + # Nothing to do for these + return + + debug("callback: {}".format(args)) + + if self.state[application]['done']: + debug("{} is done".format(application)) + return + + if status in ['error']: + # To test broken charms, if a charm enters an error state we should + # end the test + debug("{} is in an error state, stop the test.".format(application)) + # asyncio.ensure_future(self.stop()) + self.state[application]['done'] = True + assert False + + if status in ["blocked"] and self.isproxy(application): + if self.state[application]['phase'] == "deploy": + debug("Configuring proxy charm for {}".format(application)) + asyncio.ensure_future(self.configure_proxy_charm(*args)) + + elif status in ["active"]: + """When a charm is active, we can assume that it has been properly + configured (not blocked), regardless of if it's a proxy or not. -class AsyncMock(mock.MagicMock): - async def __call__(self, *args, **kwargs): - return super().__call__(*args, **kwargs) + All primitives should be complete by init_config_primitive + """ + asyncio.ensure_future(self.execute_charm_tests(*args))