From 8bfcc14713a71f43f155e3cddec168380134d344 Mon Sep 17 00:00:00 2001 From: David Garcia Date: Mon, 14 Sep 2020 15:10:04 +0200 Subject: [PATCH] Revert ""Remove unused lines of code"" This reverts commit e8102d9e28e5c502fc66ca842d14e1ad29efbfda. Change-Id: Ic22f292f601f45451d3fdd56ab98bfa4ea9161eb Signed-off-by: David Garcia --- n2vc/juju_observer.py | 318 +++++ n2vc/n2vc_conn.py | 31 + n2vc/n2vc_juju_conn.py | 722 ++++++++++- n2vc/provisioner.py | 367 ++++++ n2vc/tests/unit/test_juju_observer.py | 159 +++ n2vc/tests/unit/test_provisioner.py | 142 ++- n2vc/vnf.py | 1619 +++++++++++++++++++++++++ requirements.txt | 1 + setup.py | 1 + 9 files changed, 3356 insertions(+), 4 deletions(-) create mode 100644 n2vc/juju_observer.py create mode 100644 n2vc/tests/unit/test_juju_observer.py create mode 100644 n2vc/vnf.py diff --git a/n2vc/juju_observer.py b/n2vc/juju_observer.py new file mode 100644 index 0000000..29ae932 --- /dev/null +++ b/n2vc/juju_observer.py @@ -0,0 +1,318 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +import asyncio +import time + +from juju.action import Action +from juju.application import Application +from juju.machine import Machine +from juju.model import ModelObserver, Model + +from n2vc.exceptions import N2VCTimeoutException +from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status + + +class _Entity: + def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict): + self.entity_id = entity_id + self.entity_type = entity_type + self.obj = obj + self.event = asyncio.Event() + self.db_dict = db_dict + + +class JujuModelObserver(ModelObserver): + def __init__(self, n2vc: N2VCConnector, model: Model): + self.n2vc = n2vc + self.model = model + model.add_observer(self) + self.machines = dict() + self.applications = dict() + self.actions = dict() + + def register_machine(self, machine: Machine, db_dict: dict): + try: + entity_id = machine.entity_id + except Exception: + # no entity_id aatribute, try machine attribute + entity_id = machine.machine + # self.n2vc.debug( + # msg='Registering machine for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict + ) + self.machines[entity_id] = entity + + def unregister_machine(self, machine_id: str): + if machine_id in self.machines: + del self.machines[machine_id] + + def is_machine_registered(self, machine_id: str): + return machine_id in self.machines + + def register_application(self, application: Application, db_dict: dict): + entity_id = application.entity_id + # self.n2vc.debug( + # msg='Registering application for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, + entity_type="application", + obj=application, + db_dict=db_dict, + ) + self.applications[entity_id] = entity + + def unregister_application(self, application_id: str): + if application_id in self.applications: + del self.applications[application_id] + + def is_application_registered(self, application_id: str): + return application_id in self.applications + + def register_action(self, action: Action, db_dict: dict): + entity_id = action.entity_id + # self.n2vc.debug( + # msg='Registering action for changes notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict + ) + self.actions[entity_id] = entity + + def unregister_action(self, action_id: str): + if action_id in self.actions: + del self.actions[action_id] + + def is_action_registered(self, action_id: str): + return action_id in self.actions + + async def wait_for_machine( + self, + machine_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: + + if not self.is_machine_registered(machine_id): + return + + self.n2vc.debug("Waiting for machine completed: {}".format(machine_id)) + + # wait for a final state + entity = self.machines[machine_id] + return await self._wait_for_entity( + entity=entity, + field_to_check="agent_status", + final_states_list=["started"], + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + async def wait_for_application( + self, + application_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: + + if not self.is_application_registered(application_id): + return + + self.n2vc.debug("Waiting for application completed: {}".format(application_id)) + + # application statuses: unknown, active, waiting + # wait for a final state + entity = self.applications[application_id] + return await self._wait_for_entity( + entity=entity, + field_to_check="status", + final_states_list=["active", "blocked"], + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + async def wait_for_action( + self, + action_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: + + if not self.is_action_registered(action_id): + return + + self.n2vc.debug("Waiting for action completed: {}".format(action_id)) + + # action statuses: pending, running, completed, failed, cancelled + # wait for a final state + entity = self.actions[action_id] + return await self._wait_for_entity( + entity=entity, + field_to_check="status", + final_states_list=["completed", "failed", "cancelled"], + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + async def _wait_for_entity( + self, + entity: _Entity, + field_to_check: str, + final_states_list: list, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: + + # default values for no timeout + if total_timeout is None: + total_timeout = 3600 + if progress_timeout is None: + progress_timeout = 3600 + + # max end time + now = time.time() + total_end = now + total_timeout + + if now >= total_end: + raise N2VCTimeoutException( + message="Total timeout {} seconds, {}: {}".format( + total_timeout, entity.entity_type, entity.entity_id + ), + timeout="total", + ) + + # update next progress timeout + progress_end = now + progress_timeout # type: float + + # which is closest? progress or end timeout? + closest_end = min(total_end, progress_end) + + next_timeout = closest_end - now + + retries = 0 + + while entity.obj.__getattribute__(field_to_check) not in final_states_list: + retries += 1 + if await _wait_for_event_or_timeout(entity.event, next_timeout): + entity.event.clear() + else: + message = "Progress timeout {} seconds, {}: {}".format( + progress_timeout, entity.entity_type, entity.entity_id + ) + self.n2vc.debug(message) + raise N2VCTimeoutException(message=message, timeout="progress") + # self.n2vc.debug('End of wait. Final state: {}, retries: {}' + # .format(entity.obj.__getattribute__(field_to_check), retries)) + return retries + + async def on_change(self, delta, old, new, model): + + if new is None: + return + + # log + # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}' + # .format(delta.type, delta.entity, new.entity_id)) + + if delta.entity == "machine": + + # check registered machine + if new.entity_id not in self.machines: + return + + # write change in database + await self.n2vc.write_app_status_to_db( + db_dict=self.machines[new.entity_id].db_dict, + status=juju_status_2_osm_status(delta.entity, new.agent_status), + detailed_status=new.status_message, + vca_status=new.status, + entity_type="machine", + ) + + # set event for this machine + self.machines[new.entity_id].event.set() + + elif delta.entity == "application": + + # check registered application + if new.entity_id not in self.applications: + return + + # write change in database + await self.n2vc.write_app_status_to_db( + db_dict=self.applications[new.entity_id].db_dict, + status=juju_status_2_osm_status(delta.entity, new.status), + detailed_status=new.status_message, + vca_status=new.status, + entity_type="application", + ) + + # set event for this application + self.applications[new.entity_id].event.set() + + elif delta.entity == "unit": + + # get the application for this unit + application_id = delta.data["application"] + + # check registered application + if application_id not in self.applications: + return + + # write change in database + if not new.dead: + await self.n2vc.write_app_status_to_db( + db_dict=self.applications[application_id].db_dict, + status=juju_status_2_osm_status(delta.entity, new.workload_status), + detailed_status=new.workload_status_message, + vca_status=new.workload_status, + entity_type="unit", + ) + + # set event for this application + self.applications[application_id].event.set() + + elif delta.entity == "action": + + # check registered action + if new.entity_id not in self.actions: + return + + # write change in database + await self.n2vc.write_app_status_to_db( + db_dict=self.actions[new.entity_id].db_dict, + status=juju_status_2_osm_status(delta.entity, new.status), + detailed_status=new.status, + vca_status=new.status, + entity_type="action", + ) + + # set event for this application + self.actions[new.entity_id].event.set() + + +async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None): + try: + await asyncio.wait_for(fut=event.wait(), timeout=timeout) + except asyncio.TimeoutError: + pass + return event.is_set() diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index b9c3002..5e11b5f 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -499,6 +499,37 @@ class N2VCConnector(abc.ABC, Loggable): return JujuStatusToOSM[entity_type][status] +# DEPRECATED +def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: + if statustype == "application" or statustype == "unit": + if status in ["waiting", "maintenance"]: + return N2VCDeploymentStatus.RUNNING + if status in ["error"]: + return N2VCDeploymentStatus.FAILED + elif status in ["active"]: + return N2VCDeploymentStatus.COMPLETED + elif status in ["blocked"]: + return N2VCDeploymentStatus.RUNNING + else: + return N2VCDeploymentStatus.UNKNOWN + elif statustype == "action": + if status in ["running"]: + return N2VCDeploymentStatus.RUNNING + elif status in ["completed"]: + return N2VCDeploymentStatus.COMPLETED + else: + return N2VCDeploymentStatus.UNKNOWN + elif statustype == "machine": + if status in ["pending"]: + return N2VCDeploymentStatus.PENDING + elif status in ["started"]: + return N2VCDeploymentStatus.COMPLETED + else: + return N2VCDeploymentStatus.UNKNOWN + + return N2VCDeploymentStatus.FAILED + + def obj_to_yaml(obj: object) -> str: # dump to yaml dump_text = yaml.dump(obj, default_flow_style=False, indent=2) diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index fff78c9..31bdd6e 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -26,19 +26,29 @@ import binascii import logging import os import re - +import time + +from juju.action import Action +from juju.application import Application +from juju.client import client +from juju.controller import Controller +from juju.errors import JujuAPIError +from juju.machine import Machine +from juju.model import Model from n2vc.exceptions import ( N2VCBadArgumentsException, N2VCException, N2VCConnectionException, N2VCExecutionException, N2VCInvalidCertificate, - # N2VCNotFound, + N2VCNotFound, MethodNotImplemented, JujuK8sProxycharmNotSupported, ) +from n2vc.juju_observer import JujuModelObserver from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml +from n2vc.provisioner import AsyncSSHProvisioner from n2vc.libjuju import Libjuju @@ -1024,6 +1034,714 @@ class N2VCJujuConnector(N2VCConnector): return N2VCJujuConnector._format_app_name(application_name) + async def _juju_create_machine( + self, + model_name: str, + application_name: str, + machine_id: str = None, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ) -> Machine: + + self.log.debug( + "creating machine in model: {}, existing machine id: {}".format( + model_name, machine_id + ) + ) + + # get juju model and observer (create model if needed) + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # find machine id in model + machine = None + if machine_id is not None: + self.log.debug("Finding existing machine id {} in model".format(machine_id)) + # get juju existing machines in the model + existing_machines = await model.get_machines() + if machine_id in existing_machines: + self.log.debug( + "Machine id {} found in model (reusing it)".format(machine_id) + ) + machine = model.machines[machine_id] + + if machine is None: + self.log.debug("Creating a new machine in juju...") + # machine does not exist, create it and wait for it + machine = await model.add_machine( + spec=None, constraints=None, disks=None, series="xenial" + ) + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # id for the execution environment + ee_id = N2VCJujuConnector._build_ee_id( + model_name=model_name, + application_name=application_name, + machine_id=str(machine.entity_id), + ) + + # write ee_id in database + self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id) + + # wait for machine creation + await observer.wait_for_machine( + machine_id=str(machine.entity_id), + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + else: + + self.log.debug("Reusing old machine pending") + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # machine does exist, but it is in creation process (pending), wait for + # create finalisation + await observer.wait_for_machine( + machine_id=machine.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + self.log.debug("Machine ready at " + str(machine.dns_name)) + return machine + + async def _juju_provision_machine( + self, + model_name: str, + hostname: str, + username: str, + private_key_path: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ) -> str: + + if not self.api_proxy: + msg = "Cannot provision machine: api_proxy is not defined" + self.log.error(msg=msg) + raise N2VCException(message=msg) + + self.log.debug( + "provisioning machine. model: {}, hostname: {}, username: {}".format( + model_name, hostname, username + ) + ) + + if not self._authenticated: + await self._juju_login() + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # TODO check if machine is already provisioned + machine_list = await model.get_machines() + + provisioner = AsyncSSHProvisioner( + host=hostname, + user=username, + private_key_path=private_key_path, + log=self.log, + ) + + params = None + try: + params = await provisioner.provision_machine() + except Exception as ex: + msg = "Exception provisioning machine: {}".format(ex) + self.log.error(msg) + raise N2VCException(message=msg) + + params.jobs = ["JobHostUnits"] + + connection = model.connection() + + # Submit the request. + self.log.debug("Adding machine to model") + client_facade = client.ClientFacade.from_connection(connection) + results = await client_facade.AddMachines(params=[params]) + error = results.machines[0].error + if error: + msg = "Error adding machine: {}".format(error.message) + self.log.error(msg=msg) + raise ValueError(msg) + + machine_id = results.machines[0].machine + + # Need to run this after AddMachines has been called, + # as we need the machine_id + self.log.debug("Installing Juju agent into machine {}".format(machine_id)) + asyncio.ensure_future( + provisioner.install_agent( + connection=connection, + nonce=params.nonce, + machine_id=machine_id, + proxy=self.api_proxy, + ) + ) + + # wait for machine in model (now, machine is not yet in model, so we must + # wait for it) + machine = None + for _ in range(10): + machine_list = await model.get_machines() + if machine_id in machine_list: + self.log.debug("Machine {} found in model!".format(machine_id)) + machine = model.machines.get(machine_id) + break + await asyncio.sleep(2) + + if machine is None: + msg = "Machine {} not found in model".format(machine_id) + self.log.error(msg=msg) + raise Exception(msg) + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # wait for machine creation + self.log.debug("waiting for provision finishes... {}".format(machine_id)) + await observer.wait_for_machine( + machine_id=machine_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + self.log.debug("Machine provisioned {}".format(machine_id)) + + return machine_id + + async def _juju_deploy_charm( + self, + model_name: str, + application_name: str, + charm_path: str, + machine_id: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + ) -> (Application, int): + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # check if application already exists + application = None + if application_name in model.applications: + application = model.applications[application_name] + + if application is None: + + # application does not exist, create it and wait for it + self.log.debug( + "deploying application {} to machine {}, model {}".format( + application_name, machine_id, model_name + ) + ) + self.log.debug("charm: {}".format(charm_path)) + machine = model.machines[machine_id] + # series = None + application = await model.deploy( + entity_url=charm_path, + application_name=application_name, + channel="stable", + num_units=1, + series=machine.series, + to=machine_id, + config=config, + ) + + # register application with observer + observer.register_application(application=application, db_dict=db_dict) + + self.log.debug( + "waiting for application deployed... {}".format(application.entity_id) + ) + retries = await observer.wait_for_application( + application_id=application.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("application deployed") + + else: + + # register application with observer + observer.register_application(application=application, db_dict=db_dict) + + # application already exists, but not finalised + self.log.debug("application already exists, waiting for deployed...") + retries = await observer.wait_for_application( + application_id=application.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("application deployed") + + return application, retries + + async def _juju_execute_action( + self, + model_name: str, + application_name: str, + action_name: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs + ) -> Action: + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) + + unit = None + for u in application.units: + if await u.is_leader_from_status(): + unit = u + if unit is not None: + actions = await application.get_actions() + if action_name in actions: + self.log.debug( + 'executing action "{}" using params: {}'.format(action_name, kwargs) + ) + action = await unit.run_action(action_name, **kwargs) + + # register action with observer + observer.register_action(action=action, db_dict=db_dict) + + await observer.wait_for_action( + action_id=action.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("action completed with status: {}".format(action.status)) + output = await model.get_action_output(action_uuid=action.entity_id) + status = await model.get_action_status(uuid_or_prefix=action.entity_id) + if action.entity_id in status: + status = status[action.entity_id] + else: + status = "failed" + return output, status + + raise N2VCExecutionException( + message="Cannot execute action on charm", primitive_name=action_name + ) + + async def _juju_configure_application( + self, + model_name: str, + application_name: str, + config: dict, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + ): + + # get the application + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) + + self.log.debug( + "configuring the application {} -> {}".format(application_name, config) + ) + res = await application.set_config(config) + self.log.debug( + "application {} configured. res={}".format(application_name, res) + ) + + # Verify the config is set + new_conf = await application.get_config() + for key in config: + value = new_conf[key]["value"] + self.log.debug(" {} = {}".format(key, value)) + if config[key] != value: + raise N2VCException( + message="key {} is not configured correctly {} != {}".format( + key, config[key], new_conf[key] + ) + ) + + # check if 'verify-ssh-credentials' action exists + # unit = application.units[0] + actions = await application.get_actions() + if "verify-ssh-credentials" not in actions: + msg = ( + "Action verify-ssh-credentials does not exist in application {}" + ).format(application_name) + self.log.debug(msg=msg) + return False + + # execute verify-credentials + num_retries = 20 + retry_timeout = 15.0 + for _ in range(num_retries): + try: + self.log.debug("Executing action verify-ssh-credentials...") + output, ok = await self._juju_execute_action( + model_name=model_name, + application_name=application_name, + action_name="verify-ssh-credentials", + db_dict=db_dict, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("Result: {}, output: {}".format(ok, output)) + return True + except asyncio.CancelledError: + raise + except Exception as e: + self.log.debug( + "Error executing verify-ssh-credentials: {}. Retrying...".format(e) + ) + await asyncio.sleep(retry_timeout) + else: + self.log.error( + "Error executing verify-ssh-credentials after {} retries. ".format( + num_retries + ) + ) + return False + + async def _juju_get_application(self, model_name: str, application_name: str): + """Get the deployed application.""" + + model = await self._juju_get_model(model_name=model_name) + + application_name = N2VCJujuConnector._format_app_name(application_name) + + if model.applications and application_name in model.applications: + return model.applications[application_name] + else: + raise N2VCException( + message="Cannot get application {} from model {}".format( + application_name, model_name + ) + ) + + async def _juju_get_model(self, model_name: str) -> Model: + """ Get a model object from juju controller + If the model does not exits, it creates it. + + :param str model_name: name of the model + :returns Model: model obtained from juju controller or Exception + """ + + # format model name + model_name = N2VCJujuConnector._format_model_name(model_name) + + if model_name in self.juju_models: + return self.juju_models[model_name] + + if self._creating_model: + self.log.debug("Another coroutine is creating a model. Wait...") + while self._creating_model: + # another coroutine is creating a model, wait + await asyncio.sleep(0.1) + # retry (perhaps another coroutine has created the model meanwhile) + if model_name in self.juju_models: + return self.juju_models[model_name] + + try: + self._creating_model = True + + # get juju model names from juju + model_list = await self.controller.list_models() + if model_name not in model_list: + self.log.info( + "Model {} does not exist. Creating new model...".format(model_name) + ) + config_dict = {"authorized-keys": self.public_key} + if self.apt_mirror: + config_dict["apt-mirror"] = self.apt_mirror + if not self.enable_os_upgrade: + config_dict["enable-os-refresh-update"] = False + config_dict["enable-os-upgrade"] = False + if self.cloud in self.BUILT_IN_CLOUDS: + model = await self.controller.add_model( + model_name=model_name, + config=config_dict, + cloud_name=self.cloud, + ) + else: + model = await self.controller.add_model( + model_name=model_name, + config=config_dict, + cloud_name=self.cloud, + credential_name=self.cloud, + ) + self.log.info("New model created, name={}".format(model_name)) + else: + self.log.debug( + "Model already exists in juju. Getting model {}".format(model_name) + ) + model = await self.controller.get_model(model_name) + self.log.debug("Existing model in juju, name={}".format(model_name)) + + self.juju_models[model_name] = model + self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model) + return model + + except Exception as e: + msg = "Cannot get model {}. Exception: {}".format(model_name, e) + self.log.error(msg) + raise N2VCException(msg) + finally: + self._creating_model = False + + async def _juju_add_relation( + self, + model_name: str, + application_name_1: str, + application_name_2: str, + relation_1: str, + relation_2: str, + ): + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + + r1 = "{}:{}".format(application_name_1, relation_1) + r2 = "{}:{}".format(application_name_2, relation_2) + + self.log.debug("adding relation: {} -> {}".format(r1, r2)) + try: + await model.add_relation(relation1=r1, relation2=r2) + except JujuAPIError as e: + # If one of the applications in the relationship doesn't exist, or the + # relation has already been added, + # let the operation fail silently. + if "not found" in e.message: + return + if "already exists" in e.message: + return + # another execption, raise it + raise e + + async def _juju_destroy_application(self, model_name: str, application_name: str): + + self.log.debug( + "Destroying application {} in model {}".format(application_name, model_name) + ) + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + application = model.applications.get(application_name) + if application: + observer.unregister_application(application_name) + await application.destroy() + else: + self.log.debug("Application not found: {}".format(application_name)) + + async def _juju_destroy_machine( + self, model_name: str, machine_id: str, total_timeout: float = None + ): + + self.log.debug( + "Destroying machine {} in model {}".format(machine_id, model_name) + ) + + if total_timeout is None: + total_timeout = 3600 + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + machines = await model.get_machines() + if machine_id in machines: + machine = model.machines[machine_id] + observer.unregister_machine(machine_id) + # TODO: change this by machine.is_manual when this is upstreamed: + # https://github.com/juju/python-libjuju/pull/396 + if "instance-id" in machine.safe_data and machine.safe_data[ + "instance-id" + ].startswith("manual:"): + self.log.debug("machine.destroy(force=True) started.") + await machine.destroy(force=True) + self.log.debug("machine.destroy(force=True) passed.") + # max timeout + end = time.time() + total_timeout + # wait for machine removal + machines = await model.get_machines() + while machine_id in machines and time.time() < end: + self.log.debug( + "Waiting for machine {} is destroyed".format(machine_id) + ) + await asyncio.sleep(0.5) + machines = await model.get_machines() + self.log.debug("Machine destroyed: {}".format(machine_id)) + else: + self.log.debug("Machine not found: {}".format(machine_id)) + + async def _juju_destroy_model(self, model_name: str, total_timeout: float = None): + + self.log.debug("Destroying model {}".format(model_name)) + + if total_timeout is None: + total_timeout = 3600 + end = time.time() + total_timeout + + model = await self._juju_get_model(model_name=model_name) + + if not model: + raise N2VCNotFound(message="Model {} does not exist".format(model_name)) + + uuid = model.info.uuid + + # destroy applications + for application_name in model.applications: + try: + await self._juju_destroy_application( + model_name=model_name, application_name=application_name + ) + except Exception as e: + self.log.error( + "Error destroying application {} in model {}: {}".format( + application_name, model_name, e + ) + ) + + # destroy machines + machines = await model.get_machines() + for machine_id in machines: + try: + await self._juju_destroy_machine( + model_name=model_name, machine_id=machine_id + ) + except asyncio.CancelledError: + raise + except Exception: + # ignore exceptions destroying machine + pass + + await self._juju_disconnect_model(model_name=model_name) + + self.log.debug("destroying model {}...".format(model_name)) + await self.controller.destroy_model(uuid) + # self.log.debug('model destroy requested {}'.format(model_name)) + + # wait for model is completely destroyed + self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) + last_exception = "" + while time.time() < end: + try: + # await self.controller.get_model(uuid) + models = await self.controller.list_models() + if model_name not in models: + self.log.debug( + "The model {} ({}) was destroyed".format(model_name, uuid) + ) + return + except asyncio.CancelledError: + raise + except Exception as e: + last_exception = e + await asyncio.sleep(5) + raise N2VCException( + "Timeout waiting for model {} to be destroyed {}".format( + model_name, last_exception + ) + ) + + async def _juju_login(self): + """Connect to juju controller + + """ + + # if already authenticated, exit function + if self._authenticated: + return + + # if connecting, wait for finish + # another task could be trying to connect in parallel + while self._connecting: + await asyncio.sleep(0.1) + + # double check after other task has finished + if self._authenticated: + return + + try: + self._connecting = True + self.log.info( + "connecting to juju controller: {} {}:{}{}".format( + self.url, + self.username, + self.secret[:8] + "...", + " with ca_cert" if self.ca_cert else "", + ) + ) + + # Create controller object + self.controller = Controller(loop=self.loop) + # Connect to controller + await self.controller.connect( + endpoint=self.url, + username=self.username, + password=self.secret, + cacert=self.ca_cert, + ) + self._authenticated = True + self.log.info("juju controller connected") + except Exception as e: + message = "Exception connecting to juju: {}".format(e) + self.log.error(message) + raise N2VCConnectionException(message=message, url=self.url) + finally: + self._connecting = False + + async def _juju_logout(self): + """Logout of the Juju controller.""" + if not self._authenticated: + return False + + # disconnect all models + for model_name in self.juju_models: + try: + await self._juju_disconnect_model(model_name) + except Exception as e: + self.log.error( + "Error disconnecting model {} : {}".format(model_name, e) + ) + # continue with next model... + + self.log.info("Disconnecting controller") + try: + await self.controller.disconnect() + except Exception as e: + raise N2VCConnectionException( + message="Error disconnecting controller: {}".format(e), url=self.url + ) + + self.controller = None + self._authenticated = False + self.log.info("disconnected") + + async def _juju_disconnect_model(self, model_name: str): + self.log.debug("Disconnecting model {}".format(model_name)) + if model_name in self.juju_models: + await self.juju_models[model_name].disconnect() + self.juju_models[model_name] = None + self.juju_observers[model_name] = None + else: + self.warning("Cannot disconnect model: {}".format(model_name)) + def _create_juju_public_key(self): """Recreate the Juju public key on lcm container, if needed Certain libjuju commands expect to be run from the same machine as Juju diff --git a/n2vc/provisioner.py b/n2vc/provisioner.py index c4d8b5b..fea7a12 100644 --- a/n2vc/provisioner.py +++ b/n2vc/provisioner.py @@ -14,11 +14,15 @@ import logging import os import re +import shlex from subprocess import CalledProcessError import tempfile +import time import uuid from juju.client import client +import n2vc.exceptions +import paramiko import asyncio arches = [ @@ -340,3 +344,366 @@ class AsyncSSHProvisioner: return await self._ssh( "{} /bin/bash {}".format("sudo" if root else "", tmpFile) ) + + +class SSHProvisioner: + """Provision a manually created machine via SSH.""" + + def __init__(self, user, host, private_key_path, log=None): + + self.host = host + self.user = user + self.private_key_path = private_key_path + + if log: + self.log = log + else: + self.log = logging.getLogger(__name__) + + def _get_ssh_client(self, host=None, user=None, private_key_path=None): + """Return a connected Paramiko ssh object. + + :param str host: The host to connect to. + :param str user: The user to connect as. + :param str key: The private key to authenticate with. + + :return: object: A paramiko.SSHClient + :raises: :class:`paramiko.ssh_exception.SSHException` if the + connection failed + """ + + if not host: + host = self.host + + if not user: + user = self.user + + if not private_key_path: + private_key_path = self.private_key_path + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + pkey = None + + # Read the private key into a paramiko.RSAKey + if os.path.exists(private_key_path): + with open(private_key_path, "r") as f: + pkey = paramiko.RSAKey.from_private_key(f) + + ####################################################################### + # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where # + # the server may not send the SSH_MSG_USERAUTH_BANNER message except # + # when responding to an auth_none request. For example, paramiko will # + # attempt to use password authentication when a password is set, but # + # the server could deny that, instead requesting keyboard-interactive.# + # The hack to workaround this is to attempt a reconnect, which will # + # receive the right banner, and authentication can proceed. See the # + # following for more info: # + # https://github.com/paramiko/paramiko/issues/432 # + # https://github.com/paramiko/paramiko/pull/438 # + ####################################################################### + + retry = 10 + attempts = 0 + delay = 15 + while attempts <= retry: + try: + attempts += 1 + + # Attempt to establish a SSH connection + ssh.connect( + host, + port=22, + username=user, + pkey=pkey, + # allow_agent=False, + # look_for_keys=False, + ) + break + except paramiko.ssh_exception.SSHException as e: + if "Error reading SSH protocol banner" == str(e): + # Once more, with feeling + ssh.connect(host, port=22, username=user, pkey=pkey) + else: + # Reraise the original exception + self.log.debug("Unhandled exception caught: {}".format(e)) + raise e + except Exception as e: + if "Unable to connect to port" in str(e): + self.log.debug( + "Waiting for VM to boot, sleeping {} seconds".format(delay) + ) + if attempts > retry: + raise e + else: + time.sleep(delay) + # Slowly back off the retry + delay += 15 + else: + self.log.debug(e) + raise e + return ssh + + def _run_command(self, ssh, cmd, pty=True): + """Run a command remotely via SSH. + + :param object ssh: The SSHClient + :param str cmd: The command to execute + :param list cmd: The `shlex.split` command to execute + :param bool pty: Whether to allocate a pty + + :return: tuple: The stdout and stderr of the command execution + :raises: :class:`CalledProcessError` if the command fails + """ + + if isinstance(cmd, str): + cmd = shlex.split(cmd) + + if type(cmd) is not list: + cmd = [cmd] + + cmds = " ".join(cmd) + _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty) + retcode = stdout.channel.recv_exit_status() + + if retcode > 0: + output = stderr.read().strip() + raise CalledProcessError(returncode=retcode, cmd=cmd, output=output) + return ( + stdout.read().decode("utf-8").strip(), + stderr.read().decode("utf-8").strip(), + ) + + def _init_ubuntu_user(self): + """Initialize the ubuntu user. + + :return: bool: If the initialization was successful + :raises: :class:`paramiko.ssh_exception.AuthenticationException` + if the authentication fails + """ + ssh = None + try: + # Run w/o allocating a pty, so we fail if sudo prompts for a passwd + ssh = self._get_ssh_client() + self._run_command(ssh, "sudo -n true", pty=False) + except paramiko.ssh_exception.AuthenticationException: + raise n2vc.exceptions.AuthenticationFailed(self.user) + except paramiko.ssh_exception.NoValidConnectionsError: + raise n2vc.exceptions.NoRouteToHost(self.host) + finally: + if ssh: + ssh.close() + + # Infer the public key + public_key_path = "{}.pub".format(self.private_key_path) + + if not os.path.exists(public_key_path): + raise FileNotFoundError( + "Public key '{}' doesn't exist.".format(public_key_path) + ) + + with open(public_key_path, "r") as f: + public_key = f.readline() + + script = INITIALIZE_UBUNTU_SCRIPT.format(public_key) + + try: + ssh = self._get_ssh_client() + + self._run_command( + ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True + ) + except paramiko.ssh_exception.AuthenticationException as e: + raise e + finally: + ssh.close() + + return True + + def _detect_hardware_and_os(self, ssh): + """Detect the target hardware capabilities and OS series. + + :param object ssh: The SSHClient + :return: str: A raw string containing OS and hardware information. + """ + + info = { + "series": "", + "arch": "", + "cpu-cores": "", + "mem": "", + } + + stdout, _ = self._run_command( + ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True, + ) + + lines = stdout.split("\n") + + # Remove extraneous line if DNS resolution of hostname famils + # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out + if "unable to resolve host" in lines[0]: + lines = lines[1:] + + info["series"] = lines[0].strip() + info["arch"] = normalize_arch(lines[1].strip()) + + memKb = re.split(r"\s+", lines[2])[1] + + # Convert megabytes -> kilobytes + info["mem"] = round(int(memKb) / 1024) + + # Detect available CPUs + recorded = {} + for line in lines[3:]: + physical_id = "" + + if line.find("physical id") == 0: + physical_id = line.split(":")[1].strip() + elif line.find("cpu cores") == 0: + cores = line.split(":")[1].strip() + + if physical_id not in recorded.keys(): + info["cpu-cores"] += cores + recorded[physical_id] = True + + return info + + def provision_machine(self): + """Perform the initial provisioning of the target machine. + + :return: bool: The client.AddMachineParams + :raises: :class:`paramiko.ssh_exception.AuthenticationException` + if the upload fails + """ + params = client.AddMachineParams() + + if self._init_ubuntu_user(): + try: + ssh = self._get_ssh_client() + + hw = self._detect_hardware_and_os(ssh) + params.series = hw["series"] + params.instance_id = "manual:{}".format(self.host) + params.nonce = "manual:{}:{}".format( + self.host, str(uuid.uuid4()), + ) # a nop for Juju w/manual machines + params.hardware_characteristics = { + "arch": hw["arch"], + "mem": int(hw["mem"]), + "cpu-cores": int(hw["cpu-cores"]), + } + params.addresses = [ + {"value": self.host, "type": "ipv4", "scope": "public"} + ] + + except paramiko.ssh_exception.AuthenticationException as e: + raise e + finally: + ssh.close() + + return params + + async def install_agent(self, connection, nonce, machine_id, api): + """ + :param object connection: Connection to Juju API + :param str nonce: The nonce machine specification + :param str machine_id: The id assigned to the machine + + :return: bool: If the initialization was successful + """ + # The path where the Juju agent should be installed. + data_dir = "/var/lib/juju" + + # Disabling this prevents `apt-get update` from running initially, so + # charms will fail to deploy + disable_package_commands = False + + client_facade = client.ClientFacade.from_connection(connection) + results = await client_facade.ProvisioningScript( + data_dir=data_dir, + disable_package_commands=disable_package_commands, + machine_id=machine_id, + nonce=nonce, + ) + + """Get the IP of the controller + + Parse the provisioning script, looking for the first apiaddress. + + Example: + apiaddresses: + - 10.195.8.2:17070 + - 127.0.0.1:17070 + - '[::1]:17070' + """ + m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) + apiaddress = m.group(1) + + """Add IP Table rule + + In order to route the traffic to the private ip of the Juju controller + we use a DNAT rule to tell the machine that the destination for the + private address is the public address of the machine where the Juju + controller is running in LXD. That machine will have a complimentary + iptables rule, routing traffic to the appropriate LXD container. + """ + + script = IPTABLES_SCRIPT.format(apiaddress, api) + + # Run this in a retry loop, because dpkg may be running and cause the + # script to fail. + retry = 10 + attempts = 0 + delay = 15 + + while attempts <= retry: + try: + attempts += 1 + + self._run_configure_script(script) + break + except Exception as e: + self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) + if attempts > retry: + raise e + else: + time.sleep(delay) + # Slowly back off the retry + delay += 15 + + # self.log.debug("Running configure script") + self._run_configure_script(results.script) + # self.log.debug("Configure script finished") + + def _run_configure_script(self, script: str): + """Run the script to install the Juju agent on the target machine. + + :param str script: The script returned by the ProvisioningScript API + :raises: :class:`paramiko.ssh_exception.AuthenticationException` + if the upload fails + """ + _, tmpFile = tempfile.mkstemp() + with open(tmpFile, "w") as f: + f.write(script) + try: + # get ssh client + ssh = self._get_ssh_client(user="ubuntu",) + + # copy the local copy of the script to the remote machine + sftp = paramiko.SFTPClient.from_transport(ssh.get_transport()) + sftp.put( + tmpFile, tmpFile, + ) + + # run the provisioning script + self._run_command( + ssh, "sudo /bin/bash {}".format(tmpFile), + ) + + except paramiko.ssh_exception.AuthenticationException as e: + raise e + finally: + os.remove(tmpFile) + ssh.close() diff --git a/n2vc/tests/unit/test_juju_observer.py b/n2vc/tests/unit/test_juju_observer.py new file mode 100644 index 0000000..f40824e --- /dev/null +++ b/n2vc/tests/unit/test_juju_observer.py @@ -0,0 +1,159 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +from unittest import mock +from unittest.mock import Mock + +import asynctest + +from n2vc.exceptions import N2VCTimeoutException +from n2vc.juju_observer import JujuModelObserver, _Entity + + +class FakeObject: + def __init__(self): + self.complete = True + + +class JujuModelObserverTest(asynctest.TestCase): + def setUp(self): + self.n2vc = Mock() + self.model = Mock() + self.juju_observer = JujuModelObserver(n2vc=self.n2vc, model=self.model) + self.loop = asyncio.new_event_loop() + + def test_wait_no_retries(self): + obj = FakeObject() + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + result = self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=None, + total_timeout=None, + ) + ) + self.assertEqual(result, 0) + + @mock.patch("n2vc.juju_observer.asyncio.wait_for") + def test_wait_default_values(self, wait_for): + wait_for.return_value = asyncio.Future() + wait_for.return_value.set_result(None) + obj = FakeObject() + obj.complete = False + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=None, + total_timeout=None, + ) + ) + wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) + + @mock.patch("n2vc.juju_observer.asyncio.wait_for") + def test_wait_default_progress(self, wait_for): + wait_for.return_value = asyncio.Future() + wait_for.return_value.set_result(None) + obj = FakeObject() + obj.complete = False + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=4000, + total_timeout=None, + ) + ) + wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) + + @mock.patch("n2vc.juju_observer.asyncio.wait_for") + def test_wait_default_total(self, wait_for): + wait_for.return_value = asyncio.Future() + wait_for.return_value.set_result(None) + obj = FakeObject() + obj.complete = False + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=None, + total_timeout=4000.0, + ) + ) + wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) + + @mock.patch("n2vc.juju_observer.asyncio.wait_for") + def test_wait_total_less_than_progress_timeout(self, wait_for): + wait_for.return_value = asyncio.Future() + wait_for.return_value.set_result(None) + obj = FakeObject() + obj.complete = False + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=4500.0, + total_timeout=3000.0, + ) + ) + wait_for.assert_called_once_with(fut=mock.ANY, timeout=3000.0) + + @mock.patch("n2vc.juju_observer.asyncio.wait_for") + def test_wait_progress_less_than_total_timeout(self, wait_for): + wait_for.return_value = asyncio.Future() + wait_for.return_value.set_result(None) + obj = FakeObject() + obj.complete = False + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=1500.0, + total_timeout=3000.0, + ) + ) + wait_for.assert_called_once_with(fut=mock.ANY, timeout=1500.0) + + def test_wait_negative_timeout(self): + obj = FakeObject() + entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) + with self.assertRaises(N2VCTimeoutException): + self.loop.run_until_complete( + self.juju_observer._wait_for_entity( + entity=entity, + field_to_check="complete", + final_states_list=[True], + progress_timeout=None, + total_timeout=-1000, + ) + ) diff --git a/n2vc/tests/unit/test_provisioner.py b/n2vc/tests/unit/test_provisioner.py index a4572b3..880c5cb 100644 --- a/n2vc/tests/unit/test_provisioner.py +++ b/n2vc/tests/unit/test_provisioner.py @@ -12,9 +12,147 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest import TestCase +from unittest import TestCase, mock + +from mock import mock_open +from n2vc.provisioner import SSHProvisioner +from paramiko.ssh_exception import SSHException class ProvisionerTest(TestCase): def setUp(self): - pass + self.provisioner = SSHProvisioner(None, None, None) + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client(self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os): + mock_instance = mock_sshclient.return_value + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_no_connection( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.method_inside_someobject.side_effect = ["something"] + mock_instance.connect.side_effect = SSHException() + + self.assertRaises(SSHException, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_bad_banner( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.method_inside_someobject.side_effect = ["something"] + mock_instance.connect.side_effect = [ + SSHException("Error reading SSH protocol banner"), + None, + None, + ] + + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 3, mock_instance.connect.call_count, "Should attempt 3 connections" + ) + + @mock.patch("time.sleep", autospec=True) + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_unable_to_connect( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = Exception("Unable to connect to port") + + self.assertRaises(Exception, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 11, mock_instance.connect.call_count, "Should attempt 11 connections" + ) + + @mock.patch("time.sleep", autospec=True) + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_unable_to_connect_once( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = [ + Exception("Unable to connect to port"), + None, + ] + + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 2, mock_instance.connect.call_count, "Should attempt 2 connections" + ) + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_other_exception( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = Exception() + + self.assertRaises(Exception, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 1, mock_instance.connect.call_count, "Should only attempt 1 connection" + ) + + +# diff --git a/n2vc/vnf.py b/n2vc/vnf.py new file mode 100644 index 0000000..4e46746 --- /dev/null +++ b/n2vc/vnf.py @@ -0,0 +1,1619 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import base64 +import binascii +import logging +import os.path +import re +import shlex +import ssl +import subprocess + +from juju.client import client +from juju.controller import Controller +from juju.errors import JujuAPIError, JujuError +from juju.model import ModelObserver + +import n2vc.exceptions +from n2vc.provisioner import SSHProvisioner + + +# import time +# FIXME: this should load the juju inside or modules without having to +# explicitly install it. Check why it's not working. +# Load our subtree of the juju library +# path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +# path = os.path.join(path, "modules/libjuju/") +# if path not in sys.path: +# sys.path.insert(1, path) +# We might need this to connect to the websocket securely, but test and verify. +try: + ssl._create_default_https_context = ssl._create_unverified_context +except AttributeError: + # Legacy Python doesn't verify by default (see pep-0476) + # https://www.python.org/dev/peps/pep-0476/ + pass + + +# Custom exceptions +# Deprecated. Please use n2vc.exceptions namespace. +class JujuCharmNotFound(Exception): + """The Charm can't be found or is not readable.""" + + +class JujuApplicationExists(Exception): + """The Application already exists.""" + + +class N2VCPrimitiveExecutionFailed(Exception): + """Something failed while attempting to execute a primitive.""" + + +class NetworkServiceDoesNotExist(Exception): + """The Network Service being acted against does not exist.""" + + +class PrimitiveDoesNotExist(Exception): + """The Primitive being executed does not exist.""" + + +# Quiet the debug logging +logging.getLogger("websockets.protocol").setLevel(logging.INFO) +logging.getLogger("juju.client.connection").setLevel(logging.WARN) +logging.getLogger("juju.model").setLevel(logging.WARN) +logging.getLogger("juju.machine").setLevel(logging.WARN) + + +class VCAMonitor(ModelObserver): + """Monitor state changes within the Juju Model.""" + + log = None + + def __init__(self, ns_name): + self.log = logging.getLogger(__name__) + + self.ns_name = ns_name + self.applications = {} + + def AddApplication(self, application_name, callback, *callback_args): + if application_name not in self.applications: + self.applications[application_name] = { + "callback": callback, + "callback_args": callback_args, + } + + def RemoveApplication(self, application_name): + if application_name in self.applications: + del self.applications[application_name] + + async def on_change(self, delta, old, new, model): + """React to changes in the Juju model.""" + + if delta.entity == "unit": + # Ignore change events from other applications + if delta.data["application"] not in self.applications.keys(): + return + + try: + + application_name = delta.data["application"] + + callback = self.applications[application_name]["callback"] + callback_args = self.applications[application_name]["callback_args"] + + if old and new: + # Fire off a callback with the application state + if callback: + callback( + self.ns_name, + delta.data["application"], + new.workload_status, + new.workload_status_message, + *callback_args, + ) + + if old and not new: + # This is a charm being removed + if callback: + callback( + self.ns_name, + delta.data["application"], + "removed", + "", + *callback_args, + ) + except Exception as e: + self.log.debug("[1] notify_callback exception: {}".format(e)) + + elif delta.entity == "action": + # TODO: Decide how we want to notify the user of actions + + # uuid = delta.data['id'] # The Action's unique id + # msg = delta.data['message'] # The output of the action + # + # if delta.data['status'] == "pending": + # # The action is queued + # pass + # elif delta.data['status'] == "completed"" + # # The action was successful + # pass + # elif delta.data['status'] == "failed": + # # The action failed. + # pass + + pass + + +######## +# TODO +# +# Create unique models per network service +# Document all public functions + + +class N2VC: + def __init__( + self, + log=None, + server="127.0.0.1", + port=17070, + user="admin", + secret=None, + artifacts=None, + loop=None, + juju_public_key=None, + ca_cert=None, + api_proxy=None, + ): + """Initialize N2VC + + Initializes the N2VC object, allowing the caller to interoperate with the VCA. + + + :param log obj: The logging object to log to + :param server str: The IP Address or Hostname of the Juju controller + :param port int: The port of the Juju Controller + :param user str: The Juju username to authenticate with + :param secret str: The Juju password to authenticate with + :param artifacts str: The directory where charms required by a vnfd are + stored. + :param loop obj: The loop to use. + :param juju_public_key str: The contents of the Juju public SSH key + :param ca_cert str: The CA certificate to use to authenticate + :param api_proxy str: The IP of the host machine + + :Example: + client = n2vc.vnf.N2VC( + log=log, + server='10.1.1.28', + port=17070, + user='admin', + secret='admin', + artifacts='/app/storage/myvnf/charms', + loop=loop, + juju_public_key='', + ca_cert='', + api_proxy='192.168.1.155' + ) + """ + + # Initialize instance-level variables + self.api = None + self.log = None + self.controller = None + self.connecting = False + self.authenticated = False + self.api_proxy = api_proxy + + if log: + self.log = log + else: + self.log = logging.getLogger(__name__) + + # For debugging + self.refcount = { + "controller": 0, + "model": 0, + } + + self.models = {} + + # Model Observers + self.monitors = {} + + # VCA config + self.hostname = "" + self.port = 17070 + self.username = "" + self.secret = "" + + self.juju_public_key = juju_public_key + if juju_public_key: + self._create_juju_public_key(juju_public_key) + else: + self.juju_public_key = "" + + # TODO: Verify ca_cert is valid before using. VCA will crash + # if the ca_cert isn't formatted correctly. + def base64_to_cacert(b64string): + """Convert the base64-encoded string containing the VCA CACERT. + + The input string.... + + """ + try: + cacert = base64.b64decode(b64string).decode("utf-8") + + cacert = re.sub(r"\\n", r"\n", cacert,) + except binascii.Error as e: + self.log.debug("Caught binascii.Error: {}".format(e)) + raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate") + + return cacert + + self.ca_cert = None + if ca_cert: + self.ca_cert = base64_to_cacert(ca_cert) + + # Quiet websocket traffic + logging.getLogger("websockets.protocol").setLevel(logging.INFO) + logging.getLogger("juju.client.connection").setLevel(logging.WARN) + logging.getLogger("model").setLevel(logging.WARN) + # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG) + + self.log.debug("JujuApi: instantiated") + + self.server = server + self.port = port + + self.secret = secret + if user.startswith("user-"): + self.user = user + else: + self.user = "user-{}".format(user) + + self.endpoint = "%s:%d" % (server, int(port)) + + self.artifacts = artifacts + + self.loop = loop or asyncio.get_event_loop() + + def __del__(self): + """Close any open connections.""" + yield self.logout() + + def _create_juju_public_key(self, public_key): + """Recreate the Juju public key on disk. + + Certain libjuju commands expect to be run from the same machine as Juju + is bootstrapped to. This method will write the public key to disk in + that location: ~/.local/share/juju/ssh/juju_id_rsa.pub + """ + # Make sure that we have a public key before writing to disk + if public_key is None or len(public_key) == 0: + if "OSM_VCA_PUBKEY" in os.environ: + public_key = os.getenv("OSM_VCA_PUBKEY", "") + if len(public_key == 0): + return + else: + return + + path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),) + if not os.path.exists(path): + os.makedirs(path) + + with open("{}/juju_id_rsa.pub".format(path), "w") as f: + f.write(public_key) + + def notify_callback( + self, + model_name, + application_name, + status, + message, + callback=None, + *callback_args + ): + try: + if callback: + callback( + model_name, application_name, status, message, *callback_args, + ) + except Exception as e: + self.log.error("[0] notify_callback exception {}".format(e)) + raise e + return True + + # Public methods + async def Relate(self, model_name, vnfd): + """Create a relation between the charm-enabled VDUs in a VNF. + + The Relation mapping has two parts: the id of the vdu owning the endpoint, and + the name of the endpoint. + + vdu: + ... + vca-relationships: + relation: + - provides: dataVM:db + requires: mgmtVM:app + + This tells N2VC that the charm referred to by the dataVM vdu offers a relation + named 'db', and the mgmtVM vdu + has an 'app' endpoint that should be connected to a database. + + :param str ns_name: The name of the network service. + :param dict vnfd: The parsed yaml VNF descriptor. + """ + + # Currently, the call to Relate() is made automatically after the + # deployment of each charm; if the relation depends on a charm that + # hasn't been deployed yet, the call will fail silently. This will + # prevent an API breakage, with the intent of making this an explicitly + # required call in a more object-oriented refactor of the N2VC API. + + configs = [] + vnf_config = vnfd.get("vnf-configuration") + if vnf_config: + juju = vnf_config["juju"] + if juju: + configs.append(vnf_config) + + for vdu in vnfd["vdu"]: + vdu_config = vdu.get("vdu-configuration") + if vdu_config: + juju = vdu_config["juju"] + if juju: + configs.append(vdu_config) + + def _get_application_name(name): + """Get the application name that's mapped to a vnf/vdu.""" + vnf_member_index = 0 + vnf_name = vnfd["name"] + + for vdu in vnfd.get("vdu"): + # Compare the named portion of the relation to the vdu's id + if vdu["id"] == name: + application_name = self.FormatApplicationName( + model_name, vnf_name, str(vnf_member_index), + ) + return application_name + else: + vnf_member_index += 1 + + return None + + # Loop through relations + for cfg in configs: + if "juju" in cfg: + juju = cfg["juju"] + if ( + "vca-relationships" in juju + and "relation" in juju["vca-relationships"] + ): + for rel in juju["vca-relationships"]["relation"]: + try: + + # get the application name for the provides + (name, endpoint) = rel["provides"].split(":") + application_name = _get_application_name(name) + + provides = "{}:{}".format(application_name, endpoint) + + # get the application name for thr requires + (name, endpoint) = rel["requires"].split(":") + application_name = _get_application_name(name) + + requires = "{}:{}".format(application_name, endpoint) + self.log.debug( + "Relation: {} <-> {}".format(provides, requires) + ) + await self.add_relation( + model_name, provides, requires, + ) + except Exception as e: + self.log.debug("Exception: {}".format(e)) + + return + + async def DeployCharms( + self, + model_name, + application_name, + vnfd, + charm_path, + params={}, + machine_spec={}, + callback=None, + *callback_args + ): + """Deploy one or more charms associated with a VNF. + + Deploy the charm(s) referenced in a VNF Descriptor. + + :param str model_name: The name or unique id of the network service. + :param str application_name: The name of the application + :param dict vnfd: The name of the application + :param str charm_path: The path to the Juju charm + :param dict params: A dictionary of runtime parameters + Examples:: + { + 'rw_mgmt_ip': '1.2.3.4', + # Pass the initial-config-primitives section of the vnf or vdu + 'initial-config-primitives': {...} + 'user_values': dictionary with the day-1 parameters provided at + instantiation time. It will replace values + inside < >. rw_mgmt_ip will be included here also + } + :param dict machine_spec: A dictionary describing the machine to + install to + Examples:: + { + 'hostname': '1.2.3.4', + 'username': 'ubuntu', + } + :param obj callback: A callback function to receive status changes. + :param tuple callback_args: A list of arguments to be passed to the + callback + """ + + ######################################################## + # Verify the path to the charm exists and is readable. # + ######################################################## + if not os.path.exists(charm_path): + self.log.debug("Charm path doesn't exist: {}".format(charm_path)) + self.notify_callback( + model_name, + application_name, + "error", + "failed", + callback, + *callback_args, + ) + raise JujuCharmNotFound("No artifacts configured.") + + ################################ + # Login to the Juju controller # + ################################ + if not self.authenticated: + self.log.debug("Authenticating with Juju") + await self.login() + + ########################################## + # Get the model for this network service # + ########################################## + model = await self.get_model(model_name) + + ######################################## + # Verify the application doesn't exist # + ######################################## + app = await self.get_application(model, application_name) + if app: + raise JujuApplicationExists( + ( + 'Can\'t deploy application "{}" to model ' + ' "{}" because it already exists.' + ).format(application_name, model_name) + ) + + ################################################################ + # Register this application with the model-level event monitor # + ################################################################ + if callback: + self.log.debug( + "JujuApi: Registering callback for {}".format(application_name,) + ) + await self.Subscribe(model_name, application_name, callback, *callback_args) + + ####################################### + # Get the initial charm configuration # + ####################################### + + rw_mgmt_ip = None + if "rw_mgmt_ip" in params: + rw_mgmt_ip = params["rw_mgmt_ip"] + + if "initial-config-primitive" not in params: + params["initial-config-primitive"] = {} + + initial_config = self._get_config_from_dict( + params["initial-config-primitive"], {"": rw_mgmt_ip} + ) + + ######################################################## + # Check for specific machine placement (native charms) # + ######################################################## + to = "" + series = "xenial" + + if machine_spec.keys(): + if all(k in machine_spec for k in ["hostname", "username"]): + + # Allow series to be derived from the native charm + series = None + + self.log.debug( + "Provisioning manual machine {}@{}".format( + machine_spec["username"], machine_spec["hostname"], + ) + ) + + """Native Charm support + + Taking a bare VM (assumed to be an Ubuntu cloud image), + the provisioning process will: + - Create an ubuntu user w/sudo access + - Detect hardware + - Detect architecture + - Download and install Juju agent from controller + - Enable Juju agent + - Add an iptables rule to route traffic to the API proxy + """ + + to = await self.provision_machine( + model_name=model_name, + username=machine_spec["username"], + hostname=machine_spec["hostname"], + private_key_path=self.GetPrivateKeyPath(), + ) + self.log.debug("Provisioned machine id {}".format(to)) + + # TODO: If to is none, raise an exception + + # The native charm won't have the sshproxy layer, typically, but LCM + # uses the config primitive + # to interpret what the values are. That's a gap to fill. + + """ + The ssh-* config parameters are unique to the sshproxy layer, + which most native charms will not be aware of. + + Setting invalid config parameters will cause the deployment to + fail. + + For the moment, we will strip the ssh-* parameters from native + charms, until the feature gap is addressed in the information + model. + """ + + # Native charms don't include the ssh-* config values, so strip them + # from the initial_config, otherwise the deploy will raise an error. + # self.log.debug("Removing ssh-* from initial-config") + for k in ["ssh-hostname", "ssh-username", "ssh-password"]: + if k in initial_config: + self.log.debug("Removing parameter {}".format(k)) + del initial_config[k] + + self.log.debug( + "JujuApi: Deploying charm ({}/{}) from {} to {}".format( + model_name, application_name, charm_path, to, + ) + ) + + ######################################################## + # Deploy the charm and apply the initial configuration # + ######################################################## + app = await model.deploy( + # We expect charm_path to be either the path to the charm on disk + # or in the format of cs:series/name + charm_path, + # This is the formatted, unique name for this charm + application_name=application_name, + # Proxy charms should use the current LTS. This will need to be + # changed for native charms. + series=series, + # Apply the initial 'config' primitive during deployment + config=initial_config, + # Where to deploy the charm to. + to=to, + ) + + ############################# + # Map the vdu id<->app name # + ############################# + try: + await self.Relate(model_name, vnfd) + except KeyError as ex: + # We don't currently support relations between NS and VNF/VDU charms + self.log.warn("[N2VC] Relations not supported: {}".format(ex)) + except Exception: + # This may happen if not all of the charms needed by the relation + # are ready. We can safely ignore this, because Relate will be + # retried when the endpoint of the relation is deployed. + self.log.warn("[N2VC] Relations not ready") + + # ####################################### + # # Execute initial config primitive(s) # + # ####################################### + uuids = await self.ExecuteInitialPrimitives( + model_name, application_name, params, + ) + return uuids + + # primitives = {} + # + # # Build a sequential list of the primitives to execute + # for primitive in params['initial-config-primitive']: + # try: + # if primitive['name'] == 'config': + # # This is applied when the Application is deployed + # pass + # else: + # seq = primitive['seq'] + # + # params = {} + # if 'parameter' in primitive: + # params = primitive['parameter'] + # + # primitives[seq] = { + # 'name': primitive['name'], + # 'parameters': self._map_primitive_parameters( + # params, + # {'': rw_mgmt_ip} + # ), + # } + # + # for primitive in sorted(primitives): + # await self.ExecutePrimitive( + # model_name, + # application_name, + # primitives[primitive]['name'], + # callback, + # callback_args, + # **primitives[primitive]['parameters'], + # ) + # except N2VCPrimitiveExecutionFailed as e: + # self.log.debug( + # "[N2VC] Exception executing primitive: {}".format(e) + # ) + # raise + + async def GetPrimitiveStatus(self, model_name, uuid): + """Get the status of an executed Primitive. + + The status of an executed Primitive will be one of three values: + - completed + - failed + - running + """ + status = None + try: + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + + results = await model.get_action_status(uuid) + + if uuid in results: + status = results[uuid] + + except Exception as e: + self.log.debug( + "Caught exception while getting primitive status: {}".format(e) + ) + raise N2VCPrimitiveExecutionFailed(e) + + return status + + async def GetPrimitiveOutput(self, model_name, uuid): + """Get the output of an executed Primitive. + + Note: this only returns output for a successfully executed primitive. + """ + results = None + try: + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + results = await model.get_action_output(uuid, 60) + except Exception as e: + self.log.debug( + "Caught exception while getting primitive status: {}".format(e) + ) + raise N2VCPrimitiveExecutionFailed(e) + + return results + + # async def ProvisionMachine(self, model_name, hostname, username): + # """Provision machine for usage with Juju. + # + # Provisions a previously instantiated machine for use with Juju. + # """ + # try: + # if not self.authenticated: + # await self.login() + # + # # FIXME: This is hard-coded until model-per-ns is added + # model_name = 'default' + # + # model = await self.get_model(model_name) + # model.add_machine(spec={}) + # + # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format( + # "ubuntu", + # host['address'], + # private_key_path, + # )) + # return machine.id + # + # except Exception as e: + # self.log.debug( + # "Caught exception while getting primitive status: {}".format(e) + # ) + # raise N2VCPrimitiveExecutionFailed(e) + + def GetPrivateKeyPath(self): + homedir = os.environ["HOME"] + sshdir = "{}/.ssh".format(homedir) + private_key_path = "{}/id_n2vc_rsa".format(sshdir) + return private_key_path + + async def GetPublicKey(self): + """Get the N2VC SSH public key.abs + + Returns the SSH public key, to be injected into virtual machines to + be managed by the VCA. + + The first time this is run, a ssh keypair will be created. The public + key is injected into a VM so that we can provision the machine with + Juju, after which Juju will communicate with the VM directly via the + juju agent. + """ + # public_key = "" + + # Find the path to where we expect our key to live. + homedir = os.environ["HOME"] + sshdir = "{}/.ssh".format(homedir) + if not os.path.exists(sshdir): + os.mkdir(sshdir) + + private_key_path = "{}/id_n2vc_rsa".format(sshdir) + public_key_path = "{}.pub".format(private_key_path) + + # If we don't have a key generated, generate it. + if not os.path.exists(private_key_path): + cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( + "rsa", "4096", private_key_path + ) + subprocess.check_output(shlex.split(cmd)) + + # Read the public key + with open(public_key_path, "r") as f: + public_key = f.readline() + + return public_key + + async def ExecuteInitialPrimitives( + self, model_name, application_name, params, callback=None, *callback_args + ): + """Execute multiple primitives. + + Execute multiple primitives as declared in initial-config-primitive. + This is useful in cases where the primitives initially failed -- for + example, if the charm is a proxy but the proxy hasn't been configured + yet. + """ + uuids = [] + primitives = {} + + # Build a sequential list of the primitives to execute + for primitive in params["initial-config-primitive"]: + try: + if primitive["name"] == "config": + pass + else: + seq = primitive["seq"] + + params_ = {} + if "parameter" in primitive: + params_ = primitive["parameter"] + + user_values = params.get("user_values", {}) + if "rw_mgmt_ip" not in user_values: + user_values["rw_mgmt_ip"] = None + # just for backward compatibility, because it will be provided + # always by modern version of LCM + + primitives[seq] = { + "name": primitive["name"], + "parameters": self._map_primitive_parameters( + params_, user_values + ), + } + + for primitive in sorted(primitives): + try: + # self.log.debug("Queuing action {}".format( + # primitives[primitive]['name'])) + uuids.append( + await self.ExecutePrimitive( + model_name, + application_name, + primitives[primitive]["name"], + callback, + callback_args, + **primitives[primitive]["parameters"], + ) + ) + except PrimitiveDoesNotExist as e: + self.log.debug( + "Ignoring exception PrimitiveDoesNotExist: {}".format(e) + ) + pass + except Exception as e: + self.log.debug( + ( + "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}" + ).format(e) + ) + raise e + + except N2VCPrimitiveExecutionFailed as e: + self.log.debug("[N2VC] Exception executing primitive: {}".format(e)) + raise + return uuids + + async def ExecutePrimitive( + self, + model_name, + application_name, + primitive, + callback, + *callback_args, + **params + ): + """Execute a primitive of a charm for Day 1 or Day 2 configuration. + + Execute a primitive defined in the VNF descriptor. + + :param str model_name: The name or unique id of the network service. + :param str application_name: The name of the application + :param str primitive: The name of the primitive to execute. + :param obj callback: A callback function to receive status changes. + :param tuple callback_args: A list of arguments to be passed to the + callback function. + :param dict params: A dictionary of key=value pairs representing the + primitive's parameters + Examples:: + { + 'rw_mgmt_ip': '1.2.3.4', + # Pass the initial-config-primitives section of the vnf or vdu + 'initial-config-primitives': {...} + } + """ + self.log.debug("Executing primitive={} params={}".format(primitive, params)) + uuid = None + try: + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + + if primitive == "config": + # config is special, and expecting params to be a dictionary + await self.set_config( + model, application_name, params["params"], + ) + else: + app = await self.get_application(model, application_name) + if app: + # Does this primitive exist? + actions = await app.get_actions() + + if primitive not in actions.keys(): + raise PrimitiveDoesNotExist( + "Primitive {} does not exist".format(primitive) + ) + + # Run against the first (and probably only) unit in the app + unit = app.units[0] + if unit: + action = await unit.run_action(primitive, **params) + uuid = action.id + except PrimitiveDoesNotExist as e: + # Catch and raise this exception if it's thrown from the inner block + raise e + except Exception as e: + # An unexpected exception was caught + self.log.debug("Caught exception while executing primitive: {}".format(e)) + raise N2VCPrimitiveExecutionFailed(e) + return uuid + + async def RemoveCharms( + self, model_name, application_name, callback=None, *callback_args + ): + """Remove a charm from the VCA. + + Remove a charm referenced in a VNF Descriptor. + + :param str model_name: The name of the network service. + :param str application_name: The name of the application + :param obj callback: A callback function to receive status changes. + :param tuple callback_args: A list of arguments to be passed to the + callback function. + """ + try: + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + app = await self.get_application(model, application_name) + if app: + # Remove this application from event monitoring + await self.Unsubscribe(model_name, application_name) + + # self.notify_callback(model_name, application_name, "removing", + # callback, *callback_args) + self.log.debug("Removing the application {}".format(application_name)) + await app.remove() + + # await self.disconnect_model(self.monitors[model_name]) + + self.notify_callback( + model_name, + application_name, + "removed", + "Removing charm {}".format(application_name), + callback, + *callback_args, + ) + + except Exception as e: + print("Caught exception: {}".format(e)) + self.log.debug(e) + raise e + + async def CreateNetworkService(self, ns_uuid): + """Create a new Juju model for the Network Service. + + Creates a new Model in the Juju Controller. + + :param str ns_uuid: A unique id representing an instaance of a + Network Service. + + :returns: True if the model was created. Raises JujuError on failure. + """ + if not self.authenticated: + await self.login() + + models = await self.controller.list_models() + if ns_uuid not in models: + # Get the new model + await self.get_model(ns_uuid) + + return True + + async def DestroyNetworkService(self, ns_uuid): + """Destroy a Network Service. + + Destroy the Network Service and any deployed charms. + + :param ns_uuid The unique id of the Network Service + + :returns: True if the model was created. Raises JujuError on failure. + """ + + # Do not delete the default model. The default model was used by all + # Network Services, prior to the implementation of a model per NS. + if ns_uuid.lower() == "default": + return False + + if not self.authenticated: + await self.login() + + models = await self.controller.list_models() + if ns_uuid in models: + model = await self.controller.get_model(ns_uuid) + + for application in model.applications: + app = model.applications[application] + + await self.RemoveCharms(ns_uuid, application) + + self.log.debug("Unsubscribing Watcher for {}".format(application)) + await self.Unsubscribe(ns_uuid, application) + + self.log.debug("Waiting for application to terminate") + timeout = 30 + try: + await model.block_until( + lambda: all( + unit.workload_status in ["terminated"] for unit in app.units + ), + timeout=timeout, + ) + except Exception: + self.log.debug( + "Timed out waiting for {} to terminate.".format(application) + ) + + for machine in model.machines: + try: + self.log.debug("Destroying machine {}".format(machine)) + await model.machines[machine].destroy(force=True) + except JujuAPIError as e: + if "does not exist" in str(e): + # Our cached model may be stale, because the machine + # has already been removed. It's safe to continue. + continue + else: + self.log.debug("Caught exception: {}".format(e)) + raise e + + # Disconnect from the Model + if ns_uuid in self.models: + self.log.debug("Disconnecting model {}".format(ns_uuid)) + # await self.disconnect_model(self.models[ns_uuid]) + await self.disconnect_model(ns_uuid) + + try: + self.log.debug("Destroying model {}".format(ns_uuid)) + await self.controller.destroy_models(ns_uuid) + except JujuError: + raise NetworkServiceDoesNotExist( + "The Network Service '{}' does not exist".format(ns_uuid) + ) + + return True + + async def GetMetrics(self, model_name, application_name): + """Get the metrics collected by the VCA. + + :param model_name The name or unique id of the network service + :param application_name The name of the application + """ + metrics = {} + model = await self.get_model(model_name) + app = await self.get_application(model, application_name) + if app: + metrics = await app.get_metrics() + + return metrics + + async def HasApplication(self, model_name, application_name): + model = await self.get_model(model_name) + app = await self.get_application(model, application_name) + if app: + return True + return False + + async def Subscribe(self, ns_name, application_name, callback, *callback_args): + """Subscribe to callbacks for an application. + + :param ns_name str: The name of the Network Service + :param application_name str: The name of the application + :param callback obj: The callback method + :param callback_args list: The list of arguments to append to calls to + the callback method + """ + self.monitors[ns_name].AddApplication( + application_name, callback, *callback_args + ) + + async def Unsubscribe(self, ns_name, application_name): + """Unsubscribe to callbacks for an application. + + Unsubscribes the caller from notifications from a deployed application. + + :param ns_name str: The name of the Network Service + :param application_name str: The name of the application + """ + self.monitors[ns_name].RemoveApplication(application_name,) + + # Non-public methods + async def add_relation(self, model_name, relation1, relation2): + """ + Add a relation between two application endpoints. + + :param str model_name: The name or unique id of the network service + :param str relation1: '[:]' + :param str relation2: '[:]' + """ + + if not self.authenticated: + await self.login() + + m = await self.get_model(model_name) + try: + await m.add_relation(relation1, relation2) + except JujuAPIError as e: + # If one of the applications in the relationship doesn't exist, + # or the relation has already been added, let the operation fail + # silently. + if "not found" in e.message: + return + if "already exists" in e.message: + return + + raise e + + # async def apply_config(self, config, application): + # """Apply a configuration to the application.""" + # print("JujuApi: Applying configuration to {}.".format( + # application + # )) + # return await self.set_config(application=application, config=config) + + def _get_config_from_dict(self, config_primitive, values): + """Transform the yang config primitive to dict. + + Expected result: + + config = { + 'config': + } + """ + config = {} + for primitive in config_primitive: + if primitive["name"] == "config": + # config = self._map_primitive_parameters() + for parameter in primitive["parameter"]: + param = str(parameter["name"]) + if parameter["value"] == "": + config[param] = str(values[parameter["value"]]) + else: + config[param] = str(parameter["value"]) + + return config + + def _map_primitive_parameters(self, parameters, user_values): + params = {} + for parameter in parameters: + param = str(parameter["name"]) + value = parameter.get("value") + + # map parameters inside a < >; e.g. . with the provided user + # _values. + # Must exist at user_values except if there is a default value + if isinstance(value, str) and value.startswith("<") and value.endswith(">"): + if parameter["value"][1:-1] in user_values: + value = user_values[parameter["value"][1:-1]] + elif "default-value" in parameter: + value = parameter["default-value"] + else: + raise KeyError( + "parameter {}='{}' not supplied ".format(param, value) + ) + + # If there's no value, use the default-value (if set) + if value is None and "default-value" in parameter: + value = parameter["default-value"] + + # Typecast parameter value, if present + paramtype = "string" + try: + if "data-type" in parameter: + paramtype = str(parameter["data-type"]).lower() + + if paramtype == "integer": + value = int(value) + elif paramtype == "boolean": + value = bool(value) + else: + value = str(value) + else: + # If there's no data-type, assume the value is a string + value = str(value) + except ValueError: + raise ValueError( + "parameter {}='{}' cannot be converted to type {}".format( + param, value, paramtype + ) + ) + + params[param] = value + return params + + def _get_config_from_yang(self, config_primitive, values): + """Transform the yang config primitive to dict.""" + config = {} + for primitive in config_primitive.values(): + if primitive["name"] == "config": + for parameter in primitive["parameter"].values(): + param = str(parameter["name"]) + if parameter["value"] == "": + config[param] = str(values[parameter["value"]]) + else: + config[param] = str(parameter["value"]) + + return config + + def FormatApplicationName(self, *args): + """ + Generate a Juju-compatible Application name + + :param args tuple: Positional arguments to be used to construct the + application name. + + Limitations:: + - Only accepts characters a-z and non-consequitive dashes (-) + - Application name should not exceed 50 characters + + Examples:: + + FormatApplicationName("ping_pong_ns", "ping_vnf", "a") + """ + appname = "" + for c in "-".join(list(args)): + if c.isdigit(): + c = chr(97 + int(c)) + elif not c.isalpha(): + c = "-" + appname += c + return re.sub("-+", "-", appname.lower()) + + # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0): + # """Format the name of the application + # + # Limitations: + # - Only accepts characters a-z and non-consequitive dashes (-) + # - Application name should not exceed 50 characters + # """ + # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index) + # new_name = '' + # for c in name: + # if c.isdigit(): + # c = chr(97 + int(c)) + # elif not c.isalpha(): + # c = "-" + # new_name += c + # return re.sub('\-+', '-', new_name.lower()) + + def format_model_name(self, name): + """Format the name of model. + + Model names may only contain lowercase letters, digits and hyphens + """ + + return name.replace("_", "-").lower() + + async def get_application(self, model, application): + """Get the deployed application.""" + if not self.authenticated: + await self.login() + + app = None + if application and model: + if model.applications: + if application in model.applications: + app = model.applications[application] + + return app + + async def get_model(self, model_name): + """Get a model from the Juju Controller. + + Note: Model objects returned must call disconnected() before it goes + out of scope.""" + if not self.authenticated: + await self.login() + + if model_name not in self.models: + # Get the models in the controller + models = await self.controller.list_models() + + if model_name not in models: + try: + self.models[model_name] = await self.controller.add_model( + model_name, config={"authorized-keys": self.juju_public_key} + ) + except JujuError as e: + if "already exists" not in e.message: + raise e + else: + self.models[model_name] = await self.controller.get_model(model_name) + + self.refcount["model"] += 1 + + # Create an observer for this model + await self.create_model_monitor(model_name) + + return self.models[model_name] + + async def create_model_monitor(self, model_name): + """Create a monitor for the model, if none exists.""" + if not self.authenticated: + await self.login() + + if model_name not in self.monitors: + self.monitors[model_name] = VCAMonitor(model_name) + self.models[model_name].add_observer(self.monitors[model_name]) + + return True + + async def login(self): + """Login to the Juju controller.""" + + if self.authenticated: + return + + self.connecting = True + + self.log.debug("JujuApi: Logging into controller") + + self.controller = Controller(loop=self.loop) + + if self.secret: + self.log.debug( + "Connecting to controller... ws://{} as {}/{}".format( + self.endpoint, self.user, self.secret, + ) + ) + try: + await self.controller.connect( + endpoint=self.endpoint, + username=self.user, + password=self.secret, + cacert=self.ca_cert, + ) + self.refcount["controller"] += 1 + self.authenticated = True + self.log.debug("JujuApi: Logged into controller") + except Exception as ex: + self.log.debug("Caught exception: {}".format(ex)) + else: + # current_controller no longer exists + # self.log.debug("Connecting to current controller...") + # await self.controller.connect_current() + # await self.controller.connect( + # endpoint=self.endpoint, + # username=self.user, + # cacert=cacert, + # ) + self.log.fatal("VCA credentials not configured.") + self.authenticated = False + + async def logout(self): + """Logout of the Juju controller.""" + if not self.authenticated: + return False + + try: + for model in self.models: + await self.disconnect_model(model) + + if self.controller: + self.log.debug("Disconnecting controller {}".format(self.controller)) + await self.controller.disconnect() + self.refcount["controller"] -= 1 + self.controller = None + + self.authenticated = False + + self.log.debug(self.refcount) + + except Exception as e: + self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e)) + raise e + return True + + async def disconnect_model(self, model): + self.log.debug("Disconnecting model {}".format(model)) + if model in self.models: + try: + await self.models[model].disconnect() + self.refcount["model"] -= 1 + self.models[model] = None + except Exception as e: + self.log.debug("Caught exception: {}".format(e)) + + async def provision_machine( + self, model_name: str, hostname: str, username: str, private_key_path: str + ) -> int: + """Provision a machine. + + This executes the SSH provisioner, which will log in to a machine via + SSH and prepare it for use with the Juju model + + :param model_name str: The name of the model + :param hostname str: The IP or hostname of the target VM + :param user str: The username to login to + :param private_key_path str: The path to the private key that's been injected + to the VM via cloud-init + :return machine_id int: Returns the id of the machine or None if provisioning + fails + """ + if not self.authenticated: + await self.login() + + machine_id = None + + if self.api_proxy: + self.log.debug( + "Instantiating SSH Provisioner for {}@{} ({})".format( + username, hostname, private_key_path + ) + ) + provisioner = SSHProvisioner( + host=hostname, + user=username, + private_key_path=private_key_path, + log=self.log, + ) + + params = None + try: + params = provisioner.provision_machine() + except Exception as ex: + self.log.debug("caught exception from provision_machine: {}".format(ex)) + return None + + if params: + params.jobs = ["JobHostUnits"] + + model = await self.get_model(model_name) + + connection = model.connection() + + # Submit the request. + self.log.debug("Adding machine to model") + client_facade = client.ClientFacade.from_connection(connection) + results = await client_facade.AddMachines(params=[params]) + error = results.machines[0].error + if error: + raise ValueError("Error adding machine: %s" % error.message) + + machine_id = results.machines[0].machine + + # Need to run this after AddMachines has been called, + # as we need the machine_id + self.log.debug("Installing Juju agent") + await provisioner.install_agent( + connection, params.nonce, machine_id, self.api_proxy, + ) + else: + self.log.debug("Missing API Proxy") + return machine_id + + # async def remove_application(self, name): + # """Remove the application.""" + # if not self.authenticated: + # await self.login() + # + # app = await self.get_application(name) + # if app: + # self.log.debug("JujuApi: Destroying application {}".format( + # name, + # )) + # + # await app.destroy() + + async def remove_relation(self, a, b): + """ + Remove a relation between two application endpoints + + :param a An application endpoint + :param b An application endpoint + """ + if not self.authenticated: + await self.login() + + # m = await self.get_model() + # try: + # m.remove_relation(a, b) + # finally: + # await m.disconnect() + + async def resolve_error(self, model_name, application=None): + """Resolve units in error state.""" + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + + app = await self.get_application(model, application) + if app: + self.log.debug( + "JujuApi: Resolving errors for application {}".format(application,) + ) + + for _ in app.units: + app.resolved(retry=True) + + async def run_action(self, model_name, application, action_name, **params): + """Execute an action and return an Action object.""" + if not self.authenticated: + await self.login() + result = {"status": "", "action": {"tag": None, "results": None}} + + model = await self.get_model(model_name) + + app = await self.get_application(model, application) + if app: + # We currently only have one unit per application + # so use the first unit available. + unit = app.units[0] + + self.log.debug( + "JujuApi: Running Action {} against Application {}".format( + action_name, application, + ) + ) + + action = await unit.run_action(action_name, **params) + + # Wait for the action to complete + await action.wait() + + result["status"] = action.status + result["action"]["tag"] = action.data["id"] + result["action"]["results"] = action.results + + return result + + async def set_config(self, model_name, application, config): + """Apply a configuration to the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(model_name, application) + if app: + self.log.debug( + "JujuApi: Setting config for Application {}".format(application,) + ) + await app.set_config(config) + + # Verify the config is set + newconf = await app.get_config() + for key in config: + if config[key] != newconf[key]["value"]: + self.log.debug( + ( + "JujuApi: Config not set! Key {} Value {} doesn't match {}" + ).format(key, config[key], newconf[key]) + ) + + # async def set_parameter(self, parameter, value, application=None): + # """Set a config parameter for a service.""" + # if not self.authenticated: + # await self.login() + # + # self.log.debug("JujuApi: Setting {}={} for Application {}".format( + # parameter, + # value, + # application, + # )) + # return await self.apply_config( + # {parameter: value}, + # application=application, + # ) + + async def wait_for_application(self, model_name, application_name, timeout=300): + """Wait for an application to become active.""" + if not self.authenticated: + await self.login() + + model = await self.get_model(model_name) + + app = await self.get_application(model, application_name) + self.log.debug("Application: {}".format(app)) + if app: + self.log.debug( + "JujuApi: Waiting {} seconds for Application {}".format( + timeout, application_name, + ) + ) + + await model.block_until( + lambda: all( + unit.agent_status == "idle" + and unit.workload_status in ["active", "unknown"] + for unit in app.units + ), + timeout=timeout, + ) diff --git a/requirements.txt b/requirements.txt index 4f467fb..0393d2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,6 @@ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common juju==2.8.2 +paramiko pyasn1>=0.4.4 kubernetes==10.0.1 diff --git a/setup.py b/setup.py index 7a6b451..0c874f1 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ setup( exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), install_requires=[ 'juju==2.8.2', + 'paramiko', 'pyasn1>=0.4.4', 'kubernetes==10.0.1' ], -- 2.25.1