From: David Garcia Date: Tue, 7 Jul 2020 08:34:33 +0000 (+0200) Subject: "Remove unused lines of code" X-Git-Tag: release-v9.0-start~22 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F42%2F9342%2F3;p=osm%2FN2VC.git "Remove unused lines of code" Change-Id: I02be0efe4918083d95a4017c898bfabaf269e779 Signed-off-by: David Garcia --- diff --git a/n2vc/juju_observer.py b/n2vc/juju_observer.py deleted file mode 100644 index 29ae932..0000000 --- a/n2vc/juju_observer.py +++ /dev/null @@ -1,318 +0,0 @@ -## -# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. -# This file is part of OSM -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: nfvlabs@tid.es -## - -import asyncio -import time - -from juju.action import Action -from juju.application import Application -from juju.machine import Machine -from juju.model import ModelObserver, Model - -from n2vc.exceptions import N2VCTimeoutException -from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status - - -class _Entity: - def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict): - self.entity_id = entity_id - self.entity_type = entity_type - self.obj = obj - self.event = asyncio.Event() - self.db_dict = db_dict - - -class JujuModelObserver(ModelObserver): - def __init__(self, n2vc: N2VCConnector, model: Model): - self.n2vc = n2vc - self.model = model - model.add_observer(self) - self.machines = dict() - self.applications = dict() - self.actions = dict() - - def register_machine(self, machine: Machine, db_dict: dict): - try: - entity_id = machine.entity_id - except Exception: - # no entity_id aatribute, try machine attribute - entity_id = machine.machine - # self.n2vc.debug( - # msg='Registering machine for change notifications: {}'.format(entity_id)) - entity = _Entity( - entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict - ) - self.machines[entity_id] = entity - - def unregister_machine(self, machine_id: str): - if machine_id in self.machines: - del self.machines[machine_id] - - def is_machine_registered(self, machine_id: str): - return machine_id in self.machines - - def register_application(self, application: Application, db_dict: dict): - entity_id = application.entity_id - # self.n2vc.debug( - # msg='Registering application for change notifications: {}'.format(entity_id)) - entity = _Entity( - entity_id=entity_id, - entity_type="application", - obj=application, - db_dict=db_dict, - ) - self.applications[entity_id] = entity - - def unregister_application(self, application_id: str): - if application_id in self.applications: - del self.applications[application_id] - - def is_application_registered(self, application_id: str): - return application_id in self.applications - - def register_action(self, action: Action, db_dict: dict): - entity_id = action.entity_id - # self.n2vc.debug( - # msg='Registering action for changes notifications: {}'.format(entity_id)) - entity = _Entity( - entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict - ) - self.actions[entity_id] = entity - - def unregister_action(self, action_id: str): - if action_id in self.actions: - del self.actions[action_id] - - def is_action_registered(self, action_id: str): - return action_id in self.actions - - async def wait_for_machine( - self, - machine_id: str, - progress_timeout: float = None, - total_timeout: float = None, - ) -> int: - - if not self.is_machine_registered(machine_id): - return - - self.n2vc.debug("Waiting for machine completed: {}".format(machine_id)) - - # wait for a final state - entity = self.machines[machine_id] - return await self._wait_for_entity( - entity=entity, - field_to_check="agent_status", - final_states_list=["started"], - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - async def wait_for_application( - self, - application_id: str, - progress_timeout: float = None, - total_timeout: float = None, - ) -> int: - - if not self.is_application_registered(application_id): - return - - self.n2vc.debug("Waiting for application completed: {}".format(application_id)) - - # application statuses: unknown, active, waiting - # wait for a final state - entity = self.applications[application_id] - return await self._wait_for_entity( - entity=entity, - field_to_check="status", - final_states_list=["active", "blocked"], - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - async def wait_for_action( - self, - action_id: str, - progress_timeout: float = None, - total_timeout: float = None, - ) -> int: - - if not self.is_action_registered(action_id): - return - - self.n2vc.debug("Waiting for action completed: {}".format(action_id)) - - # action statuses: pending, running, completed, failed, cancelled - # wait for a final state - entity = self.actions[action_id] - return await self._wait_for_entity( - entity=entity, - field_to_check="status", - final_states_list=["completed", "failed", "cancelled"], - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - async def _wait_for_entity( - self, - entity: _Entity, - field_to_check: str, - final_states_list: list, - progress_timeout: float = None, - total_timeout: float = None, - ) -> int: - - # default values for no timeout - if total_timeout is None: - total_timeout = 3600 - if progress_timeout is None: - progress_timeout = 3600 - - # max end time - now = time.time() - total_end = now + total_timeout - - if now >= total_end: - raise N2VCTimeoutException( - message="Total timeout {} seconds, {}: {}".format( - total_timeout, entity.entity_type, entity.entity_id - ), - timeout="total", - ) - - # update next progress timeout - progress_end = now + progress_timeout # type: float - - # which is closest? progress or end timeout? - closest_end = min(total_end, progress_end) - - next_timeout = closest_end - now - - retries = 0 - - while entity.obj.__getattribute__(field_to_check) not in final_states_list: - retries += 1 - if await _wait_for_event_or_timeout(entity.event, next_timeout): - entity.event.clear() - else: - message = "Progress timeout {} seconds, {}: {}".format( - progress_timeout, entity.entity_type, entity.entity_id - ) - self.n2vc.debug(message) - raise N2VCTimeoutException(message=message, timeout="progress") - # self.n2vc.debug('End of wait. Final state: {}, retries: {}' - # .format(entity.obj.__getattribute__(field_to_check), retries)) - return retries - - async def on_change(self, delta, old, new, model): - - if new is None: - return - - # log - # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}' - # .format(delta.type, delta.entity, new.entity_id)) - - if delta.entity == "machine": - - # check registered machine - if new.entity_id not in self.machines: - return - - # write change in database - await self.n2vc.write_app_status_to_db( - db_dict=self.machines[new.entity_id].db_dict, - status=juju_status_2_osm_status(delta.entity, new.agent_status), - detailed_status=new.status_message, - vca_status=new.status, - entity_type="machine", - ) - - # set event for this machine - self.machines[new.entity_id].event.set() - - elif delta.entity == "application": - - # check registered application - if new.entity_id not in self.applications: - return - - # write change in database - await self.n2vc.write_app_status_to_db( - db_dict=self.applications[new.entity_id].db_dict, - status=juju_status_2_osm_status(delta.entity, new.status), - detailed_status=new.status_message, - vca_status=new.status, - entity_type="application", - ) - - # set event for this application - self.applications[new.entity_id].event.set() - - elif delta.entity == "unit": - - # get the application for this unit - application_id = delta.data["application"] - - # check registered application - if application_id not in self.applications: - return - - # write change in database - if not new.dead: - await self.n2vc.write_app_status_to_db( - db_dict=self.applications[application_id].db_dict, - status=juju_status_2_osm_status(delta.entity, new.workload_status), - detailed_status=new.workload_status_message, - vca_status=new.workload_status, - entity_type="unit", - ) - - # set event for this application - self.applications[application_id].event.set() - - elif delta.entity == "action": - - # check registered action - if new.entity_id not in self.actions: - return - - # write change in database - await self.n2vc.write_app_status_to_db( - db_dict=self.actions[new.entity_id].db_dict, - status=juju_status_2_osm_status(delta.entity, new.status), - detailed_status=new.status, - vca_status=new.status, - entity_type="action", - ) - - # set event for this application - self.actions[new.entity_id].event.set() - - -async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None): - try: - await asyncio.wait_for(fut=event.wait(), timeout=timeout) - except asyncio.TimeoutError: - pass - return event.is_set() diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index 5e11b5f..b9c3002 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -499,37 +499,6 @@ class N2VCConnector(abc.ABC, Loggable): return JujuStatusToOSM[entity_type][status] -# DEPRECATED -def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: - if statustype == "application" or statustype == "unit": - if status in ["waiting", "maintenance"]: - return N2VCDeploymentStatus.RUNNING - if status in ["error"]: - return N2VCDeploymentStatus.FAILED - elif status in ["active"]: - return N2VCDeploymentStatus.COMPLETED - elif status in ["blocked"]: - return N2VCDeploymentStatus.RUNNING - else: - return N2VCDeploymentStatus.UNKNOWN - elif statustype == "action": - if status in ["running"]: - return N2VCDeploymentStatus.RUNNING - elif status in ["completed"]: - return N2VCDeploymentStatus.COMPLETED - else: - return N2VCDeploymentStatus.UNKNOWN - elif statustype == "machine": - if status in ["pending"]: - return N2VCDeploymentStatus.PENDING - elif status in ["started"]: - return N2VCDeploymentStatus.COMPLETED - else: - return N2VCDeploymentStatus.UNKNOWN - - return N2VCDeploymentStatus.FAILED - - def obj_to_yaml(obj: object) -> str: # dump to yaml dump_text = yaml.dump(obj, default_flow_style=False, indent=2) diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index 31bdd6e..fff78c9 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -26,29 +26,19 @@ import binascii import logging import os import re -import time - -from juju.action import Action -from juju.application import Application -from juju.client import client -from juju.controller import Controller -from juju.errors import JujuAPIError -from juju.machine import Machine -from juju.model import Model + from n2vc.exceptions import ( N2VCBadArgumentsException, N2VCException, N2VCConnectionException, N2VCExecutionException, N2VCInvalidCertificate, - N2VCNotFound, + # N2VCNotFound, MethodNotImplemented, JujuK8sProxycharmNotSupported, ) -from n2vc.juju_observer import JujuModelObserver from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml -from n2vc.provisioner import AsyncSSHProvisioner from n2vc.libjuju import Libjuju @@ -1034,714 +1024,6 @@ class N2VCJujuConnector(N2VCConnector): return N2VCJujuConnector._format_app_name(application_name) - async def _juju_create_machine( - self, - model_name: str, - application_name: str, - machine_id: str = None, - db_dict: dict = None, - progress_timeout: float = None, - total_timeout: float = None, - ) -> Machine: - - self.log.debug( - "creating machine in model: {}, existing machine id: {}".format( - model_name, machine_id - ) - ) - - # get juju model and observer (create model if needed) - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - # find machine id in model - machine = None - if machine_id is not None: - self.log.debug("Finding existing machine id {} in model".format(machine_id)) - # get juju existing machines in the model - existing_machines = await model.get_machines() - if machine_id in existing_machines: - self.log.debug( - "Machine id {} found in model (reusing it)".format(machine_id) - ) - machine = model.machines[machine_id] - - if machine is None: - self.log.debug("Creating a new machine in juju...") - # machine does not exist, create it and wait for it - machine = await model.add_machine( - spec=None, constraints=None, disks=None, series="xenial" - ) - - # register machine with observer - observer.register_machine(machine=machine, db_dict=db_dict) - - # id for the execution environment - ee_id = N2VCJujuConnector._build_ee_id( - model_name=model_name, - application_name=application_name, - machine_id=str(machine.entity_id), - ) - - # write ee_id in database - self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id) - - # wait for machine creation - await observer.wait_for_machine( - machine_id=str(machine.entity_id), - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - else: - - self.log.debug("Reusing old machine pending") - - # register machine with observer - observer.register_machine(machine=machine, db_dict=db_dict) - - # machine does exist, but it is in creation process (pending), wait for - # create finalisation - await observer.wait_for_machine( - machine_id=machine.entity_id, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - self.log.debug("Machine ready at " + str(machine.dns_name)) - return machine - - async def _juju_provision_machine( - self, - model_name: str, - hostname: str, - username: str, - private_key_path: str, - db_dict: dict = None, - progress_timeout: float = None, - total_timeout: float = None, - ) -> str: - - if not self.api_proxy: - msg = "Cannot provision machine: api_proxy is not defined" - self.log.error(msg=msg) - raise N2VCException(message=msg) - - self.log.debug( - "provisioning machine. model: {}, hostname: {}, username: {}".format( - model_name, hostname, username - ) - ) - - if not self._authenticated: - await self._juju_login() - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - # TODO check if machine is already provisioned - machine_list = await model.get_machines() - - provisioner = AsyncSSHProvisioner( - host=hostname, - user=username, - private_key_path=private_key_path, - log=self.log, - ) - - params = None - try: - params = await provisioner.provision_machine() - except Exception as ex: - msg = "Exception provisioning machine: {}".format(ex) - self.log.error(msg) - raise N2VCException(message=msg) - - params.jobs = ["JobHostUnits"] - - connection = model.connection() - - # Submit the request. - self.log.debug("Adding machine to model") - client_facade = client.ClientFacade.from_connection(connection) - results = await client_facade.AddMachines(params=[params]) - error = results.machines[0].error - if error: - msg = "Error adding machine: {}".format(error.message) - self.log.error(msg=msg) - raise ValueError(msg) - - machine_id = results.machines[0].machine - - # Need to run this after AddMachines has been called, - # as we need the machine_id - self.log.debug("Installing Juju agent into machine {}".format(machine_id)) - asyncio.ensure_future( - provisioner.install_agent( - connection=connection, - nonce=params.nonce, - machine_id=machine_id, - proxy=self.api_proxy, - ) - ) - - # wait for machine in model (now, machine is not yet in model, so we must - # wait for it) - machine = None - for _ in range(10): - machine_list = await model.get_machines() - if machine_id in machine_list: - self.log.debug("Machine {} found in model!".format(machine_id)) - machine = model.machines.get(machine_id) - break - await asyncio.sleep(2) - - if machine is None: - msg = "Machine {} not found in model".format(machine_id) - self.log.error(msg=msg) - raise Exception(msg) - - # register machine with observer - observer.register_machine(machine=machine, db_dict=db_dict) - - # wait for machine creation - self.log.debug("waiting for provision finishes... {}".format(machine_id)) - await observer.wait_for_machine( - machine_id=machine_id, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - - self.log.debug("Machine provisioned {}".format(machine_id)) - - return machine_id - - async def _juju_deploy_charm( - self, - model_name: str, - application_name: str, - charm_path: str, - machine_id: str, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - config: dict = None, - ) -> (Application, int): - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - # check if application already exists - application = None - if application_name in model.applications: - application = model.applications[application_name] - - if application is None: - - # application does not exist, create it and wait for it - self.log.debug( - "deploying application {} to machine {}, model {}".format( - application_name, machine_id, model_name - ) - ) - self.log.debug("charm: {}".format(charm_path)) - machine = model.machines[machine_id] - # series = None - application = await model.deploy( - entity_url=charm_path, - application_name=application_name, - channel="stable", - num_units=1, - series=machine.series, - to=machine_id, - config=config, - ) - - # register application with observer - observer.register_application(application=application, db_dict=db_dict) - - self.log.debug( - "waiting for application deployed... {}".format(application.entity_id) - ) - retries = await observer.wait_for_application( - application_id=application.entity_id, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - self.log.debug("application deployed") - - else: - - # register application with observer - observer.register_application(application=application, db_dict=db_dict) - - # application already exists, but not finalised - self.log.debug("application already exists, waiting for deployed...") - retries = await observer.wait_for_application( - application_id=application.entity_id, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - self.log.debug("application deployed") - - return application, retries - - async def _juju_execute_action( - self, - model_name: str, - application_name: str, - action_name: str, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - **kwargs - ) -> Action: - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - application = await self._juju_get_application( - model_name=model_name, application_name=application_name - ) - - unit = None - for u in application.units: - if await u.is_leader_from_status(): - unit = u - if unit is not None: - actions = await application.get_actions() - if action_name in actions: - self.log.debug( - 'executing action "{}" using params: {}'.format(action_name, kwargs) - ) - action = await unit.run_action(action_name, **kwargs) - - # register action with observer - observer.register_action(action=action, db_dict=db_dict) - - await observer.wait_for_action( - action_id=action.entity_id, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - self.log.debug("action completed with status: {}".format(action.status)) - output = await model.get_action_output(action_uuid=action.entity_id) - status = await model.get_action_status(uuid_or_prefix=action.entity_id) - if action.entity_id in status: - status = status[action.entity_id] - else: - status = "failed" - return output, status - - raise N2VCExecutionException( - message="Cannot execute action on charm", primitive_name=action_name - ) - - async def _juju_configure_application( - self, - model_name: str, - application_name: str, - config: dict, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - ): - - # get the application - application = await self._juju_get_application( - model_name=model_name, application_name=application_name - ) - - self.log.debug( - "configuring the application {} -> {}".format(application_name, config) - ) - res = await application.set_config(config) - self.log.debug( - "application {} configured. res={}".format(application_name, res) - ) - - # Verify the config is set - new_conf = await application.get_config() - for key in config: - value = new_conf[key]["value"] - self.log.debug(" {} = {}".format(key, value)) - if config[key] != value: - raise N2VCException( - message="key {} is not configured correctly {} != {}".format( - key, config[key], new_conf[key] - ) - ) - - # check if 'verify-ssh-credentials' action exists - # unit = application.units[0] - actions = await application.get_actions() - if "verify-ssh-credentials" not in actions: - msg = ( - "Action verify-ssh-credentials does not exist in application {}" - ).format(application_name) - self.log.debug(msg=msg) - return False - - # execute verify-credentials - num_retries = 20 - retry_timeout = 15.0 - for _ in range(num_retries): - try: - self.log.debug("Executing action verify-ssh-credentials...") - output, ok = await self._juju_execute_action( - model_name=model_name, - application_name=application_name, - action_name="verify-ssh-credentials", - db_dict=db_dict, - progress_timeout=progress_timeout, - total_timeout=total_timeout, - ) - self.log.debug("Result: {}, output: {}".format(ok, output)) - return True - except asyncio.CancelledError: - raise - except Exception as e: - self.log.debug( - "Error executing verify-ssh-credentials: {}. Retrying...".format(e) - ) - await asyncio.sleep(retry_timeout) - else: - self.log.error( - "Error executing verify-ssh-credentials after {} retries. ".format( - num_retries - ) - ) - return False - - async def _juju_get_application(self, model_name: str, application_name: str): - """Get the deployed application.""" - - model = await self._juju_get_model(model_name=model_name) - - application_name = N2VCJujuConnector._format_app_name(application_name) - - if model.applications and application_name in model.applications: - return model.applications[application_name] - else: - raise N2VCException( - message="Cannot get application {} from model {}".format( - application_name, model_name - ) - ) - - async def _juju_get_model(self, model_name: str) -> Model: - """ Get a model object from juju controller - If the model does not exits, it creates it. - - :param str model_name: name of the model - :returns Model: model obtained from juju controller or Exception - """ - - # format model name - model_name = N2VCJujuConnector._format_model_name(model_name) - - if model_name in self.juju_models: - return self.juju_models[model_name] - - if self._creating_model: - self.log.debug("Another coroutine is creating a model. Wait...") - while self._creating_model: - # another coroutine is creating a model, wait - await asyncio.sleep(0.1) - # retry (perhaps another coroutine has created the model meanwhile) - if model_name in self.juju_models: - return self.juju_models[model_name] - - try: - self._creating_model = True - - # get juju model names from juju - model_list = await self.controller.list_models() - if model_name not in model_list: - self.log.info( - "Model {} does not exist. Creating new model...".format(model_name) - ) - config_dict = {"authorized-keys": self.public_key} - if self.apt_mirror: - config_dict["apt-mirror"] = self.apt_mirror - if not self.enable_os_upgrade: - config_dict["enable-os-refresh-update"] = False - config_dict["enable-os-upgrade"] = False - if self.cloud in self.BUILT_IN_CLOUDS: - model = await self.controller.add_model( - model_name=model_name, - config=config_dict, - cloud_name=self.cloud, - ) - else: - model = await self.controller.add_model( - model_name=model_name, - config=config_dict, - cloud_name=self.cloud, - credential_name=self.cloud, - ) - self.log.info("New model created, name={}".format(model_name)) - else: - self.log.debug( - "Model already exists in juju. Getting model {}".format(model_name) - ) - model = await self.controller.get_model(model_name) - self.log.debug("Existing model in juju, name={}".format(model_name)) - - self.juju_models[model_name] = model - self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model) - return model - - except Exception as e: - msg = "Cannot get model {}. Exception: {}".format(model_name, e) - self.log.error(msg) - raise N2VCException(msg) - finally: - self._creating_model = False - - async def _juju_add_relation( - self, - model_name: str, - application_name_1: str, - application_name_2: str, - relation_1: str, - relation_2: str, - ): - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - - r1 = "{}:{}".format(application_name_1, relation_1) - r2 = "{}:{}".format(application_name_2, relation_2) - - self.log.debug("adding relation: {} -> {}".format(r1, r2)) - try: - await model.add_relation(relation1=r1, relation2=r2) - except JujuAPIError as e: - # If one of the applications in the relationship doesn't exist, or the - # relation has already been added, - # let the operation fail silently. - if "not found" in e.message: - return - if "already exists" in e.message: - return - # another execption, raise it - raise e - - async def _juju_destroy_application(self, model_name: str, application_name: str): - - self.log.debug( - "Destroying application {} in model {}".format(application_name, model_name) - ) - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - application = model.applications.get(application_name) - if application: - observer.unregister_application(application_name) - await application.destroy() - else: - self.log.debug("Application not found: {}".format(application_name)) - - async def _juju_destroy_machine( - self, model_name: str, machine_id: str, total_timeout: float = None - ): - - self.log.debug( - "Destroying machine {} in model {}".format(machine_id, model_name) - ) - - if total_timeout is None: - total_timeout = 3600 - - # get juju model and observer - model = await self._juju_get_model(model_name=model_name) - observer = self.juju_observers[model_name] - - machines = await model.get_machines() - if machine_id in machines: - machine = model.machines[machine_id] - observer.unregister_machine(machine_id) - # TODO: change this by machine.is_manual when this is upstreamed: - # https://github.com/juju/python-libjuju/pull/396 - if "instance-id" in machine.safe_data and machine.safe_data[ - "instance-id" - ].startswith("manual:"): - self.log.debug("machine.destroy(force=True) started.") - await machine.destroy(force=True) - self.log.debug("machine.destroy(force=True) passed.") - # max timeout - end = time.time() + total_timeout - # wait for machine removal - machines = await model.get_machines() - while machine_id in machines and time.time() < end: - self.log.debug( - "Waiting for machine {} is destroyed".format(machine_id) - ) - await asyncio.sleep(0.5) - machines = await model.get_machines() - self.log.debug("Machine destroyed: {}".format(machine_id)) - else: - self.log.debug("Machine not found: {}".format(machine_id)) - - async def _juju_destroy_model(self, model_name: str, total_timeout: float = None): - - self.log.debug("Destroying model {}".format(model_name)) - - if total_timeout is None: - total_timeout = 3600 - end = time.time() + total_timeout - - model = await self._juju_get_model(model_name=model_name) - - if not model: - raise N2VCNotFound(message="Model {} does not exist".format(model_name)) - - uuid = model.info.uuid - - # destroy applications - for application_name in model.applications: - try: - await self._juju_destroy_application( - model_name=model_name, application_name=application_name - ) - except Exception as e: - self.log.error( - "Error destroying application {} in model {}: {}".format( - application_name, model_name, e - ) - ) - - # destroy machines - machines = await model.get_machines() - for machine_id in machines: - try: - await self._juju_destroy_machine( - model_name=model_name, machine_id=machine_id - ) - except asyncio.CancelledError: - raise - except Exception: - # ignore exceptions destroying machine - pass - - await self._juju_disconnect_model(model_name=model_name) - - self.log.debug("destroying model {}...".format(model_name)) - await self.controller.destroy_model(uuid) - # self.log.debug('model destroy requested {}'.format(model_name)) - - # wait for model is completely destroyed - self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) - last_exception = "" - while time.time() < end: - try: - # await self.controller.get_model(uuid) - models = await self.controller.list_models() - if model_name not in models: - self.log.debug( - "The model {} ({}) was destroyed".format(model_name, uuid) - ) - return - except asyncio.CancelledError: - raise - except Exception as e: - last_exception = e - await asyncio.sleep(5) - raise N2VCException( - "Timeout waiting for model {} to be destroyed {}".format( - model_name, last_exception - ) - ) - - async def _juju_login(self): - """Connect to juju controller - - """ - - # if already authenticated, exit function - if self._authenticated: - return - - # if connecting, wait for finish - # another task could be trying to connect in parallel - while self._connecting: - await asyncio.sleep(0.1) - - # double check after other task has finished - if self._authenticated: - return - - try: - self._connecting = True - self.log.info( - "connecting to juju controller: {} {}:{}{}".format( - self.url, - self.username, - self.secret[:8] + "...", - " with ca_cert" if self.ca_cert else "", - ) - ) - - # Create controller object - self.controller = Controller(loop=self.loop) - # Connect to controller - await self.controller.connect( - endpoint=self.url, - username=self.username, - password=self.secret, - cacert=self.ca_cert, - ) - self._authenticated = True - self.log.info("juju controller connected") - except Exception as e: - message = "Exception connecting to juju: {}".format(e) - self.log.error(message) - raise N2VCConnectionException(message=message, url=self.url) - finally: - self._connecting = False - - async def _juju_logout(self): - """Logout of the Juju controller.""" - if not self._authenticated: - return False - - # disconnect all models - for model_name in self.juju_models: - try: - await self._juju_disconnect_model(model_name) - except Exception as e: - self.log.error( - "Error disconnecting model {} : {}".format(model_name, e) - ) - # continue with next model... - - self.log.info("Disconnecting controller") - try: - await self.controller.disconnect() - except Exception as e: - raise N2VCConnectionException( - message="Error disconnecting controller: {}".format(e), url=self.url - ) - - self.controller = None - self._authenticated = False - self.log.info("disconnected") - - async def _juju_disconnect_model(self, model_name: str): - self.log.debug("Disconnecting model {}".format(model_name)) - if model_name in self.juju_models: - await self.juju_models[model_name].disconnect() - self.juju_models[model_name] = None - self.juju_observers[model_name] = None - else: - self.warning("Cannot disconnect model: {}".format(model_name)) - def _create_juju_public_key(self): """Recreate the Juju public key on lcm container, if needed Certain libjuju commands expect to be run from the same machine as Juju diff --git a/n2vc/provisioner.py b/n2vc/provisioner.py index fea7a12..c4d8b5b 100644 --- a/n2vc/provisioner.py +++ b/n2vc/provisioner.py @@ -14,15 +14,11 @@ import logging import os import re -import shlex from subprocess import CalledProcessError import tempfile -import time import uuid from juju.client import client -import n2vc.exceptions -import paramiko import asyncio arches = [ @@ -344,366 +340,3 @@ class AsyncSSHProvisioner: return await self._ssh( "{} /bin/bash {}".format("sudo" if root else "", tmpFile) ) - - -class SSHProvisioner: - """Provision a manually created machine via SSH.""" - - def __init__(self, user, host, private_key_path, log=None): - - self.host = host - self.user = user - self.private_key_path = private_key_path - - if log: - self.log = log - else: - self.log = logging.getLogger(__name__) - - def _get_ssh_client(self, host=None, user=None, private_key_path=None): - """Return a connected Paramiko ssh object. - - :param str host: The host to connect to. - :param str user: The user to connect as. - :param str key: The private key to authenticate with. - - :return: object: A paramiko.SSHClient - :raises: :class:`paramiko.ssh_exception.SSHException` if the - connection failed - """ - - if not host: - host = self.host - - if not user: - user = self.user - - if not private_key_path: - private_key_path = self.private_key_path - - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - pkey = None - - # Read the private key into a paramiko.RSAKey - if os.path.exists(private_key_path): - with open(private_key_path, "r") as f: - pkey = paramiko.RSAKey.from_private_key(f) - - ####################################################################### - # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where # - # the server may not send the SSH_MSG_USERAUTH_BANNER message except # - # when responding to an auth_none request. For example, paramiko will # - # attempt to use password authentication when a password is set, but # - # the server could deny that, instead requesting keyboard-interactive.# - # The hack to workaround this is to attempt a reconnect, which will # - # receive the right banner, and authentication can proceed. See the # - # following for more info: # - # https://github.com/paramiko/paramiko/issues/432 # - # https://github.com/paramiko/paramiko/pull/438 # - ####################################################################### - - retry = 10 - attempts = 0 - delay = 15 - while attempts <= retry: - try: - attempts += 1 - - # Attempt to establish a SSH connection - ssh.connect( - host, - port=22, - username=user, - pkey=pkey, - # allow_agent=False, - # look_for_keys=False, - ) - break - except paramiko.ssh_exception.SSHException as e: - if "Error reading SSH protocol banner" == str(e): - # Once more, with feeling - ssh.connect(host, port=22, username=user, pkey=pkey) - else: - # Reraise the original exception - self.log.debug("Unhandled exception caught: {}".format(e)) - raise e - except Exception as e: - if "Unable to connect to port" in str(e): - self.log.debug( - "Waiting for VM to boot, sleeping {} seconds".format(delay) - ) - if attempts > retry: - raise e - else: - time.sleep(delay) - # Slowly back off the retry - delay += 15 - else: - self.log.debug(e) - raise e - return ssh - - def _run_command(self, ssh, cmd, pty=True): - """Run a command remotely via SSH. - - :param object ssh: The SSHClient - :param str cmd: The command to execute - :param list cmd: The `shlex.split` command to execute - :param bool pty: Whether to allocate a pty - - :return: tuple: The stdout and stderr of the command execution - :raises: :class:`CalledProcessError` if the command fails - """ - - if isinstance(cmd, str): - cmd = shlex.split(cmd) - - if type(cmd) is not list: - cmd = [cmd] - - cmds = " ".join(cmd) - _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty) - retcode = stdout.channel.recv_exit_status() - - if retcode > 0: - output = stderr.read().strip() - raise CalledProcessError(returncode=retcode, cmd=cmd, output=output) - return ( - stdout.read().decode("utf-8").strip(), - stderr.read().decode("utf-8").strip(), - ) - - def _init_ubuntu_user(self): - """Initialize the ubuntu user. - - :return: bool: If the initialization was successful - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the authentication fails - """ - ssh = None - try: - # Run w/o allocating a pty, so we fail if sudo prompts for a passwd - ssh = self._get_ssh_client() - self._run_command(ssh, "sudo -n true", pty=False) - except paramiko.ssh_exception.AuthenticationException: - raise n2vc.exceptions.AuthenticationFailed(self.user) - except paramiko.ssh_exception.NoValidConnectionsError: - raise n2vc.exceptions.NoRouteToHost(self.host) - finally: - if ssh: - ssh.close() - - # Infer the public key - public_key_path = "{}.pub".format(self.private_key_path) - - if not os.path.exists(public_key_path): - raise FileNotFoundError( - "Public key '{}' doesn't exist.".format(public_key_path) - ) - - with open(public_key_path, "r") as f: - public_key = f.readline() - - script = INITIALIZE_UBUNTU_SCRIPT.format(public_key) - - try: - ssh = self._get_ssh_client() - - self._run_command( - ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True - ) - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - ssh.close() - - return True - - def _detect_hardware_and_os(self, ssh): - """Detect the target hardware capabilities and OS series. - - :param object ssh: The SSHClient - :return: str: A raw string containing OS and hardware information. - """ - - info = { - "series": "", - "arch": "", - "cpu-cores": "", - "mem": "", - } - - stdout, _ = self._run_command( - ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True, - ) - - lines = stdout.split("\n") - - # Remove extraneous line if DNS resolution of hostname famils - # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out - if "unable to resolve host" in lines[0]: - lines = lines[1:] - - info["series"] = lines[0].strip() - info["arch"] = normalize_arch(lines[1].strip()) - - memKb = re.split(r"\s+", lines[2])[1] - - # Convert megabytes -> kilobytes - info["mem"] = round(int(memKb) / 1024) - - # Detect available CPUs - recorded = {} - for line in lines[3:]: - physical_id = "" - - if line.find("physical id") == 0: - physical_id = line.split(":")[1].strip() - elif line.find("cpu cores") == 0: - cores = line.split(":")[1].strip() - - if physical_id not in recorded.keys(): - info["cpu-cores"] += cores - recorded[physical_id] = True - - return info - - def provision_machine(self): - """Perform the initial provisioning of the target machine. - - :return: bool: The client.AddMachineParams - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the upload fails - """ - params = client.AddMachineParams() - - if self._init_ubuntu_user(): - try: - ssh = self._get_ssh_client() - - hw = self._detect_hardware_and_os(ssh) - params.series = hw["series"] - params.instance_id = "manual:{}".format(self.host) - params.nonce = "manual:{}:{}".format( - self.host, str(uuid.uuid4()), - ) # a nop for Juju w/manual machines - params.hardware_characteristics = { - "arch": hw["arch"], - "mem": int(hw["mem"]), - "cpu-cores": int(hw["cpu-cores"]), - } - params.addresses = [ - {"value": self.host, "type": "ipv4", "scope": "public"} - ] - - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - ssh.close() - - return params - - async def install_agent(self, connection, nonce, machine_id, api): - """ - :param object connection: Connection to Juju API - :param str nonce: The nonce machine specification - :param str machine_id: The id assigned to the machine - - :return: bool: If the initialization was successful - """ - # The path where the Juju agent should be installed. - data_dir = "/var/lib/juju" - - # Disabling this prevents `apt-get update` from running initially, so - # charms will fail to deploy - disable_package_commands = False - - client_facade = client.ClientFacade.from_connection(connection) - results = await client_facade.ProvisioningScript( - data_dir=data_dir, - disable_package_commands=disable_package_commands, - machine_id=machine_id, - nonce=nonce, - ) - - """Get the IP of the controller - - Parse the provisioning script, looking for the first apiaddress. - - Example: - apiaddresses: - - 10.195.8.2:17070 - - 127.0.0.1:17070 - - '[::1]:17070' - """ - m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) - apiaddress = m.group(1) - - """Add IP Table rule - - In order to route the traffic to the private ip of the Juju controller - we use a DNAT rule to tell the machine that the destination for the - private address is the public address of the machine where the Juju - controller is running in LXD. That machine will have a complimentary - iptables rule, routing traffic to the appropriate LXD container. - """ - - script = IPTABLES_SCRIPT.format(apiaddress, api) - - # Run this in a retry loop, because dpkg may be running and cause the - # script to fail. - retry = 10 - attempts = 0 - delay = 15 - - while attempts <= retry: - try: - attempts += 1 - - self._run_configure_script(script) - break - except Exception as e: - self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) - if attempts > retry: - raise e - else: - time.sleep(delay) - # Slowly back off the retry - delay += 15 - - # self.log.debug("Running configure script") - self._run_configure_script(results.script) - # self.log.debug("Configure script finished") - - def _run_configure_script(self, script: str): - """Run the script to install the Juju agent on the target machine. - - :param str script: The script returned by the ProvisioningScript API - :raises: :class:`paramiko.ssh_exception.AuthenticationException` - if the upload fails - """ - _, tmpFile = tempfile.mkstemp() - with open(tmpFile, "w") as f: - f.write(script) - try: - # get ssh client - ssh = self._get_ssh_client(user="ubuntu",) - - # copy the local copy of the script to the remote machine - sftp = paramiko.SFTPClient.from_transport(ssh.get_transport()) - sftp.put( - tmpFile, tmpFile, - ) - - # run the provisioning script - self._run_command( - ssh, "sudo /bin/bash {}".format(tmpFile), - ) - - except paramiko.ssh_exception.AuthenticationException as e: - raise e - finally: - os.remove(tmpFile) - ssh.close() diff --git a/n2vc/tests/unit/test_juju_observer.py b/n2vc/tests/unit/test_juju_observer.py deleted file mode 100644 index f40824e..0000000 --- a/n2vc/tests/unit/test_juju_observer.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2020 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import asyncio -from unittest import mock -from unittest.mock import Mock - -import asynctest - -from n2vc.exceptions import N2VCTimeoutException -from n2vc.juju_observer import JujuModelObserver, _Entity - - -class FakeObject: - def __init__(self): - self.complete = True - - -class JujuModelObserverTest(asynctest.TestCase): - def setUp(self): - self.n2vc = Mock() - self.model = Mock() - self.juju_observer = JujuModelObserver(n2vc=self.n2vc, model=self.model) - self.loop = asyncio.new_event_loop() - - def test_wait_no_retries(self): - obj = FakeObject() - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - result = self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=None, - total_timeout=None, - ) - ) - self.assertEqual(result, 0) - - @mock.patch("n2vc.juju_observer.asyncio.wait_for") - def test_wait_default_values(self, wait_for): - wait_for.return_value = asyncio.Future() - wait_for.return_value.set_result(None) - obj = FakeObject() - obj.complete = False - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=None, - total_timeout=None, - ) - ) - wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) - - @mock.patch("n2vc.juju_observer.asyncio.wait_for") - def test_wait_default_progress(self, wait_for): - wait_for.return_value = asyncio.Future() - wait_for.return_value.set_result(None) - obj = FakeObject() - obj.complete = False - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=4000, - total_timeout=None, - ) - ) - wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) - - @mock.patch("n2vc.juju_observer.asyncio.wait_for") - def test_wait_default_total(self, wait_for): - wait_for.return_value = asyncio.Future() - wait_for.return_value.set_result(None) - obj = FakeObject() - obj.complete = False - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=None, - total_timeout=4000.0, - ) - ) - wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0) - - @mock.patch("n2vc.juju_observer.asyncio.wait_for") - def test_wait_total_less_than_progress_timeout(self, wait_for): - wait_for.return_value = asyncio.Future() - wait_for.return_value.set_result(None) - obj = FakeObject() - obj.complete = False - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=4500.0, - total_timeout=3000.0, - ) - ) - wait_for.assert_called_once_with(fut=mock.ANY, timeout=3000.0) - - @mock.patch("n2vc.juju_observer.asyncio.wait_for") - def test_wait_progress_less_than_total_timeout(self, wait_for): - wait_for.return_value = asyncio.Future() - wait_for.return_value.set_result(None) - obj = FakeObject() - obj.complete = False - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=1500.0, - total_timeout=3000.0, - ) - ) - wait_for.assert_called_once_with(fut=mock.ANY, timeout=1500.0) - - def test_wait_negative_timeout(self): - obj = FakeObject() - entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={}) - with self.assertRaises(N2VCTimeoutException): - self.loop.run_until_complete( - self.juju_observer._wait_for_entity( - entity=entity, - field_to_check="complete", - final_states_list=[True], - progress_timeout=None, - total_timeout=-1000, - ) - ) diff --git a/n2vc/tests/unit/test_provisioner.py b/n2vc/tests/unit/test_provisioner.py index 880c5cb..a4572b3 100644 --- a/n2vc/tests/unit/test_provisioner.py +++ b/n2vc/tests/unit/test_provisioner.py @@ -12,147 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest import TestCase, mock - -from mock import mock_open -from n2vc.provisioner import SSHProvisioner -from paramiko.ssh_exception import SSHException +from unittest import TestCase class ProvisionerTest(TestCase): def setUp(self): - self.provisioner = SSHProvisioner(None, None, None) - - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client(self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os): - mock_instance = mock_sshclient.return_value - sshclient = self.provisioner._get_ssh_client() - self.assertEqual(mock_instance, sshclient) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") - - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client_no_connection( - self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os - ): - - mock_instance = mock_sshclient.return_value - mock_instance.method_inside_someobject.side_effect = ["something"] - mock_instance.connect.side_effect = SSHException() - - self.assertRaises(SSHException, self.provisioner._get_ssh_client) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") - - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client_bad_banner( - self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os - ): - - mock_instance = mock_sshclient.return_value - mock_instance.method_inside_someobject.side_effect = ["something"] - mock_instance.connect.side_effect = [ - SSHException("Error reading SSH protocol banner"), - None, - None, - ] - - sshclient = self.provisioner._get_ssh_client() - self.assertEqual(mock_instance, sshclient) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual( - 3, mock_instance.connect.call_count, "Should attempt 3 connections" - ) - - @mock.patch("time.sleep", autospec=True) - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client_unable_to_connect( - self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep - ): - - mock_instance = mock_sshclient.return_value - mock_instance.connect.side_effect = Exception("Unable to connect to port") - - self.assertRaises(Exception, self.provisioner._get_ssh_client) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual( - 11, mock_instance.connect.call_count, "Should attempt 11 connections" - ) - - @mock.patch("time.sleep", autospec=True) - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client_unable_to_connect_once( - self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep - ): - - mock_instance = mock_sshclient.return_value - mock_instance.connect.side_effect = [ - Exception("Unable to connect to port"), - None, - ] - - sshclient = self.provisioner._get_ssh_client() - self.assertEqual(mock_instance, sshclient) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual( - 2, mock_instance.connect.call_count, "Should attempt 2 connections" - ) - - @mock.patch("n2vc.provisioner.os.path.exists") - @mock.patch("n2vc.provisioner.paramiko.RSAKey") - @mock.patch("n2vc.provisioner.paramiko.SSHClient") - @mock.patch("builtins.open", new_callable=mock_open, read_data="data") - def test__get_ssh_client_other_exception( - self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os - ): - - mock_instance = mock_sshclient.return_value - mock_instance.connect.side_effect = Exception() - - self.assertRaises(Exception, self.provisioner._get_ssh_client) - self.assertEqual( - 1, - mock_instance.set_missing_host_key_policy.call_count, - "Missing host key call count", - ) - self.assertEqual( - 1, mock_instance.connect.call_count, "Should only attempt 1 connection" - ) - - -# + pass diff --git a/n2vc/vnf.py b/n2vc/vnf.py deleted file mode 100644 index 4e46746..0000000 --- a/n2vc/vnf.py +++ /dev/null @@ -1,1619 +0,0 @@ -# Copyright 2019 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import base64 -import binascii -import logging -import os.path -import re -import shlex -import ssl -import subprocess - -from juju.client import client -from juju.controller import Controller -from juju.errors import JujuAPIError, JujuError -from juju.model import ModelObserver - -import n2vc.exceptions -from n2vc.provisioner import SSHProvisioner - - -# import time -# FIXME: this should load the juju inside or modules without having to -# explicitly install it. Check why it's not working. -# Load our subtree of the juju library -# path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -# path = os.path.join(path, "modules/libjuju/") -# if path not in sys.path: -# sys.path.insert(1, path) -# We might need this to connect to the websocket securely, but test and verify. -try: - ssl._create_default_https_context = ssl._create_unverified_context -except AttributeError: - # Legacy Python doesn't verify by default (see pep-0476) - # https://www.python.org/dev/peps/pep-0476/ - pass - - -# Custom exceptions -# Deprecated. Please use n2vc.exceptions namespace. -class JujuCharmNotFound(Exception): - """The Charm can't be found or is not readable.""" - - -class JujuApplicationExists(Exception): - """The Application already exists.""" - - -class N2VCPrimitiveExecutionFailed(Exception): - """Something failed while attempting to execute a primitive.""" - - -class NetworkServiceDoesNotExist(Exception): - """The Network Service being acted against does not exist.""" - - -class PrimitiveDoesNotExist(Exception): - """The Primitive being executed does not exist.""" - - -# Quiet the debug logging -logging.getLogger("websockets.protocol").setLevel(logging.INFO) -logging.getLogger("juju.client.connection").setLevel(logging.WARN) -logging.getLogger("juju.model").setLevel(logging.WARN) -logging.getLogger("juju.machine").setLevel(logging.WARN) - - -class VCAMonitor(ModelObserver): - """Monitor state changes within the Juju Model.""" - - log = None - - def __init__(self, ns_name): - self.log = logging.getLogger(__name__) - - self.ns_name = ns_name - self.applications = {} - - def AddApplication(self, application_name, callback, *callback_args): - if application_name not in self.applications: - self.applications[application_name] = { - "callback": callback, - "callback_args": callback_args, - } - - def RemoveApplication(self, application_name): - if application_name in self.applications: - del self.applications[application_name] - - async def on_change(self, delta, old, new, model): - """React to changes in the Juju model.""" - - if delta.entity == "unit": - # Ignore change events from other applications - if delta.data["application"] not in self.applications.keys(): - return - - try: - - application_name = delta.data["application"] - - callback = self.applications[application_name]["callback"] - callback_args = self.applications[application_name]["callback_args"] - - if old and new: - # Fire off a callback with the application state - if callback: - callback( - self.ns_name, - delta.data["application"], - new.workload_status, - new.workload_status_message, - *callback_args, - ) - - if old and not new: - # This is a charm being removed - if callback: - callback( - self.ns_name, - delta.data["application"], - "removed", - "", - *callback_args, - ) - except Exception as e: - self.log.debug("[1] notify_callback exception: {}".format(e)) - - elif delta.entity == "action": - # TODO: Decide how we want to notify the user of actions - - # uuid = delta.data['id'] # The Action's unique id - # msg = delta.data['message'] # The output of the action - # - # if delta.data['status'] == "pending": - # # The action is queued - # pass - # elif delta.data['status'] == "completed"" - # # The action was successful - # pass - # elif delta.data['status'] == "failed": - # # The action failed. - # pass - - pass - - -######## -# TODO -# -# Create unique models per network service -# Document all public functions - - -class N2VC: - def __init__( - self, - log=None, - server="127.0.0.1", - port=17070, - user="admin", - secret=None, - artifacts=None, - loop=None, - juju_public_key=None, - ca_cert=None, - api_proxy=None, - ): - """Initialize N2VC - - Initializes the N2VC object, allowing the caller to interoperate with the VCA. - - - :param log obj: The logging object to log to - :param server str: The IP Address or Hostname of the Juju controller - :param port int: The port of the Juju Controller - :param user str: The Juju username to authenticate with - :param secret str: The Juju password to authenticate with - :param artifacts str: The directory where charms required by a vnfd are - stored. - :param loop obj: The loop to use. - :param juju_public_key str: The contents of the Juju public SSH key - :param ca_cert str: The CA certificate to use to authenticate - :param api_proxy str: The IP of the host machine - - :Example: - client = n2vc.vnf.N2VC( - log=log, - server='10.1.1.28', - port=17070, - user='admin', - secret='admin', - artifacts='/app/storage/myvnf/charms', - loop=loop, - juju_public_key='', - ca_cert='', - api_proxy='192.168.1.155' - ) - """ - - # Initialize instance-level variables - self.api = None - self.log = None - self.controller = None - self.connecting = False - self.authenticated = False - self.api_proxy = api_proxy - - if log: - self.log = log - else: - self.log = logging.getLogger(__name__) - - # For debugging - self.refcount = { - "controller": 0, - "model": 0, - } - - self.models = {} - - # Model Observers - self.monitors = {} - - # VCA config - self.hostname = "" - self.port = 17070 - self.username = "" - self.secret = "" - - self.juju_public_key = juju_public_key - if juju_public_key: - self._create_juju_public_key(juju_public_key) - else: - self.juju_public_key = "" - - # TODO: Verify ca_cert is valid before using. VCA will crash - # if the ca_cert isn't formatted correctly. - def base64_to_cacert(b64string): - """Convert the base64-encoded string containing the VCA CACERT. - - The input string.... - - """ - try: - cacert = base64.b64decode(b64string).decode("utf-8") - - cacert = re.sub(r"\\n", r"\n", cacert,) - except binascii.Error as e: - self.log.debug("Caught binascii.Error: {}".format(e)) - raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate") - - return cacert - - self.ca_cert = None - if ca_cert: - self.ca_cert = base64_to_cacert(ca_cert) - - # Quiet websocket traffic - logging.getLogger("websockets.protocol").setLevel(logging.INFO) - logging.getLogger("juju.client.connection").setLevel(logging.WARN) - logging.getLogger("model").setLevel(logging.WARN) - # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG) - - self.log.debug("JujuApi: instantiated") - - self.server = server - self.port = port - - self.secret = secret - if user.startswith("user-"): - self.user = user - else: - self.user = "user-{}".format(user) - - self.endpoint = "%s:%d" % (server, int(port)) - - self.artifacts = artifacts - - self.loop = loop or asyncio.get_event_loop() - - def __del__(self): - """Close any open connections.""" - yield self.logout() - - def _create_juju_public_key(self, public_key): - """Recreate the Juju public key on disk. - - Certain libjuju commands expect to be run from the same machine as Juju - is bootstrapped to. This method will write the public key to disk in - that location: ~/.local/share/juju/ssh/juju_id_rsa.pub - """ - # Make sure that we have a public key before writing to disk - if public_key is None or len(public_key) == 0: - if "OSM_VCA_PUBKEY" in os.environ: - public_key = os.getenv("OSM_VCA_PUBKEY", "") - if len(public_key == 0): - return - else: - return - - path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),) - if not os.path.exists(path): - os.makedirs(path) - - with open("{}/juju_id_rsa.pub".format(path), "w") as f: - f.write(public_key) - - def notify_callback( - self, - model_name, - application_name, - status, - message, - callback=None, - *callback_args - ): - try: - if callback: - callback( - model_name, application_name, status, message, *callback_args, - ) - except Exception as e: - self.log.error("[0] notify_callback exception {}".format(e)) - raise e - return True - - # Public methods - async def Relate(self, model_name, vnfd): - """Create a relation between the charm-enabled VDUs in a VNF. - - The Relation mapping has two parts: the id of the vdu owning the endpoint, and - the name of the endpoint. - - vdu: - ... - vca-relationships: - relation: - - provides: dataVM:db - requires: mgmtVM:app - - This tells N2VC that the charm referred to by the dataVM vdu offers a relation - named 'db', and the mgmtVM vdu - has an 'app' endpoint that should be connected to a database. - - :param str ns_name: The name of the network service. - :param dict vnfd: The parsed yaml VNF descriptor. - """ - - # Currently, the call to Relate() is made automatically after the - # deployment of each charm; if the relation depends on a charm that - # hasn't been deployed yet, the call will fail silently. This will - # prevent an API breakage, with the intent of making this an explicitly - # required call in a more object-oriented refactor of the N2VC API. - - configs = [] - vnf_config = vnfd.get("vnf-configuration") - if vnf_config: - juju = vnf_config["juju"] - if juju: - configs.append(vnf_config) - - for vdu in vnfd["vdu"]: - vdu_config = vdu.get("vdu-configuration") - if vdu_config: - juju = vdu_config["juju"] - if juju: - configs.append(vdu_config) - - def _get_application_name(name): - """Get the application name that's mapped to a vnf/vdu.""" - vnf_member_index = 0 - vnf_name = vnfd["name"] - - for vdu in vnfd.get("vdu"): - # Compare the named portion of the relation to the vdu's id - if vdu["id"] == name: - application_name = self.FormatApplicationName( - model_name, vnf_name, str(vnf_member_index), - ) - return application_name - else: - vnf_member_index += 1 - - return None - - # Loop through relations - for cfg in configs: - if "juju" in cfg: - juju = cfg["juju"] - if ( - "vca-relationships" in juju - and "relation" in juju["vca-relationships"] - ): - for rel in juju["vca-relationships"]["relation"]: - try: - - # get the application name for the provides - (name, endpoint) = rel["provides"].split(":") - application_name = _get_application_name(name) - - provides = "{}:{}".format(application_name, endpoint) - - # get the application name for thr requires - (name, endpoint) = rel["requires"].split(":") - application_name = _get_application_name(name) - - requires = "{}:{}".format(application_name, endpoint) - self.log.debug( - "Relation: {} <-> {}".format(provides, requires) - ) - await self.add_relation( - model_name, provides, requires, - ) - except Exception as e: - self.log.debug("Exception: {}".format(e)) - - return - - async def DeployCharms( - self, - model_name, - application_name, - vnfd, - charm_path, - params={}, - machine_spec={}, - callback=None, - *callback_args - ): - """Deploy one or more charms associated with a VNF. - - Deploy the charm(s) referenced in a VNF Descriptor. - - :param str model_name: The name or unique id of the network service. - :param str application_name: The name of the application - :param dict vnfd: The name of the application - :param str charm_path: The path to the Juju charm - :param dict params: A dictionary of runtime parameters - Examples:: - { - 'rw_mgmt_ip': '1.2.3.4', - # Pass the initial-config-primitives section of the vnf or vdu - 'initial-config-primitives': {...} - 'user_values': dictionary with the day-1 parameters provided at - instantiation time. It will replace values - inside < >. rw_mgmt_ip will be included here also - } - :param dict machine_spec: A dictionary describing the machine to - install to - Examples:: - { - 'hostname': '1.2.3.4', - 'username': 'ubuntu', - } - :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the - callback - """ - - ######################################################## - # Verify the path to the charm exists and is readable. # - ######################################################## - if not os.path.exists(charm_path): - self.log.debug("Charm path doesn't exist: {}".format(charm_path)) - self.notify_callback( - model_name, - application_name, - "error", - "failed", - callback, - *callback_args, - ) - raise JujuCharmNotFound("No artifacts configured.") - - ################################ - # Login to the Juju controller # - ################################ - if not self.authenticated: - self.log.debug("Authenticating with Juju") - await self.login() - - ########################################## - # Get the model for this network service # - ########################################## - model = await self.get_model(model_name) - - ######################################## - # Verify the application doesn't exist # - ######################################## - app = await self.get_application(model, application_name) - if app: - raise JujuApplicationExists( - ( - 'Can\'t deploy application "{}" to model ' - ' "{}" because it already exists.' - ).format(application_name, model_name) - ) - - ################################################################ - # Register this application with the model-level event monitor # - ################################################################ - if callback: - self.log.debug( - "JujuApi: Registering callback for {}".format(application_name,) - ) - await self.Subscribe(model_name, application_name, callback, *callback_args) - - ####################################### - # Get the initial charm configuration # - ####################################### - - rw_mgmt_ip = None - if "rw_mgmt_ip" in params: - rw_mgmt_ip = params["rw_mgmt_ip"] - - if "initial-config-primitive" not in params: - params["initial-config-primitive"] = {} - - initial_config = self._get_config_from_dict( - params["initial-config-primitive"], {"": rw_mgmt_ip} - ) - - ######################################################## - # Check for specific machine placement (native charms) # - ######################################################## - to = "" - series = "xenial" - - if machine_spec.keys(): - if all(k in machine_spec for k in ["hostname", "username"]): - - # Allow series to be derived from the native charm - series = None - - self.log.debug( - "Provisioning manual machine {}@{}".format( - machine_spec["username"], machine_spec["hostname"], - ) - ) - - """Native Charm support - - Taking a bare VM (assumed to be an Ubuntu cloud image), - the provisioning process will: - - Create an ubuntu user w/sudo access - - Detect hardware - - Detect architecture - - Download and install Juju agent from controller - - Enable Juju agent - - Add an iptables rule to route traffic to the API proxy - """ - - to = await self.provision_machine( - model_name=model_name, - username=machine_spec["username"], - hostname=machine_spec["hostname"], - private_key_path=self.GetPrivateKeyPath(), - ) - self.log.debug("Provisioned machine id {}".format(to)) - - # TODO: If to is none, raise an exception - - # The native charm won't have the sshproxy layer, typically, but LCM - # uses the config primitive - # to interpret what the values are. That's a gap to fill. - - """ - The ssh-* config parameters are unique to the sshproxy layer, - which most native charms will not be aware of. - - Setting invalid config parameters will cause the deployment to - fail. - - For the moment, we will strip the ssh-* parameters from native - charms, until the feature gap is addressed in the information - model. - """ - - # Native charms don't include the ssh-* config values, so strip them - # from the initial_config, otherwise the deploy will raise an error. - # self.log.debug("Removing ssh-* from initial-config") - for k in ["ssh-hostname", "ssh-username", "ssh-password"]: - if k in initial_config: - self.log.debug("Removing parameter {}".format(k)) - del initial_config[k] - - self.log.debug( - "JujuApi: Deploying charm ({}/{}) from {} to {}".format( - model_name, application_name, charm_path, to, - ) - ) - - ######################################################## - # Deploy the charm and apply the initial configuration # - ######################################################## - app = await model.deploy( - # We expect charm_path to be either the path to the charm on disk - # or in the format of cs:series/name - charm_path, - # This is the formatted, unique name for this charm - application_name=application_name, - # Proxy charms should use the current LTS. This will need to be - # changed for native charms. - series=series, - # Apply the initial 'config' primitive during deployment - config=initial_config, - # Where to deploy the charm to. - to=to, - ) - - ############################# - # Map the vdu id<->app name # - ############################# - try: - await self.Relate(model_name, vnfd) - except KeyError as ex: - # We don't currently support relations between NS and VNF/VDU charms - self.log.warn("[N2VC] Relations not supported: {}".format(ex)) - except Exception: - # This may happen if not all of the charms needed by the relation - # are ready. We can safely ignore this, because Relate will be - # retried when the endpoint of the relation is deployed. - self.log.warn("[N2VC] Relations not ready") - - # ####################################### - # # Execute initial config primitive(s) # - # ####################################### - uuids = await self.ExecuteInitialPrimitives( - model_name, application_name, params, - ) - return uuids - - # primitives = {} - # - # # Build a sequential list of the primitives to execute - # for primitive in params['initial-config-primitive']: - # try: - # if primitive['name'] == 'config': - # # This is applied when the Application is deployed - # pass - # else: - # seq = primitive['seq'] - # - # params = {} - # if 'parameter' in primitive: - # params = primitive['parameter'] - # - # primitives[seq] = { - # 'name': primitive['name'], - # 'parameters': self._map_primitive_parameters( - # params, - # {'': rw_mgmt_ip} - # ), - # } - # - # for primitive in sorted(primitives): - # await self.ExecutePrimitive( - # model_name, - # application_name, - # primitives[primitive]['name'], - # callback, - # callback_args, - # **primitives[primitive]['parameters'], - # ) - # except N2VCPrimitiveExecutionFailed as e: - # self.log.debug( - # "[N2VC] Exception executing primitive: {}".format(e) - # ) - # raise - - async def GetPrimitiveStatus(self, model_name, uuid): - """Get the status of an executed Primitive. - - The status of an executed Primitive will be one of three values: - - completed - - failed - - running - """ - status = None - try: - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - - results = await model.get_action_status(uuid) - - if uuid in results: - status = results[uuid] - - except Exception as e: - self.log.debug( - "Caught exception while getting primitive status: {}".format(e) - ) - raise N2VCPrimitiveExecutionFailed(e) - - return status - - async def GetPrimitiveOutput(self, model_name, uuid): - """Get the output of an executed Primitive. - - Note: this only returns output for a successfully executed primitive. - """ - results = None - try: - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - results = await model.get_action_output(uuid, 60) - except Exception as e: - self.log.debug( - "Caught exception while getting primitive status: {}".format(e) - ) - raise N2VCPrimitiveExecutionFailed(e) - - return results - - # async def ProvisionMachine(self, model_name, hostname, username): - # """Provision machine for usage with Juju. - # - # Provisions a previously instantiated machine for use with Juju. - # """ - # try: - # if not self.authenticated: - # await self.login() - # - # # FIXME: This is hard-coded until model-per-ns is added - # model_name = 'default' - # - # model = await self.get_model(model_name) - # model.add_machine(spec={}) - # - # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format( - # "ubuntu", - # host['address'], - # private_key_path, - # )) - # return machine.id - # - # except Exception as e: - # self.log.debug( - # "Caught exception while getting primitive status: {}".format(e) - # ) - # raise N2VCPrimitiveExecutionFailed(e) - - def GetPrivateKeyPath(self): - homedir = os.environ["HOME"] - sshdir = "{}/.ssh".format(homedir) - private_key_path = "{}/id_n2vc_rsa".format(sshdir) - return private_key_path - - async def GetPublicKey(self): - """Get the N2VC SSH public key.abs - - Returns the SSH public key, to be injected into virtual machines to - be managed by the VCA. - - The first time this is run, a ssh keypair will be created. The public - key is injected into a VM so that we can provision the machine with - Juju, after which Juju will communicate with the VM directly via the - juju agent. - """ - # public_key = "" - - # Find the path to where we expect our key to live. - homedir = os.environ["HOME"] - sshdir = "{}/.ssh".format(homedir) - if not os.path.exists(sshdir): - os.mkdir(sshdir) - - private_key_path = "{}/id_n2vc_rsa".format(sshdir) - public_key_path = "{}.pub".format(private_key_path) - - # If we don't have a key generated, generate it. - if not os.path.exists(private_key_path): - cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( - "rsa", "4096", private_key_path - ) - subprocess.check_output(shlex.split(cmd)) - - # Read the public key - with open(public_key_path, "r") as f: - public_key = f.readline() - - return public_key - - async def ExecuteInitialPrimitives( - self, model_name, application_name, params, callback=None, *callback_args - ): - """Execute multiple primitives. - - Execute multiple primitives as declared in initial-config-primitive. - This is useful in cases where the primitives initially failed -- for - example, if the charm is a proxy but the proxy hasn't been configured - yet. - """ - uuids = [] - primitives = {} - - # Build a sequential list of the primitives to execute - for primitive in params["initial-config-primitive"]: - try: - if primitive["name"] == "config": - pass - else: - seq = primitive["seq"] - - params_ = {} - if "parameter" in primitive: - params_ = primitive["parameter"] - - user_values = params.get("user_values", {}) - if "rw_mgmt_ip" not in user_values: - user_values["rw_mgmt_ip"] = None - # just for backward compatibility, because it will be provided - # always by modern version of LCM - - primitives[seq] = { - "name": primitive["name"], - "parameters": self._map_primitive_parameters( - params_, user_values - ), - } - - for primitive in sorted(primitives): - try: - # self.log.debug("Queuing action {}".format( - # primitives[primitive]['name'])) - uuids.append( - await self.ExecutePrimitive( - model_name, - application_name, - primitives[primitive]["name"], - callback, - callback_args, - **primitives[primitive]["parameters"], - ) - ) - except PrimitiveDoesNotExist as e: - self.log.debug( - "Ignoring exception PrimitiveDoesNotExist: {}".format(e) - ) - pass - except Exception as e: - self.log.debug( - ( - "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}" - ).format(e) - ) - raise e - - except N2VCPrimitiveExecutionFailed as e: - self.log.debug("[N2VC] Exception executing primitive: {}".format(e)) - raise - return uuids - - async def ExecutePrimitive( - self, - model_name, - application_name, - primitive, - callback, - *callback_args, - **params - ): - """Execute a primitive of a charm for Day 1 or Day 2 configuration. - - Execute a primitive defined in the VNF descriptor. - - :param str model_name: The name or unique id of the network service. - :param str application_name: The name of the application - :param str primitive: The name of the primitive to execute. - :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the - callback function. - :param dict params: A dictionary of key=value pairs representing the - primitive's parameters - Examples:: - { - 'rw_mgmt_ip': '1.2.3.4', - # Pass the initial-config-primitives section of the vnf or vdu - 'initial-config-primitives': {...} - } - """ - self.log.debug("Executing primitive={} params={}".format(primitive, params)) - uuid = None - try: - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - - if primitive == "config": - # config is special, and expecting params to be a dictionary - await self.set_config( - model, application_name, params["params"], - ) - else: - app = await self.get_application(model, application_name) - if app: - # Does this primitive exist? - actions = await app.get_actions() - - if primitive not in actions.keys(): - raise PrimitiveDoesNotExist( - "Primitive {} does not exist".format(primitive) - ) - - # Run against the first (and probably only) unit in the app - unit = app.units[0] - if unit: - action = await unit.run_action(primitive, **params) - uuid = action.id - except PrimitiveDoesNotExist as e: - # Catch and raise this exception if it's thrown from the inner block - raise e - except Exception as e: - # An unexpected exception was caught - self.log.debug("Caught exception while executing primitive: {}".format(e)) - raise N2VCPrimitiveExecutionFailed(e) - return uuid - - async def RemoveCharms( - self, model_name, application_name, callback=None, *callback_args - ): - """Remove a charm from the VCA. - - Remove a charm referenced in a VNF Descriptor. - - :param str model_name: The name of the network service. - :param str application_name: The name of the application - :param obj callback: A callback function to receive status changes. - :param tuple callback_args: A list of arguments to be passed to the - callback function. - """ - try: - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - app = await self.get_application(model, application_name) - if app: - # Remove this application from event monitoring - await self.Unsubscribe(model_name, application_name) - - # self.notify_callback(model_name, application_name, "removing", - # callback, *callback_args) - self.log.debug("Removing the application {}".format(application_name)) - await app.remove() - - # await self.disconnect_model(self.monitors[model_name]) - - self.notify_callback( - model_name, - application_name, - "removed", - "Removing charm {}".format(application_name), - callback, - *callback_args, - ) - - except Exception as e: - print("Caught exception: {}".format(e)) - self.log.debug(e) - raise e - - async def CreateNetworkService(self, ns_uuid): - """Create a new Juju model for the Network Service. - - Creates a new Model in the Juju Controller. - - :param str ns_uuid: A unique id representing an instaance of a - Network Service. - - :returns: True if the model was created. Raises JujuError on failure. - """ - if not self.authenticated: - await self.login() - - models = await self.controller.list_models() - if ns_uuid not in models: - # Get the new model - await self.get_model(ns_uuid) - - return True - - async def DestroyNetworkService(self, ns_uuid): - """Destroy a Network Service. - - Destroy the Network Service and any deployed charms. - - :param ns_uuid The unique id of the Network Service - - :returns: True if the model was created. Raises JujuError on failure. - """ - - # Do not delete the default model. The default model was used by all - # Network Services, prior to the implementation of a model per NS. - if ns_uuid.lower() == "default": - return False - - if not self.authenticated: - await self.login() - - models = await self.controller.list_models() - if ns_uuid in models: - model = await self.controller.get_model(ns_uuid) - - for application in model.applications: - app = model.applications[application] - - await self.RemoveCharms(ns_uuid, application) - - self.log.debug("Unsubscribing Watcher for {}".format(application)) - await self.Unsubscribe(ns_uuid, application) - - self.log.debug("Waiting for application to terminate") - timeout = 30 - try: - await model.block_until( - lambda: all( - unit.workload_status in ["terminated"] for unit in app.units - ), - timeout=timeout, - ) - except Exception: - self.log.debug( - "Timed out waiting for {} to terminate.".format(application) - ) - - for machine in model.machines: - try: - self.log.debug("Destroying machine {}".format(machine)) - await model.machines[machine].destroy(force=True) - except JujuAPIError as e: - if "does not exist" in str(e): - # Our cached model may be stale, because the machine - # has already been removed. It's safe to continue. - continue - else: - self.log.debug("Caught exception: {}".format(e)) - raise e - - # Disconnect from the Model - if ns_uuid in self.models: - self.log.debug("Disconnecting model {}".format(ns_uuid)) - # await self.disconnect_model(self.models[ns_uuid]) - await self.disconnect_model(ns_uuid) - - try: - self.log.debug("Destroying model {}".format(ns_uuid)) - await self.controller.destroy_models(ns_uuid) - except JujuError: - raise NetworkServiceDoesNotExist( - "The Network Service '{}' does not exist".format(ns_uuid) - ) - - return True - - async def GetMetrics(self, model_name, application_name): - """Get the metrics collected by the VCA. - - :param model_name The name or unique id of the network service - :param application_name The name of the application - """ - metrics = {} - model = await self.get_model(model_name) - app = await self.get_application(model, application_name) - if app: - metrics = await app.get_metrics() - - return metrics - - async def HasApplication(self, model_name, application_name): - model = await self.get_model(model_name) - app = await self.get_application(model, application_name) - if app: - return True - return False - - async def Subscribe(self, ns_name, application_name, callback, *callback_args): - """Subscribe to callbacks for an application. - - :param ns_name str: The name of the Network Service - :param application_name str: The name of the application - :param callback obj: The callback method - :param callback_args list: The list of arguments to append to calls to - the callback method - """ - self.monitors[ns_name].AddApplication( - application_name, callback, *callback_args - ) - - async def Unsubscribe(self, ns_name, application_name): - """Unsubscribe to callbacks for an application. - - Unsubscribes the caller from notifications from a deployed application. - - :param ns_name str: The name of the Network Service - :param application_name str: The name of the application - """ - self.monitors[ns_name].RemoveApplication(application_name,) - - # Non-public methods - async def add_relation(self, model_name, relation1, relation2): - """ - Add a relation between two application endpoints. - - :param str model_name: The name or unique id of the network service - :param str relation1: '[:]' - :param str relation2: '[:]' - """ - - if not self.authenticated: - await self.login() - - m = await self.get_model(model_name) - try: - await m.add_relation(relation1, relation2) - except JujuAPIError as e: - # If one of the applications in the relationship doesn't exist, - # or the relation has already been added, let the operation fail - # silently. - if "not found" in e.message: - return - if "already exists" in e.message: - return - - raise e - - # async def apply_config(self, config, application): - # """Apply a configuration to the application.""" - # print("JujuApi: Applying configuration to {}.".format( - # application - # )) - # return await self.set_config(application=application, config=config) - - def _get_config_from_dict(self, config_primitive, values): - """Transform the yang config primitive to dict. - - Expected result: - - config = { - 'config': - } - """ - config = {} - for primitive in config_primitive: - if primitive["name"] == "config": - # config = self._map_primitive_parameters() - for parameter in primitive["parameter"]: - param = str(parameter["name"]) - if parameter["value"] == "": - config[param] = str(values[parameter["value"]]) - else: - config[param] = str(parameter["value"]) - - return config - - def _map_primitive_parameters(self, parameters, user_values): - params = {} - for parameter in parameters: - param = str(parameter["name"]) - value = parameter.get("value") - - # map parameters inside a < >; e.g. . with the provided user - # _values. - # Must exist at user_values except if there is a default value - if isinstance(value, str) and value.startswith("<") and value.endswith(">"): - if parameter["value"][1:-1] in user_values: - value = user_values[parameter["value"][1:-1]] - elif "default-value" in parameter: - value = parameter["default-value"] - else: - raise KeyError( - "parameter {}='{}' not supplied ".format(param, value) - ) - - # If there's no value, use the default-value (if set) - if value is None and "default-value" in parameter: - value = parameter["default-value"] - - # Typecast parameter value, if present - paramtype = "string" - try: - if "data-type" in parameter: - paramtype = str(parameter["data-type"]).lower() - - if paramtype == "integer": - value = int(value) - elif paramtype == "boolean": - value = bool(value) - else: - value = str(value) - else: - # If there's no data-type, assume the value is a string - value = str(value) - except ValueError: - raise ValueError( - "parameter {}='{}' cannot be converted to type {}".format( - param, value, paramtype - ) - ) - - params[param] = value - return params - - def _get_config_from_yang(self, config_primitive, values): - """Transform the yang config primitive to dict.""" - config = {} - for primitive in config_primitive.values(): - if primitive["name"] == "config": - for parameter in primitive["parameter"].values(): - param = str(parameter["name"]) - if parameter["value"] == "": - config[param] = str(values[parameter["value"]]) - else: - config[param] = str(parameter["value"]) - - return config - - def FormatApplicationName(self, *args): - """ - Generate a Juju-compatible Application name - - :param args tuple: Positional arguments to be used to construct the - application name. - - Limitations:: - - Only accepts characters a-z and non-consequitive dashes (-) - - Application name should not exceed 50 characters - - Examples:: - - FormatApplicationName("ping_pong_ns", "ping_vnf", "a") - """ - appname = "" - for c in "-".join(list(args)): - if c.isdigit(): - c = chr(97 + int(c)) - elif not c.isalpha(): - c = "-" - appname += c - return re.sub("-+", "-", appname.lower()) - - # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0): - # """Format the name of the application - # - # Limitations: - # - Only accepts characters a-z and non-consequitive dashes (-) - # - Application name should not exceed 50 characters - # """ - # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index) - # new_name = '' - # for c in name: - # if c.isdigit(): - # c = chr(97 + int(c)) - # elif not c.isalpha(): - # c = "-" - # new_name += c - # return re.sub('\-+', '-', new_name.lower()) - - def format_model_name(self, name): - """Format the name of model. - - Model names may only contain lowercase letters, digits and hyphens - """ - - return name.replace("_", "-").lower() - - async def get_application(self, model, application): - """Get the deployed application.""" - if not self.authenticated: - await self.login() - - app = None - if application and model: - if model.applications: - if application in model.applications: - app = model.applications[application] - - return app - - async def get_model(self, model_name): - """Get a model from the Juju Controller. - - Note: Model objects returned must call disconnected() before it goes - out of scope.""" - if not self.authenticated: - await self.login() - - if model_name not in self.models: - # Get the models in the controller - models = await self.controller.list_models() - - if model_name not in models: - try: - self.models[model_name] = await self.controller.add_model( - model_name, config={"authorized-keys": self.juju_public_key} - ) - except JujuError as e: - if "already exists" not in e.message: - raise e - else: - self.models[model_name] = await self.controller.get_model(model_name) - - self.refcount["model"] += 1 - - # Create an observer for this model - await self.create_model_monitor(model_name) - - return self.models[model_name] - - async def create_model_monitor(self, model_name): - """Create a monitor for the model, if none exists.""" - if not self.authenticated: - await self.login() - - if model_name not in self.monitors: - self.monitors[model_name] = VCAMonitor(model_name) - self.models[model_name].add_observer(self.monitors[model_name]) - - return True - - async def login(self): - """Login to the Juju controller.""" - - if self.authenticated: - return - - self.connecting = True - - self.log.debug("JujuApi: Logging into controller") - - self.controller = Controller(loop=self.loop) - - if self.secret: - self.log.debug( - "Connecting to controller... ws://{} as {}/{}".format( - self.endpoint, self.user, self.secret, - ) - ) - try: - await self.controller.connect( - endpoint=self.endpoint, - username=self.user, - password=self.secret, - cacert=self.ca_cert, - ) - self.refcount["controller"] += 1 - self.authenticated = True - self.log.debug("JujuApi: Logged into controller") - except Exception as ex: - self.log.debug("Caught exception: {}".format(ex)) - else: - # current_controller no longer exists - # self.log.debug("Connecting to current controller...") - # await self.controller.connect_current() - # await self.controller.connect( - # endpoint=self.endpoint, - # username=self.user, - # cacert=cacert, - # ) - self.log.fatal("VCA credentials not configured.") - self.authenticated = False - - async def logout(self): - """Logout of the Juju controller.""" - if not self.authenticated: - return False - - try: - for model in self.models: - await self.disconnect_model(model) - - if self.controller: - self.log.debug("Disconnecting controller {}".format(self.controller)) - await self.controller.disconnect() - self.refcount["controller"] -= 1 - self.controller = None - - self.authenticated = False - - self.log.debug(self.refcount) - - except Exception as e: - self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e)) - raise e - return True - - async def disconnect_model(self, model): - self.log.debug("Disconnecting model {}".format(model)) - if model in self.models: - try: - await self.models[model].disconnect() - self.refcount["model"] -= 1 - self.models[model] = None - except Exception as e: - self.log.debug("Caught exception: {}".format(e)) - - async def provision_machine( - self, model_name: str, hostname: str, username: str, private_key_path: str - ) -> int: - """Provision a machine. - - This executes the SSH provisioner, which will log in to a machine via - SSH and prepare it for use with the Juju model - - :param model_name str: The name of the model - :param hostname str: The IP or hostname of the target VM - :param user str: The username to login to - :param private_key_path str: The path to the private key that's been injected - to the VM via cloud-init - :return machine_id int: Returns the id of the machine or None if provisioning - fails - """ - if not self.authenticated: - await self.login() - - machine_id = None - - if self.api_proxy: - self.log.debug( - "Instantiating SSH Provisioner for {}@{} ({})".format( - username, hostname, private_key_path - ) - ) - provisioner = SSHProvisioner( - host=hostname, - user=username, - private_key_path=private_key_path, - log=self.log, - ) - - params = None - try: - params = provisioner.provision_machine() - except Exception as ex: - self.log.debug("caught exception from provision_machine: {}".format(ex)) - return None - - if params: - params.jobs = ["JobHostUnits"] - - model = await self.get_model(model_name) - - connection = model.connection() - - # Submit the request. - self.log.debug("Adding machine to model") - client_facade = client.ClientFacade.from_connection(connection) - results = await client_facade.AddMachines(params=[params]) - error = results.machines[0].error - if error: - raise ValueError("Error adding machine: %s" % error.message) - - machine_id = results.machines[0].machine - - # Need to run this after AddMachines has been called, - # as we need the machine_id - self.log.debug("Installing Juju agent") - await provisioner.install_agent( - connection, params.nonce, machine_id, self.api_proxy, - ) - else: - self.log.debug("Missing API Proxy") - return machine_id - - # async def remove_application(self, name): - # """Remove the application.""" - # if not self.authenticated: - # await self.login() - # - # app = await self.get_application(name) - # if app: - # self.log.debug("JujuApi: Destroying application {}".format( - # name, - # )) - # - # await app.destroy() - - async def remove_relation(self, a, b): - """ - Remove a relation between two application endpoints - - :param a An application endpoint - :param b An application endpoint - """ - if not self.authenticated: - await self.login() - - # m = await self.get_model() - # try: - # m.remove_relation(a, b) - # finally: - # await m.disconnect() - - async def resolve_error(self, model_name, application=None): - """Resolve units in error state.""" - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - - app = await self.get_application(model, application) - if app: - self.log.debug( - "JujuApi: Resolving errors for application {}".format(application,) - ) - - for _ in app.units: - app.resolved(retry=True) - - async def run_action(self, model_name, application, action_name, **params): - """Execute an action and return an Action object.""" - if not self.authenticated: - await self.login() - result = {"status": "", "action": {"tag": None, "results": None}} - - model = await self.get_model(model_name) - - app = await self.get_application(model, application) - if app: - # We currently only have one unit per application - # so use the first unit available. - unit = app.units[0] - - self.log.debug( - "JujuApi: Running Action {} against Application {}".format( - action_name, application, - ) - ) - - action = await unit.run_action(action_name, **params) - - # Wait for the action to complete - await action.wait() - - result["status"] = action.status - result["action"]["tag"] = action.data["id"] - result["action"]["results"] = action.results - - return result - - async def set_config(self, model_name, application, config): - """Apply a configuration to the application.""" - if not self.authenticated: - await self.login() - - app = await self.get_application(model_name, application) - if app: - self.log.debug( - "JujuApi: Setting config for Application {}".format(application,) - ) - await app.set_config(config) - - # Verify the config is set - newconf = await app.get_config() - for key in config: - if config[key] != newconf[key]["value"]: - self.log.debug( - ( - "JujuApi: Config not set! Key {} Value {} doesn't match {}" - ).format(key, config[key], newconf[key]) - ) - - # async def set_parameter(self, parameter, value, application=None): - # """Set a config parameter for a service.""" - # if not self.authenticated: - # await self.login() - # - # self.log.debug("JujuApi: Setting {}={} for Application {}".format( - # parameter, - # value, - # application, - # )) - # return await self.apply_config( - # {parameter: value}, - # application=application, - # ) - - async def wait_for_application(self, model_name, application_name, timeout=300): - """Wait for an application to become active.""" - if not self.authenticated: - await self.login() - - model = await self.get_model(model_name) - - app = await self.get_application(model, application_name) - self.log.debug("Application: {}".format(app)) - if app: - self.log.debug( - "JujuApi: Waiting {} seconds for Application {}".format( - timeout, application_name, - ) - ) - - await model.block_until( - lambda: all( - unit.agent_status == "idle" - and unit.workload_status in ["active", "unknown"] - for unit in app.units - ), - timeout=timeout, - ) diff --git a/requirements.txt b/requirements.txt index 0393d2d..4f467fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,5 @@ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common juju==2.8.2 -paramiko pyasn1>=0.4.4 kubernetes==10.0.1 diff --git a/setup.py b/setup.py index 0c874f1..7a6b451 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,6 @@ setup( exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), install_requires=[ 'juju==2.8.2', - 'paramiko', 'pyasn1>=0.4.4', 'kubernetes==10.0.1' ],