--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import asyncio
+import time
+
+from juju.action import Action
+from juju.application import Application
+from juju.machine import Machine
+from juju.model import ModelObserver, Model
+
+from n2vc.exceptions import N2VCTimeoutException
+from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
+
+
+class _Entity:
+ def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
+ self.entity_id = entity_id
+ self.entity_type = entity_type
+ self.obj = obj
+ self.event = asyncio.Event()
+ self.db_dict = db_dict
+
+
+class JujuModelObserver(ModelObserver):
+ def __init__(self, n2vc: N2VCConnector, model: Model):
+ self.n2vc = n2vc
+ self.model = model
+ model.add_observer(self)
+ self.machines = dict()
+ self.applications = dict()
+ self.actions = dict()
+
+ def register_machine(self, machine: Machine, db_dict: dict):
+ try:
+ entity_id = machine.entity_id
+ except Exception:
+ # no entity_id aatribute, try machine attribute
+ entity_id = machine.machine
+ # self.n2vc.debug(
+ # msg='Registering machine for change notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict
+ )
+ self.machines[entity_id] = entity
+
+ def unregister_machine(self, machine_id: str):
+ if machine_id in self.machines:
+ del self.machines[machine_id]
+
+ def is_machine_registered(self, machine_id: str):
+ return machine_id in self.machines
+
+ def register_application(self, application: Application, db_dict: dict):
+ entity_id = application.entity_id
+ # self.n2vc.debug(
+ # msg='Registering application for change notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id,
+ entity_type="application",
+ obj=application,
+ db_dict=db_dict,
+ )
+ self.applications[entity_id] = entity
+
+ def unregister_application(self, application_id: str):
+ if application_id in self.applications:
+ del self.applications[application_id]
+
+ def is_application_registered(self, application_id: str):
+ return application_id in self.applications
+
+ def register_action(self, action: Action, db_dict: dict):
+ entity_id = action.entity_id
+ # self.n2vc.debug(
+ # msg='Registering action for changes notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict
+ )
+ self.actions[entity_id] = entity
+
+ def unregister_action(self, action_id: str):
+ if action_id in self.actions:
+ del self.actions[action_id]
+
+ def is_action_registered(self, action_id: str):
+ return action_id in self.actions
+
+ async def wait_for_machine(
+ self,
+ machine_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
+
+ if not self.is_machine_registered(machine_id):
+ return
+
+ self.n2vc.debug("Waiting for machine completed: {}".format(machine_id))
+
+ # wait for a final state
+ entity = self.machines[machine_id]
+ return await self._wait_for_entity(
+ entity=entity,
+ field_to_check="agent_status",
+ final_states_list=["started"],
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ async def wait_for_application(
+ self,
+ application_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
+
+ if not self.is_application_registered(application_id):
+ return
+
+ self.n2vc.debug("Waiting for application completed: {}".format(application_id))
+
+ # application statuses: unknown, active, waiting
+ # wait for a final state
+ entity = self.applications[application_id]
+ return await self._wait_for_entity(
+ entity=entity,
+ field_to_check="status",
+ final_states_list=["active", "blocked"],
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ async def wait_for_action(
+ self,
+ action_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
+
+ if not self.is_action_registered(action_id):
+ return
+
+ self.n2vc.debug("Waiting for action completed: {}".format(action_id))
+
+ # action statuses: pending, running, completed, failed, cancelled
+ # wait for a final state
+ entity = self.actions[action_id]
+ return await self._wait_for_entity(
+ entity=entity,
+ field_to_check="status",
+ final_states_list=["completed", "failed", "cancelled"],
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ async def _wait_for_entity(
+ self,
+ entity: _Entity,
+ field_to_check: str,
+ final_states_list: list,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
+
+ # default values for no timeout
+ if total_timeout is None:
+ total_timeout = 3600
+ if progress_timeout is None:
+ progress_timeout = 3600
+
+ # max end time
+ now = time.time()
+ total_end = now + total_timeout
+
+ if now >= total_end:
+ raise N2VCTimeoutException(
+ message="Total timeout {} seconds, {}: {}".format(
+ total_timeout, entity.entity_type, entity.entity_id
+ ),
+ timeout="total",
+ )
+
+ # update next progress timeout
+ progress_end = now + progress_timeout # type: float
+
+ # which is closest? progress or end timeout?
+ closest_end = min(total_end, progress_end)
+
+ next_timeout = closest_end - now
+
+ retries = 0
+
+ while entity.obj.__getattribute__(field_to_check) not in final_states_list:
+ retries += 1
+ if await _wait_for_event_or_timeout(entity.event, next_timeout):
+ entity.event.clear()
+ else:
+ message = "Progress timeout {} seconds, {}: {}".format(
+ progress_timeout, entity.entity_type, entity.entity_id
+ )
+ self.n2vc.debug(message)
+ raise N2VCTimeoutException(message=message, timeout="progress")
+ # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
+ # .format(entity.obj.__getattribute__(field_to_check), retries))
+ return retries
+
+ async def on_change(self, delta, old, new, model):
+
+ if new is None:
+ return
+
+ # log
+ # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
+ # .format(delta.type, delta.entity, new.entity_id))
+
+ if delta.entity == "machine":
+
+ # check registered machine
+ if new.entity_id not in self.machines:
+ return
+
+ # write change in database
+ await self.n2vc.write_app_status_to_db(
+ db_dict=self.machines[new.entity_id].db_dict,
+ status=juju_status_2_osm_status(delta.entity, new.agent_status),
+ detailed_status=new.status_message,
+ vca_status=new.status,
+ entity_type="machine",
+ )
+
+ # set event for this machine
+ self.machines[new.entity_id].event.set()
+
+ elif delta.entity == "application":
+
+ # check registered application
+ if new.entity_id not in self.applications:
+ return
+
+ # write change in database
+ await self.n2vc.write_app_status_to_db(
+ db_dict=self.applications[new.entity_id].db_dict,
+ status=juju_status_2_osm_status(delta.entity, new.status),
+ detailed_status=new.status_message,
+ vca_status=new.status,
+ entity_type="application",
+ )
+
+ # set event for this application
+ self.applications[new.entity_id].event.set()
+
+ elif delta.entity == "unit":
+
+ # get the application for this unit
+ application_id = delta.data["application"]
+
+ # check registered application
+ if application_id not in self.applications:
+ return
+
+ # write change in database
+ if not new.dead:
+ await self.n2vc.write_app_status_to_db(
+ db_dict=self.applications[application_id].db_dict,
+ status=juju_status_2_osm_status(delta.entity, new.workload_status),
+ detailed_status=new.workload_status_message,
+ vca_status=new.workload_status,
+ entity_type="unit",
+ )
+
+ # set event for this application
+ self.applications[application_id].event.set()
+
+ elif delta.entity == "action":
+
+ # check registered action
+ if new.entity_id not in self.actions:
+ return
+
+ # write change in database
+ await self.n2vc.write_app_status_to_db(
+ db_dict=self.actions[new.entity_id].db_dict,
+ status=juju_status_2_osm_status(delta.entity, new.status),
+ detailed_status=new.status,
+ vca_status=new.status,
+ entity_type="action",
+ )
+
+ # set event for this application
+ self.actions[new.entity_id].event.set()
+
+
+async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
+ try:
+ await asyncio.wait_for(fut=event.wait(), timeout=timeout)
+ except asyncio.TimeoutError:
+ pass
+ return event.is_set()
return JujuStatusToOSM[entity_type][status]
+# DEPRECATED
+def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
+ if statustype == "application" or statustype == "unit":
+ if status in ["waiting", "maintenance"]:
+ return N2VCDeploymentStatus.RUNNING
+ if status in ["error"]:
+ return N2VCDeploymentStatus.FAILED
+ elif status in ["active"]:
+ return N2VCDeploymentStatus.COMPLETED
+ elif status in ["blocked"]:
+ return N2VCDeploymentStatus.RUNNING
+ else:
+ return N2VCDeploymentStatus.UNKNOWN
+ elif statustype == "action":
+ if status in ["running"]:
+ return N2VCDeploymentStatus.RUNNING
+ elif status in ["completed"]:
+ return N2VCDeploymentStatus.COMPLETED
+ else:
+ return N2VCDeploymentStatus.UNKNOWN
+ elif statustype == "machine":
+ if status in ["pending"]:
+ return N2VCDeploymentStatus.PENDING
+ elif status in ["started"]:
+ return N2VCDeploymentStatus.COMPLETED
+ else:
+ return N2VCDeploymentStatus.UNKNOWN
+
+ return N2VCDeploymentStatus.FAILED
+
+
def obj_to_yaml(obj: object) -> str:
# dump to yaml
dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
import logging
import os
import re
-
+import time
+
+from juju.action import Action
+from juju.application import Application
+from juju.client import client
+from juju.controller import Controller
+from juju.errors import JujuAPIError
+from juju.machine import Machine
+from juju.model import Model
from n2vc.exceptions import (
N2VCBadArgumentsException,
N2VCException,
N2VCConnectionException,
N2VCExecutionException,
N2VCInvalidCertificate,
- # N2VCNotFound,
+ N2VCNotFound,
MethodNotImplemented,
JujuK8sProxycharmNotSupported,
)
+from n2vc.juju_observer import JujuModelObserver
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
+from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.libjuju import Libjuju
return N2VCJujuConnector._format_app_name(application_name)
+ async def _juju_create_machine(
+ self,
+ model_name: str,
+ application_name: str,
+ machine_id: str = None,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> Machine:
+
+ self.log.debug(
+ "creating machine in model: {}, existing machine id: {}".format(
+ model_name, machine_id
+ )
+ )
+
+ # get juju model and observer (create model if needed)
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # find machine id in model
+ machine = None
+ if machine_id is not None:
+ self.log.debug("Finding existing machine id {} in model".format(machine_id))
+ # get juju existing machines in the model
+ existing_machines = await model.get_machines()
+ if machine_id in existing_machines:
+ self.log.debug(
+ "Machine id {} found in model (reusing it)".format(machine_id)
+ )
+ machine = model.machines[machine_id]
+
+ if machine is None:
+ self.log.debug("Creating a new machine in juju...")
+ # machine does not exist, create it and wait for it
+ machine = await model.add_machine(
+ spec=None, constraints=None, disks=None, series="xenial"
+ )
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # id for the execution environment
+ ee_id = N2VCJujuConnector._build_ee_id(
+ model_name=model_name,
+ application_name=application_name,
+ machine_id=str(machine.entity_id),
+ )
+
+ # write ee_id in database
+ self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
+
+ # wait for machine creation
+ await observer.wait_for_machine(
+ machine_id=str(machine.entity_id),
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ else:
+
+ self.log.debug("Reusing old machine pending")
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # machine does exist, but it is in creation process (pending), wait for
+ # create finalisation
+ await observer.wait_for_machine(
+ machine_id=machine.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ self.log.debug("Machine ready at " + str(machine.dns_name))
+ return machine
+
+ async def _juju_provision_machine(
+ self,
+ model_name: str,
+ hostname: str,
+ username: str,
+ private_key_path: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> str:
+
+ if not self.api_proxy:
+ msg = "Cannot provision machine: api_proxy is not defined"
+ self.log.error(msg=msg)
+ raise N2VCException(message=msg)
+
+ self.log.debug(
+ "provisioning machine. model: {}, hostname: {}, username: {}".format(
+ model_name, hostname, username
+ )
+ )
+
+ if not self._authenticated:
+ await self._juju_login()
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # TODO check if machine is already provisioned
+ machine_list = await model.get_machines()
+
+ provisioner = AsyncSSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log,
+ )
+
+ params = None
+ try:
+ params = await provisioner.provision_machine()
+ except Exception as ex:
+ msg = "Exception provisioning machine: {}".format(ex)
+ self.log.error(msg)
+ raise N2VCException(message=msg)
+
+ params.jobs = ["JobHostUnits"]
+
+ connection = model.connection()
+
+ # Submit the request.
+ self.log.debug("Adding machine to model")
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+ if error:
+ msg = "Error adding machine: {}".format(error.message)
+ self.log.error(msg=msg)
+ raise ValueError(msg)
+
+ machine_id = results.machines[0].machine
+
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ self.log.debug("Installing Juju agent into machine {}".format(machine_id))
+ asyncio.ensure_future(
+ provisioner.install_agent(
+ connection=connection,
+ nonce=params.nonce,
+ machine_id=machine_id,
+ proxy=self.api_proxy,
+ )
+ )
+
+ # wait for machine in model (now, machine is not yet in model, so we must
+ # wait for it)
+ machine = None
+ for _ in range(10):
+ machine_list = await model.get_machines()
+ if machine_id in machine_list:
+ self.log.debug("Machine {} found in model!".format(machine_id))
+ machine = model.machines.get(machine_id)
+ break
+ await asyncio.sleep(2)
+
+ if machine is None:
+ msg = "Machine {} not found in model".format(machine_id)
+ self.log.error(msg=msg)
+ raise Exception(msg)
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # wait for machine creation
+ self.log.debug("waiting for provision finishes... {}".format(machine_id))
+ await observer.wait_for_machine(
+ machine_id=machine_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ self.log.debug("Machine provisioned {}".format(machine_id))
+
+ return machine_id
+
+ async def _juju_deploy_charm(
+ self,
+ model_name: str,
+ application_name: str,
+ charm_path: str,
+ machine_id: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ ) -> (Application, int):
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # check if application already exists
+ application = None
+ if application_name in model.applications:
+ application = model.applications[application_name]
+
+ if application is None:
+
+ # application does not exist, create it and wait for it
+ self.log.debug(
+ "deploying application {} to machine {}, model {}".format(
+ application_name, machine_id, model_name
+ )
+ )
+ self.log.debug("charm: {}".format(charm_path))
+ machine = model.machines[machine_id]
+ # series = None
+ application = await model.deploy(
+ entity_url=charm_path,
+ application_name=application_name,
+ channel="stable",
+ num_units=1,
+ series=machine.series,
+ to=machine_id,
+ config=config,
+ )
+
+ # register application with observer
+ observer.register_application(application=application, db_dict=db_dict)
+
+ self.log.debug(
+ "waiting for application deployed... {}".format(application.entity_id)
+ )
+ retries = await observer.wait_for_application(
+ application_id=application.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("application deployed")
+
+ else:
+
+ # register application with observer
+ observer.register_application(application=application, db_dict=db_dict)
+
+ # application already exists, but not finalised
+ self.log.debug("application already exists, waiting for deployed...")
+ retries = await observer.wait_for_application(
+ application_id=application.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("application deployed")
+
+ return application, retries
+
+ async def _juju_execute_action(
+ self,
+ model_name: str,
+ application_name: str,
+ action_name: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ **kwargs
+ ) -> Action:
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ application = await self._juju_get_application(
+ model_name=model_name, application_name=application_name
+ )
+
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ if unit is not None:
+ actions = await application.get_actions()
+ if action_name in actions:
+ self.log.debug(
+ 'executing action "{}" using params: {}'.format(action_name, kwargs)
+ )
+ action = await unit.run_action(action_name, **kwargs)
+
+ # register action with observer
+ observer.register_action(action=action, db_dict=db_dict)
+
+ await observer.wait_for_action(
+ action_id=action.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("action completed with status: {}".format(action.status))
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+ if action.entity_id in status:
+ status = status[action.entity_id]
+ else:
+ status = "failed"
+ return output, status
+
+ raise N2VCExecutionException(
+ message="Cannot execute action on charm", primitive_name=action_name
+ )
+
+ async def _juju_configure_application(
+ self,
+ model_name: str,
+ application_name: str,
+ config: dict,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ):
+
+ # get the application
+ application = await self._juju_get_application(
+ model_name=model_name, application_name=application_name
+ )
+
+ self.log.debug(
+ "configuring the application {} -> {}".format(application_name, config)
+ )
+ res = await application.set_config(config)
+ self.log.debug(
+ "application {} configured. res={}".format(application_name, res)
+ )
+
+ # Verify the config is set
+ new_conf = await application.get_config()
+ for key in config:
+ value = new_conf[key]["value"]
+ self.log.debug(" {} = {}".format(key, value))
+ if config[key] != value:
+ raise N2VCException(
+ message="key {} is not configured correctly {} != {}".format(
+ key, config[key], new_conf[key]
+ )
+ )
+
+ # check if 'verify-ssh-credentials' action exists
+ # unit = application.units[0]
+ actions = await application.get_actions()
+ if "verify-ssh-credentials" not in actions:
+ msg = (
+ "Action verify-ssh-credentials does not exist in application {}"
+ ).format(application_name)
+ self.log.debug(msg=msg)
+ return False
+
+ # execute verify-credentials
+ num_retries = 20
+ retry_timeout = 15.0
+ for _ in range(num_retries):
+ try:
+ self.log.debug("Executing action verify-ssh-credentials...")
+ output, ok = await self._juju_execute_action(
+ model_name=model_name,
+ application_name=application_name,
+ action_name="verify-ssh-credentials",
+ db_dict=db_dict,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("Result: {}, output: {}".format(ok, output))
+ return True
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ self.log.debug(
+ "Error executing verify-ssh-credentials: {}. Retrying...".format(e)
+ )
+ await asyncio.sleep(retry_timeout)
+ else:
+ self.log.error(
+ "Error executing verify-ssh-credentials after {} retries. ".format(
+ num_retries
+ )
+ )
+ return False
+
+ async def _juju_get_application(self, model_name: str, application_name: str):
+ """Get the deployed application."""
+
+ model = await self._juju_get_model(model_name=model_name)
+
+ application_name = N2VCJujuConnector._format_app_name(application_name)
+
+ if model.applications and application_name in model.applications:
+ return model.applications[application_name]
+ else:
+ raise N2VCException(
+ message="Cannot get application {} from model {}".format(
+ application_name, model_name
+ )
+ )
+
+ async def _juju_get_model(self, model_name: str) -> Model:
+ """ Get a model object from juju controller
+ If the model does not exits, it creates it.
+
+ :param str model_name: name of the model
+ :returns Model: model obtained from juju controller or Exception
+ """
+
+ # format model name
+ model_name = N2VCJujuConnector._format_model_name(model_name)
+
+ if model_name in self.juju_models:
+ return self.juju_models[model_name]
+
+ if self._creating_model:
+ self.log.debug("Another coroutine is creating a model. Wait...")
+ while self._creating_model:
+ # another coroutine is creating a model, wait
+ await asyncio.sleep(0.1)
+ # retry (perhaps another coroutine has created the model meanwhile)
+ if model_name in self.juju_models:
+ return self.juju_models[model_name]
+
+ try:
+ self._creating_model = True
+
+ # get juju model names from juju
+ model_list = await self.controller.list_models()
+ if model_name not in model_list:
+ self.log.info(
+ "Model {} does not exist. Creating new model...".format(model_name)
+ )
+ config_dict = {"authorized-keys": self.public_key}
+ if self.apt_mirror:
+ config_dict["apt-mirror"] = self.apt_mirror
+ if not self.enable_os_upgrade:
+ config_dict["enable-os-refresh-update"] = False
+ config_dict["enable-os-upgrade"] = False
+ if self.cloud in self.BUILT_IN_CLOUDS:
+ model = await self.controller.add_model(
+ model_name=model_name,
+ config=config_dict,
+ cloud_name=self.cloud,
+ )
+ else:
+ model = await self.controller.add_model(
+ model_name=model_name,
+ config=config_dict,
+ cloud_name=self.cloud,
+ credential_name=self.cloud,
+ )
+ self.log.info("New model created, name={}".format(model_name))
+ else:
+ self.log.debug(
+ "Model already exists in juju. Getting model {}".format(model_name)
+ )
+ model = await self.controller.get_model(model_name)
+ self.log.debug("Existing model in juju, name={}".format(model_name))
+
+ self.juju_models[model_name] = model
+ self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
+ return model
+
+ except Exception as e:
+ msg = "Cannot get model {}. Exception: {}".format(model_name, e)
+ self.log.error(msg)
+ raise N2VCException(msg)
+ finally:
+ self._creating_model = False
+
+ async def _juju_add_relation(
+ self,
+ model_name: str,
+ application_name_1: str,
+ application_name_2: str,
+ relation_1: str,
+ relation_2: str,
+ ):
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+
+ r1 = "{}:{}".format(application_name_1, relation_1)
+ r2 = "{}:{}".format(application_name_2, relation_2)
+
+ self.log.debug("adding relation: {} -> {}".format(r1, r2))
+ try:
+ await model.add_relation(relation1=r1, relation2=r2)
+ except JujuAPIError as e:
+ # If one of the applications in the relationship doesn't exist, or the
+ # relation has already been added,
+ # let the operation fail silently.
+ if "not found" in e.message:
+ return
+ if "already exists" in e.message:
+ return
+ # another execption, raise it
+ raise e
+
+ async def _juju_destroy_application(self, model_name: str, application_name: str):
+
+ self.log.debug(
+ "Destroying application {} in model {}".format(application_name, model_name)
+ )
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ application = model.applications.get(application_name)
+ if application:
+ observer.unregister_application(application_name)
+ await application.destroy()
+ else:
+ self.log.debug("Application not found: {}".format(application_name))
+
+ async def _juju_destroy_machine(
+ self, model_name: str, machine_id: str, total_timeout: float = None
+ ):
+
+ self.log.debug(
+ "Destroying machine {} in model {}".format(machine_id, model_name)
+ )
+
+ if total_timeout is None:
+ total_timeout = 3600
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ machines = await model.get_machines()
+ if machine_id in machines:
+ machine = model.machines[machine_id]
+ observer.unregister_machine(machine_id)
+ # TODO: change this by machine.is_manual when this is upstreamed:
+ # https://github.com/juju/python-libjuju/pull/396
+ if "instance-id" in machine.safe_data and machine.safe_data[
+ "instance-id"
+ ].startswith("manual:"):
+ self.log.debug("machine.destroy(force=True) started.")
+ await machine.destroy(force=True)
+ self.log.debug("machine.destroy(force=True) passed.")
+ # max timeout
+ end = time.time() + total_timeout
+ # wait for machine removal
+ machines = await model.get_machines()
+ while machine_id in machines and time.time() < end:
+ self.log.debug(
+ "Waiting for machine {} is destroyed".format(machine_id)
+ )
+ await asyncio.sleep(0.5)
+ machines = await model.get_machines()
+ self.log.debug("Machine destroyed: {}".format(machine_id))
+ else:
+ self.log.debug("Machine not found: {}".format(machine_id))
+
+ async def _juju_destroy_model(self, model_name: str, total_timeout: float = None):
+
+ self.log.debug("Destroying model {}".format(model_name))
+
+ if total_timeout is None:
+ total_timeout = 3600
+ end = time.time() + total_timeout
+
+ model = await self._juju_get_model(model_name=model_name)
+
+ if not model:
+ raise N2VCNotFound(message="Model {} does not exist".format(model_name))
+
+ uuid = model.info.uuid
+
+ # destroy applications
+ for application_name in model.applications:
+ try:
+ await self._juju_destroy_application(
+ model_name=model_name, application_name=application_name
+ )
+ except Exception as e:
+ self.log.error(
+ "Error destroying application {} in model {}: {}".format(
+ application_name, model_name, e
+ )
+ )
+
+ # destroy machines
+ machines = await model.get_machines()
+ for machine_id in machines:
+ try:
+ await self._juju_destroy_machine(
+ model_name=model_name, machine_id=machine_id
+ )
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ # ignore exceptions destroying machine
+ pass
+
+ await self._juju_disconnect_model(model_name=model_name)
+
+ self.log.debug("destroying model {}...".format(model_name))
+ await self.controller.destroy_model(uuid)
+ # self.log.debug('model destroy requested {}'.format(model_name))
+
+ # wait for model is completely destroyed
+ self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+ last_exception = ""
+ while time.time() < end:
+ try:
+ # await self.controller.get_model(uuid)
+ models = await self.controller.list_models()
+ if model_name not in models:
+ self.log.debug(
+ "The model {} ({}) was destroyed".format(model_name, uuid)
+ )
+ return
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ last_exception = e
+ await asyncio.sleep(5)
+ raise N2VCException(
+ "Timeout waiting for model {} to be destroyed {}".format(
+ model_name, last_exception
+ )
+ )
+
+ async def _juju_login(self):
+ """Connect to juju controller
+
+ """
+
+ # if already authenticated, exit function
+ if self._authenticated:
+ return
+
+ # if connecting, wait for finish
+ # another task could be trying to connect in parallel
+ while self._connecting:
+ await asyncio.sleep(0.1)
+
+ # double check after other task has finished
+ if self._authenticated:
+ return
+
+ try:
+ self._connecting = True
+ self.log.info(
+ "connecting to juju controller: {} {}:{}{}".format(
+ self.url,
+ self.username,
+ self.secret[:8] + "...",
+ " with ca_cert" if self.ca_cert else "",
+ )
+ )
+
+ # Create controller object
+ self.controller = Controller(loop=self.loop)
+ # Connect to controller
+ await self.controller.connect(
+ endpoint=self.url,
+ username=self.username,
+ password=self.secret,
+ cacert=self.ca_cert,
+ )
+ self._authenticated = True
+ self.log.info("juju controller connected")
+ except Exception as e:
+ message = "Exception connecting to juju: {}".format(e)
+ self.log.error(message)
+ raise N2VCConnectionException(message=message, url=self.url)
+ finally:
+ self._connecting = False
+
+ async def _juju_logout(self):
+ """Logout of the Juju controller."""
+ if not self._authenticated:
+ return False
+
+ # disconnect all models
+ for model_name in self.juju_models:
+ try:
+ await self._juju_disconnect_model(model_name)
+ except Exception as e:
+ self.log.error(
+ "Error disconnecting model {} : {}".format(model_name, e)
+ )
+ # continue with next model...
+
+ self.log.info("Disconnecting controller")
+ try:
+ await self.controller.disconnect()
+ except Exception as e:
+ raise N2VCConnectionException(
+ message="Error disconnecting controller: {}".format(e), url=self.url
+ )
+
+ self.controller = None
+ self._authenticated = False
+ self.log.info("disconnected")
+
+ async def _juju_disconnect_model(self, model_name: str):
+ self.log.debug("Disconnecting model {}".format(model_name))
+ if model_name in self.juju_models:
+ await self.juju_models[model_name].disconnect()
+ self.juju_models[model_name] = None
+ self.juju_observers[model_name] = None
+ else:
+ self.warning("Cannot disconnect model: {}".format(model_name))
+
def _create_juju_public_key(self):
"""Recreate the Juju public key on lcm container, if needed
Certain libjuju commands expect to be run from the same machine as Juju
import logging
import os
import re
+import shlex
from subprocess import CalledProcessError
import tempfile
+import time
import uuid
from juju.client import client
+import n2vc.exceptions
+import paramiko
import asyncio
arches = [
return await self._ssh(
"{} /bin/bash {}".format("sudo" if root else "", tmpFile)
)
+
+
+class SSHProvisioner:
+ """Provision a manually created machine via SSH."""
+
+ def __init__(self, user, host, private_key_path, log=None):
+
+ self.host = host
+ self.user = user
+ self.private_key_path = private_key_path
+
+ if log:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
+ def _get_ssh_client(self, host=None, user=None, private_key_path=None):
+ """Return a connected Paramiko ssh object.
+
+ :param str host: The host to connect to.
+ :param str user: The user to connect as.
+ :param str key: The private key to authenticate with.
+
+ :return: object: A paramiko.SSHClient
+ :raises: :class:`paramiko.ssh_exception.SSHException` if the
+ connection failed
+ """
+
+ if not host:
+ host = self.host
+
+ if not user:
+ user = self.user
+
+ if not private_key_path:
+ private_key_path = self.private_key_path
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ pkey = None
+
+ # Read the private key into a paramiko.RSAKey
+ if os.path.exists(private_key_path):
+ with open(private_key_path, "r") as f:
+ pkey = paramiko.RSAKey.from_private_key(f)
+
+ #######################################################################
+ # There is a bug in some versions of OpenSSH 4.3 (CentOS/RHEL5) where #
+ # the server may not send the SSH_MSG_USERAUTH_BANNER message except #
+ # when responding to an auth_none request. For example, paramiko will #
+ # attempt to use password authentication when a password is set, but #
+ # the server could deny that, instead requesting keyboard-interactive.#
+ # The hack to workaround this is to attempt a reconnect, which will #
+ # receive the right banner, and authentication can proceed. See the #
+ # following for more info: #
+ # https://github.com/paramiko/paramiko/issues/432 #
+ # https://github.com/paramiko/paramiko/pull/438 #
+ #######################################################################
+
+ retry = 10
+ attempts = 0
+ delay = 15
+ while attempts <= retry:
+ try:
+ attempts += 1
+
+ # Attempt to establish a SSH connection
+ ssh.connect(
+ host,
+ port=22,
+ username=user,
+ pkey=pkey,
+ # allow_agent=False,
+ # look_for_keys=False,
+ )
+ break
+ except paramiko.ssh_exception.SSHException as e:
+ if "Error reading SSH protocol banner" == str(e):
+ # Once more, with feeling
+ ssh.connect(host, port=22, username=user, pkey=pkey)
+ else:
+ # Reraise the original exception
+ self.log.debug("Unhandled exception caught: {}".format(e))
+ raise e
+ except Exception as e:
+ if "Unable to connect to port" in str(e):
+ self.log.debug(
+ "Waiting for VM to boot, sleeping {} seconds".format(delay)
+ )
+ if attempts > retry:
+ raise e
+ else:
+ time.sleep(delay)
+ # Slowly back off the retry
+ delay += 15
+ else:
+ self.log.debug(e)
+ raise e
+ return ssh
+
+ def _run_command(self, ssh, cmd, pty=True):
+ """Run a command remotely via SSH.
+
+ :param object ssh: The SSHClient
+ :param str cmd: The command to execute
+ :param list cmd: The `shlex.split` command to execute
+ :param bool pty: Whether to allocate a pty
+
+ :return: tuple: The stdout and stderr of the command execution
+ :raises: :class:`CalledProcessError` if the command fails
+ """
+
+ if isinstance(cmd, str):
+ cmd = shlex.split(cmd)
+
+ if type(cmd) is not list:
+ cmd = [cmd]
+
+ cmds = " ".join(cmd)
+ _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty)
+ retcode = stdout.channel.recv_exit_status()
+
+ if retcode > 0:
+ output = stderr.read().strip()
+ raise CalledProcessError(returncode=retcode, cmd=cmd, output=output)
+ return (
+ stdout.read().decode("utf-8").strip(),
+ stderr.read().decode("utf-8").strip(),
+ )
+
+ def _init_ubuntu_user(self):
+ """Initialize the ubuntu user.
+
+ :return: bool: If the initialization was successful
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the authentication fails
+ """
+ ssh = None
+ try:
+ # Run w/o allocating a pty, so we fail if sudo prompts for a passwd
+ ssh = self._get_ssh_client()
+ self._run_command(ssh, "sudo -n true", pty=False)
+ except paramiko.ssh_exception.AuthenticationException:
+ raise n2vc.exceptions.AuthenticationFailed(self.user)
+ except paramiko.ssh_exception.NoValidConnectionsError:
+ raise n2vc.exceptions.NoRouteToHost(self.host)
+ finally:
+ if ssh:
+ ssh.close()
+
+ # Infer the public key
+ public_key_path = "{}.pub".format(self.private_key_path)
+
+ if not os.path.exists(public_key_path):
+ raise FileNotFoundError(
+ "Public key '{}' doesn't exist.".format(public_key_path)
+ )
+
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ script = INITIALIZE_UBUNTU_SCRIPT.format(public_key)
+
+ try:
+ ssh = self._get_ssh_client()
+
+ self._run_command(
+ ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True
+ )
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ ssh.close()
+
+ return True
+
+ def _detect_hardware_and_os(self, ssh):
+ """Detect the target hardware capabilities and OS series.
+
+ :param object ssh: The SSHClient
+ :return: str: A raw string containing OS and hardware information.
+ """
+
+ info = {
+ "series": "",
+ "arch": "",
+ "cpu-cores": "",
+ "mem": "",
+ }
+
+ stdout, _ = self._run_command(
+ ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True,
+ )
+
+ lines = stdout.split("\n")
+
+ # Remove extraneous line if DNS resolution of hostname famils
+ # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out
+ if "unable to resolve host" in lines[0]:
+ lines = lines[1:]
+
+ info["series"] = lines[0].strip()
+ info["arch"] = normalize_arch(lines[1].strip())
+
+ memKb = re.split(r"\s+", lines[2])[1]
+
+ # Convert megabytes -> kilobytes
+ info["mem"] = round(int(memKb) / 1024)
+
+ # Detect available CPUs
+ recorded = {}
+ for line in lines[3:]:
+ physical_id = ""
+
+ if line.find("physical id") == 0:
+ physical_id = line.split(":")[1].strip()
+ elif line.find("cpu cores") == 0:
+ cores = line.split(":")[1].strip()
+
+ if physical_id not in recorded.keys():
+ info["cpu-cores"] += cores
+ recorded[physical_id] = True
+
+ return info
+
+ def provision_machine(self):
+ """Perform the initial provisioning of the target machine.
+
+ :return: bool: The client.AddMachineParams
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the upload fails
+ """
+ params = client.AddMachineParams()
+
+ if self._init_ubuntu_user():
+ try:
+ ssh = self._get_ssh_client()
+
+ hw = self._detect_hardware_and_os(ssh)
+ params.series = hw["series"]
+ params.instance_id = "manual:{}".format(self.host)
+ params.nonce = "manual:{}:{}".format(
+ self.host, str(uuid.uuid4()),
+ ) # a nop for Juju w/manual machines
+ params.hardware_characteristics = {
+ "arch": hw["arch"],
+ "mem": int(hw["mem"]),
+ "cpu-cores": int(hw["cpu-cores"]),
+ }
+ params.addresses = [
+ {"value": self.host, "type": "ipv4", "scope": "public"}
+ ]
+
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ ssh.close()
+
+ return params
+
+ async def install_agent(self, connection, nonce, machine_id, api):
+ """
+ :param object connection: Connection to Juju API
+ :param str nonce: The nonce machine specification
+ :param str machine_id: The id assigned to the machine
+
+ :return: bool: If the initialization was successful
+ """
+ # The path where the Juju agent should be installed.
+ data_dir = "/var/lib/juju"
+
+ # Disabling this prevents `apt-get update` from running initially, so
+ # charms will fail to deploy
+ disable_package_commands = False
+
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.ProvisioningScript(
+ data_dir=data_dir,
+ disable_package_commands=disable_package_commands,
+ machine_id=machine_id,
+ nonce=nonce,
+ )
+
+ """Get the IP of the controller
+
+ Parse the provisioning script, looking for the first apiaddress.
+
+ Example:
+ apiaddresses:
+ - 10.195.8.2:17070
+ - 127.0.0.1:17070
+ - '[::1]:17070'
+ """
+ m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script)
+ apiaddress = m.group(1)
+
+ """Add IP Table rule
+
+ In order to route the traffic to the private ip of the Juju controller
+ we use a DNAT rule to tell the machine that the destination for the
+ private address is the public address of the machine where the Juju
+ controller is running in LXD. That machine will have a complimentary
+ iptables rule, routing traffic to the appropriate LXD container.
+ """
+
+ script = IPTABLES_SCRIPT.format(apiaddress, api)
+
+ # Run this in a retry loop, because dpkg may be running and cause the
+ # script to fail.
+ retry = 10
+ attempts = 0
+ delay = 15
+
+ while attempts <= retry:
+ try:
+ attempts += 1
+
+ self._run_configure_script(script)
+ break
+ except Exception as e:
+ self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay))
+ if attempts > retry:
+ raise e
+ else:
+ time.sleep(delay)
+ # Slowly back off the retry
+ delay += 15
+
+ # self.log.debug("Running configure script")
+ self._run_configure_script(results.script)
+ # self.log.debug("Configure script finished")
+
+ def _run_configure_script(self, script: str):
+ """Run the script to install the Juju agent on the target machine.
+
+ :param str script: The script returned by the ProvisioningScript API
+ :raises: :class:`paramiko.ssh_exception.AuthenticationException`
+ if the upload fails
+ """
+ _, tmpFile = tempfile.mkstemp()
+ with open(tmpFile, "w") as f:
+ f.write(script)
+ try:
+ # get ssh client
+ ssh = self._get_ssh_client(user="ubuntu",)
+
+ # copy the local copy of the script to the remote machine
+ sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
+ sftp.put(
+ tmpFile, tmpFile,
+ )
+
+ # run the provisioning script
+ self._run_command(
+ ssh, "sudo /bin/bash {}".format(tmpFile),
+ )
+
+ except paramiko.ssh_exception.AuthenticationException as e:
+ raise e
+ finally:
+ os.remove(tmpFile)
+ ssh.close()
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+from unittest import mock
+from unittest.mock import Mock
+
+import asynctest
+
+from n2vc.exceptions import N2VCTimeoutException
+from n2vc.juju_observer import JujuModelObserver, _Entity
+
+
+class FakeObject:
+ def __init__(self):
+ self.complete = True
+
+
+class JujuModelObserverTest(asynctest.TestCase):
+ def setUp(self):
+ self.n2vc = Mock()
+ self.model = Mock()
+ self.juju_observer = JujuModelObserver(n2vc=self.n2vc, model=self.model)
+ self.loop = asyncio.new_event_loop()
+
+ def test_wait_no_retries(self):
+ obj = FakeObject()
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ result = self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=None,
+ total_timeout=None,
+ )
+ )
+ self.assertEqual(result, 0)
+
+ @mock.patch("n2vc.juju_observer.asyncio.wait_for")
+ def test_wait_default_values(self, wait_for):
+ wait_for.return_value = asyncio.Future()
+ wait_for.return_value.set_result(None)
+ obj = FakeObject()
+ obj.complete = False
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=None,
+ total_timeout=None,
+ )
+ )
+ wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
+
+ @mock.patch("n2vc.juju_observer.asyncio.wait_for")
+ def test_wait_default_progress(self, wait_for):
+ wait_for.return_value = asyncio.Future()
+ wait_for.return_value.set_result(None)
+ obj = FakeObject()
+ obj.complete = False
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=4000,
+ total_timeout=None,
+ )
+ )
+ wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
+
+ @mock.patch("n2vc.juju_observer.asyncio.wait_for")
+ def test_wait_default_total(self, wait_for):
+ wait_for.return_value = asyncio.Future()
+ wait_for.return_value.set_result(None)
+ obj = FakeObject()
+ obj.complete = False
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=None,
+ total_timeout=4000.0,
+ )
+ )
+ wait_for.assert_called_once_with(fut=mock.ANY, timeout=3600.0)
+
+ @mock.patch("n2vc.juju_observer.asyncio.wait_for")
+ def test_wait_total_less_than_progress_timeout(self, wait_for):
+ wait_for.return_value = asyncio.Future()
+ wait_for.return_value.set_result(None)
+ obj = FakeObject()
+ obj.complete = False
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=4500.0,
+ total_timeout=3000.0,
+ )
+ )
+ wait_for.assert_called_once_with(fut=mock.ANY, timeout=3000.0)
+
+ @mock.patch("n2vc.juju_observer.asyncio.wait_for")
+ def test_wait_progress_less_than_total_timeout(self, wait_for):
+ wait_for.return_value = asyncio.Future()
+ wait_for.return_value.set_result(None)
+ obj = FakeObject()
+ obj.complete = False
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=1500.0,
+ total_timeout=3000.0,
+ )
+ )
+ wait_for.assert_called_once_with(fut=mock.ANY, timeout=1500.0)
+
+ def test_wait_negative_timeout(self):
+ obj = FakeObject()
+ entity = _Entity(entity_id="eid-1", entity_type="fake", obj=obj, db_dict={})
+ with self.assertRaises(N2VCTimeoutException):
+ self.loop.run_until_complete(
+ self.juju_observer._wait_for_entity(
+ entity=entity,
+ field_to_check="complete",
+ final_states_list=[True],
+ progress_timeout=None,
+ total_timeout=-1000,
+ )
+ )
# See the License for the specific language governing permissions and
# limitations under the License.
-from unittest import TestCase
+from unittest import TestCase, mock
+
+from mock import mock_open
+from n2vc.provisioner import SSHProvisioner
+from paramiko.ssh_exception import SSHException
class ProvisionerTest(TestCase):
def setUp(self):
- pass
+ self.provisioner = SSHProvisioner(None, None, None)
+
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client(self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os):
+ mock_instance = mock_sshclient.return_value
+ sshclient = self.provisioner._get_ssh_client()
+ self.assertEqual(mock_instance, sshclient)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(1, mock_instance.connect.call_count, "Connect call count")
+
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client_no_connection(
+ self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
+ ):
+
+ mock_instance = mock_sshclient.return_value
+ mock_instance.method_inside_someobject.side_effect = ["something"]
+ mock_instance.connect.side_effect = SSHException()
+
+ self.assertRaises(SSHException, self.provisioner._get_ssh_client)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(1, mock_instance.connect.call_count, "Connect call count")
+
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client_bad_banner(
+ self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
+ ):
+
+ mock_instance = mock_sshclient.return_value
+ mock_instance.method_inside_someobject.side_effect = ["something"]
+ mock_instance.connect.side_effect = [
+ SSHException("Error reading SSH protocol banner"),
+ None,
+ None,
+ ]
+
+ sshclient = self.provisioner._get_ssh_client()
+ self.assertEqual(mock_instance, sshclient)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(
+ 3, mock_instance.connect.call_count, "Should attempt 3 connections"
+ )
+
+ @mock.patch("time.sleep", autospec=True)
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client_unable_to_connect(
+ self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep
+ ):
+
+ mock_instance = mock_sshclient.return_value
+ mock_instance.connect.side_effect = Exception("Unable to connect to port")
+
+ self.assertRaises(Exception, self.provisioner._get_ssh_client)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(
+ 11, mock_instance.connect.call_count, "Should attempt 11 connections"
+ )
+
+ @mock.patch("time.sleep", autospec=True)
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client_unable_to_connect_once(
+ self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep
+ ):
+
+ mock_instance = mock_sshclient.return_value
+ mock_instance.connect.side_effect = [
+ Exception("Unable to connect to port"),
+ None,
+ ]
+
+ sshclient = self.provisioner._get_ssh_client()
+ self.assertEqual(mock_instance, sshclient)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(
+ 2, mock_instance.connect.call_count, "Should attempt 2 connections"
+ )
+
+ @mock.patch("n2vc.provisioner.os.path.exists")
+ @mock.patch("n2vc.provisioner.paramiko.RSAKey")
+ @mock.patch("n2vc.provisioner.paramiko.SSHClient")
+ @mock.patch("builtins.open", new_callable=mock_open, read_data="data")
+ def test__get_ssh_client_other_exception(
+ self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os
+ ):
+
+ mock_instance = mock_sshclient.return_value
+ mock_instance.connect.side_effect = Exception()
+
+ self.assertRaises(Exception, self.provisioner._get_ssh_client)
+ self.assertEqual(
+ 1,
+ mock_instance.set_missing_host_key_policy.call_count,
+ "Missing host key call count",
+ )
+ self.assertEqual(
+ 1, mock_instance.connect.call_count, "Should only attempt 1 connection"
+ )
+
+
+#
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import base64
+import binascii
+import logging
+import os.path
+import re
+import shlex
+import ssl
+import subprocess
+
+from juju.client import client
+from juju.controller import Controller
+from juju.errors import JujuAPIError, JujuError
+from juju.model import ModelObserver
+
+import n2vc.exceptions
+from n2vc.provisioner import SSHProvisioner
+
+
+# import time
+# FIXME: this should load the juju inside or modules without having to
+# explicitly install it. Check why it's not working.
+# Load our subtree of the juju library
+# path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
+# path = os.path.join(path, "modules/libjuju/")
+# if path not in sys.path:
+# sys.path.insert(1, path)
+# We might need this to connect to the websocket securely, but test and verify.
+try:
+ ssl._create_default_https_context = ssl._create_unverified_context
+except AttributeError:
+ # Legacy Python doesn't verify by default (see pep-0476)
+ # https://www.python.org/dev/peps/pep-0476/
+ pass
+
+
+# Custom exceptions
+# Deprecated. Please use n2vc.exceptions namespace.
+class JujuCharmNotFound(Exception):
+ """The Charm can't be found or is not readable."""
+
+
+class JujuApplicationExists(Exception):
+ """The Application already exists."""
+
+
+class N2VCPrimitiveExecutionFailed(Exception):
+ """Something failed while attempting to execute a primitive."""
+
+
+class NetworkServiceDoesNotExist(Exception):
+ """The Network Service being acted against does not exist."""
+
+
+class PrimitiveDoesNotExist(Exception):
+ """The Primitive being executed does not exist."""
+
+
+# Quiet the debug logging
+logging.getLogger("websockets.protocol").setLevel(logging.INFO)
+logging.getLogger("juju.client.connection").setLevel(logging.WARN)
+logging.getLogger("juju.model").setLevel(logging.WARN)
+logging.getLogger("juju.machine").setLevel(logging.WARN)
+
+
+class VCAMonitor(ModelObserver):
+ """Monitor state changes within the Juju Model."""
+
+ log = None
+
+ def __init__(self, ns_name):
+ self.log = logging.getLogger(__name__)
+
+ self.ns_name = ns_name
+ self.applications = {}
+
+ def AddApplication(self, application_name, callback, *callback_args):
+ if application_name not in self.applications:
+ self.applications[application_name] = {
+ "callback": callback,
+ "callback_args": callback_args,
+ }
+
+ def RemoveApplication(self, application_name):
+ if application_name in self.applications:
+ del self.applications[application_name]
+
+ async def on_change(self, delta, old, new, model):
+ """React to changes in the Juju model."""
+
+ if delta.entity == "unit":
+ # Ignore change events from other applications
+ if delta.data["application"] not in self.applications.keys():
+ return
+
+ try:
+
+ application_name = delta.data["application"]
+
+ callback = self.applications[application_name]["callback"]
+ callback_args = self.applications[application_name]["callback_args"]
+
+ if old and new:
+ # Fire off a callback with the application state
+ if callback:
+ callback(
+ self.ns_name,
+ delta.data["application"],
+ new.workload_status,
+ new.workload_status_message,
+ *callback_args,
+ )
+
+ if old and not new:
+ # This is a charm being removed
+ if callback:
+ callback(
+ self.ns_name,
+ delta.data["application"],
+ "removed",
+ "",
+ *callback_args,
+ )
+ except Exception as e:
+ self.log.debug("[1] notify_callback exception: {}".format(e))
+
+ elif delta.entity == "action":
+ # TODO: Decide how we want to notify the user of actions
+
+ # uuid = delta.data['id'] # The Action's unique id
+ # msg = delta.data['message'] # The output of the action
+ #
+ # if delta.data['status'] == "pending":
+ # # The action is queued
+ # pass
+ # elif delta.data['status'] == "completed""
+ # # The action was successful
+ # pass
+ # elif delta.data['status'] == "failed":
+ # # The action failed.
+ # pass
+
+ pass
+
+
+########
+# TODO
+#
+# Create unique models per network service
+# Document all public functions
+
+
+class N2VC:
+ def __init__(
+ self,
+ log=None,
+ server="127.0.0.1",
+ port=17070,
+ user="admin",
+ secret=None,
+ artifacts=None,
+ loop=None,
+ juju_public_key=None,
+ ca_cert=None,
+ api_proxy=None,
+ ):
+ """Initialize N2VC
+
+ Initializes the N2VC object, allowing the caller to interoperate with the VCA.
+
+
+ :param log obj: The logging object to log to
+ :param server str: The IP Address or Hostname of the Juju controller
+ :param port int: The port of the Juju Controller
+ :param user str: The Juju username to authenticate with
+ :param secret str: The Juju password to authenticate with
+ :param artifacts str: The directory where charms required by a vnfd are
+ stored.
+ :param loop obj: The loop to use.
+ :param juju_public_key str: The contents of the Juju public SSH key
+ :param ca_cert str: The CA certificate to use to authenticate
+ :param api_proxy str: The IP of the host machine
+
+ :Example:
+ client = n2vc.vnf.N2VC(
+ log=log,
+ server='10.1.1.28',
+ port=17070,
+ user='admin',
+ secret='admin',
+ artifacts='/app/storage/myvnf/charms',
+ loop=loop,
+ juju_public_key='<contents of the juju public key>',
+ ca_cert='<contents of CA certificate>',
+ api_proxy='192.168.1.155'
+ )
+ """
+
+ # Initialize instance-level variables
+ self.api = None
+ self.log = None
+ self.controller = None
+ self.connecting = False
+ self.authenticated = False
+ self.api_proxy = api_proxy
+
+ if log:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
+ # For debugging
+ self.refcount = {
+ "controller": 0,
+ "model": 0,
+ }
+
+ self.models = {}
+
+ # Model Observers
+ self.monitors = {}
+
+ # VCA config
+ self.hostname = ""
+ self.port = 17070
+ self.username = ""
+ self.secret = ""
+
+ self.juju_public_key = juju_public_key
+ if juju_public_key:
+ self._create_juju_public_key(juju_public_key)
+ else:
+ self.juju_public_key = ""
+
+ # TODO: Verify ca_cert is valid before using. VCA will crash
+ # if the ca_cert isn't formatted correctly.
+ def base64_to_cacert(b64string):
+ """Convert the base64-encoded string containing the VCA CACERT.
+
+ The input string....
+
+ """
+ try:
+ cacert = base64.b64decode(b64string).decode("utf-8")
+
+ cacert = re.sub(r"\\n", r"\n", cacert,)
+ except binascii.Error as e:
+ self.log.debug("Caught binascii.Error: {}".format(e))
+ raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate")
+
+ return cacert
+
+ self.ca_cert = None
+ if ca_cert:
+ self.ca_cert = base64_to_cacert(ca_cert)
+
+ # Quiet websocket traffic
+ logging.getLogger("websockets.protocol").setLevel(logging.INFO)
+ logging.getLogger("juju.client.connection").setLevel(logging.WARN)
+ logging.getLogger("model").setLevel(logging.WARN)
+ # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
+
+ self.log.debug("JujuApi: instantiated")
+
+ self.server = server
+ self.port = port
+
+ self.secret = secret
+ if user.startswith("user-"):
+ self.user = user
+ else:
+ self.user = "user-{}".format(user)
+
+ self.endpoint = "%s:%d" % (server, int(port))
+
+ self.artifacts = artifacts
+
+ self.loop = loop or asyncio.get_event_loop()
+
+ def __del__(self):
+ """Close any open connections."""
+ yield self.logout()
+
+ def _create_juju_public_key(self, public_key):
+ """Recreate the Juju public key on disk.
+
+ Certain libjuju commands expect to be run from the same machine as Juju
+ is bootstrapped to. This method will write the public key to disk in
+ that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
+ """
+ # Make sure that we have a public key before writing to disk
+ if public_key is None or len(public_key) == 0:
+ if "OSM_VCA_PUBKEY" in os.environ:
+ public_key = os.getenv("OSM_VCA_PUBKEY", "")
+ if len(public_key == 0):
+ return
+ else:
+ return
+
+ path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),)
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+ with open("{}/juju_id_rsa.pub".format(path), "w") as f:
+ f.write(public_key)
+
+ def notify_callback(
+ self,
+ model_name,
+ application_name,
+ status,
+ message,
+ callback=None,
+ *callback_args
+ ):
+ try:
+ if callback:
+ callback(
+ model_name, application_name, status, message, *callback_args,
+ )
+ except Exception as e:
+ self.log.error("[0] notify_callback exception {}".format(e))
+ raise e
+ return True
+
+ # Public methods
+ async def Relate(self, model_name, vnfd):
+ """Create a relation between the charm-enabled VDUs in a VNF.
+
+ The Relation mapping has two parts: the id of the vdu owning the endpoint, and
+ the name of the endpoint.
+
+ vdu:
+ ...
+ vca-relationships:
+ relation:
+ - provides: dataVM:db
+ requires: mgmtVM:app
+
+ This tells N2VC that the charm referred to by the dataVM vdu offers a relation
+ named 'db', and the mgmtVM vdu
+ has an 'app' endpoint that should be connected to a database.
+
+ :param str ns_name: The name of the network service.
+ :param dict vnfd: The parsed yaml VNF descriptor.
+ """
+
+ # Currently, the call to Relate() is made automatically after the
+ # deployment of each charm; if the relation depends on a charm that
+ # hasn't been deployed yet, the call will fail silently. This will
+ # prevent an API breakage, with the intent of making this an explicitly
+ # required call in a more object-oriented refactor of the N2VC API.
+
+ configs = []
+ vnf_config = vnfd.get("vnf-configuration")
+ if vnf_config:
+ juju = vnf_config["juju"]
+ if juju:
+ configs.append(vnf_config)
+
+ for vdu in vnfd["vdu"]:
+ vdu_config = vdu.get("vdu-configuration")
+ if vdu_config:
+ juju = vdu_config["juju"]
+ if juju:
+ configs.append(vdu_config)
+
+ def _get_application_name(name):
+ """Get the application name that's mapped to a vnf/vdu."""
+ vnf_member_index = 0
+ vnf_name = vnfd["name"]
+
+ for vdu in vnfd.get("vdu"):
+ # Compare the named portion of the relation to the vdu's id
+ if vdu["id"] == name:
+ application_name = self.FormatApplicationName(
+ model_name, vnf_name, str(vnf_member_index),
+ )
+ return application_name
+ else:
+ vnf_member_index += 1
+
+ return None
+
+ # Loop through relations
+ for cfg in configs:
+ if "juju" in cfg:
+ juju = cfg["juju"]
+ if (
+ "vca-relationships" in juju
+ and "relation" in juju["vca-relationships"]
+ ):
+ for rel in juju["vca-relationships"]["relation"]:
+ try:
+
+ # get the application name for the provides
+ (name, endpoint) = rel["provides"].split(":")
+ application_name = _get_application_name(name)
+
+ provides = "{}:{}".format(application_name, endpoint)
+
+ # get the application name for thr requires
+ (name, endpoint) = rel["requires"].split(":")
+ application_name = _get_application_name(name)
+
+ requires = "{}:{}".format(application_name, endpoint)
+ self.log.debug(
+ "Relation: {} <-> {}".format(provides, requires)
+ )
+ await self.add_relation(
+ model_name, provides, requires,
+ )
+ except Exception as e:
+ self.log.debug("Exception: {}".format(e))
+
+ return
+
+ async def DeployCharms(
+ self,
+ model_name,
+ application_name,
+ vnfd,
+ charm_path,
+ params={},
+ machine_spec={},
+ callback=None,
+ *callback_args
+ ):
+ """Deploy one or more charms associated with a VNF.
+
+ Deploy the charm(s) referenced in a VNF Descriptor.
+
+ :param str model_name: The name or unique id of the network service.
+ :param str application_name: The name of the application
+ :param dict vnfd: The name of the application
+ :param str charm_path: The path to the Juju charm
+ :param dict params: A dictionary of runtime parameters
+ Examples::
+ {
+ 'rw_mgmt_ip': '1.2.3.4',
+ # Pass the initial-config-primitives section of the vnf or vdu
+ 'initial-config-primitives': {...}
+ 'user_values': dictionary with the day-1 parameters provided at
+ instantiation time. It will replace values
+ inside < >. rw_mgmt_ip will be included here also
+ }
+ :param dict machine_spec: A dictionary describing the machine to
+ install to
+ Examples::
+ {
+ 'hostname': '1.2.3.4',
+ 'username': 'ubuntu',
+ }
+ :param obj callback: A callback function to receive status changes.
+ :param tuple callback_args: A list of arguments to be passed to the
+ callback
+ """
+
+ ########################################################
+ # Verify the path to the charm exists and is readable. #
+ ########################################################
+ if not os.path.exists(charm_path):
+ self.log.debug("Charm path doesn't exist: {}".format(charm_path))
+ self.notify_callback(
+ model_name,
+ application_name,
+ "error",
+ "failed",
+ callback,
+ *callback_args,
+ )
+ raise JujuCharmNotFound("No artifacts configured.")
+
+ ################################
+ # Login to the Juju controller #
+ ################################
+ if not self.authenticated:
+ self.log.debug("Authenticating with Juju")
+ await self.login()
+
+ ##########################################
+ # Get the model for this network service #
+ ##########################################
+ model = await self.get_model(model_name)
+
+ ########################################
+ # Verify the application doesn't exist #
+ ########################################
+ app = await self.get_application(model, application_name)
+ if app:
+ raise JujuApplicationExists(
+ (
+ 'Can\'t deploy application "{}" to model '
+ ' "{}" because it already exists.'
+ ).format(application_name, model_name)
+ )
+
+ ################################################################
+ # Register this application with the model-level event monitor #
+ ################################################################
+ if callback:
+ self.log.debug(
+ "JujuApi: Registering callback for {}".format(application_name,)
+ )
+ await self.Subscribe(model_name, application_name, callback, *callback_args)
+
+ #######################################
+ # Get the initial charm configuration #
+ #######################################
+
+ rw_mgmt_ip = None
+ if "rw_mgmt_ip" in params:
+ rw_mgmt_ip = params["rw_mgmt_ip"]
+
+ if "initial-config-primitive" not in params:
+ params["initial-config-primitive"] = {}
+
+ initial_config = self._get_config_from_dict(
+ params["initial-config-primitive"], {"<rw_mgmt_ip>": rw_mgmt_ip}
+ )
+
+ ########################################################
+ # Check for specific machine placement (native charms) #
+ ########################################################
+ to = ""
+ series = "xenial"
+
+ if machine_spec.keys():
+ if all(k in machine_spec for k in ["hostname", "username"]):
+
+ # Allow series to be derived from the native charm
+ series = None
+
+ self.log.debug(
+ "Provisioning manual machine {}@{}".format(
+ machine_spec["username"], machine_spec["hostname"],
+ )
+ )
+
+ """Native Charm support
+
+ Taking a bare VM (assumed to be an Ubuntu cloud image),
+ the provisioning process will:
+ - Create an ubuntu user w/sudo access
+ - Detect hardware
+ - Detect architecture
+ - Download and install Juju agent from controller
+ - Enable Juju agent
+ - Add an iptables rule to route traffic to the API proxy
+ """
+
+ to = await self.provision_machine(
+ model_name=model_name,
+ username=machine_spec["username"],
+ hostname=machine_spec["hostname"],
+ private_key_path=self.GetPrivateKeyPath(),
+ )
+ self.log.debug("Provisioned machine id {}".format(to))
+
+ # TODO: If to is none, raise an exception
+
+ # The native charm won't have the sshproxy layer, typically, but LCM
+ # uses the config primitive
+ # to interpret what the values are. That's a gap to fill.
+
+ """
+ The ssh-* config parameters are unique to the sshproxy layer,
+ which most native charms will not be aware of.
+
+ Setting invalid config parameters will cause the deployment to
+ fail.
+
+ For the moment, we will strip the ssh-* parameters from native
+ charms, until the feature gap is addressed in the information
+ model.
+ """
+
+ # Native charms don't include the ssh-* config values, so strip them
+ # from the initial_config, otherwise the deploy will raise an error.
+ # self.log.debug("Removing ssh-* from initial-config")
+ for k in ["ssh-hostname", "ssh-username", "ssh-password"]:
+ if k in initial_config:
+ self.log.debug("Removing parameter {}".format(k))
+ del initial_config[k]
+
+ self.log.debug(
+ "JujuApi: Deploying charm ({}/{}) from {} to {}".format(
+ model_name, application_name, charm_path, to,
+ )
+ )
+
+ ########################################################
+ # Deploy the charm and apply the initial configuration #
+ ########################################################
+ app = await model.deploy(
+ # We expect charm_path to be either the path to the charm on disk
+ # or in the format of cs:series/name
+ charm_path,
+ # This is the formatted, unique name for this charm
+ application_name=application_name,
+ # Proxy charms should use the current LTS. This will need to be
+ # changed for native charms.
+ series=series,
+ # Apply the initial 'config' primitive during deployment
+ config=initial_config,
+ # Where to deploy the charm to.
+ to=to,
+ )
+
+ #############################
+ # Map the vdu id<->app name #
+ #############################
+ try:
+ await self.Relate(model_name, vnfd)
+ except KeyError as ex:
+ # We don't currently support relations between NS and VNF/VDU charms
+ self.log.warn("[N2VC] Relations not supported: {}".format(ex))
+ except Exception:
+ # This may happen if not all of the charms needed by the relation
+ # are ready. We can safely ignore this, because Relate will be
+ # retried when the endpoint of the relation is deployed.
+ self.log.warn("[N2VC] Relations not ready")
+
+ # #######################################
+ # # Execute initial config primitive(s) #
+ # #######################################
+ uuids = await self.ExecuteInitialPrimitives(
+ model_name, application_name, params,
+ )
+ return uuids
+
+ # primitives = {}
+ #
+ # # Build a sequential list of the primitives to execute
+ # for primitive in params['initial-config-primitive']:
+ # try:
+ # if primitive['name'] == 'config':
+ # # This is applied when the Application is deployed
+ # pass
+ # else:
+ # seq = primitive['seq']
+ #
+ # params = {}
+ # if 'parameter' in primitive:
+ # params = primitive['parameter']
+ #
+ # primitives[seq] = {
+ # 'name': primitive['name'],
+ # 'parameters': self._map_primitive_parameters(
+ # params,
+ # {'<rw_mgmt_ip>': rw_mgmt_ip}
+ # ),
+ # }
+ #
+ # for primitive in sorted(primitives):
+ # await self.ExecutePrimitive(
+ # model_name,
+ # application_name,
+ # primitives[primitive]['name'],
+ # callback,
+ # callback_args,
+ # **primitives[primitive]['parameters'],
+ # )
+ # except N2VCPrimitiveExecutionFailed as e:
+ # self.log.debug(
+ # "[N2VC] Exception executing primitive: {}".format(e)
+ # )
+ # raise
+
+ async def GetPrimitiveStatus(self, model_name, uuid):
+ """Get the status of an executed Primitive.
+
+ The status of an executed Primitive will be one of three values:
+ - completed
+ - failed
+ - running
+ """
+ status = None
+ try:
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+
+ results = await model.get_action_status(uuid)
+
+ if uuid in results:
+ status = results[uuid]
+
+ except Exception as e:
+ self.log.debug(
+ "Caught exception while getting primitive status: {}".format(e)
+ )
+ raise N2VCPrimitiveExecutionFailed(e)
+
+ return status
+
+ async def GetPrimitiveOutput(self, model_name, uuid):
+ """Get the output of an executed Primitive.
+
+ Note: this only returns output for a successfully executed primitive.
+ """
+ results = None
+ try:
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+ results = await model.get_action_output(uuid, 60)
+ except Exception as e:
+ self.log.debug(
+ "Caught exception while getting primitive status: {}".format(e)
+ )
+ raise N2VCPrimitiveExecutionFailed(e)
+
+ return results
+
+ # async def ProvisionMachine(self, model_name, hostname, username):
+ # """Provision machine for usage with Juju.
+ #
+ # Provisions a previously instantiated machine for use with Juju.
+ # """
+ # try:
+ # if not self.authenticated:
+ # await self.login()
+ #
+ # # FIXME: This is hard-coded until model-per-ns is added
+ # model_name = 'default'
+ #
+ # model = await self.get_model(model_name)
+ # model.add_machine(spec={})
+ #
+ # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format(
+ # "ubuntu",
+ # host['address'],
+ # private_key_path,
+ # ))
+ # return machine.id
+ #
+ # except Exception as e:
+ # self.log.debug(
+ # "Caught exception while getting primitive status: {}".format(e)
+ # )
+ # raise N2VCPrimitiveExecutionFailed(e)
+
+ def GetPrivateKeyPath(self):
+ homedir = os.environ["HOME"]
+ sshdir = "{}/.ssh".format(homedir)
+ private_key_path = "{}/id_n2vc_rsa".format(sshdir)
+ return private_key_path
+
+ async def GetPublicKey(self):
+ """Get the N2VC SSH public key.abs
+
+ Returns the SSH public key, to be injected into virtual machines to
+ be managed by the VCA.
+
+ The first time this is run, a ssh keypair will be created. The public
+ key is injected into a VM so that we can provision the machine with
+ Juju, after which Juju will communicate with the VM directly via the
+ juju agent.
+ """
+ # public_key = ""
+
+ # Find the path to where we expect our key to live.
+ homedir = os.environ["HOME"]
+ sshdir = "{}/.ssh".format(homedir)
+ if not os.path.exists(sshdir):
+ os.mkdir(sshdir)
+
+ private_key_path = "{}/id_n2vc_rsa".format(sshdir)
+ public_key_path = "{}.pub".format(private_key_path)
+
+ # If we don't have a key generated, generate it.
+ if not os.path.exists(private_key_path):
+ cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
+ "rsa", "4096", private_key_path
+ )
+ subprocess.check_output(shlex.split(cmd))
+
+ # Read the public key
+ with open(public_key_path, "r") as f:
+ public_key = f.readline()
+
+ return public_key
+
+ async def ExecuteInitialPrimitives(
+ self, model_name, application_name, params, callback=None, *callback_args
+ ):
+ """Execute multiple primitives.
+
+ Execute multiple primitives as declared in initial-config-primitive.
+ This is useful in cases where the primitives initially failed -- for
+ example, if the charm is a proxy but the proxy hasn't been configured
+ yet.
+ """
+ uuids = []
+ primitives = {}
+
+ # Build a sequential list of the primitives to execute
+ for primitive in params["initial-config-primitive"]:
+ try:
+ if primitive["name"] == "config":
+ pass
+ else:
+ seq = primitive["seq"]
+
+ params_ = {}
+ if "parameter" in primitive:
+ params_ = primitive["parameter"]
+
+ user_values = params.get("user_values", {})
+ if "rw_mgmt_ip" not in user_values:
+ user_values["rw_mgmt_ip"] = None
+ # just for backward compatibility, because it will be provided
+ # always by modern version of LCM
+
+ primitives[seq] = {
+ "name": primitive["name"],
+ "parameters": self._map_primitive_parameters(
+ params_, user_values
+ ),
+ }
+
+ for primitive in sorted(primitives):
+ try:
+ # self.log.debug("Queuing action {}".format(
+ # primitives[primitive]['name']))
+ uuids.append(
+ await self.ExecutePrimitive(
+ model_name,
+ application_name,
+ primitives[primitive]["name"],
+ callback,
+ callback_args,
+ **primitives[primitive]["parameters"],
+ )
+ )
+ except PrimitiveDoesNotExist as e:
+ self.log.debug(
+ "Ignoring exception PrimitiveDoesNotExist: {}".format(e)
+ )
+ pass
+ except Exception as e:
+ self.log.debug(
+ (
+ "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}"
+ ).format(e)
+ )
+ raise e
+
+ except N2VCPrimitiveExecutionFailed as e:
+ self.log.debug("[N2VC] Exception executing primitive: {}".format(e))
+ raise
+ return uuids
+
+ async def ExecutePrimitive(
+ self,
+ model_name,
+ application_name,
+ primitive,
+ callback,
+ *callback_args,
+ **params
+ ):
+ """Execute a primitive of a charm for Day 1 or Day 2 configuration.
+
+ Execute a primitive defined in the VNF descriptor.
+
+ :param str model_name: The name or unique id of the network service.
+ :param str application_name: The name of the application
+ :param str primitive: The name of the primitive to execute.
+ :param obj callback: A callback function to receive status changes.
+ :param tuple callback_args: A list of arguments to be passed to the
+ callback function.
+ :param dict params: A dictionary of key=value pairs representing the
+ primitive's parameters
+ Examples::
+ {
+ 'rw_mgmt_ip': '1.2.3.4',
+ # Pass the initial-config-primitives section of the vnf or vdu
+ 'initial-config-primitives': {...}
+ }
+ """
+ self.log.debug("Executing primitive={} params={}".format(primitive, params))
+ uuid = None
+ try:
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+
+ if primitive == "config":
+ # config is special, and expecting params to be a dictionary
+ await self.set_config(
+ model, application_name, params["params"],
+ )
+ else:
+ app = await self.get_application(model, application_name)
+ if app:
+ # Does this primitive exist?
+ actions = await app.get_actions()
+
+ if primitive not in actions.keys():
+ raise PrimitiveDoesNotExist(
+ "Primitive {} does not exist".format(primitive)
+ )
+
+ # Run against the first (and probably only) unit in the app
+ unit = app.units[0]
+ if unit:
+ action = await unit.run_action(primitive, **params)
+ uuid = action.id
+ except PrimitiveDoesNotExist as e:
+ # Catch and raise this exception if it's thrown from the inner block
+ raise e
+ except Exception as e:
+ # An unexpected exception was caught
+ self.log.debug("Caught exception while executing primitive: {}".format(e))
+ raise N2VCPrimitiveExecutionFailed(e)
+ return uuid
+
+ async def RemoveCharms(
+ self, model_name, application_name, callback=None, *callback_args
+ ):
+ """Remove a charm from the VCA.
+
+ Remove a charm referenced in a VNF Descriptor.
+
+ :param str model_name: The name of the network service.
+ :param str application_name: The name of the application
+ :param obj callback: A callback function to receive status changes.
+ :param tuple callback_args: A list of arguments to be passed to the
+ callback function.
+ """
+ try:
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+ app = await self.get_application(model, application_name)
+ if app:
+ # Remove this application from event monitoring
+ await self.Unsubscribe(model_name, application_name)
+
+ # self.notify_callback(model_name, application_name, "removing",
+ # callback, *callback_args)
+ self.log.debug("Removing the application {}".format(application_name))
+ await app.remove()
+
+ # await self.disconnect_model(self.monitors[model_name])
+
+ self.notify_callback(
+ model_name,
+ application_name,
+ "removed",
+ "Removing charm {}".format(application_name),
+ callback,
+ *callback_args,
+ )
+
+ except Exception as e:
+ print("Caught exception: {}".format(e))
+ self.log.debug(e)
+ raise e
+
+ async def CreateNetworkService(self, ns_uuid):
+ """Create a new Juju model for the Network Service.
+
+ Creates a new Model in the Juju Controller.
+
+ :param str ns_uuid: A unique id representing an instaance of a
+ Network Service.
+
+ :returns: True if the model was created. Raises JujuError on failure.
+ """
+ if not self.authenticated:
+ await self.login()
+
+ models = await self.controller.list_models()
+ if ns_uuid not in models:
+ # Get the new model
+ await self.get_model(ns_uuid)
+
+ return True
+
+ async def DestroyNetworkService(self, ns_uuid):
+ """Destroy a Network Service.
+
+ Destroy the Network Service and any deployed charms.
+
+ :param ns_uuid The unique id of the Network Service
+
+ :returns: True if the model was created. Raises JujuError on failure.
+ """
+
+ # Do not delete the default model. The default model was used by all
+ # Network Services, prior to the implementation of a model per NS.
+ if ns_uuid.lower() == "default":
+ return False
+
+ if not self.authenticated:
+ await self.login()
+
+ models = await self.controller.list_models()
+ if ns_uuid in models:
+ model = await self.controller.get_model(ns_uuid)
+
+ for application in model.applications:
+ app = model.applications[application]
+
+ await self.RemoveCharms(ns_uuid, application)
+
+ self.log.debug("Unsubscribing Watcher for {}".format(application))
+ await self.Unsubscribe(ns_uuid, application)
+
+ self.log.debug("Waiting for application to terminate")
+ timeout = 30
+ try:
+ await model.block_until(
+ lambda: all(
+ unit.workload_status in ["terminated"] for unit in app.units
+ ),
+ timeout=timeout,
+ )
+ except Exception:
+ self.log.debug(
+ "Timed out waiting for {} to terminate.".format(application)
+ )
+
+ for machine in model.machines:
+ try:
+ self.log.debug("Destroying machine {}".format(machine))
+ await model.machines[machine].destroy(force=True)
+ except JujuAPIError as e:
+ if "does not exist" in str(e):
+ # Our cached model may be stale, because the machine
+ # has already been removed. It's safe to continue.
+ continue
+ else:
+ self.log.debug("Caught exception: {}".format(e))
+ raise e
+
+ # Disconnect from the Model
+ if ns_uuid in self.models:
+ self.log.debug("Disconnecting model {}".format(ns_uuid))
+ # await self.disconnect_model(self.models[ns_uuid])
+ await self.disconnect_model(ns_uuid)
+
+ try:
+ self.log.debug("Destroying model {}".format(ns_uuid))
+ await self.controller.destroy_models(ns_uuid)
+ except JujuError:
+ raise NetworkServiceDoesNotExist(
+ "The Network Service '{}' does not exist".format(ns_uuid)
+ )
+
+ return True
+
+ async def GetMetrics(self, model_name, application_name):
+ """Get the metrics collected by the VCA.
+
+ :param model_name The name or unique id of the network service
+ :param application_name The name of the application
+ """
+ metrics = {}
+ model = await self.get_model(model_name)
+ app = await self.get_application(model, application_name)
+ if app:
+ metrics = await app.get_metrics()
+
+ return metrics
+
+ async def HasApplication(self, model_name, application_name):
+ model = await self.get_model(model_name)
+ app = await self.get_application(model, application_name)
+ if app:
+ return True
+ return False
+
+ async def Subscribe(self, ns_name, application_name, callback, *callback_args):
+ """Subscribe to callbacks for an application.
+
+ :param ns_name str: The name of the Network Service
+ :param application_name str: The name of the application
+ :param callback obj: The callback method
+ :param callback_args list: The list of arguments to append to calls to
+ the callback method
+ """
+ self.monitors[ns_name].AddApplication(
+ application_name, callback, *callback_args
+ )
+
+ async def Unsubscribe(self, ns_name, application_name):
+ """Unsubscribe to callbacks for an application.
+
+ Unsubscribes the caller from notifications from a deployed application.
+
+ :param ns_name str: The name of the Network Service
+ :param application_name str: The name of the application
+ """
+ self.monitors[ns_name].RemoveApplication(application_name,)
+
+ # Non-public methods
+ async def add_relation(self, model_name, relation1, relation2):
+ """
+ Add a relation between two application endpoints.
+
+ :param str model_name: The name or unique id of the network service
+ :param str relation1: '<application>[:<relation_name>]'
+ :param str relation2: '<application>[:<relation_name>]'
+ """
+
+ if not self.authenticated:
+ await self.login()
+
+ m = await self.get_model(model_name)
+ try:
+ await m.add_relation(relation1, relation2)
+ except JujuAPIError as e:
+ # If one of the applications in the relationship doesn't exist,
+ # or the relation has already been added, let the operation fail
+ # silently.
+ if "not found" in e.message:
+ return
+ if "already exists" in e.message:
+ return
+
+ raise e
+
+ # async def apply_config(self, config, application):
+ # """Apply a configuration to the application."""
+ # print("JujuApi: Applying configuration to {}.".format(
+ # application
+ # ))
+ # return await self.set_config(application=application, config=config)
+
+ def _get_config_from_dict(self, config_primitive, values):
+ """Transform the yang config primitive to dict.
+
+ Expected result:
+
+ config = {
+ 'config':
+ }
+ """
+ config = {}
+ for primitive in config_primitive:
+ if primitive["name"] == "config":
+ # config = self._map_primitive_parameters()
+ for parameter in primitive["parameter"]:
+ param = str(parameter["name"])
+ if parameter["value"] == "<rw_mgmt_ip>":
+ config[param] = str(values[parameter["value"]])
+ else:
+ config[param] = str(parameter["value"])
+
+ return config
+
+ def _map_primitive_parameters(self, parameters, user_values):
+ params = {}
+ for parameter in parameters:
+ param = str(parameter["name"])
+ value = parameter.get("value")
+
+ # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user
+ # _values.
+ # Must exist at user_values except if there is a default value
+ if isinstance(value, str) and value.startswith("<") and value.endswith(">"):
+ if parameter["value"][1:-1] in user_values:
+ value = user_values[parameter["value"][1:-1]]
+ elif "default-value" in parameter:
+ value = parameter["default-value"]
+ else:
+ raise KeyError(
+ "parameter {}='{}' not supplied ".format(param, value)
+ )
+
+ # If there's no value, use the default-value (if set)
+ if value is None and "default-value" in parameter:
+ value = parameter["default-value"]
+
+ # Typecast parameter value, if present
+ paramtype = "string"
+ try:
+ if "data-type" in parameter:
+ paramtype = str(parameter["data-type"]).lower()
+
+ if paramtype == "integer":
+ value = int(value)
+ elif paramtype == "boolean":
+ value = bool(value)
+ else:
+ value = str(value)
+ else:
+ # If there's no data-type, assume the value is a string
+ value = str(value)
+ except ValueError:
+ raise ValueError(
+ "parameter {}='{}' cannot be converted to type {}".format(
+ param, value, paramtype
+ )
+ )
+
+ params[param] = value
+ return params
+
+ def _get_config_from_yang(self, config_primitive, values):
+ """Transform the yang config primitive to dict."""
+ config = {}
+ for primitive in config_primitive.values():
+ if primitive["name"] == "config":
+ for parameter in primitive["parameter"].values():
+ param = str(parameter["name"])
+ if parameter["value"] == "<rw_mgmt_ip>":
+ config[param] = str(values[parameter["value"]])
+ else:
+ config[param] = str(parameter["value"])
+
+ return config
+
+ def FormatApplicationName(self, *args):
+ """
+ Generate a Juju-compatible Application name
+
+ :param args tuple: Positional arguments to be used to construct the
+ application name.
+
+ Limitations::
+ - Only accepts characters a-z and non-consequitive dashes (-)
+ - Application name should not exceed 50 characters
+
+ Examples::
+
+ FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
+ """
+ appname = ""
+ for c in "-".join(list(args)):
+ if c.isdigit():
+ c = chr(97 + int(c))
+ elif not c.isalpha():
+ c = "-"
+ appname += c
+ return re.sub("-+", "-", appname.lower())
+
+ # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
+ # """Format the name of the application
+ #
+ # Limitations:
+ # - Only accepts characters a-z and non-consequitive dashes (-)
+ # - Application name should not exceed 50 characters
+ # """
+ # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
+ # new_name = ''
+ # for c in name:
+ # if c.isdigit():
+ # c = chr(97 + int(c))
+ # elif not c.isalpha():
+ # c = "-"
+ # new_name += c
+ # return re.sub('\-+', '-', new_name.lower())
+
+ def format_model_name(self, name):
+ """Format the name of model.
+
+ Model names may only contain lowercase letters, digits and hyphens
+ """
+
+ return name.replace("_", "-").lower()
+
+ async def get_application(self, model, application):
+ """Get the deployed application."""
+ if not self.authenticated:
+ await self.login()
+
+ app = None
+ if application and model:
+ if model.applications:
+ if application in model.applications:
+ app = model.applications[application]
+
+ return app
+
+ async def get_model(self, model_name):
+ """Get a model from the Juju Controller.
+
+ Note: Model objects returned must call disconnected() before it goes
+ out of scope."""
+ if not self.authenticated:
+ await self.login()
+
+ if model_name not in self.models:
+ # Get the models in the controller
+ models = await self.controller.list_models()
+
+ if model_name not in models:
+ try:
+ self.models[model_name] = await self.controller.add_model(
+ model_name, config={"authorized-keys": self.juju_public_key}
+ )
+ except JujuError as e:
+ if "already exists" not in e.message:
+ raise e
+ else:
+ self.models[model_name] = await self.controller.get_model(model_name)
+
+ self.refcount["model"] += 1
+
+ # Create an observer for this model
+ await self.create_model_monitor(model_name)
+
+ return self.models[model_name]
+
+ async def create_model_monitor(self, model_name):
+ """Create a monitor for the model, if none exists."""
+ if not self.authenticated:
+ await self.login()
+
+ if model_name not in self.monitors:
+ self.monitors[model_name] = VCAMonitor(model_name)
+ self.models[model_name].add_observer(self.monitors[model_name])
+
+ return True
+
+ async def login(self):
+ """Login to the Juju controller."""
+
+ if self.authenticated:
+ return
+
+ self.connecting = True
+
+ self.log.debug("JujuApi: Logging into controller")
+
+ self.controller = Controller(loop=self.loop)
+
+ if self.secret:
+ self.log.debug(
+ "Connecting to controller... ws://{} as {}/{}".format(
+ self.endpoint, self.user, self.secret,
+ )
+ )
+ try:
+ await self.controller.connect(
+ endpoint=self.endpoint,
+ username=self.user,
+ password=self.secret,
+ cacert=self.ca_cert,
+ )
+ self.refcount["controller"] += 1
+ self.authenticated = True
+ self.log.debug("JujuApi: Logged into controller")
+ except Exception as ex:
+ self.log.debug("Caught exception: {}".format(ex))
+ else:
+ # current_controller no longer exists
+ # self.log.debug("Connecting to current controller...")
+ # await self.controller.connect_current()
+ # await self.controller.connect(
+ # endpoint=self.endpoint,
+ # username=self.user,
+ # cacert=cacert,
+ # )
+ self.log.fatal("VCA credentials not configured.")
+ self.authenticated = False
+
+ async def logout(self):
+ """Logout of the Juju controller."""
+ if not self.authenticated:
+ return False
+
+ try:
+ for model in self.models:
+ await self.disconnect_model(model)
+
+ if self.controller:
+ self.log.debug("Disconnecting controller {}".format(self.controller))
+ await self.controller.disconnect()
+ self.refcount["controller"] -= 1
+ self.controller = None
+
+ self.authenticated = False
+
+ self.log.debug(self.refcount)
+
+ except Exception as e:
+ self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e))
+ raise e
+ return True
+
+ async def disconnect_model(self, model):
+ self.log.debug("Disconnecting model {}".format(model))
+ if model in self.models:
+ try:
+ await self.models[model].disconnect()
+ self.refcount["model"] -= 1
+ self.models[model] = None
+ except Exception as e:
+ self.log.debug("Caught exception: {}".format(e))
+
+ async def provision_machine(
+ self, model_name: str, hostname: str, username: str, private_key_path: str
+ ) -> int:
+ """Provision a machine.
+
+ This executes the SSH provisioner, which will log in to a machine via
+ SSH and prepare it for use with the Juju model
+
+ :param model_name str: The name of the model
+ :param hostname str: The IP or hostname of the target VM
+ :param user str: The username to login to
+ :param private_key_path str: The path to the private key that's been injected
+ to the VM via cloud-init
+ :return machine_id int: Returns the id of the machine or None if provisioning
+ fails
+ """
+ if not self.authenticated:
+ await self.login()
+
+ machine_id = None
+
+ if self.api_proxy:
+ self.log.debug(
+ "Instantiating SSH Provisioner for {}@{} ({})".format(
+ username, hostname, private_key_path
+ )
+ )
+ provisioner = SSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log,
+ )
+
+ params = None
+ try:
+ params = provisioner.provision_machine()
+ except Exception as ex:
+ self.log.debug("caught exception from provision_machine: {}".format(ex))
+ return None
+
+ if params:
+ params.jobs = ["JobHostUnits"]
+
+ model = await self.get_model(model_name)
+
+ connection = model.connection()
+
+ # Submit the request.
+ self.log.debug("Adding machine to model")
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+ if error:
+ raise ValueError("Error adding machine: %s" % error.message)
+
+ machine_id = results.machines[0].machine
+
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ self.log.debug("Installing Juju agent")
+ await provisioner.install_agent(
+ connection, params.nonce, machine_id, self.api_proxy,
+ )
+ else:
+ self.log.debug("Missing API Proxy")
+ return machine_id
+
+ # async def remove_application(self, name):
+ # """Remove the application."""
+ # if not self.authenticated:
+ # await self.login()
+ #
+ # app = await self.get_application(name)
+ # if app:
+ # self.log.debug("JujuApi: Destroying application {}".format(
+ # name,
+ # ))
+ #
+ # await app.destroy()
+
+ async def remove_relation(self, a, b):
+ """
+ Remove a relation between two application endpoints
+
+ :param a An application endpoint
+ :param b An application endpoint
+ """
+ if not self.authenticated:
+ await self.login()
+
+ # m = await self.get_model()
+ # try:
+ # m.remove_relation(a, b)
+ # finally:
+ # await m.disconnect()
+
+ async def resolve_error(self, model_name, application=None):
+ """Resolve units in error state."""
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+
+ app = await self.get_application(model, application)
+ if app:
+ self.log.debug(
+ "JujuApi: Resolving errors for application {}".format(application,)
+ )
+
+ for _ in app.units:
+ app.resolved(retry=True)
+
+ async def run_action(self, model_name, application, action_name, **params):
+ """Execute an action and return an Action object."""
+ if not self.authenticated:
+ await self.login()
+ result = {"status": "", "action": {"tag": None, "results": None}}
+
+ model = await self.get_model(model_name)
+
+ app = await self.get_application(model, application)
+ if app:
+ # We currently only have one unit per application
+ # so use the first unit available.
+ unit = app.units[0]
+
+ self.log.debug(
+ "JujuApi: Running Action {} against Application {}".format(
+ action_name, application,
+ )
+ )
+
+ action = await unit.run_action(action_name, **params)
+
+ # Wait for the action to complete
+ await action.wait()
+
+ result["status"] = action.status
+ result["action"]["tag"] = action.data["id"]
+ result["action"]["results"] = action.results
+
+ return result
+
+ async def set_config(self, model_name, application, config):
+ """Apply a configuration to the application."""
+ if not self.authenticated:
+ await self.login()
+
+ app = await self.get_application(model_name, application)
+ if app:
+ self.log.debug(
+ "JujuApi: Setting config for Application {}".format(application,)
+ )
+ await app.set_config(config)
+
+ # Verify the config is set
+ newconf = await app.get_config()
+ for key in config:
+ if config[key] != newconf[key]["value"]:
+ self.log.debug(
+ (
+ "JujuApi: Config not set! Key {} Value {} doesn't match {}"
+ ).format(key, config[key], newconf[key])
+ )
+
+ # async def set_parameter(self, parameter, value, application=None):
+ # """Set a config parameter for a service."""
+ # if not self.authenticated:
+ # await self.login()
+ #
+ # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
+ # parameter,
+ # value,
+ # application,
+ # ))
+ # return await self.apply_config(
+ # {parameter: value},
+ # application=application,
+ # )
+
+ async def wait_for_application(self, model_name, application_name, timeout=300):
+ """Wait for an application to become active."""
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.get_model(model_name)
+
+ app = await self.get_application(model, application_name)
+ self.log.debug("Application: {}".format(app))
+ if app:
+ self.log.debug(
+ "JujuApi: Waiting {} seconds for Application {}".format(
+ timeout, application_name,
+ )
+ )
+
+ await model.block_until(
+ lambda: all(
+ unit.agent_status == "idle"
+ and unit.workload_status in ["active", "unknown"]
+ for unit in app.units
+ ),
+ timeout=timeout,
+ )
git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
juju==2.8.2
+paramiko
pyasn1>=0.4.4
kubernetes==10.0.1
exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
install_requires=[
'juju==2.8.2',
+ 'paramiko',
'pyasn1>=0.4.4',
'kubernetes==10.0.1'
],