# limitations under the License.
import asyncio
+import base64
+import binascii
import logging
-import os
import os.path
import re
import shlex
import ssl
import subprocess
-import sys
-# import time
-from n2vc.provisioner import SSHProvisioner
-
-# FIXME: this should load the juju inside or modules without having to
-# explicitly install it. Check why it's not working.
-# Load our subtree of the juju library
-path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
-path = os.path.join(path, "modules/libjuju/")
-if path not in sys.path:
- sys.path.insert(1, path)
from juju.client import client
from juju.controller import Controller
-from juju.model import ModelObserver
from juju.errors import JujuAPIError, JujuError
+from juju.model import ModelObserver
+import n2vc.exceptions
+from n2vc.provisioner import SSHProvisioner
+
+# import time
+# FIXME: this should load the juju inside or modules without having to
+# explicitly install it. Check why it's not working.
+# Load our subtree of the juju library
+# path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
+# path = os.path.join(path, "modules/libjuju/")
+# if path not in sys.path:
+# sys.path.insert(1, path)
# We might need this to connect to the websocket securely, but test and verify.
try:
ssl._create_default_https_context = ssl._create_unverified_context
# Quiet the debug logging
-logging.getLogger('websockets.protocol').setLevel(logging.INFO)
-logging.getLogger('juju.client.connection').setLevel(logging.WARN)
-logging.getLogger('juju.model').setLevel(logging.WARN)
-logging.getLogger('juju.machine').setLevel(logging.WARN)
+logging.getLogger("websockets.protocol").setLevel(logging.INFO)
+logging.getLogger("juju.client.connection").setLevel(logging.WARN)
+logging.getLogger("juju.model").setLevel(logging.WARN)
+logging.getLogger("juju.machine").setLevel(logging.WARN)
class VCAMonitor(ModelObserver):
"""Monitor state changes within the Juju Model."""
+
log = None
def __init__(self, ns_name):
def AddApplication(self, application_name, callback, *callback_args):
if application_name not in self.applications:
self.applications[application_name] = {
- 'callback': callback,
- 'callback_args': callback_args
+ "callback": callback,
+ "callback_args": callback_args,
}
def RemoveApplication(self, application_name):
if delta.entity == "unit":
# Ignore change events from other applications
- if delta.data['application'] not in self.applications.keys():
+ if delta.data["application"] not in self.applications.keys():
return
try:
- application_name = delta.data['application']
+ application_name = delta.data["application"]
- callback = self.applications[application_name]['callback']
- callback_args = \
- self.applications[application_name]['callback_args']
+ callback = self.applications[application_name]["callback"]
+ callback_args = self.applications[application_name]["callback_args"]
if old and new:
# Fire off a callback with the application state
if callback:
callback(
self.ns_name,
- delta.data['application'],
+ delta.data["application"],
new.workload_status,
new.workload_status_message,
- *callback_args)
+ *callback_args,
+ )
if old and not new:
# This is a charm being removed
if callback:
callback(
self.ns_name,
- delta.data['application'],
+ delta.data["application"],
"removed",
"",
- *callback_args)
+ *callback_args,
+ )
except Exception as e:
self.log.debug("[1] notify_callback exception: {}".format(e))
pass
+
########
# TODO
#
class N2VC:
- def __init__(self,
- log=None,
- server='127.0.0.1',
- port=17070,
- user='admin',
- secret=None,
- artifacts=None,
- loop=None,
- juju_public_key=None,
- ca_cert=None,
- api_proxy=None
- ):
+ def __init__(
+ self,
+ log=None,
+ server="127.0.0.1",
+ port=17070,
+ user="admin",
+ secret=None,
+ artifacts=None,
+ loop=None,
+ juju_public_key=None,
+ ca_cert=None,
+ api_proxy=None,
+ ):
"""Initialize N2VC
Initializes the N2VC object, allowing the caller to interoperate with the VCA.
self.authenticated = False
self.api_proxy = api_proxy
+ if log:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
# For debugging
self.refcount = {
- 'controller': 0,
- 'model': 0,
+ "controller": 0,
+ "model": 0,
}
self.models = {}
self.port = 17070
self.username = ""
self.secret = ""
-
+
self.juju_public_key = juju_public_key
if juju_public_key:
self._create_juju_public_key(juju_public_key)
+ else:
+ self.juju_public_key = ""
# TODO: Verify ca_cert is valid before using. VCA will crash
# if the ca_cert isn't formatted correctly.
- # self.ca_cert = ca_cert
- self.ca_cert = None
+ def base64_to_cacert(b64string):
+ """Convert the base64-encoded string containing the VCA CACERT.
- if log:
- self.log = log
- else:
- self.log = logging.getLogger(__name__)
+ The input string....
+
+ """
+ try:
+ cacert = base64.b64decode(b64string).decode("utf-8")
+
+ cacert = re.sub(r"\\n", r"\n", cacert,)
+ except binascii.Error as e:
+ self.log.debug("Caught binascii.Error: {}".format(e))
+ raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate")
+
+ return cacert
+
+ self.ca_cert = None
+ if ca_cert:
+ self.ca_cert = base64_to_cacert(ca_cert)
# Quiet websocket traffic
- logging.getLogger('websockets.protocol').setLevel(logging.INFO)
- logging.getLogger('juju.client.connection').setLevel(logging.WARN)
- logging.getLogger('model').setLevel(logging.WARN)
+ logging.getLogger("websockets.protocol").setLevel(logging.INFO)
+ logging.getLogger("juju.client.connection").setLevel(logging.WARN)
+ logging.getLogger("model").setLevel(logging.WARN)
# logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
- self.log.debug('JujuApi: instantiated')
+ self.log.debug("JujuApi: instantiated")
self.server = server
self.port = port
self.secret = secret
- if user.startswith('user-'):
+ if user.startswith("user-"):
self.user = user
else:
- self.user = 'user-{}'.format(user)
+ self.user = "user-{}".format(user)
- self.endpoint = '%s:%d' % (server, int(port))
+ self.endpoint = "%s:%d" % (server, int(port))
self.artifacts = artifacts
"""
# Make sure that we have a public key before writing to disk
if public_key is None or len(public_key) == 0:
- if 'OSM_VCA_PUBKEY' in os.environ:
- public_key = os.getenv('OSM_VCA_PUBKEY', '')
+ if "OSM_VCA_PUBKEY" in os.environ:
+ public_key = os.getenv("OSM_VCA_PUBKEY", "")
if len(public_key == 0):
return
else:
return
- path = "{}/.local/share/juju/ssh".format(
- os.path.expanduser('~'),
- )
+ path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),)
if not os.path.exists(path):
os.makedirs(path)
- with open('{}/juju_id_rsa.pub'.format(path), 'w') as f:
+ with open("{}/juju_id_rsa.pub".format(path), "w") as f:
f.write(public_key)
- def notify_callback(self, model_name, application_name, status, message,
- callback=None, *callback_args):
+ def notify_callback(
+ self,
+ model_name,
+ application_name,
+ status,
+ message,
+ callback=None,
+ *callback_args
+ ):
try:
if callback:
callback(
- model_name,
- application_name,
- status, message,
- *callback_args,
+ model_name, application_name, status, message, *callback_args,
)
except Exception as e:
self.log.error("[0] notify_callback exception {}".format(e))
async def Relate(self, model_name, vnfd):
"""Create a relation between the charm-enabled VDUs in a VNF.
- The Relation mapping has two parts: the id of the vdu owning the endpoint, and the name of the endpoint.
+ The Relation mapping has two parts: the id of the vdu owning the endpoint, and
+ the name of the endpoint.
vdu:
...
- provides: dataVM:db
requires: mgmtVM:app
- This tells N2VC that the charm referred to by the dataVM vdu offers a relation named 'db', and the mgmtVM vdu has an 'app' endpoint that should be connected to a database.
+ This tells N2VC that the charm referred to by the dataVM vdu offers a relation
+ named 'db', and the mgmtVM vdu
+ has an 'app' endpoint that should be connected to a database.
:param str ns_name: The name of the network service.
:param dict vnfd: The parsed yaml VNF descriptor.
configs = []
vnf_config = vnfd.get("vnf-configuration")
if vnf_config:
- juju = vnf_config['juju']
+ juju = vnf_config["juju"]
if juju:
configs.append(vnf_config)
- for vdu in vnfd['vdu']:
- vdu_config = vdu.get('vdu-configuration')
+ for vdu in vnfd["vdu"]:
+ vdu_config = vdu.get("vdu-configuration")
if vdu_config:
- juju = vdu_config['juju']
+ juju = vdu_config["juju"]
if juju:
configs.append(vdu_config)
def _get_application_name(name):
"""Get the application name that's mapped to a vnf/vdu."""
vnf_member_index = 0
- vnf_name = vnfd['name']
+ vnf_name = vnfd["name"]
- for vdu in vnfd.get('vdu'):
+ for vdu in vnfd.get("vdu"):
# Compare the named portion of the relation to the vdu's id
- if vdu['id'] == name:
+ if vdu["id"] == name:
application_name = self.FormatApplicationName(
- model_name,
- vnf_name,
- str(vnf_member_index),
+ model_name, vnf_name, str(vnf_member_index),
)
return application_name
else:
# Loop through relations
for cfg in configs:
- if 'juju' in cfg:
- juju = cfg['juju']
- if 'vca-relationships' in juju and 'relation' in juju['vca-relationships']:
- for rel in juju['vca-relationships']['relation']:
+ if "juju" in cfg:
+ juju = cfg["juju"]
+ if (
+ "vca-relationships" in juju
+ and "relation" in juju["vca-relationships"]
+ ):
+ for rel in juju["vca-relationships"]["relation"]:
try:
# get the application name for the provides
- (name, endpoint) = rel['provides'].split(':')
+ (name, endpoint) = rel["provides"].split(":")
application_name = _get_application_name(name)
- provides = "{}:{}".format(
- application_name,
- endpoint
- )
+ provides = "{}:{}".format(application_name, endpoint)
# get the application name for thr requires
- (name, endpoint) = rel['requires'].split(':')
+ (name, endpoint) = rel["requires"].split(":")
application_name = _get_application_name(name)
- requires = "{}:{}".format(
- application_name,
- endpoint
+ requires = "{}:{}".format(application_name, endpoint)
+ self.log.debug(
+ "Relation: {} <-> {}".format(provides, requires)
)
- self.log.debug("Relation: {} <-> {}".format(
- provides,
- requires
- ))
await self.add_relation(
- model_name,
- provides,
- requires,
+ model_name, provides, requires,
)
except Exception as e:
self.log.debug("Exception: {}".format(e))
return
- async def DeployCharms(self, model_name, application_name, vnfd,
- charm_path, params={}, machine_spec={},
- callback=None, *callback_args):
+ async def DeployCharms(
+ self,
+ model_name,
+ application_name,
+ vnfd,
+ charm_path,
+ params={},
+ machine_spec={},
+ callback=None,
+ *callback_args
+ ):
"""Deploy one or more charms associated with a VNF.
Deploy the charm(s) referenced in a VNF Descriptor.
'rw_mgmt_ip': '1.2.3.4',
# Pass the initial-config-primitives section of the vnf or vdu
'initial-config-primitives': {...}
- 'user_values': dictionary with the day-1 parameters provided at instantiation time. It will replace values
+ 'user_values': dictionary with the day-1 parameters provided at
+ instantiation time. It will replace values
inside < >. rw_mgmt_ip will be included here also
}
:param dict machine_spec: A dictionary describing the machine to
self.notify_callback(
model_name,
application_name,
+ "error",
"failed",
callback,
*callback_args,
########################################
app = await self.get_application(model, application_name)
if app:
- raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model_name))
+ raise JujuApplicationExists(
+ (
+ 'Can\'t deploy application "{}" to model '
+ ' "{}" because it already exists.'
+ ).format(application_name, model_name)
+ )
################################################################
# Register this application with the model-level event monitor #
################################################################
if callback:
- self.log.debug("JujuApi: Registering callback for {}".format(
- application_name,
- ))
+ self.log.debug(
+ "JujuApi: Registering callback for {}".format(application_name,)
+ )
await self.Subscribe(model_name, application_name, callback, *callback_args)
#######################################
#######################################
rw_mgmt_ip = None
- if 'rw_mgmt_ip' in params:
- rw_mgmt_ip = params['rw_mgmt_ip']
+ if "rw_mgmt_ip" in params:
+ rw_mgmt_ip = params["rw_mgmt_ip"]
- if 'initial-config-primitive' not in params:
- params['initial-config-primitive'] = {}
+ if "initial-config-primitive" not in params:
+ params["initial-config-primitive"] = {}
initial_config = self._get_config_from_dict(
- params['initial-config-primitive'],
- {'<rw_mgmt_ip>': rw_mgmt_ip}
+ params["initial-config-primitive"], {"<rw_mgmt_ip>": rw_mgmt_ip}
)
########################################################
series = "xenial"
if machine_spec.keys():
- if all(k in machine_spec for k in ['hostname', 'username']):
+ if all(k in machine_spec for k in ["hostname", "username"]):
# Allow series to be derived from the native charm
series = None
- self.log.debug("Provisioning manual machine {}@{}".format(
- machine_spec['username'],
- machine_spec['hostname'],
- ))
+ self.log.debug(
+ "Provisioning manual machine {}@{}".format(
+ machine_spec["username"], machine_spec["hostname"],
+ )
+ )
"""Native Charm support
to = await self.provision_machine(
model_name=model_name,
- username=machine_spec['username'],
- hostname=machine_spec['hostname'],
+ username=machine_spec["username"],
+ hostname=machine_spec["hostname"],
private_key_path=self.GetPrivateKeyPath(),
)
self.log.debug("Provisioned machine id {}".format(to))
# TODO: If to is none, raise an exception
- # The native charm won't have the sshproxy layer, typically, but LCM uses the config primitive
+ # The native charm won't have the sshproxy layer, typically, but LCM
+ # uses the config primitive
# to interpret what the values are. That's a gap to fill.
"""
# Native charms don't include the ssh-* config values, so strip them
# from the initial_config, otherwise the deploy will raise an error.
# self.log.debug("Removing ssh-* from initial-config")
- for k in ['ssh-hostname', 'ssh-username', 'ssh-password']:
+ for k in ["ssh-hostname", "ssh-username", "ssh-password"]:
if k in initial_config:
self.log.debug("Removing parameter {}".format(k))
del initial_config[k]
- self.log.debug("JujuApi: Deploying charm ({}/{}) from {} to {}".format(
- model_name,
- application_name,
- charm_path,
- to,
- ))
+ self.log.debug(
+ "JujuApi: Deploying charm ({}/{}) from {} to {}".format(
+ model_name, application_name, charm_path, to,
+ )
+ )
########################################################
# Deploy the charm and apply the initial configuration #
except KeyError as ex:
# We don't currently support relations between NS and VNF/VDU charms
self.log.warn("[N2VC] Relations not supported: {}".format(ex))
- except Exception as ex:
+ except Exception:
# This may happen if not all of the charms needed by the relation
# are ready. We can safely ignore this, because Relate will be
# retried when the endpoint of the relation is deployed.
# # Execute initial config primitive(s) #
# #######################################
uuids = await self.ExecuteInitialPrimitives(
- model_name,
- application_name,
- params,
+ model_name, application_name, params,
)
return uuids
# raise N2VCPrimitiveExecutionFailed(e)
def GetPrivateKeyPath(self):
- homedir = os.environ['HOME']
+ homedir = os.environ["HOME"]
sshdir = "{}/.ssh".format(homedir)
private_key_path = "{}/id_n2vc_rsa".format(sshdir)
return private_key_path
Juju, after which Juju will communicate with the VM directly via the
juju agent.
"""
- public_key = ""
+ # public_key = ""
# Find the path to where we expect our key to live.
- homedir = os.environ['HOME']
+ homedir = os.environ["HOME"]
sshdir = "{}/.ssh".format(homedir)
if not os.path.exists(sshdir):
os.mkdir(sshdir)
# If we don't have a key generated, generate it.
if not os.path.exists(private_key_path):
cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
- "rsa",
- "4096",
- private_key_path
+ "rsa", "4096", private_key_path
)
subprocess.check_output(shlex.split(cmd))
return public_key
- async def ExecuteInitialPrimitives(self, model_name, application_name,
- params, callback=None, *callback_args):
+ async def ExecuteInitialPrimitives(
+ self, model_name, application_name, params, callback=None, *callback_args
+ ):
"""Execute multiple primitives.
Execute multiple primitives as declared in initial-config-primitive.
primitives = {}
# Build a sequential list of the primitives to execute
- for primitive in params['initial-config-primitive']:
+ for primitive in params["initial-config-primitive"]:
try:
- if primitive['name'] == 'config':
+ if primitive["name"] == "config":
pass
else:
- seq = primitive['seq']
+ seq = primitive["seq"]
params_ = {}
- if 'parameter' in primitive:
- params_ = primitive['parameter']
+ if "parameter" in primitive:
+ params_ = primitive["parameter"]
user_values = params.get("user_values", {})
- if 'rw_mgmt_ip' not in user_values:
- user_values['rw_mgmt_ip'] = None
- # just for backward compatibility, because it will be provided always by modern version of LCM
+ if "rw_mgmt_ip" not in user_values:
+ user_values["rw_mgmt_ip"] = None
+ # just for backward compatibility, because it will be provided
+ # always by modern version of LCM
primitives[seq] = {
- 'name': primitive['name'],
- 'parameters': self._map_primitive_parameters(
- params_,
- user_values
+ "name": primitive["name"],
+ "parameters": self._map_primitive_parameters(
+ params_, user_values
),
}
for primitive in sorted(primitives):
try:
- # self.log.debug("Queuing action {}".format(primitives[primitive]['name']))
+ # self.log.debug("Queuing action {}".format(
+ # primitives[primitive]['name']))
uuids.append(
await self.ExecutePrimitive(
model_name,
application_name,
- primitives[primitive]['name'],
+ primitives[primitive]["name"],
callback,
callback_args,
- **primitives[primitive]['parameters'],
+ **primitives[primitive]["parameters"],
)
)
except PrimitiveDoesNotExist as e:
- self.log.debug("Ignoring exception PrimitiveDoesNotExist: {}".format(e))
+ self.log.debug(
+ "Ignoring exception PrimitiveDoesNotExist: {}".format(e)
+ )
pass
except Exception as e:
- self.log.debug("XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}".format(e))
+ self.log.debug(
+ (
+ "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}"
+ ).format(e)
+ )
raise e
except N2VCPrimitiveExecutionFailed as e:
- self.log.debug(
- "[N2VC] Exception executing primitive: {}".format(e)
- )
+ self.log.debug("[N2VC] Exception executing primitive: {}".format(e))
raise
return uuids
- async def ExecutePrimitive(self, model_name, application_name, primitive,
- callback, *callback_args, **params):
+ async def ExecutePrimitive(
+ self,
+ model_name,
+ application_name,
+ primitive,
+ callback,
+ *callback_args,
+ **params
+ ):
"""Execute a primitive of a charm for Day 1 or Day 2 configuration.
Execute a primitive defined in the VNF descriptor.
model = await self.get_model(model_name)
- if primitive == 'config':
+ if primitive == "config":
# config is special, and expecting params to be a dictionary
await self.set_config(
- model,
- application_name,
- params['params'],
+ model, application_name, params["params"],
)
else:
app = await self.get_application(model, application_name)
actions = await app.get_actions()
if primitive not in actions.keys():
- raise PrimitiveDoesNotExist("Primitive {} does not exist".format(primitive))
+ raise PrimitiveDoesNotExist(
+ "Primitive {} does not exist".format(primitive)
+ )
# Run against the first (and probably only) unit in the app
unit = app.units[0]
raise e
except Exception as e:
# An unexpected exception was caught
- self.log.debug(
- "Caught exception while executing primitive: {}".format(e)
- )
+ self.log.debug("Caught exception while executing primitive: {}".format(e))
raise N2VCPrimitiveExecutionFailed(e)
return uuid
- async def RemoveCharms(self, model_name, application_name, callback=None,
- *callback_args):
+ async def RemoveCharms(
+ self, model_name, application_name, callback=None, *callback_args
+ ):
"""Remove a charm from the VCA.
Remove a charm referenced in a VNF Descriptor.
# Remove this application from event monitoring
await self.Unsubscribe(model_name, application_name)
- # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
- self.log.debug(
- "Removing the application {}".format(application_name)
- )
+ # self.notify_callback(model_name, application_name, "removing",
+ # callback, *callback_args)
+ self.log.debug("Removing the application {}".format(application_name))
await app.remove()
# await self.disconnect_model(self.monitors[model_name])
models = await self.controller.list_models()
if ns_uuid not in models:
- try:
- self.models[ns_uuid] = await self.controller.add_model(
- ns_uuid
- )
- except JujuError as e:
- if "already exists" not in e.message:
- raise e
-
- # Create an observer for this model
- await self.create_model_monitor(ns_uuid)
+ # Get the new model
+ await self.get_model(ns_uuid)
return True
try:
await model.block_until(
lambda: all(
- unit.workload_status in ['terminated'] for unit in app.units
+ unit.workload_status in ["terminated"] for unit in app.units
),
- timeout=timeout
+ timeout=timeout,
+ )
+ except Exception:
+ self.log.debug(
+ "Timed out waiting for {} to terminate.".format(application)
)
- except Exception as e:
- self.log.debug("Timed out waiting for {} to terminate.".format(application))
for machine in model.machines:
try:
self.log.debug("Destroying machine {}".format(machine))
await model.machines[machine].destroy(force=True)
except JujuAPIError as e:
- if 'does not exist' in str(e):
+ if "does not exist" in str(e):
# Our cached model may be stale, because the machine
# has already been removed. It's safe to continue.
continue
the callback method
"""
self.monitors[ns_name].AddApplication(
- application_name,
- callback,
- *callback_args
+ application_name, callback, *callback_args
)
async def Unsubscribe(self, ns_name, application_name):
:param ns_name str: The name of the Network Service
:param application_name str: The name of the application
"""
- self.monitors[ns_name].RemoveApplication(
- application_name,
- )
+ self.monitors[ns_name].RemoveApplication(application_name,)
# Non-public methods
async def add_relation(self, model_name, relation1, relation2):
# If one of the applications in the relationship doesn't exist,
# or the relation has already been added, let the operation fail
# silently.
- if 'not found' in e.message:
+ if "not found" in e.message:
return
- if 'already exists' in e.message:
+ if "already exists" in e.message:
return
raise e
"""
config = {}
for primitive in config_primitive:
- if primitive['name'] == 'config':
+ if primitive["name"] == "config":
# config = self._map_primitive_parameters()
- for parameter in primitive['parameter']:
- param = str(parameter['name'])
- if parameter['value'] == "<rw_mgmt_ip>":
- config[param] = str(values[parameter['value']])
+ for parameter in primitive["parameter"]:
+ param = str(parameter["name"])
+ if parameter["value"] == "<rw_mgmt_ip>":
+ config[param] = str(values[parameter["value"]])
else:
- config[param] = str(parameter['value'])
+ config[param] = str(parameter["value"])
return config
def _map_primitive_parameters(self, parameters, user_values):
params = {}
for parameter in parameters:
- param = str(parameter['name'])
- value = parameter.get('value')
+ param = str(parameter["name"])
+ value = parameter.get("value")
- # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user_values.
+ # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user
+ # _values.
# Must exist at user_values except if there is a default value
if isinstance(value, str) and value.startswith("<") and value.endswith(">"):
- if parameter['value'][1:-1] in user_values:
- value = user_values[parameter['value'][1:-1]]
- elif 'default-value' in parameter:
- value = parameter['default-value']
+ if parameter["value"][1:-1] in user_values:
+ value = user_values[parameter["value"][1:-1]]
+ elif "default-value" in parameter:
+ value = parameter["default-value"]
else:
- raise KeyError("parameter {}='{}' not supplied ".format(param, value))
+ raise KeyError(
+ "parameter {}='{}' not supplied ".format(param, value)
+ )
# If there's no value, use the default-value (if set)
- if value is None and 'default-value' in parameter:
- value = parameter['default-value']
+ if value is None and "default-value" in parameter:
+ value = parameter["default-value"]
# Typecast parameter value, if present
paramtype = "string"
try:
- if 'data-type' in parameter:
- paramtype = str(parameter['data-type']).lower()
+ if "data-type" in parameter:
+ paramtype = str(parameter["data-type"]).lower()
if paramtype == "integer":
value = int(value)
# If there's no data-type, assume the value is a string
value = str(value)
except ValueError:
- raise ValueError("parameter {}='{}' cannot be converted to type {}".format(param, value, paramtype))
+ raise ValueError(
+ "parameter {}='{}' cannot be converted to type {}".format(
+ param, value, paramtype
+ )
+ )
params[param] = value
return params
"""Transform the yang config primitive to dict."""
config = {}
for primitive in config_primitive.values():
- if primitive['name'] == 'config':
- for parameter in primitive['parameter'].values():
- param = str(parameter['name'])
- if parameter['value'] == "<rw_mgmt_ip>":
- config[param] = str(values[parameter['value']])
+ if primitive["name"] == "config":
+ for parameter in primitive["parameter"].values():
+ param = str(parameter["name"])
+ if parameter["value"] == "<rw_mgmt_ip>":
+ config[param] = str(values[parameter["value"]])
else:
- config[param] = str(parameter['value'])
+ config[param] = str(parameter["value"])
return config
elif not c.isalpha():
c = "-"
appname += c
- return re.sub('-+', '-', appname.lower())
+ return re.sub("-+", "-", appname.lower())
# def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
# """Format the name of the application
Model names may only contain lowercase letters, digits and hyphens
"""
- return name.replace('_', '-').lower()
+ return name.replace("_", "-").lower()
async def get_application(self, model, application):
"""Get the deployed application."""
if model_name not in models:
try:
self.models[model_name] = await self.controller.add_model(
- model_name
+ model_name, config={"authorized-keys": self.juju_public_key}
)
except JujuError as e:
if "already exists" not in e.message:
raise e
else:
- self.models[model_name] = await self.controller.get_model(
- model_name
- )
+ self.models[model_name] = await self.controller.get_model(model_name)
- self.refcount['model'] += 1
+ self.refcount["model"] += 1
# Create an observer for this model
await self.create_model_monitor(model_name)
if self.secret:
self.log.debug(
- "Connecting to controller... ws://{}:{} as {}/{}".format(
- self.endpoint,
- self.port,
- self.user,
- self.secret,
+ "Connecting to controller... ws://{} as {}/{}".format(
+ self.endpoint, self.user, self.secret,
)
)
- await self.controller.connect(
- endpoint=self.endpoint,
- username=self.user,
- password=self.secret,
- cacert=self.ca_cert,
- )
- self.refcount['controller'] += 1
+ try:
+ await self.controller.connect(
+ endpoint=self.endpoint,
+ username=self.user,
+ password=self.secret,
+ cacert=self.ca_cert,
+ )
+ self.refcount["controller"] += 1
+ self.authenticated = True
+ self.log.debug("JujuApi: Logged into controller")
+ except Exception as ex:
+ self.log.debug("Caught exception: {}".format(ex))
else:
# current_controller no longer exists
# self.log.debug("Connecting to current controller...")
# cacert=cacert,
# )
self.log.fatal("VCA credentials not configured.")
-
- self.authenticated = True
- self.log.debug("JujuApi: Logged into controller")
+ self.authenticated = False
async def logout(self):
"""Logout of the Juju controller."""
await self.disconnect_model(model)
if self.controller:
- self.log.debug("Disconnecting controller {}".format(
- self.controller
- ))
+ self.log.debug("Disconnecting controller {}".format(self.controller))
await self.controller.disconnect()
- self.refcount['controller'] -= 1
+ self.refcount["controller"] -= 1
self.controller = None
self.authenticated = False
self.log.debug(self.refcount)
except Exception as e:
- self.log.fatal(
- "Fatal error logging out of Juju Controller: {}".format(e)
- )
+ self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e))
raise e
return True
if model in self.models:
try:
await self.models[model].disconnect()
- self.refcount['model'] -= 1
+ self.refcount["model"] -= 1
self.models[model] = None
except Exception as e:
self.log.debug("Caught exception: {}".format(e))
- async def provision_machine(self, model_name: str,
- hostname: str, username: str,
- private_key_path: str) -> int:
+ async def provision_machine(
+ self, model_name: str, hostname: str, username: str, private_key_path: str
+ ) -> int:
"""Provision a machine.
This executes the SSH provisioner, which will log in to a machine via
:param model_name str: The name of the model
:param hostname str: The IP or hostname of the target VM
:param user str: The username to login to
- :param private_key_path str: The path to the private key that's been injected to the VM via cloud-init
- :return machine_id int: Returns the id of the machine or None if provisioning fails
+ :param private_key_path str: The path to the private key that's been injected
+ to the VM via cloud-init
+ :return machine_id int: Returns the id of the machine or None if provisioning
+ fails
"""
if not self.authenticated:
await self.login()
machine_id = None
if self.api_proxy:
- self.log.debug("Instantiating SSH Provisioner for {}@{} ({})".format(
- username,
- hostname,
- private_key_path
- ))
+ self.log.debug(
+ "Instantiating SSH Provisioner for {}@{} ({})".format(
+ username, hostname, private_key_path
+ )
+ )
provisioner = SSHProvisioner(
host=hostname,
user=username,
return None
if params:
- params.jobs = ['JobHostUnits']
+ params.jobs = ["JobHostUnits"]
model = await self.get_model(model_name)
# as we need the machine_id
self.log.debug("Installing Juju agent")
await provisioner.install_agent(
- connection,
- params.nonce,
- machine_id,
- self.api_proxy,
+ connection, params.nonce, machine_id, self.api_proxy,
)
else:
self.log.debug("Missing API Proxy")
if not self.authenticated:
await self.login()
- m = await self.get_model()
- try:
- m.remove_relation(a, b)
- finally:
- await m.disconnect()
+ # m = await self.get_model()
+ # try:
+ # m.remove_relation(a, b)
+ # finally:
+ # await m.disconnect()
async def resolve_error(self, model_name, application=None):
"""Resolve units in error state."""
app = await self.get_application(model, application)
if app:
self.log.debug(
- "JujuApi: Resolving errors for application {}".format(
- application,
- )
+ "JujuApi: Resolving errors for application {}".format(application,)
)
- for unit in app.units:
+ for _ in app.units:
app.resolved(retry=True)
async def run_action(self, model_name, application, action_name, **params):
"""Execute an action and return an Action object."""
if not self.authenticated:
await self.login()
- result = {
- 'status': '',
- 'action': {
- 'tag': None,
- 'results': None,
- }
- }
+ result = {"status": "", "action": {"tag": None, "results": None}}
model = await self.get_model(model_name)
self.log.debug(
"JujuApi: Running Action {} against Application {}".format(
- action_name,
- application,
+ action_name, application,
)
)
# Wait for the action to complete
await action.wait()
- result['status'] = action.status
- result['action']['tag'] = action.data['id']
- result['action']['results'] = action.results
+ result["status"] = action.status
+ result["action"]["tag"] = action.data["id"]
+ result["action"]["results"] = action.results
return result
app = await self.get_application(model_name, application)
if app:
- self.log.debug("JujuApi: Setting config for Application {}".format(
- application,
- ))
+ self.log.debug(
+ "JujuApi: Setting config for Application {}".format(application,)
+ )
await app.set_config(config)
# Verify the config is set
newconf = await app.get_config()
for key in config:
- if config[key] != newconf[key]['value']:
- self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
+ if config[key] != newconf[key]["value"]:
+ self.log.debug(
+ (
+ "JujuApi: Config not set! Key {} Value {} doesn't match {}"
+ ).format(key, config[key], newconf[key])
+ )
# async def set_parameter(self, parameter, value, application=None):
# """Set a config parameter for a service."""
# return await self.apply_config(
# {parameter: value},
# application=application,
- # )
+ # )
- async def wait_for_application(self, model_name, application_name,
- timeout=300):
+ async def wait_for_application(self, model_name, application_name, timeout=300):
"""Wait for an application to become active."""
if not self.authenticated:
await self.login()
if app:
self.log.debug(
"JujuApi: Waiting {} seconds for Application {}".format(
- timeout,
- application_name,
+ timeout, application_name,
)
)
await model.block_until(
lambda: all(
- unit.agent_status == 'idle' and unit.workload_status in
- ['active', 'unknown'] for unit in app.units
+ unit.agent_status == "idle"
+ and unit.workload_status in ["active", "unknown"]
+ for unit in app.units
),
- timeout=timeout
+ timeout=timeout,
)