#!/usr/bin/env python3
import asyncio
-import functools
-
+import datetime
import logging
import n2vc.vnf
import pylxd
import pytest
import os
import shlex
-import shutil
import subprocess
-import tempfile
import time
import uuid
import yaml
await self._controller.disconnect()
+def debug(msg):
+ """Format debug messages in a consistent way."""
+ now = datetime.datetime.now()
+
+ # TODO: Decide on the best way to log. Output from `logging.debug` shows up
+ # when a test fails, but print() will always show up when running tox with
+ # `-s`, which is really useful for debugging single tests without having to
+ # insert a False assert to see the log.
+ logging.debug(
+ "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg)
+ )
+ # print(
+ # "[{}] {}".format(now.strftime('%Y-%m-%dT%H:%M:%S'), msg)
+ # )
+
+
def get_charm_path():
return "{}/charms".format(here)
return "{}/charms/layers".format(here)
-def parse_metrics(application, results):
- """Parse the returned metrics into a dict."""
-
- # We'll receive the results for all units, to look for the one we want
- # Caveat: we're grabbing results from the first unit of the application,
- # which is enough for testing, since we're only deploying a single unit.
- retval = {}
- for unit in results:
- if unit.startswith(application):
- for result in results[unit]:
- retval[result['key']] = result['value']
- return retval
-
-
def collect_metrics(application):
"""Invoke Juju's metrics collector.
log = logging.getLogger()
log.level = logging.DEBUG
- # Running under tox/pytest makes getting env variables harder.
-
# Extract parameters from the environment in order to run our test
vca_host = os.getenv('VCA_HOST', '127.0.0.1')
vca_port = os.getenv('VCA_PORT', 17070)
private_key_path, public_key_path = find_juju_ssh_keys()
- # create profile w/cloud-init and juju ssh key
- if not public_key:
- public_key = ""
- with open(public_key_path, "r") as f:
- public_key = f.readline()
-
- client.profiles.create(
- test_machine,
- config={
- 'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
- devices={
- 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
- 'eth0': {
- 'nictype': 'bridged',
- 'parent': 'lxdbr0',
- 'type': 'nic'
+ try:
+ # create profile w/cloud-init and juju ssh key
+ if not public_key:
+ public_key = ""
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ client.profiles.create(
+ test_machine,
+ config={
+ 'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
+ devices={
+ 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
+ 'eth0': {
+ 'nictype': 'bridged',
+ 'parent': 'lxdbr0',
+ 'type': 'nic'
+ }
}
- }
- )
+ )
+ except Exception as ex:
+ debug("Error creating lxd profile {}: {}".format(test_machine, ex))
+ raise ex
- # create lxc machine
- config = {
- 'name': test_machine,
- 'source': {
- 'type': 'image',
- 'alias': 'xenial',
- 'mode': 'pull',
- 'protocol': 'simplestreams',
- 'server': 'https://cloud-images.ubuntu.com/releases',
- },
- 'profiles': [test_machine],
- }
- container = client.containers.create(config, wait=True)
- container.start(wait=True)
+ try:
+ # create lxc machine
+ config = {
+ 'name': test_machine,
+ 'source': {
+ 'type': 'image',
+ 'alias': 'xenial',
+ 'mode': 'pull',
+ 'protocol': 'simplestreams',
+ 'server': 'https://cloud-images.ubuntu.com/releases',
+ },
+ 'profiles': [test_machine],
+ }
+ container = client.containers.create(config, wait=True)
+ container.start(wait=True)
+ except Exception as ex:
+ debug("Error creating lxd container {}: {}".format(test_machine, ex))
+ # This is a test-ending failure.
+ raise ex
def wait_for_network(container, timeout=30):
"""Wait for eth0 to have an ipv4 address."""
return addresses[0]
return None
- wait_for_network(container)
+ try:
+ wait_for_network(container)
+ except Exception as ex:
+ debug(
+ "Error waiting for container {} network: {}".format(
+ test_machine,
+ ex,
+ )
+ )
# HACK: We need to give sshd a chance to bind to the interface,
# and pylxd's container.execute seems to be broken and fails and/or
# hangs trying to properly check if the service is up.
- time.sleep(5)
- client = None
+ (exit_code, stdout, stderr) = container.execute([
+ 'ping',
+ '-c', '5', # Wait for 5 ECHO_REPLY
+ '8.8.8.8', # Ping Google's public DNS
+ '-W', '15', # Set a 15 second deadline
+ ])
+ if exit_code > 0:
+ # The network failed
+ raise Exception("Unable to verify container network")
return container
def destroy_lxd_container(container):
- """Stop and delete a LXD container."""
+ """Stop and delete a LXD container.
+
+ Sometimes we see errors talking to LXD -- ephemerial issues like
+ load or a bug that's killed the API. We'll do our best to clean
+ up here, and we should run a cleanup after all tests are finished
+ to remove any extra containers and profiles belonging to us.
+ """
+
+ if type(container) is bool:
+ return
+
name = container.name
+ debug("Destroying container {}".format(name))
+
client = get_lxd_client()
def wait_for_stop(timeout=30):
if client.containers.exists(name) is False:
return
- container.stop(wait=False)
- wait_for_stop()
+ try:
+ container.stop(wait=False)
+ wait_for_stop()
+ except Exception as ex:
+ debug(
+ "Error stopping container {}: {}".format(
+ name,
+ ex,
+ )
+ )
- container.delete(wait=False)
- wait_for_delete()
+ try:
+ container.delete(wait=False)
+ wait_for_delete()
+ except Exception as ex:
+ debug(
+ "Error deleting container {}: {}".format(
+ name,
+ ex,
+ )
+ )
- # Delete the profile created for this container
- profile = client.profiles.get(name)
- if profile:
- profile.delete()
+ try:
+ # Delete the profile created for this container
+ profile = client.profiles.get(name)
+ if profile:
+ profile.delete()
+ except Exception as ex:
+ debug(
+ "Error deleting profile {}: {}".format(
+ name,
+ ex,
+ )
+ )
def find_lxd_config():
return client
+
# TODO: This is marked serial but can be run in parallel with work, including:
# - Fixing an event loop issue; seems that all tests stop when one test stops?
We need to have a flag (instance variable) that controls this behavior. It may be necessary to skip validation and run against a descriptor implementing features that have not yet been released in the Information Model.
"""
+ """
+ The six phases of integration testing, for the test itself and each charm?:
+
+ setup/teardown_class:
+ 1. Prepare - Verify the environment and create a new model
+ 2. Deploy - Mark the test as ready to execute
+ 3. Configure - Configuration to reach Active state
+ 4. Test - Execute primitive(s) to verify success
+ 5. Collect - Collect any useful artifacts for debugging (charm, logs)
+ 6. Destroy - Destroy the model
+
+
+ 1. Prepare - Building of charm
+ 2. Deploy - Deploying charm
+ 3. Configure - Configuration to reach Active state
+ 4. Test - Execute primitive(s) to verify success
+ 5. Collect - Collect any useful artifacts for debugging (charm, logs)
+ 6. Destroy - Destroy the charm
+
+ """
@classmethod
def setup_class(self):
""" setup any state specific to the execution of the given class (which
usually contains tests).
"""
# Initialize instance variable(s)
- # self.container = None
+ self.n2vc = None
# Track internal state for each test run
self.state = {}
self.ns_name = self.nsd['name']
self.vnf_name = self.vnfd['name']
+ # Hard-coded to default for now, but this may change in the future.
+ self.model = "default"
+
self.charms = {}
self.parse_vnf_descriptor()
assert self.charms is not {}
# to pytest. Test(s) should wait for this flag to change to False
# before returning.
self._running = True
+ self._stopping = False
@classmethod
def teardown_class(self):
""" teardown any state that was previously setup with a call to
setup_class.
"""
- for application in self.state:
- logging.warn(
- "Destroying container for application {}".format(application)
- )
- if self.state[application]['container']:
- destroy_lxd_container(self.state[application]['container'])
-
- # Clean up any artifacts created during the test
- logging.debug("Artifacts: {}".format(self.artifacts))
- for charm in self.artifacts:
- artifact = self.artifacts[charm]
- if os.path.exists(artifact['tmpdir']):
- logging.debug("Removing directory '{}'".format(artifact))
- shutil.rmtree(artifact['tmpdir'])
- #
- # Logout of N2VC
- if self.n2vc:
- asyncio.ensure_future(self.n2vc.logout())
- logging.debug("Tearing down")
- pass
+ debug("Running teardown_class...")
+ try:
+ debug("Destroying LXD containers...")
+ for application in self.state:
+ if self.state[application]['container']:
+ destroy_lxd_container(self.state[application]['container'])
+ debug("Destroying LXD containers...done.")
+
+ # Logout of N2VC
+ if self.n2vc:
+ debug("Logging out of N2VC...")
+ yield from self.n2vc.logout()
+ debug("Logging out of N2VC...done.")
+ debug("Running teardown_class...done.")
+ except Exception as ex:
+ debug("Exception in teardown_class: {}".format(ex))
@classmethod
def all_charms_active(self):
"""Determine if the all deployed charms are active."""
active = 0
- for application in self.charms:
- if self.charms[application]['status'] == 'active':
- active += 1
+
+ for application in self.state:
+ if 'status' in self.state[application]:
+ debug("status of {} is '{}'".format(
+ application,
+ self.state[application]['status'],
+ ))
+ if self.state[application]['status'] == 'active':
+ active += 1
+
+ debug("Active charms: {}/{}".format(
+ active,
+ len(self.charms),
+ ))
if active == len(self.charms):
- logging.warn("All charms active!")
return True
return False
@classmethod
- def running(self, timeout=600):
+ def are_tests_finished(self):
+ appcount = len(self.state)
+
+ # If we don't have state yet, keep running.
+ if appcount == 0:
+ debug("No applications")
+ return False
+
+ if self._stopping:
+ debug("_stopping is True")
+ return True
+
+ appdone = 0
+ for application in self.state:
+ if self.state[application]['done']:
+ appdone += 1
+
+ debug("{}/{} charms tested".format(appdone, appcount))
+
+ if appcount == appdone:
+ return True
+
+ return False
+
+ @classmethod
+ async def running(self, timeout=600):
"""Returns if the test is still running.
@param timeout The time, in seconds, to wait for the test to complete.
"""
+ if self.are_tests_finished():
+ await self.stop()
+ return False
+
+ await asyncio.sleep(30)
- # if start + now > start > timeout:
- # self.stop_test()
return self._running
@classmethod
if charm not in self.artifacts:
try:
- # Note: This builds the charm under N2VC/tests/charms/
- # The snap-installed command only has write access to the users $HOME
- # so writing to /tmp isn't possible at the moment.
- builds = tempfile.mkdtemp(dir=get_charm_path())
-
- cmd = "charm build {}/{} -o {}/".format(
- get_layer_path(),
- charm,
- builds,
- )
- logging.debug(cmd)
-
- subprocess.check_call(shlex.split(cmd))
+ # Note: This builds the charm under N2VC/tests/charms/builds/
+ # Currently, the snap-installed command only has write access
+ # to the $HOME (changing in an upcoming release) so writing to
+ # /tmp isn't possible at the moment.
+ builds = get_charm_path()
+
+ if not os.path.exists("{}/builds/{}".format(builds, charm)):
+ cmd = "charm build {}/{} -o {}/".format(
+ get_layer_path(),
+ charm,
+ builds,
+ )
+ subprocess.check_call(shlex.split(cmd))
self.artifacts[charm] = {
'tmpdir': builds,
either a vdu or vnf.
"""
- self.n2vc = get_n2vc(loop=loop)
+ if not self.n2vc:
+ self.n2vc = get_n2vc(loop=loop)
vnf_name = self.n2vc.FormatApplicationName(
self.ns_name,
self.vnf_name,
str(vnf_index),
)
- logging.debug("Deploying charm at {}".format(self.artifacts[charm]))
+ debug("Deploying charm at {}".format(self.artifacts[charm]))
await self.n2vc.DeployCharms(
self.ns_name,
self.get_charm(charm),
params,
{},
- self.n2vc_callback
+ self.n2vc_callback,
)
@classmethod
assert 'proxy' in self.charms[application_name]
assert type(self.charms[application_name]['proxy']) is bool
- # logging.debug(self.charms[application_name])
+ # debug(self.charms[application_name])
return self.charms[application_name]['proxy']
@classmethod
return charms.keys()
+ @classmethod
+ def get_phase(self, application):
+ return self.state[application]['phase']
+
+ @classmethod
+ def set_phase(self, application, phase):
+ self.state[application]['phase'] = phase
+
+ @classmethod
+ async def configure_proxy_charm(self, *args):
+ (model, application, _, _) = args
+
+ try:
+ if self.get_phase(application) == "deploy":
+ self.set_phase(application, "configure")
+
+ debug("Start CreateContainer for {}".format(application))
+ self.state[application]['container'] = \
+ await self.CreateContainer(*args)
+ debug("Done CreateContainer for {}".format(application))
+
+ if self.state[application]['container']:
+ debug("Configure {} for container".format(application))
+ if await self.configure_ssh_proxy(application):
+ await asyncio.sleep(0.1)
+ return True
+ else:
+ debug("Failed to configure container for {}".format(application))
+ else:
+ debug("skipping CreateContainer for {}: {}".format(
+ application,
+ self.get_phase(application),
+ ))
+
+ except Exception as ex:
+ debug("configure_proxy_charm exception: {}".format(ex))
+ finally:
+ await asyncio.sleep(0.1)
+
+ return False
+
+ @classmethod
+ async def execute_charm_tests(self, *args):
+ (model, application, _, _) = args
+
+ debug("Executing charm test(s) for {}".format(application))
+
+ if self.state[application]['done']:
+ debug("Trying to execute tests against finished charm...aborting")
+ return False
+
+ try:
+ phase = self.get_phase(application)
+ # We enter the test phase when after deploy (for native charms) or
+ # configure, for proxy charms.
+ if phase in ["deploy", "configure"]:
+ self.set_phase(application, "test")
+ if self.are_tests_finished():
+ raise Exception("Trying to execute init-config on finished test")
+
+ if await self.execute_initial_config_primitives(application):
+ # check for metrics
+ await self.check_metrics(application)
+
+ debug("Done testing {}".format(application))
+ self.state[application]['done'] = True
+
+ except Exception as ex:
+ debug("Exception in execute_charm_tests: {}".format(ex))
+ finally:
+ await asyncio.sleep(0.1)
+
+ return True
+
@classmethod
async def CreateContainer(self, *args):
"""Create a LXD container for use with a proxy charm.abs
1. Get the public key from the charm via `get-ssh-public-key` action
2. Create container with said key injected for the ubuntu user
+
+ Returns a Container object
"""
# Create and configure a LXD container for use with a proxy charm.
(model, application, _, _) = args
- # self.state[application_name]
- print("trying to create container")
- if self.state[application]['container'] is None:
- logging.debug(
- "Creating container for application {}".format(application)
- )
- # HACK: Set this so the n2vc_callback knows
- # there's a container being created
- self.state[application]['container'] = True
+ debug("[CreateContainer] {}".format(args))
+ container = None
+ try:
# Execute 'get-ssh-public-key' primitive and get returned value
uuid = await self.n2vc.ExecutePrimitive(
model,
"get-ssh-public-key",
None,
)
+
result = await self.n2vc.GetPrimitiveOutput(model, uuid)
pubkey = result['pubkey']
- self.state[application]['container'] = create_lxd_container(
+ container = create_lxd_container(
public_key=pubkey,
name=os.path.basename(__file__)
)
- return self.state[application]['container']
+ return container
+ except Exception as ex:
+ debug("Error creating container: {}".format(ex))
+ pass
+
+ return None
@classmethod
- async def stop():
+ async def stop(self):
"""Stop the test.
- Remove charms
- Stop and delete containers
- Logout of N2VC
+
+ TODO: Clean up duplicate code between teardown_class() and stop()
"""
- logging.warning("Stop the test.")
- assert True
- for application in self.charms:
+ debug("stop() called")
+
+ if self.n2vc and self._running and not self._stopping:
+ self._running = False
+ self._stopping = True
+
+ for application in self.charms:
+ try:
+ await self.n2vc.RemoveCharms(self.model, application)
+ if self.state[application]['container']:
+ debug("Deleting LXD container...")
+ destroy_lxd_container(
+ self.state[application]['container']
+ )
+ self.state[application]['container'] = None
+ debug("Deleting LXD container...done.")
+ else:
+ debug("No container found for {}".format(application))
+ except Exception as e:
+ debug("Error while deleting container: {}".format(e))
+
+ # Logout of N2VC
try:
- logging.warn("Removing charm")
- await self.n2vc.RemoveCharms(model, application)
-
- logging.warn(
- "Destroying container for application {}".format(application)
- )
- if self.state[application]['container']:
- destroy_lxd_container(self.state[application]['container'])
- except Exception as e:
- logging.warn("Error while deleting container: {}".format(e))
-
- # Clean up any artifacts created during the test
- logging.debug("Artifacts: {}".format(self.artifacts))
- for charm in self.artifacts:
- artifact = self.artifacts[charm]
- if os.path.exists(artifact['tmpdir']):
- logging.debug("Removing directory '{}'".format(artifact))
- shutil.rmtree(artifact['tmpdir'])
-
- # Logout of N2VC
- await self.n2vc.logout()
- self.n2vc = None
-
- self._running = False
+ debug("Logging out of N2VC...")
+ await self.n2vc.logout()
+ self.n2vc = None
+ debug("Logging out of N2VC...Done.")
+ except Exception as ex:
+ debug(ex)
+
+ # Let the test know we're finished.
+ debug("Marking test as finished.")
+ # self._running = False
+ else:
+ debug("Skipping stop()")
@classmethod
def get_container_ip(self, container):
return ipaddr
@classmethod
- def n2vc_callback(self, *args, **kwargs):
- """Monitor and react to changes in the charm state.
-
- This is where we will monitor the state of the charm:
- - is it active?
- - is it in error?
- - is it waiting on input to continue?
-
- When the state changes, we respond appropriately:
- - configuring ssh credentials for a proxy charm
- - running a service primitive
-
- Lastly, when the test has finished we begin the teardown, removing the
- charm and associated LXD container, and notify pytest that this test
- is over.
-
- Args are expected to contain four values, received from N2VC:
- - str, the name of the model
- - str, the name of the application
- - str, the workload status as reported by Juju
- - str, the workload message as reported by Juju
- """
- (model, application, status, message) = args
- # logging.warn("Callback for {}/{} - {} ({})".format(
- # model,
- # application,
- # status,
- # message
- # ))
+ async def configure_ssh_proxy(self, application, task=None):
+ """Configure the proxy charm to use the lxd container.
- if application not in self.state:
- # Initialize the state of the application
- self.state[application] = {
- 'status': None,
- 'container': None,
- }
-
- # Make sure we're only receiving valid status. This will catch charms
- # that aren't setting their workload state and appear as "unknown"
- # assert status not in ["active", "blocked", "waiting", "maintenance"]
-
- task = None
- if kwargs and 'task' in kwargs:
- task = kwargs['task']
- # logging.debug("Got task: {}".format(task))
-
- # if application in self.charms:
- self.state[application]['status'] = status
-
- # Closures and inner functions, oh my.
- def is_active():
- """Is the charm in an active state?"""
- if status in ["active"]:
- return True
- return False
-
- def is_blocked():
- """Is the charm waiting for us?"""
- if status in ["blocked"]:
- return True
- return False
-
- def configure_ssh_proxy(task):
- """Configure the proxy charm to use the lxd container."""
- logging.debug("configure_ssh_proxy({})".format(task))
-
- mgmtaddr = self.get_container_ip(
- self.state[application]['container'],
- )
-
- logging.debug(
- "Setting config ssh-hostname={}".format(mgmtaddr)
- )
+ Configure the charm to use a LXD container as it's VNF.
+ """
+ debug("Configuring ssh proxy for {}".format(application))
- # task = asyncio.ensure_future(
- # stop_test,
- # )
- # return
+ mgmtaddr = self.get_container_ip(
+ self.state[application]['container'],
+ )
- task = asyncio.ensure_future(
- self.n2vc.ExecutePrimitive(
- model,
- application,
- "config",
- None,
- params={
- 'ssh-hostname': mgmtaddr,
- 'ssh-username': 'ubuntu',
- }
- )
+ debug(
+ "Setting ssh-hostname for {} to {}".format(
+ application,
+ mgmtaddr,
)
+ )
- # Execute the VNFD's 'initial-config-primitive'
- task.add_done_callback(functools.partial(
- execute_initial_config_primitives,
- ))
+ await self.n2vc.ExecutePrimitive(
+ self.model,
+ application,
+ "config",
+ None,
+ params={
+ 'ssh-hostname': mgmtaddr,
+ 'ssh-username': 'ubuntu',
+ }
+ )
- def execute_initial_config_primitives(task=None):
- logging.debug("execute_initial_config_primitives({})".format(task))
+ return True
+ @classmethod
+ async def execute_initial_config_primitives(self, application, task=None):
+ debug("Executing initial_config_primitives for {}".format(application))
+ try:
init_config = self.charms[application]
"""
The initial-config-primitive is run during deploy but may fail
on some steps because proxy charm access isn't configured.
- At this stage, we'll re-run those actions.
+ Re-run those actions so we can inspect the status.
"""
-
- task = asyncio.ensure_future(
- self.n2vc.ExecuteInitialPrimitives(
- model,
- application,
- init_config,
- )
+ uuids = await self.n2vc.ExecuteInitialPrimitives(
+ self.model,
+ application,
+ init_config,
)
"""
status of each. The test continues if all Actions succeed, and
fails if any of them fail.
"""
- task.add_done_callback(functools.partial(wait_for_uuids))
+ await self.wait_for_uuids(application, uuids)
+ debug("Primitives for {} finished.".format(application))
- def check_metrics():
- task = asyncio.ensure_future(
- self.n2vc.GetMetrics(
- model,
- application,
- )
- )
+ return True
+ except Exception as ex:
+ debug("execute_initial_config_primitives exception: {}".format(ex))
- task.add_done_callback(
- functools.partial(
- verify_metrics,
- )
+ return False
+
+ @classmethod
+ async def check_metrics(self, application, task=None):
+ """Check and run metrics, if present.
+
+ Checks to see if metrics are specified by the charm. If so, collects
+ the metrics.
+
+ If no metrics, then mark the test as finished.
+ """
+ if has_metrics(self.charms[application]['name']):
+ debug("Collecting metrics for {}".format(application))
+
+ metrics = await self.n2vc.GetMetrics(
+ self.model,
+ application,
)
- def verify_metrics(task):
- logging.debug("Verifying metrics!")
- # Check if task returned metrics
- results = task.result()
+ return await self.verify_metrics(application, metrics)
- metrics = parse_metrics(application, results)
- logging.debug(metrics)
+ @classmethod
+ async def verify_metrics(self, application, metrics):
+ """Verify the charm's metrics.
- if len(metrics):
- logging.warn("[metrics] removing charms")
- task = asyncio.ensure_future(
- self.n2vc.RemoveCharms(model, application)
- )
+ Verify that the charm has sent metrics successfully.
- task.add_done_callback(functools.partial(self.stop))
+ Stops the test when finished.
+ """
+ debug("Verifying metrics for {}: {}".format(application, metrics))
- else:
- # TODO: Ran into a case where it took 9 attempts before metrics
- # were available; the controller is slow sometimes.
- time.sleep(60)
- check_metrics()
-
- def wait_for_uuids(task):
- logging.debug("wait_for_uuids({})".format(task))
- uuids = task.result()
-
- waitfor = len(uuids)
- finished = 0
-
- def get_primitive_result(uuid, task):
- logging.debug("Got result from action")
- # completed, failed, or running
- result = task.result()
-
- if status in result and result['status'] \
- in ["completed", "failed"]:
-
- # It's over
- logging.debug("Action {} is {}".format(
- uuid,
- task.result['status'])
- )
- pass
- else:
- logging.debug("action is still running")
-
- def get_primitive_status(uuid, task):
- result = task.result()
-
- if result == "completed":
- # Make sure all primitives are finished
- global finished
- finished += 1
-
- if waitfor == finished:
- if self.all_charms_active():
- logging.debug("Action complete; removing charm")
- task = asyncio.ensure_future(
- self.stop,
- )
- # task = asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- # task.add_done_callback(functools.partial(stop_test))
- else:
- logging.warn("Not all charms in an active state.")
- elif result == "failed":
- logging.debug("Action failed; removing charm")
- task = asyncio.ensure_future(
- self.stop,
- )
- # task = asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- # task.add_done_callback(functools.partial(stop_test))
-
- # assert False
- # self._running = False
- # return
- else:
- # logging.debug("action is still running: {}".format(result))
- # logging.debug(result)
- # pass
- # The primitive is running; try again.
- task = asyncio.ensure_future(
- self.n2vc.GetPrimitiveStatus(model, uuid)
- )
- task.add_done_callback(functools.partial(
- get_primitive_result,
- uuid,
- ))
-
- for actionid in uuids:
- task = asyncio.ensure_future(
- self.n2vc.GetPrimitiveStatus(model, actionid)
- )
- task.add_done_callback(functools.partial(
- get_primitive_result,
- actionid,
- ))
+ if len(metrics):
+ return True
- # def stop_test(task):
- # """Stop the test.
- #
- # When the test has either succeeded or reached a failing state,
- # begin the process of removing the test fixtures.
- # """
- # for application in self.charms:
- # asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- #
- # self._running = False
-
- if is_blocked():
- # Container only applies to proxy charms.
- if self.isproxy(application):
-
- if self.state[application]['container'] is None:
- logging.warn("Creating new container")
- # Create the new LXD container
-
- task = asyncio.ensure_future(self.CreateContainer(*args))
-
- # Configure the proxy charm to use the container when ready
- task.add_done_callback(functools.partial(
- configure_ssh_proxy,
- ))
-
- # task.add_done_callback(functools.partial(
- # stop_test,
- # ))
- # create_lxd_container()
- # self.container = True
- # else:
- # logging.warn("{} already has container".format(application))
- #
- # task = asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- # task.add_done_callback(functools.partial(stop_test))
+ else:
+ # TODO: Ran into a case where it took 9 attempts before metrics
+ # were available; the controller is slow sometimes.
+ await asyncio.sleep(30)
+ return await self.check_metrics(application)
- else:
- # A charm may validly be in a blocked state if it's waiting for
- # relations or some other kind of manual intervention
- # logging.debug("This is not a proxy charm.")
- # TODO: needs testing
- task = asyncio.ensure_future(
- execute_initial_config_primitives()
- )
+ @classmethod
+ async def wait_for_uuids(self, application, uuids):
+ """Wait for primitives to execute.
- # task.add_done_callback(functools.partial(stop_test))
+ The task will provide a list of uuids representing primitives that are
+ queued to run.
+ """
+ debug("Waiting for uuids for {}: {}".format(application, uuids))
+ waitfor = len(uuids)
+ finished = 0
- elif is_active():
- # Does the charm have metrics defined?
- if has_metrics(self.charms[application]['name']):
- # logging.debug("metrics.yaml defined in the layer!")
+ while waitfor > finished:
+ for uid in uuids:
+ await asyncio.sleep(10)
- # Force a run of the metric collector, so we don't have
- # to wait for it's normal 5 minute interval run.
- # NOTE: this shouldn't be done outside of CI
- collect_metrics(application)
+ if uuid not in self.state[application]['actions']:
+ self.state[application]['actions'][uid] = "pending"
- # get the current metrics
- check_metrics()
- else:
- # When the charm reaches an active state and hasn't been
- # handled (metrics collection, etc)., the test has succeded.
- # logging.debug("Charm is active! Removing charm...")
- if self.all_charms_active():
- logging.warn("All charms active!")
- task = asyncio.ensure_future(
- self.stop(),
+ status = self.state[application]['actions'][uid]
+
+ # Have we already marked this as done?
+ if status in ["pending", "running"]:
+
+ debug("Getting status of {} ({})...".format(uid, status))
+ status = await self.n2vc.GetPrimitiveStatus(
+ self.model,
+ uid,
)
+ debug("...state of {} is {}".format(uid, status))
+ self.state[application]['actions'][uid] = status
- # task = asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- # task.add_done_callback(functools.partial(stop_test))
- else:
- logging.warning("Waiting for all charms to be active.")
- # task = asyncio.ensure_future(
- # self.n2vc.RemoveCharms(model, application)
- # )
- # task.add_done_callback(functools.partial(stop_test))
+ if status in ['completed', 'failed']:
+ finished += 1
+
+ debug("{}/{} actions complete".format(finished, waitfor))
+
+ # Wait for the primitive to finish and try again
+ if waitfor > finished:
+ debug("Waiting 10s for action to finish...")
+ await asyncio.sleep(10)
+
+ @classmethod
+ def n2vc_callback(self, *args, **kwargs):
+ (model, application, status, message) = args
+ # debug("callback: {}".format(args))
+
+ if application not in self.state:
+ # Initialize the state of the application
+ self.state[application] = {
+ 'status': None, # Juju status
+ 'container': None, # lxd container, for proxy charms
+ 'actions': {}, # Actions we've executed
+ 'done': False, # Are we done testing this charm?
+ 'phase': "deploy", # What phase is this application in?
+ }
+
+ self.state[application]['status'] = status
+
+ if status in ['waiting', 'maintenance', 'unknown']:
+ # Nothing to do for these
+ return
+
+ debug("callback: {}".format(args))
+
+ if self.state[application]['done']:
+ debug("{} is done".format(application))
+ return
+
+ if status in ["blocked"] and self.isproxy(application):
+ if self.state[application]['phase'] == "deploy":
+ debug("Configuring proxy charm for {}".format(application))
+ asyncio.ensure_future(self.configure_proxy_charm(*args))
+
+ elif status in ["active"]:
+ """When a charm is active, we can assume that it has been properly
+ configured (not blocked), regardless of if it's a proxy or not.
+
+ All primitives should be complete by init_config_primitive
+ """
+ asyncio.ensure_future(self.execute_charm_tests(*args))