+++ /dev/null
-##
-# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
-# This file is part of OSM
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: nfvlabs@tid.es
-##
-
-import asyncio
-import time
-
-from juju.action import Action
-from juju.application import Application
-from juju.machine import Machine
-from juju.model import ModelObserver, Model
-
-from n2vc.exceptions import N2VCTimeoutException
-from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
-
-
-class _Entity:
- def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
- self.entity_id = entity_id
- self.entity_type = entity_type
- self.obj = obj
- self.event = asyncio.Event()
- self.db_dict = db_dict
-
-
-class JujuModelObserver(ModelObserver):
- def __init__(self, n2vc: N2VCConnector, model: Model):
- self.n2vc = n2vc
- self.model = model
- model.add_observer(self)
- self.machines = dict()
- self.applications = dict()
- self.actions = dict()
-
- def register_machine(self, machine: Machine, db_dict: dict):
- try:
- entity_id = machine.entity_id
- except Exception:
- # no entity_id aatribute, try machine attribute
- entity_id = machine.machine
- # self.n2vc.debug(
- # msg='Registering machine for change notifications: {}'.format(entity_id))
- entity = _Entity(
- entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict
- )
- self.machines[entity_id] = entity
-
- def unregister_machine(self, machine_id: str):
- if machine_id in self.machines:
- del self.machines[machine_id]
-
- def is_machine_registered(self, machine_id: str):
- return machine_id in self.machines
-
- def register_application(self, application: Application, db_dict: dict):
- entity_id = application.entity_id
- # self.n2vc.debug(
- # msg='Registering application for change notifications: {}'.format(entity_id))
- entity = _Entity(
- entity_id=entity_id,
- entity_type="application",
- obj=application,
- db_dict=db_dict,
- )
- self.applications[entity_id] = entity
-
- def unregister_application(self, application_id: str):
- if application_id in self.applications:
- del self.applications[application_id]
-
- def is_application_registered(self, application_id: str):
- return application_id in self.applications
-
- def register_action(self, action: Action, db_dict: dict):
- entity_id = action.entity_id
- # self.n2vc.debug(
- # msg='Registering action for changes notifications: {}'.format(entity_id))
- entity = _Entity(
- entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict
- )
- self.actions[entity_id] = entity
-
- def unregister_action(self, action_id: str):
- if action_id in self.actions:
- del self.actions[action_id]
-
- def is_action_registered(self, action_id: str):
- return action_id in self.actions
-
- async def wait_for_machine(
- self,
- machine_id: str,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> int:
-
- if not self.is_machine_registered(machine_id):
- return
-
- self.n2vc.debug("Waiting for machine completed: {}".format(machine_id))
-
- # wait for a final state
- entity = self.machines[machine_id]
- return await self._wait_for_entity(
- entity=entity,
- field_to_check="agent_status",
- final_states_list=["started"],
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- async def wait_for_application(
- self,
- application_id: str,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> int:
-
- if not self.is_application_registered(application_id):
- return
-
- self.n2vc.debug("Waiting for application completed: {}".format(application_id))
-
- # application statuses: unknown, active, waiting
- # wait for a final state
- entity = self.applications[application_id]
- return await self._wait_for_entity(
- entity=entity,
- field_to_check="status",
- final_states_list=["active", "blocked"],
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- async def wait_for_action(
- self,
- action_id: str,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> int:
-
- if not self.is_action_registered(action_id):
- return
-
- self.n2vc.debug("Waiting for action completed: {}".format(action_id))
-
- # action statuses: pending, running, completed, failed, cancelled
- # wait for a final state
- entity = self.actions[action_id]
- return await self._wait_for_entity(
- entity=entity,
- field_to_check="status",
- final_states_list=["completed", "failed", "cancelled"],
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- async def _wait_for_entity(
- self,
- entity: _Entity,
- field_to_check: str,
- final_states_list: list,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> int:
-
- # default values for no timeout
- if total_timeout is None:
- total_timeout = 3600
- if progress_timeout is None:
- progress_timeout = 3600
-
- # max end time
- now = time.time()
- total_end = now + total_timeout
-
- if now >= total_end:
- raise N2VCTimeoutException(
- message="Total timeout {} seconds, {}: {}".format(
- total_timeout, entity.entity_type, entity.entity_id
- ),
- timeout="total",
- )
-
- # update next progress timeout
- progress_end = now + progress_timeout # type: float
-
- # which is closest? progress or end timeout?
- closest_end = min(total_end, progress_end)
-
- next_timeout = closest_end - now
-
- retries = 0
-
- while entity.obj.__getattribute__(field_to_check) not in final_states_list:
- retries += 1
- if await _wait_for_event_or_timeout(entity.event, next_timeout):
- entity.event.clear()
- else:
- message = "Progress timeout {} seconds, {}: {}".format(
- progress_timeout, entity.entity_type, entity.entity_id
- )
- self.n2vc.debug(message)
- raise N2VCTimeoutException(message=message, timeout="progress")
- # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
- # .format(entity.obj.__getattribute__(field_to_check), retries))
- return retries
-
- async def on_change(self, delta, old, new, model):
-
- if new is None:
- return
-
- # log
- # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
- # .format(delta.type, delta.entity, new.entity_id))
-
- if delta.entity == "machine":
-
- # check registered machine
- if new.entity_id not in self.machines:
- return
-
- # write change in database
- await self.n2vc.write_app_status_to_db(
- db_dict=self.machines[new.entity_id].db_dict,
- status=juju_status_2_osm_status(delta.entity, new.agent_status),
- detailed_status=new.status_message,
- vca_status=new.status,
- entity_type="machine",
- )
-
- # set event for this machine
- self.machines[new.entity_id].event.set()
-
- elif delta.entity == "application":
-
- # check registered application
- if new.entity_id not in self.applications:
- return
-
- # write change in database
- await self.n2vc.write_app_status_to_db(
- db_dict=self.applications[new.entity_id].db_dict,
- status=juju_status_2_osm_status(delta.entity, new.status),
- detailed_status=new.status_message,
- vca_status=new.status,
- entity_type="application",
- )
-
- # set event for this application
- self.applications[new.entity_id].event.set()
-
- elif delta.entity == "unit":
-
- # get the application for this unit
- application_id = delta.data["application"]
-
- # check registered application
- if application_id not in self.applications:
- return
-
- # write change in database
- if not new.dead:
- await self.n2vc.write_app_status_to_db(
- db_dict=self.applications[application_id].db_dict,
- status=juju_status_2_osm_status(delta.entity, new.workload_status),
- detailed_status=new.workload_status_message,
- vca_status=new.workload_status,
- entity_type="unit",
- )
-
- # set event for this application
- self.applications[application_id].event.set()
-
- elif delta.entity == "action":
-
- # check registered action
- if new.entity_id not in self.actions:
- return
-
- # write change in database
- await self.n2vc.write_app_status_to_db(
- db_dict=self.actions[new.entity_id].db_dict,
- status=juju_status_2_osm_status(delta.entity, new.status),
- detailed_status=new.status,
- vca_status=new.status,
- entity_type="action",
- )
-
- # set event for this application
- self.actions[new.entity_id].event.set()
-
-
-async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
- try:
- await asyncio.wait_for(fut=event.wait(), timeout=timeout)
- except asyncio.TimeoutError:
- pass
- return event.is_set()
return JujuStatusToOSM[entity_type][status]
-# DEPRECATED
-def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
- if statustype == "application" or statustype == "unit":
- if status in ["waiting", "maintenance"]:
- return N2VCDeploymentStatus.RUNNING
- if status in ["error"]:
- return N2VCDeploymentStatus.FAILED
- elif status in ["active"]:
- return N2VCDeploymentStatus.COMPLETED
- elif status in ["blocked"]:
- return N2VCDeploymentStatus.RUNNING
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "action":
- if status in ["running"]:
- return N2VCDeploymentStatus.RUNNING
- elif status in ["completed"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "machine":
- if status in ["pending"]:
- return N2VCDeploymentStatus.PENDING
- elif status in ["started"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
-
- return N2VCDeploymentStatus.FAILED
-
-
def obj_to_yaml(obj: object) -> str:
# dump to yaml
dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
import logging
import os
import re
-import time
-
-from juju.action import Action
-from juju.application import Application
-from juju.client import client
-from juju.controller import Controller
-from juju.errors import JujuAPIError
-from juju.machine import Machine
-from juju.model import Model
+
from n2vc.exceptions import (
N2VCBadArgumentsException,
N2VCException,
N2VCConnectionException,
N2VCExecutionException,
N2VCInvalidCertificate,
- N2VCNotFound,
+ # N2VCNotFound,
MethodNotImplemented,
JujuK8sProxycharmNotSupported,
)
-from n2vc.juju_observer import JujuModelObserver
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
-from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.libjuju import Libjuju
return N2VCJujuConnector._format_app_name(application_name)
- async def _juju_create_machine(
- self,
- model_name: str,
- application_name: str,
- machine_id: str = None,
- db_dict: dict = None,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> Machine:
-
- self.log.debug(
- "creating machine in model: {}, existing machine id: {}".format(
- model_name, machine_id
- )
- )
-
- # get juju model and observer (create model if needed)
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- # find machine id in model
- machine = None
- if machine_id is not None:
- self.log.debug("Finding existing machine id {} in model".format(machine_id))
- # get juju existing machines in the model
- existing_machines = await model.get_machines()
- if machine_id in existing_machines:
- self.log.debug(
- "Machine id {} found in model (reusing it)".format(machine_id)
- )
- machine = model.machines[machine_id]
-
- if machine is None:
- self.log.debug("Creating a new machine in juju...")
- # machine does not exist, create it and wait for it
- machine = await model.add_machine(
- spec=None, constraints=None, disks=None, series="xenial"
- )
-
- # register machine with observer
- observer.register_machine(machine=machine, db_dict=db_dict)
-
- # id for the execution environment
- ee_id = N2VCJujuConnector._build_ee_id(
- model_name=model_name,
- application_name=application_name,
- machine_id=str(machine.entity_id),
- )
-
- # write ee_id in database
- self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
-
- # wait for machine creation
- await observer.wait_for_machine(
- machine_id=str(machine.entity_id),
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- else:
-
- self.log.debug("Reusing old machine pending")
-
- # register machine with observer
- observer.register_machine(machine=machine, db_dict=db_dict)
-
- # machine does exist, but it is in creation process (pending), wait for
- # create finalisation
- await observer.wait_for_machine(
- machine_id=machine.entity_id,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- self.log.debug("Machine ready at " + str(machine.dns_name))
- return machine
-
- async def _juju_provision_machine(
- self,
- model_name: str,
- hostname: str,
- username: str,
- private_key_path: str,
- db_dict: dict = None,
- progress_timeout: float = None,
- total_timeout: float = None,
- ) -> str:
-
- if not self.api_proxy:
- msg = "Cannot provision machine: api_proxy is not defined"
- self.log.error(msg=msg)
- raise N2VCException(message=msg)
-
- self.log.debug(
- "provisioning machine. model: {}, hostname: {}, username: {}".format(
- model_name, hostname, username
- )
- )
-
- if not self._authenticated:
- await self._juju_login()
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- # TODO check if machine is already provisioned
- machine_list = await model.get_machines()
-
- provisioner = AsyncSSHProvisioner(
- host=hostname,
- user=username,
- private_key_path=private_key_path,
- log=self.log,
- )
-
- params = None
- try:
- params = await provisioner.provision_machine()
- except Exception as ex:
- msg = "Exception provisioning machine: {}".format(ex)
- self.log.error(msg)
- raise N2VCException(message=msg)
-
- params.jobs = ["JobHostUnits"]
-
- connection = model.connection()
-
- # Submit the request.
- self.log.debug("Adding machine to model")
- client_facade = client.ClientFacade.from_connection(connection)
- results = await client_facade.AddMachines(params=[params])
- error = results.machines[0].error
- if error:
- msg = "Error adding machine: {}".format(error.message)
- self.log.error(msg=msg)
- raise ValueError(msg)
-
- machine_id = results.machines[0].machine
-
- # Need to run this after AddMachines has been called,
- # as we need the machine_id
- self.log.debug("Installing Juju agent into machine {}".format(machine_id))
- asyncio.ensure_future(
- provisioner.install_agent(
- connection=connection,
- nonce=params.nonce,
- machine_id=machine_id,
- proxy=self.api_proxy,
- )
- )
-
- # wait for machine in model (now, machine is not yet in model, so we must
- # wait for it)
- machine = None
- for _ in range(10):
- machine_list = await model.get_machines()
- if machine_id in machine_list:
- self.log.debug("Machine {} found in model!".format(machine_id))
- machine = model.machines.get(machine_id)
- break
- await asyncio.sleep(2)
-
- if machine is None:
- msg = "Machine {} not found in model".format(machine_id)
- self.log.error(msg=msg)
- raise Exception(msg)
-
- # register machine with observer
- observer.register_machine(machine=machine, db_dict=db_dict)
-
- # wait for machine creation
- self.log.debug("waiting for provision finishes... {}".format(machine_id))
- await observer.wait_for_machine(
- machine_id=machine_id,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
-
- self.log.debug("Machine provisioned {}".format(machine_id))
-
- return machine_id
-
- async def _juju_deploy_charm(
- self,
- model_name: str,
- application_name: str,
- charm_path: str,
- machine_id: str,
- db_dict: dict,
- progress_timeout: float = None,
- total_timeout: float = None,
- config: dict = None,
- ) -> (Application, int):
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- # check if application already exists
- application = None
- if application_name in model.applications:
- application = model.applications[application_name]
-
- if application is None:
-
- # application does not exist, create it and wait for it
- self.log.debug(
- "deploying application {} to machine {}, model {}".format(
- application_name, machine_id, model_name
- )
- )
- self.log.debug("charm: {}".format(charm_path))
- machine = model.machines[machine_id]
- # series = None
- application = await model.deploy(
- entity_url=charm_path,
- application_name=application_name,
- channel="stable",
- num_units=1,
- series=machine.series,
- to=machine_id,
- config=config,
- )
-
- # register application with observer
- observer.register_application(application=application, db_dict=db_dict)
-
- self.log.debug(
- "waiting for application deployed... {}".format(application.entity_id)
- )
- retries = await observer.wait_for_application(
- application_id=application.entity_id,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
- self.log.debug("application deployed")
-
- else:
-
- # register application with observer
- observer.register_application(application=application, db_dict=db_dict)
-
- # application already exists, but not finalised
- self.log.debug("application already exists, waiting for deployed...")
- retries = await observer.wait_for_application(
- application_id=application.entity_id,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
- self.log.debug("application deployed")
-
- return application, retries
-
- async def _juju_execute_action(
- self,
- model_name: str,
- application_name: str,
- action_name: str,
- db_dict: dict,
- progress_timeout: float = None,
- total_timeout: float = None,
- **kwargs
- ) -> Action:
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- application = await self._juju_get_application(
- model_name=model_name, application_name=application_name
- )
-
- unit = None
- for u in application.units:
- if await u.is_leader_from_status():
- unit = u
- if unit is not None:
- actions = await application.get_actions()
- if action_name in actions:
- self.log.debug(
- 'executing action "{}" using params: {}'.format(action_name, kwargs)
- )
- action = await unit.run_action(action_name, **kwargs)
-
- # register action with observer
- observer.register_action(action=action, db_dict=db_dict)
-
- await observer.wait_for_action(
- action_id=action.entity_id,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
- self.log.debug("action completed with status: {}".format(action.status))
- output = await model.get_action_output(action_uuid=action.entity_id)
- status = await model.get_action_status(uuid_or_prefix=action.entity_id)
- if action.entity_id in status:
- status = status[action.entity_id]
- else:
- status = "failed"
- return output, status
-
- raise N2VCExecutionException(
- message="Cannot execute action on charm", primitive_name=action_name
- )
-
- async def _juju_configure_application(
- self,
- model_name: str,
- application_name: str,
- config: dict,
- db_dict: dict,
- progress_timeout: float = None,
- total_timeout: float = None,
- ):
-
- # get the application
- application = await self._juju_get_application(
- model_name=model_name, application_name=application_name
- )
-
- self.log.debug(
- "configuring the application {} -> {}".format(application_name, config)
- )
- res = await application.set_config(config)
- self.log.debug(
- "application {} configured. res={}".format(application_name, res)
- )
-
- # Verify the config is set
- new_conf = await application.get_config()
- for key in config:
- value = new_conf[key]["value"]
- self.log.debug(" {} = {}".format(key, value))
- if config[key] != value:
- raise N2VCException(
- message="key {} is not configured correctly {} != {}".format(
- key, config[key], new_conf[key]
- )
- )
-
- # check if 'verify-ssh-credentials' action exists
- # unit = application.units[0]
- actions = await application.get_actions()
- if "verify-ssh-credentials" not in actions:
- msg = (
- "Action verify-ssh-credentials does not exist in application {}"
- ).format(application_name)
- self.log.debug(msg=msg)
- return False
-
- # execute verify-credentials
- num_retries = 20
- retry_timeout = 15.0
- for _ in range(num_retries):
- try:
- self.log.debug("Executing action verify-ssh-credentials...")
- output, ok = await self._juju_execute_action(
- model_name=model_name,
- application_name=application_name,
- action_name="verify-ssh-credentials",
- db_dict=db_dict,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
- )
- self.log.debug("Result: {}, output: {}".format(ok, output))
- return True
- except asyncio.CancelledError:
- raise
- except Exception as e:
- self.log.debug(
- "Error executing verify-ssh-credentials: {}. Retrying...".format(e)
- )
- await asyncio.sleep(retry_timeout)
- else:
- self.log.error(
- "Error executing verify-ssh-credentials after {} retries. ".format(
- num_retries
- )
- )
- return False
-
- async def _juju_get_application(self, model_name: str, application_name: str):
- """Get the deployed application."""
-
- model = await self._juju_get_model(model_name=model_name)
-
- application_name = N2VCJujuConnector._format_app_name(application_name)
-
- if model.applications and application_name in model.applications:
- return model.applications[application_name]
- else:
- raise N2VCException(
- message="Cannot get application {} from model {}".format(
- application_name, model_name
- )
- )
-
- async def _juju_get_model(self, model_name: str) -> Model:
- """ Get a model object from juju controller
- If the model does not exits, it creates it.
-
- :param str model_name: name of the model
- :returns Model: model obtained from juju controller or Exception
- """
-
- # format model name
- model_name = N2VCJujuConnector._format_model_name(model_name)
-
- if model_name in self.juju_models:
- return self.juju_models[model_name]
-
- if self._creating_model:
- self.log.debug("Another coroutine is creating a model. Wait...")
- while self._creating_model:
- # another coroutine is creating a model, wait
- await asyncio.sleep(0.1)
- # retry (perhaps another coroutine has created the model meanwhile)
- if model_name in self.juju_models:
- return self.juju_models[model_name]
-
- try:
- self._creating_model = True
-
- # get juju model names from juju
- model_list = await self.controller.list_models()
- if model_name not in model_list:
- self.log.info(
- "Model {} does not exist. Creating new model...".format(model_name)
- )
- config_dict = {"authorized-keys": self.public_key}
- if self.apt_mirror:
- config_dict["apt-mirror"] = self.apt_mirror
- if not self.enable_os_upgrade:
- config_dict["enable-os-refresh-update"] = False
- config_dict["enable-os-upgrade"] = False
- if self.cloud in self.BUILT_IN_CLOUDS:
- model = await self.controller.add_model(
- model_name=model_name,
- config=config_dict,
- cloud_name=self.cloud,
- )
- else:
- model = await self.controller.add_model(
- model_name=model_name,
- config=config_dict,
- cloud_name=self.cloud,
- credential_name=self.cloud,
- )
- self.log.info("New model created, name={}".format(model_name))
- else:
- self.log.debug(
- "Model already exists in juju. Getting model {}".format(model_name)
- )
- model = await self.controller.get_model(model_name)
- self.log.debug("Existing model in juju, name={}".format(model_name))
-
- self.juju_models[model_name] = model
- self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
- return model
-
- except Exception as e:
- msg = "Cannot get model {}. Exception: {}".format(model_name, e)
- self.log.error(msg)
- raise N2VCException(msg)
- finally:
- self._creating_model = False
-
- async def _juju_add_relation(
- self,
- model_name: str,
- application_name_1: str,
- application_name_2: str,
- relation_1: str,
- relation_2: str,
- ):
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
-
- r1 = "{}:{}".format(application_name_1, relation_1)
- r2 = "{}:{}".format(application_name_2, relation_2)
-
- self.log.debug("adding relation: {} -> {}".format(r1, r2))
- try:
- await model.add_relation(relation1=r1, relation2=r2)
- except JujuAPIError as e:
- # If one of the applications in the relationship doesn't exist, or the
- # relation has already been added,
- # let the operation fail silently.
- if "not found" in e.message:
- return
- if "already exists" in e.message:
- return
- # another execption, raise it
- raise e
-
- async def _juju_destroy_application(self, model_name: str, application_name: str):
-
- self.log.debug(
- "Destroying application {} in model {}".format(application_name, model_name)
- )
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- application = model.applications.get(application_name)
- if application:
- observer.unregister_application(application_name)
- await application.destroy()
- else:
- self.log.debug("Application not found: {}".format(application_name))
-
- async def _juju_destroy_machine(
- self, model_name: str, machine_id: str, total_timeout: float = None
- ):
-
- self.log.debug(
- "Destroying machine {} in model {}".format(machine_id, model_name)
- )
-
- if total_timeout is None:
- total_timeout = 3600
-
- # get juju model and observer
- model = await self._juju_get_model(model_name=model_name)
- observer = self.juju_observers[model_name]
-
- machines = await model.get_machines()
- if machine_id in machines:
- machine = model.machines[machine_id]
- observer.unregister_machine(machine_id)
- # TODO: change this by machine.is_manual when this is upstreamed:
- # https://github.com/juju/python-libjuju/pull/396
- if "instance-id" in machine.safe_data and machine.safe_data[
- "instance-id"
- ].startswith("manual:"):
- self.log.debug("machine.destroy(force=True) started.")
- await machine.destroy(force=True)
- self.log.debug("machine.destroy(force=True) passed.")
- # max timeout
- end = time.time() + total_timeout
- # wait for machine removal
- machines = await model.get_machines()
- while machine_id in machines and time.time() < end:
- self.log.debug(
- "Waiting for machine {} is destroyed".format(machine_id)
- )
- await asyncio.sleep(0.5)
- machines = await model.get_machines()
- self.log.debug("Machine destroyed: {}".format(machine_id))
- else:
- self.log.debug("Machine not found: {}".format(machine_id))
-
- async def _juju_destroy_model(self, model_name: str, total_timeout: float = None):
-
- self.log.debug("Destroying model {}".format(model_name))
-
- if total_timeout is None:
- total_timeout = 3600
- end = time.time() + total_timeout
-
- model = await self._juju_get_model(model_name=model_name)
-
- if not model:
- raise N2VCNotFound(message="Model {} does not exist".format(model_name))
-
- uuid = model.info.uuid
-
- # destroy applications
- for application_name in model.applications:
- try:
- await self._juju_destroy_application(
- model_name=model_name, application_name=application_name
- )
- except Exception as e:
- self.log.error(
- "Error destroying application {} in model {}: {}".format(
- application_name, model_name, e
- )
- )
-
- # destroy machines
- machines = await model.get_machines()
- for machine_id in machines:
- try:
- await self._juju_destroy_machine(
- model_name=model_name, machine_id=machine_id
- )
- except asyncio.CancelledError:
- raise
- except Exception:
- # ignore exceptions destroying machine
- pass
-
- await self._juju_disconnect_model(model_name=model_name)
-
- self.log.debug("destroying model {}...".format(model_name))
- await self.controller.destroy_model(uuid)
- # self.log.debug('model destroy requested {}'.format(model_name))
-
- # wait for model is completely destroyed
- self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
- last_exception = ""
- while time.time() < end:
- try:
- # await self.controller.get_model(uuid)
- models = await self.controller.list_models()
- if model_name not in models:
- self.log.debug(
- "The model {} ({}) was destroyed".format(model_name, uuid)
- )
- return
- except asyncio.CancelledError:
- raise
- except Exception as e:
- last_exception = e
- await asyncio.sleep(5)
- raise N2VCException(
- "Timeout waiting for model {} to be destroyed {}".format(
- model_name, last_exception
- )
- )
-
- async def _juju_login(self):
- """Connect to juju controller
-
- """
-
- # if already authenticated, exit function
- if self._authenticated:
- return
-
- # if connecting, wait for finish
- # another task could be trying to connect in parallel
- while self._connecting:
- await asyncio.sleep(0.1)
-
- # double check after other task has finished
- if self._authenticated:
- return
-
- try:
- self._connecting = True
- self.log.info(
- "connecting to juju controller: {} {}:{}{}".format(
- self.url,
- self.username,
- self.secret[:8] + "...",
- " with ca_cert" if self.ca_cert else "",
- )
- )
-
- # Create controller object
- self.controller = Controller(loop=self.loop)
- # Connect to controller
- await self.controller.connect(
- endpoint=self.url,
- username=self.username,
- password=self.secret,
- cacert=self.ca_cert,
- )
- self._authenticated = True
- self.log.info("juju controller connected")
- except Exception as e:
- message = "Exception connecting to juju: {}".format(e)
- self.log.error(message)
- raise N2VCConnectionException(message=message, url=self.url)
- finally:
- self._connecting = False
-
- async def _juju_logout(self):
- """Logout of the Juju controller."""
- if not self._authenticated:
- return False
-
- # disconnect all models
- for model_name in self.juju_models:
- try:
- await self._juju_disconnect_model(model_name)
- except Exception as e:
- self.log.error(
- "Error disconnecting model {} : {}".format(model_name, e)
- )
- # continue with next model...
-
- self.log.info("Disconnecting controller")
- try:
- await self.controller.disconnect()
- except Exception as e:
- raise N2VCConnectionException(
- message="Error disconnecting controller: {}".format(e), url=self.url
- )
-
- self.controller = None
- self._authenticated = False
- self.log.info("disconnected")
-
- async def _juju_disconnect_model(self, model_name: str):
- self.log.debug("Disconnecting model {}".format(model_name))
- if model_name in self.juju_models:
- await self.juju_models[model_name].disconnect()
- self.juju_models[model_name] = None
- self.juju_observers[model_name] = None
- else:
- self.warning("Cannot disconnect model: {}".format(model_name))
-
def _create_juju_public_key(self):
"""Recreate the Juju public key on lcm container, if needed
Certain libjuju commands expect to be run from the same machine as Juju
import logging
import os
import re
-import shlex
from subprocess import CalledProcessError
import tempfile
-import time
import uuid
from juju.client import client
-import n2vc.exceptions
-import paramiko
import asyncio
arches = [
return await self._ssh(
"{} /bin/bash {}".format("sudo" if root else "", tmpFile)
)
-
-
-class SSHProvisioner:
- """Provision a manually created machine via SSH."""
-
- def __init__(self, user, host, private_key_path, log=None):
-
- self.host = host
- self.user = user
- self.private_key_path = private_key_path
-
- if log:
- self.log = log
- else:
- self.log = logging.getLogger(__name__)
-
- def _get_ssh_client(self, host=None, user=None, private_key_path=None):
- """Return a connected Paramiko ssh object.
-
- :param str host: The host to connect to.
- :param str user: The user to connect as.
- :param str key: The private key to authenticate with.
-
- :return: object: A paramiko.SSHClient
- :raises: :class:`paramiko.ssh_exception.SSHException` if the
- connection failed
- """
-
- if not host:
- host = self.host
-
- if not user:
- user = self.user
-
- if not private_key_path:
- private_key_path = self.private_key_path
-
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- pkey = None
-
- # Read the private key into a paramiko.RSAKey
- if os.path.exists(private_key_path):
- with open(private_key_path, "r") as f:
- pkey = paramiko.RSAKey.from_private_key(f)
-
- #######################################################################
- # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
- # the server may not send the SSH_MSG_USERAUTH_BANNER message except #
- # when responding to an auth_none request. For example, paramiko will #
- # attempt to use password authentication when a password is set, but #
- # the server could deny that, instead requesting keyboard-interactive.#
- # The hack to workaround this is to attempt a reconnect, which will #
- # receive the right banner, and authentication can proceed. See the #
- # following for more info: #
- # https://github.com/paramiko/paramiko/issues/432 #
- # https://github.com/paramiko/paramiko/pull/438 #
- #######################################################################
-
- retry = 10
- attempts = 0
- delay = 15
- while attempts <= retry:
- try:
- attempts += 1
-
- # Attempt to establish a SSH connection
- ssh.connect(
- host,
- port=22,
- username=user,
- pkey=pkey,
- # allow_agent=False,
- # look_for_keys=False,
- )
- break
- except paramiko.ssh_exception.SSHException as e:
- if "Error reading SSH protocol banner" == str(e):
- # Once more, with feeling
- ssh.connect(host, port=22, username=user, pkey=pkey)
- else:
- # Reraise the original exception
- self.log.debug("Unhandled exception caught: {}".format(e))
- raise e
- except Exception as e:
- if "Unable to connect to port" in str(e):
- self.log.debug(
- "Waiting for VM to boot, sleeping {} seconds".format(delay)
- )
- if attempts > retry:
- raise e
- else:
- time.sleep(delay)
- # Slowly back off the retry
- delay += 15
- else:
- self.log.debug(e)
- raise e
- return ssh
-
- def _run_command(self, ssh, cmd, pty=True):
- """Run a command remotely via SSH.
-
- :param object ssh: The SSHClient
- :param str cmd: The command to execute
- :param list cmd: The `shlex.split` command to execute
- :param bool pty: Whether to allocate a pty
-
- :return: tuple: The stdout and stderr of the command execution
- :raises: :class:`CalledProcessError` if the command fails
- """
-
- if isinstance(cmd, str):
- cmd = shlex.split(cmd)
-
- if type(cmd) is not list:
- cmd = [cmd]
-
- cmds = " ".join(cmd)
- _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
- retcode = stdout.channel.recv_exit_status()
-
- if retcode > 0:
- output = stderr.read().strip()
- raise CalledProcessError(returncode=retcode, cmd=cmd, output=output)
- return (
- stdout.read().decode("utf-8").strip(),
- stderr.read().decode("utf-8").strip(),
- )
-
- def _init_ubuntu_user(self):
- """Initialize the ubuntu user.
-
- :return: bool: If the initialization was successful
- :raises: :class:`paramiko.ssh_exception.AuthenticationException`
- if the authentication fails
- """
- ssh = None
- try:
- # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
- ssh = self._get_ssh_client()
- self._run_command(ssh, "sudo -n true", pty=False)
- except paramiko.ssh_exception.AuthenticationException:
- raise n2vc.exceptions.AuthenticationFailed(self.user)
- except paramiko.ssh_exception.NoValidConnectionsError:
- raise n2vc.exceptions.NoRouteToHost(self.host)
- finally:
- if ssh:
- ssh.close()
-
- # Infer the public key
- public_key_path = "{}.pub".format(self.private_key_path)
-
- if not os.path.exists(public_key_path):
- raise FileNotFoundError(
- "Public key '{}' doesn't exist.".format(public_key_path)
- )
-
- with open(public_key_path, "r") as f:
- public_key = f.readline()
-
- script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
-
- try:
- ssh = self._get_ssh_client()
-
- self._run_command(
- ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True
- )
- except paramiko.ssh_exception.AuthenticationException as e:
- raise e
- finally:
- ssh.close()
-
- return True
-
- def _detect_hardware_and_os(self, ssh):
- """Detect the target hardware capabilities and OS series.
-
- :param object ssh: The SSHClient
- :return: str: A raw string containing OS and hardware information.
- """
-
- info = {
- "series": "",
- "arch": "",
- "cpu-cores": "",
- "mem": "",
- }
-
- stdout, _ = self._run_command(
- ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True,
- )
-
- lines = stdout.split("\n")
-
- # Remove extraneous line if DNS resolution of hostname famils
- # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
- if "unable to resolve host" in lines[0]:
- lines = lines[1:]
-
- info["series"] = lines[0].strip()
- info["arch"] = normalize_arch(lines[1].strip())
-
- memKb = re.split(r"\s+", lines[2])[1]
-
- # Convert megabytes -> kilobytes
- info["mem"] = round(int(memKb) / 1024)
-
- # Detect available CPUs
- recorded = {}
- for line in lines[3:]:
- physical_id = ""
-
- if line.find("physical id") == 0:
- physical_id = line.split(":")[1].strip()
- elif line.find("cpu cores") == 0:
- cores = line.split(":")[1].strip()
-
- if physical_id not in recorded.keys():
- info["cpu-cores"] += cores
- recorded[physical_id] = True
-
- return info
-
- def provision_machine(self):
- """Perform the initial provisioning of the target machine.
-
- :return: bool: The client.AddMachineParams
- :raises: :class:`paramiko.ssh_exception.AuthenticationException`
- if the upload fails
- """
- params = client.AddMachineParams()
-
- if self._init_ubuntu_user():
- try:
- ssh = self._get_ssh_client()
-
- hw = self._detect_hardware_and_os(ssh)
- params.series = hw["series"]
- params.instance_id = "manual:{}".format(self.host)
- params.nonce = "manual:{}:{}".format(
- self.host, str(uuid.uuid4()),
- ) # a nop for Juju w/manual machines
- params.hardware_characteristics = {
- "arch": hw["arch"],
- "mem": int(hw["mem"]),
- "cpu-cores": int(hw["cpu-cores"]),
- }
- params.addresses = [
- {"value": self.host, "type": "ipv4", "scope": "public"}
- ]
-
- except paramiko.ssh_exception.AuthenticationException as e:
- raise e
- finally:
- ssh.close()
-
- return params
-
- async def install_agent(self, connection, nonce, machine_id, api):
- """
- :param object connection: Connection to Juju API
- :param str nonce: The nonce machine specification
- :param str machine_id: The id assigned to the machine
-
- :return: bool: If the initialization was successful
- """
- # The path where the Juju agent should be installed.
- data_dir = "/var/lib/juju"
-
- # Disabling this prevents `apt-get update` from running initially, so
- # charms will fail to deploy
- disable_package_commands = False
-
- client_facade = client.ClientFacade.from_connection(connection)
- results = await client_facade.ProvisioningScript(
- data_dir=data_dir,
- disable_package_commands=disable_package_commands,
- machine_id=machine_id,
- nonce=nonce,
- )
-
- """Get the IP of the controller
-
- Parse the provisioning script, looking for the first apiaddress.
-
- Example:
- apiaddresses:
- - 10.195.8.2:17070
- - 127.0.0.1:17070
- - '[::1]:17070'
- """
- m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
- apiaddress = m.group(1)
-
- """Add IP Table rule
-
- In order to route the traffic to the private ip of the Juju controller
- we use a DNAT rule to tell the machine that the destination for the
- private address is the public address of the machine where the Juju
- controller is running in LXD. That machine will have a complimentary
- iptables rule, routing traffic to the appropriate LXD container.
- """
-
- script = IPTABLES_SCRIPT.format(apiaddress, api)
-
- # Run this in a retry loop, because dpkg may be running and cause the
- # script to fail.
- retry = 10
- attempts = 0
- delay = 15
-
- while attempts <= retry:
- try:
- attempts += 1
-
- self._run_configure_script(script)
- break
- except Exception as e:
- self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
- if attempts > retry:
- raise e
- else:
- time.sleep(delay)
- # Slowly back off the retry
- delay += 15
-
- # self.log.debug("Running configure script")
- self._run_configure_script(results.script)
- # self.log.debug("Configure script finished")
-
- def _run_configure_script(self, script: str):
- """Run the script to install the Juju agent on the target machine.
-
- :param str script: The script returned by the ProvisioningScript API
- :raises: :class:`paramiko.ssh_exception.AuthenticationException`
- if the upload fails
- """
- _, tmpFile = tempfile.mkstemp()
- with open(tmpFile, "w") as f:
- f.write(script)
- try:
- # get ssh client
- ssh = self._get_ssh_client(user="ubuntu",)
-
- # copy the local copy of the script to the remote machine
- sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
- sftp.put(
- tmpFile, tmpFile,
- )
-
- # run the provisioning script
- self._run_command(
- ssh, "sudo /bin/bash {}".format(tmpFile),
- )
-
- except paramiko.ssh_exception.AuthenticationException as e:
- raise e
- finally:
- os.remove(tmpFile)
- ssh.close()
+++ /dev/null
-# Copyright 2020 Canonical Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import asyncio
-from unittest import mock
-from unittest.mock import Mock
-
-import asynctest
-
-from n2vc.exceptions import N2VCTimeoutException
-from n2vc.juju_observer import JujuModelObserver, _Entity
-
-
-class FakeObject:
- def __init__(self):
- self.complete = True
-
-
-class JujuModelObserverTest(asynctest.TestCase):
- def setUp(self):
- self.n2vc = Mock()
- self.model = Mock()
- self.juju_observer = JujuModelObserver(n2vc=self.n2vc, model=self.model)
- self.loop = asyncio.new_event_loop()
-
- def test_wait_no_retries(self):
- obj = FakeObject()
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- result = self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=None,
- total_timeout=None,
- )
- )
- self.assertEqual(result, 0)
-
- @mock.patch("n2vc.juju_observer.asyncio.wait_for")
- def test_wait_default_values(self, wait_for):
- wait_for.return_value = asyncio.Future()
- wait_for.return_value.set_result(None)
- obj = FakeObject()
- obj.complete = False
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=None,
- total_timeout=None,
- )
- )
- wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
-
- @mock.patch("n2vc.juju_observer.asyncio.wait_for")
- def test_wait_default_progress(self, wait_for):
- wait_for.return_value = asyncio.Future()
- wait_for.return_value.set_result(None)
- obj = FakeObject()
- obj.complete = False
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=4000,
- total_timeout=None,
- )
- )
- wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
-
- @mock.patch("n2vc.juju_observer.asyncio.wait_for")
- def test_wait_default_total(self, wait_for):
- wait_for.return_value = asyncio.Future()
- wait_for.return_value.set_result(None)
- obj = FakeObject()
- obj.complete = False
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=None,
- total_timeout=4000.0,
- )
- )
- wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
-
- @mock.patch("n2vc.juju_observer.asyncio.wait_for")
- def test_wait_total_less_than_progress_timeout(self, wait_for):
- wait_for.return_value = asyncio.Future()
- wait_for.return_value.set_result(None)
- obj = FakeObject()
- obj.complete = False
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=4500.0,
- total_timeout=3000.0,
- )
- )
- wait_for.assert_called_once_with(fut=mock.ANY, timeout=3000.0)
-
- @mock.patch("n2vc.juju_observer.asyncio.wait_for")
- def test_wait_progress_less_than_total_timeout(self, wait_for):
- wait_for.return_value = asyncio.Future()
- wait_for.return_value.set_result(None)
- obj = FakeObject()
- obj.complete = False
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=1500.0,
- total_timeout=3000.0,
- )
- )
- wait_for.assert_called_once_with(fut=mock.ANY, timeout=1500.0)
-
- def test_wait_negative_timeout(self):
- obj = FakeObject()
- entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
- with self.assertRaises(N2VCTimeoutException):
- self.loop.run_until_complete(
- self.juju_observer._wait_for_entity(
- entity=entity,
- field_to_check="complete",
- final_states_list=[True],
- progress_timeout=None,
- total_timeout=-1000,
- )
- )
# See the License for the specific language governing permissions and
# limitations under the License.
-from unittest import TestCase, mock
-
-from mock import mock_open
-from n2vc.provisioner import SSHProvisioner
-from paramiko.ssh_exception import SSHException
+from unittest import TestCase
class ProvisionerTest(TestCase):
def setUp(self):
- self.provisioner = SSHProvisioner(None, None, None)
-
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client(self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os):
- mock_instance = mock_sshclient.return_value
- sshclient = self.provisioner._get_ssh_client()
- self.assertEqual(mock_instance, sshclient)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(1, mock_instance.connect.call_count, "Connect call count")
-
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client_no_connection(
- self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
- ):
-
- mock_instance = mock_sshclient.return_value
- mock_instance.method_inside_someobject.side_effect = ["something"]
- mock_instance.connect.side_effect = SSHException()
-
- self.assertRaises(SSHException, self.provisioner._get_ssh_client)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(1, mock_instance.connect.call_count, "Connect call count")
-
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client_bad_banner(
- self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
- ):
-
- mock_instance = mock_sshclient.return_value
- mock_instance.method_inside_someobject.side_effect = ["something"]
- mock_instance.connect.side_effect = [
- SSHException("Error reading SSH protocol banner"),
- None,
- None,
- ]
-
- sshclient = self.provisioner._get_ssh_client()
- self.assertEqual(mock_instance, sshclient)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(
- 3, mock_instance.connect.call_count, "Should attempt 3 connections"
- )
-
- @mock.patch("time.sleep", autospec=True)
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client_unable_to_connect(
- self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep
- ):
-
- mock_instance = mock_sshclient.return_value
- mock_instance.connect.side_effect = Exception("Unable to connect to port")
-
- self.assertRaises(Exception, self.provisioner._get_ssh_client)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(
- 11, mock_instance.connect.call_count, "Should attempt 11 connections"
- )
-
- @mock.patch("time.sleep", autospec=True)
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client_unable_to_connect_once(
- self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep
- ):
-
- mock_instance = mock_sshclient.return_value
- mock_instance.connect.side_effect = [
- Exception("Unable to connect to port"),
- None,
- ]
-
- sshclient = self.provisioner._get_ssh_client()
- self.assertEqual(mock_instance, sshclient)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(
- 2, mock_instance.connect.call_count, "Should attempt 2 connections"
- )
-
- @mock.patch("n2vc.provisioner.os.path.exists")
- @mock.patch("n2vc.provisioner.paramiko.RSAKey")
- @mock.patch("n2vc.provisioner.paramiko.SSHClient")
- @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
- def test__get_ssh_client_other_exception(
- self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
- ):
-
- mock_instance = mock_sshclient.return_value
- mock_instance.connect.side_effect = Exception()
-
- self.assertRaises(Exception, self.provisioner._get_ssh_client)
- self.assertEqual(
- 1,
- mock_instance.set_missing_host_key_policy.call_count,
- "Missing host key call count",
- )
- self.assertEqual(
- 1, mock_instance.connect.call_count, "Should only attempt 1 connection"
- )
-
-
-#
+ pass
+++ /dev/null
-# Copyright 2019 Canonical Ltd.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import asyncio
-import base64
-import binascii
-import logging
-import os.path
-import re
-import shlex
-import ssl
-import subprocess
-
-from juju.client import client
-from juju.controller import Controller
-from juju.errors import JujuAPIError, JujuError
-from juju.model import ModelObserver
-
-import n2vc.exceptions
-from n2vc.provisioner import SSHProvisioner
-
-
-# import time
-# FIXME: this should load the juju inside or modules without having to
-# explicitly install it. Check why it's not working.
-# Load our subtree of the juju library
-# path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
-# path = os.path.join(path, "modules/libjuju/")
-# if path not in sys.path:
-# sys.path.insert(1, path)
-# We might need this to connect to the websocket securely, but test and verify.
-try:
- ssl._create_default_https_context = ssl._create_unverified_context
-except AttributeError:
- # Legacy Python doesn't verify by default (see pep-0476)
- # https://www.python.org/dev/peps/pep-0476/
- pass
-
-
-# Custom exceptions
-# Deprecated. Please use n2vc.exceptions namespace.
-class JujuCharmNotFound(Exception):
- """The Charm can't be found or is not readable."""
-
-
-class JujuApplicationExists(Exception):
- """The Application already exists."""
-
-
-class N2VCPrimitiveExecutionFailed(Exception):
- """Something failed while attempting to execute a primitive."""
-
-
-class NetworkServiceDoesNotExist(Exception):
- """The Network Service being acted against does not exist."""
-
-
-class PrimitiveDoesNotExist(Exception):
- """The Primitive being executed does not exist."""
-
-
-# Quiet the debug logging
-logging.getLogger("websockets.protocol").setLevel(logging.INFO)
-logging.getLogger("juju.client.connection").setLevel(logging.WARN)
-logging.getLogger("juju.model").setLevel(logging.WARN)
-logging.getLogger("juju.machine").setLevel(logging.WARN)
-
-
-class VCAMonitor(ModelObserver):
- """Monitor state changes within the Juju Model."""
-
- log = None
-
- def __init__(self, ns_name):
- self.log = logging.getLogger(__name__)
-
- self.ns_name = ns_name
- self.applications = {}
-
- def AddApplication(self, application_name, callback, *callback_args):
- if application_name not in self.applications:
- self.applications[application_name] = {
- "callback": callback,
- "callback_args": callback_args,
- }
-
- def RemoveApplication(self, application_name):
- if application_name in self.applications:
- del self.applications[application_name]
-
- async def on_change(self, delta, old, new, model):
- """React to changes in the Juju model."""
-
- if delta.entity == "unit":
- # Ignore change events from other applications
- if delta.data["application"] not in self.applications.keys():
- return
-
- try:
-
- application_name = delta.data["application"]
-
- callback = self.applications[application_name]["callback"]
- callback_args = self.applications[application_name]["callback_args"]
-
- if old and new:
- # Fire off a callback with the application state
- if callback:
- callback(
- self.ns_name,
- delta.data["application"],
- new.workload_status,
- new.workload_status_message,
- *callback_args,
- )
-
- if old and not new:
- # This is a charm being removed
- if callback:
- callback(
- self.ns_name,
- delta.data["application"],
- "removed",
- "",
- *callback_args,
- )
- except Exception as e:
- self.log.debug("[1] notify_callback exception: {}".format(e))
-
- elif delta.entity == "action":
- # TODO: Decide how we want to notify the user of actions
-
- # uuid = delta.data['id'] # The Action's unique id
- # msg = delta.data['message'] # The output of the action
- #
- # if delta.data['status'] == "pending":
- # # The action is queued
- # pass
- # elif delta.data['status'] == "completed""
- # # The action was successful
- # pass
- # elif delta.data['status'] == "failed":
- # # The action failed.
- # pass
-
- pass
-
-
-########
-# TODO
-#
-# Create unique models per network service
-# Document all public functions
-
-
-class N2VC:
- def __init__(
- self,
- log=None,
- server="127.0.0.1",
- port=17070,
- user="admin",
- secret=None,
- artifacts=None,
- loop=None,
- juju_public_key=None,
- ca_cert=None,
- api_proxy=None,
- ):
- """Initialize N2VC
-
- Initializes the N2VC object, allowing the caller to interoperate with the VCA.
-
-
- :param log obj: The logging object to log to
- :param server str: The IP Address or Hostname of the Juju controller
- :param port int: The port of the Juju Controller
- :param user str: The Juju username to authenticate with
- :param secret str: The Juju password to authenticate with
- :param artifacts str: The directory where charms required by a vnfd are
- stored.
- :param loop obj: The loop to use.
- :param juju_public_key str: The contents of the Juju public SSH key
- :param ca_cert str: The CA certificate to use to authenticate
- :param api_proxy str: The IP of the host machine
-
- :Example:
- client = n2vc.vnf.N2VC(
- log=log,
- server='10.1.1.28',
- port=17070,
- user='admin',
- secret='admin',
- artifacts='/app/storage/myvnf/charms',
- loop=loop,
- juju_public_key='<contents of the juju public key>',
- ca_cert='<contents of CA certificate>',
- api_proxy='192.168.1.155'
- )
- """
-
- # Initialize instance-level variables
- self.api = None
- self.log = None
- self.controller = None
- self.connecting = False
- self.authenticated = False
- self.api_proxy = api_proxy
-
- if log:
- self.log = log
- else:
- self.log = logging.getLogger(__name__)
-
- # For debugging
- self.refcount = {
- "controller": 0,
- "model": 0,
- }
-
- self.models = {}
-
- # Model Observers
- self.monitors = {}
-
- # VCA config
- self.hostname = ""
- self.port = 17070
- self.username = ""
- self.secret = ""
-
- self.juju_public_key = juju_public_key
- if juju_public_key:
- self._create_juju_public_key(juju_public_key)
- else:
- self.juju_public_key = ""
-
- # TODO: Verify ca_cert is valid before using. VCA will crash
- # if the ca_cert isn't formatted correctly.
- def base64_to_cacert(b64string):
- """Convert the base64-encoded string containing the VCA CACERT.
-
- The input string....
-
- """
- try:
- cacert = base64.b64decode(b64string).decode("utf-8")
-
- cacert = re.sub(r"\\n", r"\n", cacert,)
- except binascii.Error as e:
- self.log.debug("Caught binascii.Error: {}".format(e))
- raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate")
-
- return cacert
-
- self.ca_cert = None
- if ca_cert:
- self.ca_cert = base64_to_cacert(ca_cert)
-
- # Quiet websocket traffic
- logging.getLogger("websockets.protocol").setLevel(logging.INFO)
- logging.getLogger("juju.client.connection").setLevel(logging.WARN)
- logging.getLogger("model").setLevel(logging.WARN)
- # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
-
- self.log.debug("JujuApi: instantiated")
-
- self.server = server
- self.port = port
-
- self.secret = secret
- if user.startswith("user-"):
- self.user = user
- else:
- self.user = "user-{}".format(user)
-
- self.endpoint = "%s:%d" % (server, int(port))
-
- self.artifacts = artifacts
-
- self.loop = loop or asyncio.get_event_loop()
-
- def __del__(self):
- """Close any open connections."""
- yield self.logout()
-
- def _create_juju_public_key(self, public_key):
- """Recreate the Juju public key on disk.
-
- Certain libjuju commands expect to be run from the same machine as Juju
- is bootstrapped to. This method will write the public key to disk in
- that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
- """
- # Make sure that we have a public key before writing to disk
- if public_key is None or len(public_key) == 0:
- if "OSM_VCA_PUBKEY" in os.environ:
- public_key = os.getenv("OSM_VCA_PUBKEY", "")
- if len(public_key == 0):
- return
- else:
- return
-
- path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),)
- if not os.path.exists(path):
- os.makedirs(path)
-
- with open("{}/juju_id_rsa.pub".format(path), "w") as f:
- f.write(public_key)
-
- def notify_callback(
- self,
- model_name,
- application_name,
- status,
- message,
- callback=None,
- *callback_args
- ):
- try:
- if callback:
- callback(
- model_name, application_name, status, message, *callback_args,
- )
- except Exception as e:
- self.log.error("[0] notify_callback exception {}".format(e))
- raise e
- return True
-
- # Public methods
- async def Relate(self, model_name, vnfd):
- """Create a relation between the charm-enabled VDUs in a VNF.
-
- The Relation mapping has two parts: the id of the vdu owning the endpoint, and
- the name of the endpoint.
-
- vdu:
- ...
- vca-relationships:
- relation:
- - provides: dataVM:db
- requires: mgmtVM:app
-
- This tells N2VC that the charm referred to by the dataVM vdu offers a relation
- named 'db', and the mgmtVM vdu
- has an 'app' endpoint that should be connected to a database.
-
- :param str ns_name: The name of the network service.
- :param dict vnfd: The parsed yaml VNF descriptor.
- """
-
- # Currently, the call to Relate() is made automatically after the
- # deployment of each charm; if the relation depends on a charm that
- # hasn't been deployed yet, the call will fail silently. This will
- # prevent an API breakage, with the intent of making this an explicitly
- # required call in a more object-oriented refactor of the N2VC API.
-
- configs = []
- vnf_config = vnfd.get("vnf-configuration")
- if vnf_config:
- juju = vnf_config["juju"]
- if juju:
- configs.append(vnf_config)
-
- for vdu in vnfd["vdu"]:
- vdu_config = vdu.get("vdu-configuration")
- if vdu_config:
- juju = vdu_config["juju"]
- if juju:
- configs.append(vdu_config)
-
- def _get_application_name(name):
- """Get the application name that's mapped to a vnf/vdu."""
- vnf_member_index = 0
- vnf_name = vnfd["name"]
-
- for vdu in vnfd.get("vdu"):
- # Compare the named portion of the relation to the vdu's id
- if vdu["id"] == name:
- application_name = self.FormatApplicationName(
- model_name, vnf_name, str(vnf_member_index),
- )
- return application_name
- else:
- vnf_member_index += 1
-
- return None
-
- # Loop through relations
- for cfg in configs:
- if "juju" in cfg:
- juju = cfg["juju"]
- if (
- "vca-relationships" in juju
- and "relation" in juju["vca-relationships"]
- ):
- for rel in juju["vca-relationships"]["relation"]:
- try:
-
- # get the application name for the provides
- (name, endpoint) = rel["provides"].split(":")
- application_name = _get_application_name(name)
-
- provides = "{}:{}".format(application_name, endpoint)
-
- # get the application name for thr requires
- (name, endpoint) = rel["requires"].split(":")
- application_name = _get_application_name(name)
-
- requires = "{}:{}".format(application_name, endpoint)
- self.log.debug(
- "Relation: {} <-> {}".format(provides, requires)
- )
- await self.add_relation(
- model_name, provides, requires,
- )
- except Exception as e:
- self.log.debug("Exception: {}".format(e))
-
- return
-
- async def DeployCharms(
- self,
- model_name,
- application_name,
- vnfd,
- charm_path,
- params={},
- machine_spec={},
- callback=None,
- *callback_args
- ):
- """Deploy one or more charms associated with a VNF.
-
- Deploy the charm(s) referenced in a VNF Descriptor.
-
- :param str model_name: The name or unique id of the network service.
- :param str application_name: The name of the application
- :param dict vnfd: The name of the application
- :param str charm_path: The path to the Juju charm
- :param dict params: A dictionary of runtime parameters
- Examples::
- {
- 'rw_mgmt_ip': '1.2.3.4',
- # Pass the initial-config-primitives section of the vnf or vdu
- 'initial-config-primitives': {...}
- 'user_values': dictionary with the day-1 parameters provided at
- instantiation time. It will replace values
- inside < >. rw_mgmt_ip will be included here also
- }
- :param dict machine_spec: A dictionary describing the machine to
- install to
- Examples::
- {
- 'hostname': '1.2.3.4',
- 'username': 'ubuntu',
- }
- :param obj callback: A callback function to receive status changes.
- :param tuple callback_args: A list of arguments to be passed to the
- callback
- """
-
- ########################################################
- # Verify the path to the charm exists and is readable. #
- ########################################################
- if not os.path.exists(charm_path):
- self.log.debug("Charm path doesn't exist: {}".format(charm_path))
- self.notify_callback(
- model_name,
- application_name,
- "error",
- "failed",
- callback,
- *callback_args,
- )
- raise JujuCharmNotFound("No artifacts configured.")
-
- ################################
- # Login to the Juju controller #
- ################################
- if not self.authenticated:
- self.log.debug("Authenticating with Juju")
- await self.login()
-
- ##########################################
- # Get the model for this network service #
- ##########################################
- model = await self.get_model(model_name)
-
- ########################################
- # Verify the application doesn't exist #
- ########################################
- app = await self.get_application(model, application_name)
- if app:
- raise JujuApplicationExists(
- (
- 'Can\'t deploy application "{}" to model '
- ' "{}" because it already exists.'
- ).format(application_name, model_name)
- )
-
- ################################################################
- # Register this application with the model-level event monitor #
- ################################################################
- if callback:
- self.log.debug(
- "JujuApi: Registering callback for {}".format(application_name,)
- )
- await self.Subscribe(model_name, application_name, callback, *callback_args)
-
- #######################################
- # Get the initial charm configuration #
- #######################################
-
- rw_mgmt_ip = None
- if "rw_mgmt_ip" in params:
- rw_mgmt_ip = params["rw_mgmt_ip"]
-
- if "initial-config-primitive" not in params:
- params["initial-config-primitive"] = {}
-
- initial_config = self._get_config_from_dict(
- params["initial-config-primitive"], {"<rw_mgmt_ip>": rw_mgmt_ip}
- )
-
- ########################################################
- # Check for specific machine placement (native charms) #
- ########################################################
- to = ""
- series = "xenial"
-
- if machine_spec.keys():
- if all(k in machine_spec for k in ["hostname", "username"]):
-
- # Allow series to be derived from the native charm
- series = None
-
- self.log.debug(
- "Provisioning manual machine {}@{}".format(
- machine_spec["username"], machine_spec["hostname"],
- )
- )
-
- """Native Charm support
-
- Taking a bare VM (assumed to be an Ubuntu cloud image),
- the provisioning process will:
- - Create an ubuntu user w/sudo access
- - Detect hardware
- - Detect architecture
- - Download and install Juju agent from controller
- - Enable Juju agent
- - Add an iptables rule to route traffic to the API proxy
- """
-
- to = await self.provision_machine(
- model_name=model_name,
- username=machine_spec["username"],
- hostname=machine_spec["hostname"],
- private_key_path=self.GetPrivateKeyPath(),
- )
- self.log.debug("Provisioned machine id {}".format(to))
-
- # TODO: If to is none, raise an exception
-
- # The native charm won't have the sshproxy layer, typically, but LCM
- # uses the config primitive
- # to interpret what the values are. That's a gap to fill.
-
- """
- The ssh-* config parameters are unique to the sshproxy layer,
- which most native charms will not be aware of.
-
- Setting invalid config parameters will cause the deployment to
- fail.
-
- For the moment, we will strip the ssh-* parameters from native
- charms, until the feature gap is addressed in the information
- model.
- """
-
- # Native charms don't include the ssh-* config values, so strip them
- # from the initial_config, otherwise the deploy will raise an error.
- # self.log.debug("Removing ssh-* from initial-config")
- for k in ["ssh-hostname", "ssh-username", "ssh-password"]:
- if k in initial_config:
- self.log.debug("Removing parameter {}".format(k))
- del initial_config[k]
-
- self.log.debug(
- "JujuApi: Deploying charm ({}/{}) from {} to {}".format(
- model_name, application_name, charm_path, to,
- )
- )
-
- ########################################################
- # Deploy the charm and apply the initial configuration #
- ########################################################
- app = await model.deploy(
- # We expect charm_path to be either the path to the charm on disk
- # or in the format of cs:series/name
- charm_path,
- # This is the formatted, unique name for this charm
- application_name=application_name,
- # Proxy charms should use the current LTS. This will need to be
- # changed for native charms.
- series=series,
- # Apply the initial 'config' primitive during deployment
- config=initial_config,
- # Where to deploy the charm to.
- to=to,
- )
-
- #############################
- # Map the vdu id<->app name #
- #############################
- try:
- await self.Relate(model_name, vnfd)
- except KeyError as ex:
- # We don't currently support relations between NS and VNF/VDU charms
- self.log.warn("[N2VC] Relations not supported: {}".format(ex))
- except Exception:
- # This may happen if not all of the charms needed by the relation
- # are ready. We can safely ignore this, because Relate will be
- # retried when the endpoint of the relation is deployed.
- self.log.warn("[N2VC] Relations not ready")
-
- # #######################################
- # # Execute initial config primitive(s) #
- # #######################################
- uuids = await self.ExecuteInitialPrimitives(
- model_name, application_name, params,
- )
- return uuids
-
- # primitives = {}
- #
- # # Build a sequential list of the primitives to execute
- # for primitive in params['initial-config-primitive']:
- # try:
- # if primitive['name'] == 'config':
- # # This is applied when the Application is deployed
- # pass
- # else:
- # seq = primitive['seq']
- #
- # params = {}
- # if 'parameter' in primitive:
- # params = primitive['parameter']
- #
- # primitives[seq] = {
- # 'name': primitive['name'],
- # 'parameters': self._map_primitive_parameters(
- # params,
- # {'<rw_mgmt_ip>': rw_mgmt_ip}
- # ),
- # }
- #
- # for primitive in sorted(primitives):
- # await self.ExecutePrimitive(
- # model_name,
- # application_name,
- # primitives[primitive]['name'],
- # callback,
- # callback_args,
- # **primitives[primitive]['parameters'],
- # )
- # except N2VCPrimitiveExecutionFailed as e:
- # self.log.debug(
- # "[N2VC] Exception executing primitive: {}".format(e)
- # )
- # raise
-
- async def GetPrimitiveStatus(self, model_name, uuid):
- """Get the status of an executed Primitive.
-
- The status of an executed Primitive will be one of three values:
- - completed
- - failed
- - running
- """
- status = None
- try:
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
-
- results = await model.get_action_status(uuid)
-
- if uuid in results:
- status = results[uuid]
-
- except Exception as e:
- self.log.debug(
- "Caught exception while getting primitive status: {}".format(e)
- )
- raise N2VCPrimitiveExecutionFailed(e)
-
- return status
-
- async def GetPrimitiveOutput(self, model_name, uuid):
- """Get the output of an executed Primitive.
-
- Note: this only returns output for a successfully executed primitive.
- """
- results = None
- try:
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
- results = await model.get_action_output(uuid, 60)
- except Exception as e:
- self.log.debug(
- "Caught exception while getting primitive status: {}".format(e)
- )
- raise N2VCPrimitiveExecutionFailed(e)
-
- return results
-
- # async def ProvisionMachine(self, model_name, hostname, username):
- # """Provision machine for usage with Juju.
- #
- # Provisions a previously instantiated machine for use with Juju.
- # """
- # try:
- # if not self.authenticated:
- # await self.login()
- #
- # # FIXME: This is hard-coded until model-per-ns is added
- # model_name = 'default'
- #
- # model = await self.get_model(model_name)
- # model.add_machine(spec={})
- #
- # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format(
- # "ubuntu",
- # host['address'],
- # private_key_path,
- # ))
- # return machine.id
- #
- # except Exception as e:
- # self.log.debug(
- # "Caught exception while getting primitive status: {}".format(e)
- # )
- # raise N2VCPrimitiveExecutionFailed(e)
-
- def GetPrivateKeyPath(self):
- homedir = os.environ["HOME"]
- sshdir = "{}/.ssh".format(homedir)
- private_key_path = "{}/id_n2vc_rsa".format(sshdir)
- return private_key_path
-
- async def GetPublicKey(self):
- """Get the N2VC SSH public key.abs
-
- Returns the SSH public key, to be injected into virtual machines to
- be managed by the VCA.
-
- The first time this is run, a ssh keypair will be created. The public
- key is injected into a VM so that we can provision the machine with
- Juju, after which Juju will communicate with the VM directly via the
- juju agent.
- """
- # public_key = ""
-
- # Find the path to where we expect our key to live.
- homedir = os.environ["HOME"]
- sshdir = "{}/.ssh".format(homedir)
- if not os.path.exists(sshdir):
- os.mkdir(sshdir)
-
- private_key_path = "{}/id_n2vc_rsa".format(sshdir)
- public_key_path = "{}.pub".format(private_key_path)
-
- # If we don't have a key generated, generate it.
- if not os.path.exists(private_key_path):
- cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
- "rsa", "4096", private_key_path
- )
- subprocess.check_output(shlex.split(cmd))
-
- # Read the public key
- with open(public_key_path, "r") as f:
- public_key = f.readline()
-
- return public_key
-
- async def ExecuteInitialPrimitives(
- self, model_name, application_name, params, callback=None, *callback_args
- ):
- """Execute multiple primitives.
-
- Execute multiple primitives as declared in initial-config-primitive.
- This is useful in cases where the primitives initially failed -- for
- example, if the charm is a proxy but the proxy hasn't been configured
- yet.
- """
- uuids = []
- primitives = {}
-
- # Build a sequential list of the primitives to execute
- for primitive in params["initial-config-primitive"]:
- try:
- if primitive["name"] == "config":
- pass
- else:
- seq = primitive["seq"]
-
- params_ = {}
- if "parameter" in primitive:
- params_ = primitive["parameter"]
-
- user_values = params.get("user_values", {})
- if "rw_mgmt_ip" not in user_values:
- user_values["rw_mgmt_ip"] = None
- # just for backward compatibility, because it will be provided
- # always by modern version of LCM
-
- primitives[seq] = {
- "name": primitive["name"],
- "parameters": self._map_primitive_parameters(
- params_, user_values
- ),
- }
-
- for primitive in sorted(primitives):
- try:
- # self.log.debug("Queuing action {}".format(
- # primitives[primitive]['name']))
- uuids.append(
- await self.ExecutePrimitive(
- model_name,
- application_name,
- primitives[primitive]["name"],
- callback,
- callback_args,
- **primitives[primitive]["parameters"],
- )
- )
- except PrimitiveDoesNotExist as e:
- self.log.debug(
- "Ignoring exception PrimitiveDoesNotExist: {}".format(e)
- )
- pass
- except Exception as e:
- self.log.debug(
- (
- "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}"
- ).format(e)
- )
- raise e
-
- except N2VCPrimitiveExecutionFailed as e:
- self.log.debug("[N2VC] Exception executing primitive: {}".format(e))
- raise
- return uuids
-
- async def ExecutePrimitive(
- self,
- model_name,
- application_name,
- primitive,
- callback,
- *callback_args,
- **params
- ):
- """Execute a primitive of a charm for Day 1 or Day 2 configuration.
-
- Execute a primitive defined in the VNF descriptor.
-
- :param str model_name: The name or unique id of the network service.
- :param str application_name: The name of the application
- :param str primitive: The name of the primitive to execute.
- :param obj callback: A callback function to receive status changes.
- :param tuple callback_args: A list of arguments to be passed to the
- callback function.
- :param dict params: A dictionary of key=value pairs representing the
- primitive's parameters
- Examples::
- {
- 'rw_mgmt_ip': '1.2.3.4',
- # Pass the initial-config-primitives section of the vnf or vdu
- 'initial-config-primitives': {...}
- }
- """
- self.log.debug("Executing primitive={} params={}".format(primitive, params))
- uuid = None
- try:
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
-
- if primitive == "config":
- # config is special, and expecting params to be a dictionary
- await self.set_config(
- model, application_name, params["params"],
- )
- else:
- app = await self.get_application(model, application_name)
- if app:
- # Does this primitive exist?
- actions = await app.get_actions()
-
- if primitive not in actions.keys():
- raise PrimitiveDoesNotExist(
- "Primitive {} does not exist".format(primitive)
- )
-
- # Run against the first (and probably only) unit in the app
- unit = app.units[0]
- if unit:
- action = await unit.run_action(primitive, **params)
- uuid = action.id
- except PrimitiveDoesNotExist as e:
- # Catch and raise this exception if it's thrown from the inner block
- raise e
- except Exception as e:
- # An unexpected exception was caught
- self.log.debug("Caught exception while executing primitive: {}".format(e))
- raise N2VCPrimitiveExecutionFailed(e)
- return uuid
-
- async def RemoveCharms(
- self, model_name, application_name, callback=None, *callback_args
- ):
- """Remove a charm from the VCA.
-
- Remove a charm referenced in a VNF Descriptor.
-
- :param str model_name: The name of the network service.
- :param str application_name: The name of the application
- :param obj callback: A callback function to receive status changes.
- :param tuple callback_args: A list of arguments to be passed to the
- callback function.
- """
- try:
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
- app = await self.get_application(model, application_name)
- if app:
- # Remove this application from event monitoring
- await self.Unsubscribe(model_name, application_name)
-
- # self.notify_callback(model_name, application_name, "removing",
- # callback, *callback_args)
- self.log.debug("Removing the application {}".format(application_name))
- await app.remove()
-
- # await self.disconnect_model(self.monitors[model_name])
-
- self.notify_callback(
- model_name,
- application_name,
- "removed",
- "Removing charm {}".format(application_name),
- callback,
- *callback_args,
- )
-
- except Exception as e:
- print("Caught exception: {}".format(e))
- self.log.debug(e)
- raise e
-
- async def CreateNetworkService(self, ns_uuid):
- """Create a new Juju model for the Network Service.
-
- Creates a new Model in the Juju Controller.
-
- :param str ns_uuid: A unique id representing an instaance of a
- Network Service.
-
- :returns: True if the model was created. Raises JujuError on failure.
- """
- if not self.authenticated:
- await self.login()
-
- models = await self.controller.list_models()
- if ns_uuid not in models:
- # Get the new model
- await self.get_model(ns_uuid)
-
- return True
-
- async def DestroyNetworkService(self, ns_uuid):
- """Destroy a Network Service.
-
- Destroy the Network Service and any deployed charms.
-
- :param ns_uuid The unique id of the Network Service
-
- :returns: True if the model was created. Raises JujuError on failure.
- """
-
- # Do not delete the default model. The default model was used by all
- # Network Services, prior to the implementation of a model per NS.
- if ns_uuid.lower() == "default":
- return False
-
- if not self.authenticated:
- await self.login()
-
- models = await self.controller.list_models()
- if ns_uuid in models:
- model = await self.controller.get_model(ns_uuid)
-
- for application in model.applications:
- app = model.applications[application]
-
- await self.RemoveCharms(ns_uuid, application)
-
- self.log.debug("Unsubscribing Watcher for {}".format(application))
- await self.Unsubscribe(ns_uuid, application)
-
- self.log.debug("Waiting for application to terminate")
- timeout = 30
- try:
- await model.block_until(
- lambda: all(
- unit.workload_status in ["terminated"] for unit in app.units
- ),
- timeout=timeout,
- )
- except Exception:
- self.log.debug(
- "Timed out waiting for {} to terminate.".format(application)
- )
-
- for machine in model.machines:
- try:
- self.log.debug("Destroying machine {}".format(machine))
- await model.machines[machine].destroy(force=True)
- except JujuAPIError as e:
- if "does not exist" in str(e):
- # Our cached model may be stale, because the machine
- # has already been removed. It's safe to continue.
- continue
- else:
- self.log.debug("Caught exception: {}".format(e))
- raise e
-
- # Disconnect from the Model
- if ns_uuid in self.models:
- self.log.debug("Disconnecting model {}".format(ns_uuid))
- # await self.disconnect_model(self.models[ns_uuid])
- await self.disconnect_model(ns_uuid)
-
- try:
- self.log.debug("Destroying model {}".format(ns_uuid))
- await self.controller.destroy_models(ns_uuid)
- except JujuError:
- raise NetworkServiceDoesNotExist(
- "The Network Service '{}' does not exist".format(ns_uuid)
- )
-
- return True
-
- async def GetMetrics(self, model_name, application_name):
- """Get the metrics collected by the VCA.
-
- :param model_name The name or unique id of the network service
- :param application_name The name of the application
- """
- metrics = {}
- model = await self.get_model(model_name)
- app = await self.get_application(model, application_name)
- if app:
- metrics = await app.get_metrics()
-
- return metrics
-
- async def HasApplication(self, model_name, application_name):
- model = await self.get_model(model_name)
- app = await self.get_application(model, application_name)
- if app:
- return True
- return False
-
- async def Subscribe(self, ns_name, application_name, callback, *callback_args):
- """Subscribe to callbacks for an application.
-
- :param ns_name str: The name of the Network Service
- :param application_name str: The name of the application
- :param callback obj: The callback method
- :param callback_args list: The list of arguments to append to calls to
- the callback method
- """
- self.monitors[ns_name].AddApplication(
- application_name, callback, *callback_args
- )
-
- async def Unsubscribe(self, ns_name, application_name):
- """Unsubscribe to callbacks for an application.
-
- Unsubscribes the caller from notifications from a deployed application.
-
- :param ns_name str: The name of the Network Service
- :param application_name str: The name of the application
- """
- self.monitors[ns_name].RemoveApplication(application_name,)
-
- # Non-public methods
- async def add_relation(self, model_name, relation1, relation2):
- """
- Add a relation between two application endpoints.
-
- :param str model_name: The name or unique id of the network service
- :param str relation1: '<application>[:<relation_name>]'
- :param str relation2: '<application>[:<relation_name>]'
- """
-
- if not self.authenticated:
- await self.login()
-
- m = await self.get_model(model_name)
- try:
- await m.add_relation(relation1, relation2)
- except JujuAPIError as e:
- # If one of the applications in the relationship doesn't exist,
- # or the relation has already been added, let the operation fail
- # silently.
- if "not found" in e.message:
- return
- if "already exists" in e.message:
- return
-
- raise e
-
- # async def apply_config(self, config, application):
- # """Apply a configuration to the application."""
- # print("JujuApi: Applying configuration to {}.".format(
- # application
- # ))
- # return await self.set_config(application=application, config=config)
-
- def _get_config_from_dict(self, config_primitive, values):
- """Transform the yang config primitive to dict.
-
- Expected result:
-
- config = {
- 'config':
- }
- """
- config = {}
- for primitive in config_primitive:
- if primitive["name"] == "config":
- # config = self._map_primitive_parameters()
- for parameter in primitive["parameter"]:
- param = str(parameter["name"])
- if parameter["value"] == "<rw_mgmt_ip>":
- config[param] = str(values[parameter["value"]])
- else:
- config[param] = str(parameter["value"])
-
- return config
-
- def _map_primitive_parameters(self, parameters, user_values):
- params = {}
- for parameter in parameters:
- param = str(parameter["name"])
- value = parameter.get("value")
-
- # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user
- # _values.
- # Must exist at user_values except if there is a default value
- if isinstance(value, str) and value.startswith("<") and value.endswith(">"):
- if parameter["value"][1:-1] in user_values:
- value = user_values[parameter["value"][1:-1]]
- elif "default-value" in parameter:
- value = parameter["default-value"]
- else:
- raise KeyError(
- "parameter {}='{}' not supplied ".format(param, value)
- )
-
- # If there's no value, use the default-value (if set)
- if value is None and "default-value" in parameter:
- value = parameter["default-value"]
-
- # Typecast parameter value, if present
- paramtype = "string"
- try:
- if "data-type" in parameter:
- paramtype = str(parameter["data-type"]).lower()
-
- if paramtype == "integer":
- value = int(value)
- elif paramtype == "boolean":
- value = bool(value)
- else:
- value = str(value)
- else:
- # If there's no data-type, assume the value is a string
- value = str(value)
- except ValueError:
- raise ValueError(
- "parameter {}='{}' cannot be converted to type {}".format(
- param, value, paramtype
- )
- )
-
- params[param] = value
- return params
-
- def _get_config_from_yang(self, config_primitive, values):
- """Transform the yang config primitive to dict."""
- config = {}
- for primitive in config_primitive.values():
- if primitive["name"] == "config":
- for parameter in primitive["parameter"].values():
- param = str(parameter["name"])
- if parameter["value"] == "<rw_mgmt_ip>":
- config[param] = str(values[parameter["value"]])
- else:
- config[param] = str(parameter["value"])
-
- return config
-
- def FormatApplicationName(self, *args):
- """
- Generate a Juju-compatible Application name
-
- :param args tuple: Positional arguments to be used to construct the
- application name.
-
- Limitations::
- - Only accepts characters a-z and non-consequitive dashes (-)
- - Application name should not exceed 50 characters
-
- Examples::
-
- FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
- """
- appname = ""
- for c in "-".join(list(args)):
- if c.isdigit():
- c = chr(97 + int(c))
- elif not c.isalpha():
- c = "-"
- appname += c
- return re.sub("-+", "-", appname.lower())
-
- # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
- # """Format the name of the application
- #
- # Limitations:
- # - Only accepts characters a-z and non-consequitive dashes (-)
- # - Application name should not exceed 50 characters
- # """
- # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
- # new_name = ''
- # for c in name:
- # if c.isdigit():
- # c = chr(97 + int(c))
- # elif not c.isalpha():
- # c = "-"
- # new_name += c
- # return re.sub('\-+', '-', new_name.lower())
-
- def format_model_name(self, name):
- """Format the name of model.
-
- Model names may only contain lowercase letters, digits and hyphens
- """
-
- return name.replace("_", "-").lower()
-
- async def get_application(self, model, application):
- """Get the deployed application."""
- if not self.authenticated:
- await self.login()
-
- app = None
- if application and model:
- if model.applications:
- if application in model.applications:
- app = model.applications[application]
-
- return app
-
- async def get_model(self, model_name):
- """Get a model from the Juju Controller.
-
- Note: Model objects returned must call disconnected() before it goes
- out of scope."""
- if not self.authenticated:
- await self.login()
-
- if model_name not in self.models:
- # Get the models in the controller
- models = await self.controller.list_models()
-
- if model_name not in models:
- try:
- self.models[model_name] = await self.controller.add_model(
- model_name, config={"authorized-keys": self.juju_public_key}
- )
- except JujuError as e:
- if "already exists" not in e.message:
- raise e
- else:
- self.models[model_name] = await self.controller.get_model(model_name)
-
- self.refcount["model"] += 1
-
- # Create an observer for this model
- await self.create_model_monitor(model_name)
-
- return self.models[model_name]
-
- async def create_model_monitor(self, model_name):
- """Create a monitor for the model, if none exists."""
- if not self.authenticated:
- await self.login()
-
- if model_name not in self.monitors:
- self.monitors[model_name] = VCAMonitor(model_name)
- self.models[model_name].add_observer(self.monitors[model_name])
-
- return True
-
- async def login(self):
- """Login to the Juju controller."""
-
- if self.authenticated:
- return
-
- self.connecting = True
-
- self.log.debug("JujuApi: Logging into controller")
-
- self.controller = Controller(loop=self.loop)
-
- if self.secret:
- self.log.debug(
- "Connecting to controller... ws://{} as {}/{}".format(
- self.endpoint, self.user, self.secret,
- )
- )
- try:
- await self.controller.connect(
- endpoint=self.endpoint,
- username=self.user,
- password=self.secret,
- cacert=self.ca_cert,
- )
- self.refcount["controller"] += 1
- self.authenticated = True
- self.log.debug("JujuApi: Logged into controller")
- except Exception as ex:
- self.log.debug("Caught exception: {}".format(ex))
- else:
- # current_controller no longer exists
- # self.log.debug("Connecting to current controller...")
- # await self.controller.connect_current()
- # await self.controller.connect(
- # endpoint=self.endpoint,
- # username=self.user,
- # cacert=cacert,
- # )
- self.log.fatal("VCA credentials not configured.")
- self.authenticated = False
-
- async def logout(self):
- """Logout of the Juju controller."""
- if not self.authenticated:
- return False
-
- try:
- for model in self.models:
- await self.disconnect_model(model)
-
- if self.controller:
- self.log.debug("Disconnecting controller {}".format(self.controller))
- await self.controller.disconnect()
- self.refcount["controller"] -= 1
- self.controller = None
-
- self.authenticated = False
-
- self.log.debug(self.refcount)
-
- except Exception as e:
- self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e))
- raise e
- return True
-
- async def disconnect_model(self, model):
- self.log.debug("Disconnecting model {}".format(model))
- if model in self.models:
- try:
- await self.models[model].disconnect()
- self.refcount["model"] -= 1
- self.models[model] = None
- except Exception as e:
- self.log.debug("Caught exception: {}".format(e))
-
- async def provision_machine(
- self, model_name: str, hostname: str, username: str, private_key_path: str
- ) -> int:
- """Provision a machine.
-
- This executes the SSH provisioner, which will log in to a machine via
- SSH and prepare it for use with the Juju model
-
- :param model_name str: The name of the model
- :param hostname str: The IP or hostname of the target VM
- :param user str: The username to login to
- :param private_key_path str: The path to the private key that's been injected
- to the VM via cloud-init
- :return machine_id int: Returns the id of the machine or None if provisioning
- fails
- """
- if not self.authenticated:
- await self.login()
-
- machine_id = None
-
- if self.api_proxy:
- self.log.debug(
- "Instantiating SSH Provisioner for {}@{} ({})".format(
- username, hostname, private_key_path
- )
- )
- provisioner = SSHProvisioner(
- host=hostname,
- user=username,
- private_key_path=private_key_path,
- log=self.log,
- )
-
- params = None
- try:
- params = provisioner.provision_machine()
- except Exception as ex:
- self.log.debug("caught exception from provision_machine: {}".format(ex))
- return None
-
- if params:
- params.jobs = ["JobHostUnits"]
-
- model = await self.get_model(model_name)
-
- connection = model.connection()
-
- # Submit the request.
- self.log.debug("Adding machine to model")
- client_facade = client.ClientFacade.from_connection(connection)
- results = await client_facade.AddMachines(params=[params])
- error = results.machines[0].error
- if error:
- raise ValueError("Error adding machine: %s" % error.message)
-
- machine_id = results.machines[0].machine
-
- # Need to run this after AddMachines has been called,
- # as we need the machine_id
- self.log.debug("Installing Juju agent")
- await provisioner.install_agent(
- connection, params.nonce, machine_id, self.api_proxy,
- )
- else:
- self.log.debug("Missing API Proxy")
- return machine_id
-
- # async def remove_application(self, name):
- # """Remove the application."""
- # if not self.authenticated:
- # await self.login()
- #
- # app = await self.get_application(name)
- # if app:
- # self.log.debug("JujuApi: Destroying application {}".format(
- # name,
- # ))
- #
- # await app.destroy()
-
- async def remove_relation(self, a, b):
- """
- Remove a relation between two application endpoints
-
- :param a An application endpoint
- :param b An application endpoint
- """
- if not self.authenticated:
- await self.login()
-
- # m = await self.get_model()
- # try:
- # m.remove_relation(a, b)
- # finally:
- # await m.disconnect()
-
- async def resolve_error(self, model_name, application=None):
- """Resolve units in error state."""
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
-
- app = await self.get_application(model, application)
- if app:
- self.log.debug(
- "JujuApi: Resolving errors for application {}".format(application,)
- )
-
- for _ in app.units:
- app.resolved(retry=True)
-
- async def run_action(self, model_name, application, action_name, **params):
- """Execute an action and return an Action object."""
- if not self.authenticated:
- await self.login()
- result = {"status": "", "action": {"tag": None, "results": None}}
-
- model = await self.get_model(model_name)
-
- app = await self.get_application(model, application)
- if app:
- # We currently only have one unit per application
- # so use the first unit available.
- unit = app.units[0]
-
- self.log.debug(
- "JujuApi: Running Action {} against Application {}".format(
- action_name, application,
- )
- )
-
- action = await unit.run_action(action_name, **params)
-
- # Wait for the action to complete
- await action.wait()
-
- result["status"] = action.status
- result["action"]["tag"] = action.data["id"]
- result["action"]["results"] = action.results
-
- return result
-
- async def set_config(self, model_name, application, config):
- """Apply a configuration to the application."""
- if not self.authenticated:
- await self.login()
-
- app = await self.get_application(model_name, application)
- if app:
- self.log.debug(
- "JujuApi: Setting config for Application {}".format(application,)
- )
- await app.set_config(config)
-
- # Verify the config is set
- newconf = await app.get_config()
- for key in config:
- if config[key] != newconf[key]["value"]:
- self.log.debug(
- (
- "JujuApi: Config not set! Key {} Value {} doesn't match {}"
- ).format(key, config[key], newconf[key])
- )
-
- # async def set_parameter(self, parameter, value, application=None):
- # """Set a config parameter for a service."""
- # if not self.authenticated:
- # await self.login()
- #
- # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
- # parameter,
- # value,
- # application,
- # ))
- # return await self.apply_config(
- # {parameter: value},
- # application=application,
- # )
-
- async def wait_for_application(self, model_name, application_name, timeout=300):
- """Wait for an application to become active."""
- if not self.authenticated:
- await self.login()
-
- model = await self.get_model(model_name)
-
- app = await self.get_application(model, application_name)
- self.log.debug("Application: {}".format(app))
- if app:
- self.log.debug(
- "JujuApi: Waiting {} seconds for Application {}".format(
- timeout, application_name,
- )
- )
-
- await model.block_until(
- lambda: all(
- unit.agent_status == "idle"
- and unit.workload_status in ["active", "unknown"]
- for unit in app.units
- ),
- timeout=timeout,
- )
git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
juju==2.8.2
-paramiko
pyasn1>=0.4.4
kubernetes==10.0.1
exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
install_requires=[
'juju==2.8.2',
- 'paramiko',
'pyasn1>=0.4.4',
'kubernetes==10.0.1'
],