X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fvnf.py;h=a1fcfe3b60279fc32b7ab5aadb87d118e8b18f7e;hp=7c39fa11f37eb8063f7bd1efac5ef5b9a15f9f11;hb=refs%2Fheads%2Fnetslice;hpb=b09436613925b2eb334c10f219b743868e4b3fe5;ds=sidebyside diff --git a/n2vc/vnf.py b/n2vc/vnf.py index 7c39fa1..a1fcfe3 100644 --- a/n2vc/vnf.py +++ b/n2vc/vnf.py @@ -1,11 +1,13 @@ - +import asyncio import logging import os import os.path import re +import shlex import ssl +import subprocess import sys -import time +# import time # FIXME: this should load the juju inside or modules without having to # explicitly install it. Check why it's not working. @@ -16,7 +18,7 @@ if path not in sys.path: sys.path.insert(1, path) from juju.controller import Controller -from juju.model import Model, ModelObserver +from juju.model import ModelObserver # We might need this to connect to the websocket securely, but test and verify. @@ -83,23 +85,18 @@ class VCAMonitor(ModelObserver): application_name = delta.data['application'] callback = self.applications[application_name]['callback'] - callback_args = self.applications[application_name]['callback_args'] + callback_args = \ + self.applications[application_name]['callback_args'] if old and new: - old_status = old.workload_status - new_status = new.workload_status - - if old_status == new_status: - """The workload status may fluctuate around certain events, - so wait until the status has stabilized before triggering - the callback.""" - if callback: - callback( - self.ns_name, - delta.data['application'], - new_status, - new.workload_status_message, - *callback_args) + # Fire off a callback with the application state + if callback: + callback( + self.ns_name, + delta.data['application'], + new.workload_status, + new.workload_status_message, + *callback_args) if old and not new: # This is a charm being removed @@ -111,7 +108,8 @@ class VCAMonitor(ModelObserver): "", *callback_args) except Exception as e: - self.log.debug("[1] notify_callback exception {}".format(e)) + self.log.debug("[1] notify_callback exception: {}".format(e)) + elif delta.entity == "action": # TODO: Decide how we want to notify the user of actions @@ -138,33 +136,14 @@ class VCAMonitor(ModelObserver): class N2VC: - - # Juju API - api = None - log = None - controller = None - connecting = False - authenticated = False - - models = {} - default_model = None - - # Model Observers - monitors = {} - - # VCA config - hostname = "" - port = 17070 - username = "" - secret = "" - def __init__(self, log=None, server='127.0.0.1', port=17070, user='admin', secret=None, - artifacts=None + artifacts=None, + loop=None, ): """Initialize N2VC @@ -181,9 +160,33 @@ class N2VC: 'port': 17070, 'artifacts': '/path/to/charms' }) - """ + # Initialize instance-level variables + self.api = None + self.log = None + self.controller = None + self.connecting = False + self.authenticated = False + + # For debugging + self.refcount = { + 'controller': 0, + 'model': 0, + } + + self.models = {} + self.default_model = None + + # Model Observers + self.monitors = {} + + # VCA config + self.hostname = "" + self.port = 17070 + self.username = "" + self.secret = "" + if log: self.log = log else: @@ -210,14 +213,22 @@ class N2VC: self.artifacts = artifacts + self.loop = loop or asyncio.get_event_loop() + def __del__(self): """Close any open connections.""" yield self.logout() - def notify_callback(self, model_name, application_name, status, message, callback=None, *callback_args): + def notify_callback(self, model_name, application_name, status, message, + callback=None, *callback_args): try: if callback: - callback(model_name, application_name, status, message, *callback_args) + callback( + model_name, + application_name, + status, message, + *callback_args, + ) except Exception as e: self.log.error("[0] notify_callback exception {}".format(e)) raise e @@ -243,7 +254,9 @@ class N2VC: return self.default_model - async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args): + async def DeployCharms(self, model_name, application_name, vnfd, + charm_path, params={}, machine_spec={}, + callback=None, *callback_args): """Deploy one or more charms associated with a VNF. Deploy the charm(s) referenced in a VNF Descriptor. @@ -259,14 +272,16 @@ class N2VC: # Pass the initial-config-primitives section of the vnf or vdu 'initial-config-primitives': {...} } - :param dict machine_spec: A dictionary describing the machine to install to + :param dict machine_spec: A dictionary describing the machine to + install to Examples:: { 'hostname': '1.2.3.4', 'username': 'ubuntu', } :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the callback + :param tuple callback_args: A list of arguments to be passed to the + callback """ ######################################################## @@ -274,7 +289,13 @@ class N2VC: ######################################################## if not os.path.exists(charm_path): self.log.debug("Charm path doesn't exist: {}".format(charm_path)) - self.notify_callback(model_name, application_name, "failed", callback, *callback_args) + self.notify_callback( + model_name, + application_name, + "failed", + callback, + *callback_args, + ) raise JujuCharmNotFound("No artifacts configured.") ################################ @@ -314,15 +335,14 @@ class N2VC: ######################################################## to = "" if machine_spec.keys(): - # TODO: This needs to be tested. - # if all(k in machine_spec for k in ['hostname', 'username']): - # # Enlist the existing machine in Juju - # machine = await self.model.add_machine(spec='ssh:%@%'.format( - # specs['host'], - # specs['user'], - # )) - # to = machine.id - pass + if all(k in machine_spec for k in ['host', 'user']): + # Enlist an existing machine as a Juju unit + machine = await model.add_machine(spec='ssh:{}@{}:{}'.format( + machine_spec['user'], + machine_spec['host'], + self.GetPrivateKeyPath(), + )) + to = machine.id ####################################### # Get the initial charm configuration # @@ -332,6 +352,9 @@ class N2VC: if 'rw_mgmt_ip' in params: rw_mgmt_ip = params['rw_mgmt_ip'] + if 'initial-config-primitive' not in params: + params['initial-config-primitive'] = {} + initial_config = self._get_config_from_dict( params['initial-config-primitive'], {'': rw_mgmt_ip} @@ -357,20 +380,198 @@ class N2VC: series='xenial', # Apply the initial 'config' primitive during deployment config=initial_config, - # TBD: Where to deploy the charm to. - to=None, + # Where to deploy the charm to. + to=to, ) # ####################################### # # Execute initial config primitive(s) # # ####################################### + await self.ExecuteInitialPrimitives( + model_name, + application_name, + params, + ) + + # primitives = {} + # + # # Build a sequential list of the primitives to execute + # for primitive in params['initial-config-primitive']: + # try: + # if primitive['name'] == 'config': + # # This is applied when the Application is deployed + # pass + # else: + # seq = primitive['seq'] + # + # params = {} + # if 'parameter' in primitive: + # params = primitive['parameter'] + # + # primitives[seq] = { + # 'name': primitive['name'], + # 'parameters': self._map_primitive_parameters( + # params, + # {'': rw_mgmt_ip} + # ), + # } + # + # for primitive in sorted(primitives): + # await self.ExecutePrimitive( + # model_name, + # application_name, + # primitives[primitive]['name'], + # callback, + # callback_args, + # **primitives[primitive]['parameters'], + # ) + # except N2VCPrimitiveExecutionFailed as e: + # self.log.debug( + # "[N2VC] Exception executing primitive: {}".format(e) + # ) + # raise + + async def GetPrimitiveStatus(self, model_name, uuid): + """Get the status of an executed Primitive. + + The status of an executed Primitive will be one of three values: + - completed + - failed + - running + """ + status = None + try: + if not self.authenticated: + await self.login() + + # FIXME: This is hard-coded until model-per-ns is added + model_name = 'default' + + model = await self.get_model(model_name) + + results = await model.get_action_status(uuid) + + if uuid in results: + status = results[uuid] + + except Exception as e: + self.log.debug( + "Caught exception while getting primitive status: {}".format(e) + ) + raise N2VCPrimitiveExecutionFailed(e) + + return status + + async def GetPrimitiveOutput(self, model_name, uuid): + """Get the output of an executed Primitive. + + Note: this only returns output for a successfully executed primitive. + """ + results = None + try: + if not self.authenticated: + await self.login() + + # FIXME: This is hard-coded until model-per-ns is added + model_name = 'default' + + model = await self.get_model(model_name) + results = await model.get_action_output(uuid, 60) + except Exception as e: + self.log.debug( + "Caught exception while getting primitive status: {}".format(e) + ) + raise N2VCPrimitiveExecutionFailed(e) + + return results + + # async def ProvisionMachine(self, model_name, hostname, username): + # """Provision machine for usage with Juju. + # + # Provisions a previously instantiated machine for use with Juju. + # """ + # try: + # if not self.authenticated: + # await self.login() + # + # # FIXME: This is hard-coded until model-per-ns is added + # model_name = 'default' + # + # model = await self.get_model(model_name) + # model.add_machine(spec={}) + # + # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format( + # "ubuntu", + # host['address'], + # private_key_path, + # )) + # return machine.id + # + # except Exception as e: + # self.log.debug( + # "Caught exception while getting primitive status: {}".format(e) + # ) + # raise N2VCPrimitiveExecutionFailed(e) + + def GetPrivateKeyPath(self): + homedir = os.environ['HOME'] + sshdir = "{}/.ssh".format(homedir) + private_key_path = "{}/id_n2vc_rsa".format(sshdir) + return private_key_path + + async def GetPublicKey(self): + """Get the N2VC SSH public key.abs + + Returns the SSH public key, to be injected into virtual machines to + be managed by the VCA. + + The first time this is run, a ssh keypair will be created. The public + key is injected into a VM so that we can provision the machine with + Juju, after which Juju will communicate with the VM directly via the + juju agent. + """ + public_key = "" + + # Find the path to where we expect our key to live. + homedir = os.environ['HOME'] + sshdir = "{}/.ssh".format(homedir) + if not os.path.exists(sshdir): + os.mkdir(sshdir) + + private_key_path = "{}/id_n2vc_rsa".format(sshdir) + public_key_path = "{}.pub".format(private_key_path) + + # If we don't have a key generated, generate it. + if not os.path.exists(private_key_path): + cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( + "rsa", + "4096", + private_key_path + ) + subprocess.check_output(shlex.split(cmd)) + + # Read the public key + with open(public_key_path, "r") as f: + public_key = f.readline() + + return public_key + + async def ExecuteInitialPrimitives(self, model_name, application_name, + params, callback=None, *callback_args): + """Execute multiple primitives. + + Execute multiple primitives as declared in initial-config-primitive. + This is useful in cases where the primitives initially failed -- for + example, if the charm is a proxy but the proxy hasn't been configured + yet. + """ + uuids = [] primitives = {} # Build a sequential list of the primitives to execute for primitive in params['initial-config-primitive']: try: if primitive['name'] == 'config': - # This is applied when the Application is deployed pass else: seq = primitive['seq'] @@ -383,49 +584,30 @@ class N2VC: 'name': primitive['name'], 'parameters': self._map_primitive_parameters( params, - {'': rw_mgmt_ip} + {'': None} ), } for primitive in sorted(primitives): - await self.ExecutePrimitive( - model_name, - application_name, - primitives[primitive]['name'], - callback, - callback_args, - **primitives[primitive]['parameters'], + uuids.append( + await self.ExecutePrimitive( + model_name, + application_name, + primitives[primitive]['name'], + callback, + callback_args, + **primitives[primitive]['parameters'], + ) ) except N2VCPrimitiveExecutionFailed as e: self.log.debug( "[N2VC] Exception executing primitive: {}".format(e) ) raise + return uuids - async def GetPrimitiveStatus(self, model_name, uuid): - results = None - try: - if not self.authenticated: - await self.login() - - # FIXME: This is hard-coded until model-per-ns is added - model_name = 'default' - - model = await self.controller.get_model(model_name) - - results = await model.get_action_output(uuid) - - await model.disconnect() - except Exception as e: - self.log.debug( - "Caught exception while getting primitive status: {}".format(e) - ) - raise N2VCPrimitiveExecutionFailed(e) - - return results - - - async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params): + async def ExecutePrimitive(self, model_name, application_name, primitive, + callback, *callback_args, **params): """Execute a primitive of a charm for Day 1 or Day 2 configuration. Execute a primitive defined in the VNF descriptor. @@ -434,8 +616,10 @@ class N2VC: :param str application_name: The name of the application :param str primitive: The name of the primitive to execute. :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the callback function. - :param dict params: A dictionary of key=value pairs representing the primitive's parameters + :param tuple callback_args: A list of arguments to be passed to the + callback function. + :param dict params: A dictionary of key=value pairs representing the + primitive's parameters Examples:: { 'rw_mgmt_ip': '1.2.3.4', @@ -443,6 +627,7 @@ class N2VC: 'initial-config-primitives': {...} } """ + self.log.debug("Executing {}".format(primitive)) uuid = None try: if not self.authenticated: @@ -451,7 +636,7 @@ class N2VC: # FIXME: This is hard-coded until model-per-ns is added model_name = 'default' - model = await self.controller.get_model(model_name) + model = await self.get_model(model_name) if primitive == 'config': # config is special, and expecting params to be a dictionary @@ -466,12 +651,8 @@ class N2VC: # Run against the first (and probably only) unit in the app unit = app.units[0] if unit: - self.log.debug( - "Executing primitive {}".format(primitive) - ) action = await unit.run_action(primitive, **params) uuid = action.id - await model.disconnect() except Exception as e: self.log.debug( "Caught exception while executing primitive: {}".format(e) @@ -479,7 +660,8 @@ class N2VC: raise N2VCPrimitiveExecutionFailed(e) return uuid - async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args): + async def RemoveCharms(self, model_name, application_name, callback=None, + *callback_args): """Remove a charm from the VCA. Remove a charm referenced in a VNF Descriptor. @@ -487,7 +669,8 @@ class N2VC: :param str model_name: The name of the network service. :param str application_name: The name of the application :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the callback function. + :param tuple callback_args: A list of arguments to be passed to the + callback function. """ try: if not self.authenticated: @@ -500,11 +683,19 @@ class N2VC: self.monitors[model_name].RemoveApplication(application_name) # self.notify_callback(model_name, application_name, "removing", callback, *callback_args) - self.log.debug("Removing the application {}".format(application_name)) + self.log.debug( + "Removing the application {}".format(application_name) + ) await app.remove() # Notify the callback that this charm has been removed. - self.notify_callback(model_name, application_name, "removed", callback, *callback_args) + self.notify_callback( + model_name, + application_name, + "removed", + callback, + *callback_args, + ) except Exception as e: print("Caught exception: {}".format(e)) @@ -528,6 +719,13 @@ class N2VC: return metrics + async def HasApplication(self, model_name, application_name): + model = await self.get_model(model_name) + app = await self.get_application(model, application_name) + if app: + return True + return False + # Non-public methods async def add_relation(self, a, b, via=None): """ @@ -580,10 +778,23 @@ class N2VC: params = {} for parameter in parameters: param = str(parameter['name']) + + # Typecast parameter value, if present + if 'data-type' in parameter: + paramtype = str(parameter['data-type']).lower() + value = None + + if paramtype == "integer": + value = int(parameter['value']) + elif paramtype == "boolean": + value = bool(parameter['value']) + else: + value = str(parameter['value']) + if parameter['value'] == "": params[param] = str(values[parameter['value']]) else: - params[param] = str(parameter['value']) + params[param] = value return params def _get_config_from_yang(self, config_primitive, values): @@ -600,6 +811,7 @@ class N2VC: return config + @staticmethod def FormatApplicationName(self, *args): """ Generate a Juju-compatible Application name @@ -625,7 +837,6 @@ class N2VC: appname += c return re.sub('\-+', '-', appname.lower()) - # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0): # """Format the name of the application # @@ -673,8 +884,10 @@ class N2VC: await self.login() if model_name not in self.models: - print("connecting to model {}".format(model_name)) - self.models[model_name] = await self.controller.get_model(model_name) + self.models[model_name] = await self.controller.get_model( + model_name, + ) + self.refcount['model'] += 1 # Create an observer for this model self.monitors[model_name] = VCAMonitor(model_name) @@ -693,16 +906,24 @@ class N2VC: self.log.debug("JujuApi: Logging into controller") cacert = None - self.controller = Controller() + self.controller = Controller(loop=self.loop) if self.secret: - self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret)) + self.log.debug( + "Connecting to controller... ws://{}:{} as {}/{}".format( + self.endpoint, + self.port, + self.user, + self.secret, + ) + ) await self.controller.connect( endpoint=self.endpoint, username=self.user, password=self.secret, cacert=cacert, ) + self.refcount['controller'] += 1 else: # current_controller no longer exists # self.log.debug("Connecting to current controller...") @@ -717,8 +938,6 @@ class N2VC: self.authenticated = True self.log.debug("JujuApi: Logged into controller") - # self.default_model = await self.controller.get_model("default") - async def logout(self): """Logout of the Juju controller.""" if not self.authenticated: @@ -726,24 +945,36 @@ class N2VC: try: if self.default_model: - self.log.debug("Disconnecting model {}".format(self.default_model)) + self.log.debug("Disconnecting model {}".format( + self.default_model + )) await self.default_model.disconnect() + self.refcount['model'] -= 1 self.default_model = None for model in self.models: await self.models[model].disconnect() + self.refcount['model'] -= 1 + self.models[model] = None if self.controller: - self.log.debug("Disconnecting controller {}".format(self.controller)) + self.log.debug("Disconnecting controller {}".format( + self.controller + )) await self.controller.disconnect() - # self.controller = None + self.refcount['controller'] -= 1 + self.controller = None self.authenticated = False + + self.log.debug(self.refcount) + except Exception as e: - self.log.fail("Fatal error logging out of Juju Controller: {}".format(e)) + self.log.fatal( + "Fatal error logging out of Juju Controller: {}".format(e) + ) raise e - # async def remove_application(self, name): # """Remove the application.""" # if not self.authenticated: @@ -780,9 +1011,11 @@ class N2VC: app = await self.get_application(self.default_model, application) if app: - self.log.debug("JujuApi: Resolving errors for application {}".format( - application, - )) + self.log.debug( + "JujuApi: Resolving errors for application {}".format( + application, + ) + ) for unit in app.units: app.resolved(retry=True) @@ -804,10 +1037,12 @@ class N2VC: # so use the first unit available. unit = app.units[0] - self.log.debug("JujuApi: Running Action {} against Application {}".format( - action_name, - application, - )) + self.log.debug( + "JujuApi: Running Action {} against Application {}".format( + action_name, + application, + ) + ) action = await unit.run_action(action_name, **params) @@ -853,25 +1088,32 @@ class N2VC: # application=application, # ) - async def wait_for_application(self, name, timeout=300): + async def wait_for_application(self, model_name, application_name, + timeout=300): """Wait for an application to become active.""" if not self.authenticated: await self.login() - app = await self.get_application(self.default_model, name) + # TODO: In a point release, we will use a model per deployed network + # service. In the meantime, we will always use the 'default' model. + model_name = 'default' + model = await self.get_model(model_name) + + app = await self.get_application(model, application_name) + self.log.debug("Application: {}".format(app)) + # app = await self.get_application(model_name, application_name) if app: self.log.debug( "JujuApi: Waiting {} seconds for Application {}".format( timeout, - name, + application_name, ) ) - await self.default_model.block_until( + await model.block_until( lambda: all( - unit.agent_status == 'idle' - and unit.workload_status - in ['active', 'unknown'] for unit in app.units + unit.agent_status == 'idle' and unit.workload_status in + ['active', 'unknown'] for unit in app.units ), timeout=timeout )