here = os.path.dirname(os.path.realpath(__file__))
-def is_bootstrapped():
- result = subprocess.run(['juju', 'switch'], stdout=subprocess.PIPE)
- return (
- result.returncode == 0 and len(result.stdout.decode().strip()) > 0)
-
-
-bootstrapped = pytest.mark.skipif(
- not is_bootstrapped(),
- reason='bootstrapped Juju environment required')
-
-
class CleanController():
"""
Context manager that automatically connects and disconnects from
vca_charms = os.getenv('VCA_CHARMS', None)
vca_secret = os.getenv('VCA_SECRET', None)
+ # Get the Juju Public key
+ juju_public_key = get_juju_public_key()
+ if juju_public_key:
+ debug("Reading Juju public key @ {}".format(juju_public_key))
+ with open(juju_public_key, 'r') as f:
+ juju_public_key = f.read()
+ debug("Found public key: {}".format(juju_public_key))
+ else:
+ raise Exception("No Juju Public Key found")
+
+ # Get the ca-cert
+ # os.path.expanduser("~/.config/lxc")
+ # with open("{}/agent.conf".format(AGENT_PATH), "r") as f:
+ # try:
+ # y = yaml.safe_load(f)
+ # self.cacert = y['cacert']
+ # except yaml.YAMLError as exc:
+ # log("Unable to find Juju ca-cert.")
+ # raise exc
+
client = n2vc.vnf.N2VC(
log=log,
server=vca_host,
user=vca_user,
secret=vca_secret,
artifacts=vca_charms,
- loop=loop
+ loop=loop,
+ juju_public_key=juju_public_key,
)
return client
name = name.replace("_", "-").replace(".", "")
client = get_lxd_client()
+ if not client:
+ raise Exception("Unable to connect to LXD")
+
test_machine = "test-{}-{}".format(
uuid.uuid4().hex[-4:],
name,
"""Find the Juju ssh keys."""
paths = []
- paths.append(os.path.expanduser("~/.local/share/juju/ssh/"))
+ paths.append(os.path.expanduser("~/.local/share/juju/ssh"))
for path in paths:
if os.path.exists(path):
return keys[0]
-def get_lxd_client(host="127.0.0.1", port="8443", verify=False):
+def get_juju_public_key():
+ """Find the Juju public key."""
+ paths = []
+
+ if 'VCA_PATH' in os.environ:
+ paths.append("{}/ssh".format(os.environ["VCA_PATH"]))
+
+ paths.append(os.path.expanduser("~/.local/share/juju/ssh"))
+ paths.append("/root/.local/share/juju/ssh")
+
+ for path in paths:
+ if os.path.exists(path):
+ public = os.path.expanduser("{}/juju_id_rsa.pub".format(path))
+ if os.path.exists(public):
+ return public
+ return None
+
+
+def get_lxd_client(host=None, port="8443", verify=False):
""" Get the LXD client."""
+
+ if host is None:
+ if 'LXD_HOST' in os.environ:
+ host = os.environ['LXD_HOST']
+ else:
+ host = '127.0.0.1'
+
+ passwd = None
+ if 'LXD_SECRET' in os.environ:
+ passwd = os.environ['LXD_SECRET']
+
+ # debug("Connecting to LXD remote {} w/authentication ({})".format(
+ # host,
+ # passwd
+ # ))
client = None
(crt, key) = find_lxd_config()
verify=verify,
)
+ # If the LXD server has a pasword set, authenticate with it.
+ if not client.trusted and passwd:
+ try:
+ client.authenticate(passwd)
+ if not client.trusted:
+ raise Exception("Unable to authenticate with LXD remote")
+ except pylxd.exceptions.LXDAPIException as ex:
+ if 'Certificate already in trust store' in ex:
+ pass
+
return client
"""TODO:
1. Validator Validation
- Automatically validate the descriptors we're using here, unless the test author explicitly wants to skip them. Useful to make sure tests aren't being run against invalid descriptors, validating functionality that may fail against a properly written descriptor.
+ Automatically validate the descriptors we're using here, unless the test
+ author explicitly wants to skip them. Useful to make sure tests aren't
+ being run against invalid descriptors, validating functionality that may
+ fail against a properly written descriptor.
- We need to have a flag (instance variable) that controls this behavior. It may be necessary to skip validation and run against a descriptor implementing features that have not yet been released in the Information Model.
+ We need to have a flag (instance variable) that controls this behavior. It
+ may be necessary to skip validation and run against a descriptor
+ implementing features that have not yet been released in the Information
+ Model.
"""
"""
# Build the charm(s) needed for this test
for charm in self.get_charm_names():
+ # debug("Building charm {}".format(charm))
self.get_charm(charm)
# A bit of a hack, in order to allow the N2VC callback to run parallel
Returns: The path to the built charm or None if `charm build` failed.
"""
-
# Make sure the charm snap is installed
+ charm_cmd = None
try:
subprocess.check_call(['which', 'charm'])
+ charm_cmd = "charm build"
except subprocess.CalledProcessError:
+ # charm_cmd = "charm-build"
+ # debug("Using legacy charm-build")
raise Exception("charm snap not installed.")
if charm not in self.artifacts:
# Currently, the snap-installed command only has write access
# to the $HOME (changing in an upcoming release) so writing to
# /tmp isn't possible at the moment.
- builds = get_charm_path()
+ builds = get_charm_path()
if not os.path.exists("{}/builds/{}".format(builds, charm)):
- cmd = "charm build --no-local-layers {}/{} -o {}/".format(
+ cmd = "{} --no-local-layers {}/{} -o {}/".format(
+ charm_cmd,
get_layer_path(),
charm,
builds,
)
- subprocess.check_call(shlex.split(cmd))
+ # debug(cmd)
+
+ env = os.environ.copy()
+ env["CHARM_BUILD_DIR"] = builds
+
+ subprocess.check_call(shlex.split(cmd), env=env)
except subprocess.CalledProcessError as e:
# charm build will return error code 100 if the charm fails
)
machine_spec = {
- 'host': hostname,
- 'user': 'ubuntu',
+ 'hostname': hostname,
+ 'username': 'ubuntu',
}
await self.n2vc.DeployCharms(
return True
except Exception as ex:
debug("execute_initial_config_primitives exception: {}".format(ex))
-
+ raise ex
+
return False
@classmethod