From: David Garcia Date: Wed, 13 May 2020 10:18:38 +0000 (+0200) Subject: Initial refactor of N2VC X-Git-Tag: release-v8.0-start~2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F09%2F8909%2F46;p=osm%2FN2VC.git Initial refactor of N2VC - Add libjuju.py to encapsulate all code related with the communication using libjuju - Replace juju_observer with juju_watcher - Add utils with some utilities like EntityType, FinalStatus, and JujuStatusToOSM Change-Id: I5d46ad65d6ed37b49bf2504209bfdd46d1b5a7e4 Signed-off-by: David Garcia --- diff --git a/n2vc/exceptions.py b/n2vc/exceptions.py index 09f3573..061cd7a 100644 --- a/n2vc/exceptions.py +++ b/n2vc/exceptions.py @@ -17,10 +17,26 @@ class JujuCharmNotFound(Exception): """The Charm can't be found or is not readable.""" +class JujuControllerFailedConnecting(Exception): + """Failed connecting to juju controller.""" + + +class JujuModelAlreadyExists(Exception): + """The model already exists.""" + + class JujuApplicationExists(Exception): """The Application already exists.""" +class JujuApplicationNotFound(Exception): + """The Application cannot be found.""" + + +class JujuMachineNotFound(Exception): + """The machine cannot be found.""" + + class N2VCPrimitiveExecutionFailed(Exception): """Something failed while attempting to execute a primitive.""" @@ -157,3 +173,7 @@ class K8sException(Exception): def __repr__(self): return self._message + + +class EntityInvalidException(Exception): + """Entity is not valid, the type does not match any EntityType.""" diff --git a/n2vc/juju_watcher.py b/n2vc/juju_watcher.py new file mode 100644 index 0000000..815abf9 --- /dev/null +++ b/n2vc/juju_watcher.py @@ -0,0 +1,209 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import time +from juju.client import client +from n2vc.utils import FinalStatus, EntityType +from n2vc.exceptions import EntityInvalidException +from n2vc.n2vc_conn import N2VCConnector +from juju.model import ModelEntity, Model +from juju.client.overrides import Delta + +import logging + +logger = logging.getLogger("__main__") + + +class JujuModelWatcher: + @staticmethod + async def wait_for( + model, + entity: ModelEntity, + progress_timeout: float = 3600, + total_timeout: float = 3600, + db_dict: dict = None, + n2vc: N2VCConnector = None, + ): + """ + Wait for entity to reach its final state. + + :param: model: Model to observe + :param: entity: Entity object + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + :param: db_dict: Dictionary with data of the DB to write the updates + :param: n2vc: N2VC Connector objector + + :raises: asyncio.TimeoutError when timeout reaches + """ + + if progress_timeout is None: + progress_timeout = 3600.0 + if total_timeout is None: + total_timeout = 3600.0 + + entity_type = EntityType.get_entity(type(entity)) + if entity_type not in FinalStatus: + raise EntityInvalidException("Entity type not found") + + # Get final states + final_states = FinalStatus[entity_type].status + field_to_check = FinalStatus[entity_type].field + + # Coroutine to wait until the entity reaches the final state + wait_for_entity = asyncio.ensure_future( + asyncio.wait_for( + model.block_until( + lambda: entity.__getattribute__(field_to_check) in final_states + ), + timeout=total_timeout, + ) + ) + + # Coroutine to watch the model for changes (and write them to DB) + watcher = asyncio.ensure_future( + JujuModelWatcher.model_watcher( + model, + entity_id=entity.entity_id, + entity_type=entity_type, + timeout=progress_timeout, + db_dict=db_dict, + n2vc=n2vc, + ) + ) + + tasks = [wait_for_entity, watcher] + try: + # Execute tasks, and stop when the first is finished + # The watcher task won't never finish (unless it timeouts) + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + except Exception as e: + raise e + finally: + # Cancel tasks + for task in tasks: + task.cancel() + + @staticmethod + async def model_watcher( + model: Model, + entity_id: str, + entity_type: EntityType, + timeout: float, + db_dict: dict = None, + n2vc: N2VCConnector = None, + ): + """ + Observes the changes related to an specific entity in a model + + :param: model: Model to observe + :param: entity_id: ID of the entity to be observed + :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION) + :param: timeout: Maximum time between two updates in the model + :param: db_dict: Dictionary with data of the DB to write the updates + :param: n2vc: N2VC Connector objector + + :raises: asyncio.TimeoutError when timeout reaches + """ + + allwatcher = client.AllWatcherFacade.from_connection(model.connection()) + + # Genenerate array with entity types to listen + entity_types = ( + [entity_type, EntityType.UNIT] + if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too + else [entity_type] + ) + + # Get time when it should timeout + timeout_end = time.time() + timeout + + while True: + change = await allwatcher.Next() + for delta in change.deltas: + write = False + delta_entity = None + + # Get delta EntityType + delta_entity = EntityType.get_entity_from_delta(delta.entity) + + if delta_entity in entity_types: + # Get entity id + if entity_type == EntityType.APPLICATION: + id = ( + delta.data["application"] + if delta_entity == EntityType.UNIT + else delta.data["name"] + ) + else: + id = delta.data["id"] + + # Write if the entity id match + write = True if id == entity_id else False + + # Update timeout + timeout_end = time.time() + timeout + (status, status_message, vca_status) = JujuModelWatcher.get_status( + delta, entity_type=delta_entity + ) + + if write and n2vc is not None and db_dict: + # Write status to DB + status = n2vc.osm_status(delta_entity, status) + await n2vc.write_app_status_to_db( + db_dict=db_dict, + status=status, + detailed_status=status_message, + vca_status=vca_status, + entity_type=delta_entity.value.__name__.lower(), + ) + # Check if timeout + if time.time() > timeout_end: + raise asyncio.TimeoutError() + + @staticmethod + def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str): + """ + Get status from delta + + :param: delta: Delta generated by the allwatcher + :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION) + + :return (status, message, vca_status) + """ + if entity_type == EntityType.MACHINE: + return ( + delta.data["agent-status"]["current"], + delta.data["instance-status"]["message"], + delta.data["instance-status"]["current"], + ) + elif entity_type == EntityType.ACTION: + return ( + delta.data["status"], + delta.data["status"], + delta.data["status"], + ) + elif entity_type == EntityType.APPLICATION: + return ( + delta.data["status"]["current"], + delta.data["status"]["message"], + delta.data["status"]["current"], + ) + elif entity_type == EntityType.UNIT: + return ( + delta.data["workload-status"]["current"], + delta.data["workload-status"]["message"], + delta.data["workload-status"]["current"], + ) diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py new file mode 100644 index 0000000..9945c91 --- /dev/null +++ b/n2vc/libjuju.py @@ -0,0 +1,806 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +from juju.controller import Controller +from juju.client.connector import NoConnectionException +from juju.client import client +import time + +from juju.errors import JujuAPIError +from juju.model import Model +from juju.machine import Machine +from juju.application import Application +from juju.client._definitions import FullStatus +from n2vc.juju_watcher import JujuModelWatcher +from n2vc.provisioner import AsyncSSHProvisioner +from n2vc.n2vc_conn import N2VCConnector +from n2vc.exceptions import ( + JujuMachineNotFound, + JujuApplicationNotFound, + JujuModelAlreadyExists, + JujuControllerFailedConnecting, + JujuApplicationExists, +) + + +class Libjuju: + def __init__( + self, + endpoint: str, + api_proxy: str, + username: str, + password: str, + cacert: str, + loop: asyncio.AbstractEventLoop = None, + log: logging.Logger = None, + db: dict = None, + n2vc: N2VCConnector = None, + apt_mirror: str = None, + enable_os_upgrade: bool = True, + ): + """ + Constructor + + :param: endpoint: Endpoint of the juju controller (host:port) + :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs + :param: username: Juju username + :param: password: Juju password + :param: cacert: Juju CA Certificate + :param: loop: Asyncio loop + :param: log: Logger + :param: db: DB object + :param: n2vc: N2VC object + :param: apt_mirror: APT Mirror + :param: enable_os_upgrade: Enable OS Upgrade + """ + + self.endpoints = [endpoint] # TODO: Store and get endpoints from DB + self.api_proxy = api_proxy + self.username = username + self.password = password + self.cacert = cacert + self.loop = loop or asyncio.get_event_loop() + self.log = log or logging.getLogger("Libjuju") + self.db = db + self.n2vc = n2vc + + # Generate config for models + self.model_config = {} + if apt_mirror: + self.model_config["apt-mirror"] = apt_mirror + self.model_config["enable-os-refresh-update"] = enable_os_upgrade + self.model_config["enable-os-upgrade"] = enable_os_upgrade + + self.reconnecting = asyncio.Lock(loop=self.loop) + self.creating_model = asyncio.Lock(loop=self.loop) + + self.models = set() + self.controller = Controller(loop=self.loop) + + self.loop.run_until_complete(self.connect()) + + async def connect(self): + """Connect to the controller""" + + self.log.debug("Connecting from controller") + await self.controller.connect( + endpoint=self.endpoints, + username=self.username, + password=self.password, + cacert=self.cacert, + ) + e = self.controller.connection().endpoint + self.log.info("Connected to controller: {}".format(e)) + + async def disconnect(self): + """Disconnect from controller""" + + self.log.debug("Disconnecting from controller") + await self.controller.disconnect() + self.log.info("Disconnected from controller") + + def controller_connected(self) -> bool: + """Check if the controller connection is open + + :return: bool: True if connected, False if not connected + """ + + is_connected = False + try: + is_connected = self.controller.connection().is_open + except NoConnectionException: + self.log.warning("VCA not connected") + return is_connected + + async def disconnect_model(self, model: Model): + """ + Disconnect model + + :param: model: Model that will be disconnected + """ + try: + await model.disconnect() + except Exception: + pass + + async def _reconnect( + self, + retry: bool = False, + timeout: int = 5, + time_between_retries: int = 3, + maximum_retries: int = 0, + ): + """ + Reconnect to the controller + + :param: retry: Set it to True to retry if the connection fails + :param: time_between_retries: Time in seconds between retries + :param: maximum_retries Maximum retries. If not set, it will retry forever + + :raises: Exception if cannot connect to the controller + """ + + if self.reconnecting.locked(): + # Return if another function is trying to reconnect + return + async with self.reconnecting: + attempt = 0 + while True: + try: + await asyncio.wait_for(self.connect(), timeout=timeout) + break + except asyncio.TimeoutError: + self.log.error("Error reconnecting to controller: Timeout") + except Exception as e: + self.log.error("Error reconnecting to controller: {}".format(e)) + + attempt += 1 + maximum_retries_reached = attempt == maximum_retries + + if not retry or maximum_retries_reached: + raise JujuControllerFailedConnecting("Controller is not connected") + else: + await asyncio.sleep(time_between_retries) + + async def add_model(self, model_name: str, cloud_name: str): + """ + Create model + + :param: model_name: Model name + :param: cloud_name: Cloud name + """ + + # Reconnect to the controller if not connected + if not self.controller_connected(): + await self._reconnect() + + # Raise exception if model already exists + if await self.model_exists(model_name): + raise JujuModelAlreadyExists("Model {} already exists.".format(model_name)) + + # Block until other workers have finished model creation + while self.creating_model.locked(): + await asyncio.sleep(0.1) + + # If the model exists, return it from the controller + if model_name in self.models: + return await self.get_model(model_name) + + # Create the model + self.log.debug("Creating model {}".format(model_name)) + async with self.creating_model: + model = await self.controller.add_model( + model_name, + config=self.model_config, + cloud_name=cloud_name, + credential_name=cloud_name, + ) + await self.disconnect_model(model) + self.models.add(model_name) + + async def get_model(self, model_name: str) -> Model: + """ + Get model from controller + + :param: model_name: Model name + + :return: Model: The created Juju model object + """ + + # Check if controller is connected + if not self.controller_connected(): + await self._reconnect() + return await self.controller.get_model(model_name) + + async def model_exists(self, model_name: str) -> bool: + """ + Check if model exists + + :param: model_name: Model name + + :return bool + """ + + # Check if controller is connected + if not self.controller_connected(): + await self._reconnect() + + return model_name in await self.controller.list_models() + + async def get_model_status(self, model_name: str) -> FullStatus: + """ + Get model status + + :param: model_name: Model name + + :return: Full status object + """ + model = await self.get_model(model_name) + status = await model.get_status() + await self.disconnect_model(model) + return status + + async def create_machine( + self, + model_name: str, + machine_id: str = None, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + series: str = "xenial", + ) -> (Machine, bool): + """ + Create machine + + :param: model_name: Model name + :param: machine_id: Machine id + :param: db_dict: Dictionary with data of the DB to write the updates + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + + :return: (juju.machine.Machine, bool): Machine object and a boolean saying + if the machine is new or it already existed + """ + new = False + machine = None + + self.log.debug( + "Creating machine (id={}) in model: {}".format(machine_id, model_name) + ) + + # Get model + model = await self.get_model(model_name) + try: + if machine_id is not None: + self.log.debug( + "Searching machine (id={}) in model {}".format( + machine_id, model_name + ) + ) + + # Get machines from model and get the machine with machine_id if exists + machines = await model.get_machines() + if machine_id in machines: + self.log.debug( + "Machine (id={}) found in model {}".format( + machine_id, model_name + ) + ) + machine = model.machines[machine_id] + else: + raise JujuMachineNotFound("Machine {} not found".format(machine_id)) + + if machine is None: + self.log.debug("Creating a new machine in model {}".format(model_name)) + + # Create machine + machine = await model.add_machine( + spec=None, constraints=None, disks=None, series=series + ) + new = True + + # Wait until the machine is ready + await JujuModelWatcher.wait_for( + model=model, + entity=machine, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + db_dict=db_dict, + n2vc=self.n2vc, + ) + except Exception as e: + raise e + finally: + await self.disconnect_model(model) + + self.log.debug("Machine ready at {}".format(machine.dns_name)) + return machine, new + + async def provision_machine( + self, + model_name: str, + hostname: str, + username: str, + private_key_path: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ) -> str: + """ + Manually provisioning of a machine + + :param: model_name: Model name + :param: hostname: IP to access the machine + :param: username: Username to login to the machine + :param: private_key_path: Local path for the private key + :param: db_dict: Dictionary with data of the DB to write the updates + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + + :return: (Entity): Machine id + """ + self.log.debug( + "Provisioning machine. model: {}, hostname: {}, username: {}".format( + model_name, hostname, username + ) + ) + + # Get model + model = await self.get_model(model_name) + + try: + # Get provisioner + provisioner = AsyncSSHProvisioner( + host=hostname, + user=username, + private_key_path=private_key_path, + log=self.log, + ) + + # Provision machine + params = await provisioner.provision_machine() + + params.jobs = ["JobHostUnits"] + + self.log.debug("Adding machine to model") + connection = model.connection() + client_facade = client.ClientFacade.from_connection(connection) + + results = await client_facade.AddMachines(params=[params]) + error = results.machines[0].error + + if error: + msg = "Error adding machine: {}".format(error.message) + self.log.error(msg=msg) + raise ValueError(msg) + + machine_id = results.machines[0].machine + + self.log.debug("Installing Juju agent into machine {}".format(machine_id)) + asyncio.ensure_future( + provisioner.install_agent( + connection=connection, + nonce=params.nonce, + machine_id=machine_id, + api=self.api_proxy, + ) + ) + + machine = None + for _ in range(10): + machine_list = await model.get_machines() + if machine_id in machine_list: + self.log.debug("Machine {} found in model!".format(machine_id)) + machine = model.machines.get(machine_id) + break + await asyncio.sleep(2) + + if machine is None: + msg = "Machine {} not found in model".format(machine_id) + self.log.error(msg=msg) + raise JujuMachineNotFound(msg) + + await JujuModelWatcher.wait_for( + model=model, + entity=machine, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + db_dict=db_dict, + n2vc=self.n2vc, + ) + except Exception as e: + raise e + finally: + await self.disconnect_model(model) + + self.log.debug("Machine provisioned {}".format(machine_id)) + + return machine_id + + async def deploy_charm( + self, + application_name: str, + path: str, + model_name: str, + machine_id: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + series: str = None, + ): + """Deploy charm + + :param: application_name: Application name + :param: path: Local path to the charm + :param: model_name: Model name + :param: machine_id ID of the machine + :param: db_dict: Dictionary with data of the DB to write the updates + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + :param: config: Config for the charm + :param: series: Series of the charm + + :return: (juju.application.Application): Juju application + """ + + # Get model + model = await self.get_model(model_name) + + try: + application = None + if application_name not in model.applications: + self.log.debug( + "Deploying charm {} to machine {} in model ~{}".format( + application_name, machine_id, model_name + ) + ) + self.log.debug("charm: {}".format(path)) + if machine_id is not None: + if machine_id not in model.machines: + msg = "Machine {} not found in model".format(machine_id) + self.log.error(msg=msg) + raise JujuMachineNotFound(msg) + machine = model.machines[machine_id] + series = machine.series + + application = await model.deploy( + entity_url=path, + application_name=application_name, + channel="stable", + num_units=1, + series=series, + to=machine_id, + config=config, + ) + + await JujuModelWatcher.wait_for( + model=model, + entity=application, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + db_dict=db_dict, + n2vc=self.n2vc, + ) + else: + raise JujuApplicationExists("Application {} exists".format(application_name)) + + except Exception as e: + raise e + finally: + await self.disconnect_model(model) + + self.log.debug("application deployed") + + return application + + async def _get_application( + self, model: Model, application_name: str + ) -> Application: + """Get application + + :param: model: Model object + :param: application_name: Application name + + :return: juju.application.Application (or None if it doesn't exist) + """ + if model.applications and application_name in model.applications: + return model.applications[application_name] + + async def execute_action( + self, + application_name: str, + model_name: str, + action_name: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs + ): + """Execute action + + :param: application_name: Application name + :param: model_name: Model name + :param: cloud_name: Cloud name + :param: action_name: Name of the action + :param: db_dict: Dictionary with data of the DB to write the updates + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + + :return: (str, str): (output and status) + """ + # Get model and observer + model = await self.get_model(model_name) + + try: + # Get application + application = await self._get_application( + model, application_name=application_name, + ) + if application is None: + raise JujuApplicationNotFound("Cannot execute action") + + # Get unit + unit = None + for u in application.units: + if await u.is_leader_from_status(): + unit = u + if unit is None: + raise Exception("Cannot execute action: leader unit not found") + + actions = await application.get_actions() + + if action_name not in actions: + raise Exception( + "Action {} not in available actions".format(action_name) + ) + + self.log.debug( + "Executing action {} using params {}".format(action_name, kwargs) + ) + action = await unit.run_action(action_name, **kwargs) + + # Register action with observer and wait for it to finish + await JujuModelWatcher.wait_for( + model=model, + entity=action, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + db_dict=db_dict, + n2vc=self.n2vc, + ) + output = await model.get_action_output(action_uuid=action.entity_id) + status = await model.get_action_status(uuid_or_prefix=action.entity_id) + status = ( + status[action.entity_id] if action.entity_id in status else "failed" + ) + + self.log.debug("action completed with status: {}".format(action.status)) + except Exception as e: + raise e + finally: + await self.disconnect_model(model) + + return output, status + + async def get_actions(self, application_name: str, model_name: str) -> dict: + """Get list of actions + + :param: application_name: Application name + :param: model_name: Model name + + :return: Dict with this format + { + "action_name": "Description of the action", + ... + } + """ + + # Get model + model = await self.get_model(model_name) + + # Get application + application = await self._get_application( + model, application_name=application_name, + ) + + # Get list of actions + actions = await application.get_actions() + + # Disconnect from model + await self.disconnect_model(model) + + return actions + + async def add_relation( + self, + model_name: str, + application_name_1: str, + application_name_2: str, + relation_1: str, + relation_2: str, + ): + """Add relation + + :param: model_name: Model name + :param: application_name_1 First application name + :param: application_name_2: Second application name + :param: relation_1: First relation name + :param: relation_2: Second relation name + """ + + # Get model + model = await self.get_model(model_name) + + # Build relation strings + r1 = "{}:{}".format(application_name_1, relation_1) + r2 = "{}:{}".format(application_name_2, relation_2) + + # Add relation + self.log.debug("Adding relation: {} -> {}".format(r1, r2)) + try: + await model.add_relation(relation1=r1, relation2=r2) + except JujuAPIError as e: + if "not found" in e.message: + self.log.warning("Relation not found: {}".format(e.message)) + return + if "already exists" in e.message: + self.log.warning("Relation already exists: {}".format(e.message)) + return + # another exception, raise it + raise e + finally: + await self.disconnect_model(model) + + async def destroy_model( + self, model_name: str, total_timeout: float, + ): + """ + Destroy model + + :param: model_name: Model name + :param: total_timeout: Timeout + """ + model = await self.get_model(model_name) + uuid = model.info.uuid + + # Destroy applications + for application_name in model.applications: + try: + await self.destroy_application( + model, application_name=application_name, + ) + except Exception as e: + self.log.error( + "Error destroying application {} in model {}: {}".format( + application_name, model_name, e + ) + ) + + # Destroy machines + machines = await model.get_machines() + for machine_id in machines: + try: + await self.destroy_machine( + model, machine_id=machine_id, total_timeout=total_timeout, + ) + except asyncio.CancelledError: + raise + except Exception: + pass + + # Disconnect model + await self.disconnect_model(model) + + # Destroy model + self.models.remove(model_name) + await self.controller.destroy_model(uuid) + + # Wait until model is destroyed + self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) + last_exception = "" + + if total_timeout is None: + total_timeout = 3600 + end = time.time() + total_timeout + while time.time() < end: + try: + models = await self.controller.list_models() + if model_name not in models: + self.log.debug( + "The model {} ({}) was destroyed".format(model_name, uuid) + ) + return + except asyncio.CancelledError: + raise + except Exception as e: + last_exception = e + await asyncio.sleep(5) + raise Exception( + "Timeout waiting for model {} to be destroyed {}".format( + model_name, last_exception + ) + ) + + async def destroy_application(self, model: Model, application_name: str): + """ + Destroy application + + :param: model: Model object + :param: application_name: Application name + """ + self.log.debug( + "Destroying application {} in model {}".format( + application_name, model.info.name + ) + ) + application = model.applications.get(application_name) + if application: + await application.destroy() + else: + self.log.warning("Application not found: {}".format(application_name)) + + async def destroy_machine( + self, model: Model, machine_id: str, total_timeout: float = 3600 + ): + """ + Destroy machine + + :param: model: Model object + :param: machine_id: Machine id + :param: total_timeout: Timeout in seconds + """ + machines = await model.get_machines() + if machine_id in machines: + machine = model.machines[machine_id] + # TODO: change this by machine.is_manual when this is upstreamed: + # https://github.com/juju/python-libjuju/pull/396 + if "instance-id" in machine.safe_data and machine.safe_data[ + "instance-id" + ].startswith("manual:"): + await machine.destroy(force=True) + + # max timeout + end = time.time() + total_timeout + + # wait for machine removal + machines = await model.get_machines() + while machine_id in machines and time.time() < end: + self.log.debug( + "Waiting for machine {} is destroyed".format(machine_id) + ) + await asyncio.sleep(0.5) + machines = await model.get_machines() + self.log.debug("Machine destroyed: {}".format(machine_id)) + else: + self.log.debug("Machine not found: {}".format(machine_id)) + + async def configure_application( + self, model_name: str, application_name: str, config: dict = None + ): + """Configure application + + :param: model_name: Model name + :param: application_name: Application name + :param: config: Config to apply to the charm + """ + if config: + model = await self.get_model(model_name) + application = await self._get_application( + model, application_name=application_name, + ) + await application.set_config(config) + await self.disconnect_model(model) diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index c0bb558..3fc7c57 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -23,7 +23,6 @@ import abc import asyncio -from enum import Enum from http import HTTPStatus import os import shlex @@ -35,14 +34,7 @@ from osm_common.dbmongo import DbException import yaml from n2vc.loggable import Loggable - - -class N2VCDeploymentStatus(Enum): - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - UNKNOWN = "unknown" +from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus class N2VCConnector(abc.ABC, Loggable): @@ -468,7 +460,14 @@ class N2VCConnector(abc.ABC, Loggable): else: self.log.info("Exception writing status to database: {}".format(e)) + def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus: + if status not in JujuStatusToOSM[entity_type]: + self.log.warning("Status {} not found in JujuStatusToOSM.") + return N2VCDeploymentStatus.UNKNOWN + return JujuStatusToOSM[entity_type][status] + +# DEPRECATED def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: if statustype == "application" or statustype == "unit": if status in ["waiting", "maintenance"]: diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index 0ac8b08..4545af2 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -48,14 +48,15 @@ from n2vc.juju_observer import JujuModelObserver from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml from n2vc.provisioner import AsyncSSHProvisioner +from n2vc.libjuju import Libjuju class N2VCJujuConnector(N2VCConnectorlocalhost", "microk8s"] @@ -198,6 +199,19 @@ class N2VCJujuConnector(N2VCConnector): False # it will be True when juju connection be stablished ) self._creating_model = False # True during model creation + self.libjuju = Libjuju( + endpoint=self.url, + api_proxy=self.api_proxy, + enable_os_upgrade=self.enable_os_upgrade, + apt_mirror=self.apt_mirror, + username=self.username, + password=self.secret, + cacert=self.ca_cert, + loop=self.loop, + log=self.log, + db=self.db, + n2vc=self, + ) # create juju pub key file in lcm container at # ./local/share/juju/ssh/juju_id_rsa.pub @@ -209,9 +223,6 @@ class N2VCJujuConnector(N2VCConnector): # self.log.info('Getting NS status. namespace: {}'.format(namespace)) - if not self._authenticated: - await self._juju_login() - _nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components( namespace=namespace ) @@ -222,10 +233,7 @@ class N2VCJujuConnector(N2VCConnector): self.log.error(msg) raise N2VCBadArgumentsException(msg, ["namespace"]) - # get juju model (create model if needed) - model = await self._juju_get_model(model_name=model_name) - - status = await model.get_status() + status = await self.libjuju.get_model_status(model_name) if yaml_format: return obj_to_yaml(status) @@ -247,9 +255,6 @@ class N2VCJujuConnector(N2VCConnector): ) ) - if not self._authenticated: - await self._juju_login() - machine_id = None if reuse_ee_id: model_name, application_name, machine_id = self._get_ee_id_components( @@ -276,30 +281,36 @@ class N2VCJujuConnector(N2VCConnector): # create or reuse a new juju machine try: - machine = await self._juju_create_machine( + if not await self.libjuju.model_exists(model_name): + await self.libjuju.add_model(model_name, cloud_name=self.cloud) + machine, new = await self.libjuju.create_machine( model_name=model_name, - application_name=application_name, machine_id=machine_id, db_dict=db_dict, progress_timeout=progress_timeout, total_timeout=total_timeout, ) + # id for the execution environment + ee_id = N2VCJujuConnector._build_ee_id( + model_name=model_name, + application_name=application_name, + machine_id=str(machine.entity_id), + ) + self.log.debug("ee_id: {}".format(ee_id)) + + if new: + # write ee_id in database + self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id) + except Exception as e: message = "Error creating machine on juju: {}".format(e) self.log.error(message) raise N2VCException(message=message) - # id for the execution environment - ee_id = N2VCJujuConnector._build_ee_id( - model_name=model_name, - application_name=application_name, - machine_id=str(machine.entity_id), - ) - self.log.debug("ee_id: {}".format(ee_id)) - # new machine credentials - credentials = dict() - credentials["hostname"] = machine.dns_name + credentials = { + "hostname": machine.dns_name, + } self.log.info( "Execution environment created. ee_id: {}, credentials: {}".format( @@ -318,9 +329,6 @@ class N2VCJujuConnector(N2VCConnector): total_timeout: float = None, ) -> str: - if not self._authenticated: - await self._juju_login() - self.log.info( "Registering execution environment. namespace={}, credentials={}".format( namespace, credentials @@ -360,7 +368,13 @@ class N2VCJujuConnector(N2VCConnector): # register machine on juju try: - machine_id = await self._juju_provision_machine( + if not self.api_proxy: + msg = "Cannot provision machine: api_proxy is not defined" + self.log.error(msg=msg) + raise N2VCException(message=msg) + if not await self.libjuju.model_exists(model_name): + await self.libjuju.add_model(model_name, cloud_name=self.cloud) + machine_id = await self.libjuju.provision_machine( model_name=model_name, hostname=hostname, username=username, @@ -405,9 +419,6 @@ class N2VCJujuConnector(N2VCConnector): ).format(ee_id, artifact_path, db_dict) ) - if not self._authenticated: - await self._juju_login() - # check arguments if ee_id is None or len(ee_id) == 0: raise N2VCBadArgumentsException( @@ -456,10 +467,10 @@ class N2VCJujuConnector(N2VCConnector): full_path = self.fs.path + "/" + artifact_path try: - await self._juju_deploy_charm( + await self.libjuju.deploy_charm( model_name=model_name, application_name=application_name, - charm_path=full_path, + path=full_path, machine_id=machine_id, db_dict=db_dict, progress_timeout=progress_timeout, @@ -487,9 +498,6 @@ class N2VCJujuConnector(N2VCConnector): ).format(ee_id, db_dict) ) - if not self._authenticated: - await self._juju_login() - # check arguments if ee_id is None or len(ee_id) == 0: raise N2VCBadArgumentsException( @@ -525,9 +533,11 @@ class N2VCJujuConnector(N2VCConnector): output = None + application_name = N2VCJujuConnector._format_app_name(application_name) + # execute action: generate-ssh-key try: - output, _status = await self._juju_execute_action( + output, _status = await self.libjuju.execute_action( model_name=model_name, application_name=application_name, action_name="generate-ssh-key", @@ -544,7 +554,7 @@ class N2VCJujuConnector(N2VCConnector): # execute action: get-ssh-public-key try: - output, _status = await self._juju_execute_action( + output, _status = await self.libjuju.execute_action( model_name=model_name, application_name=application_name, action_name="get-ssh-public-key", @@ -555,7 +565,7 @@ class N2VCJujuConnector(N2VCConnector): except Exception as e: msg = "Cannot execute action get-ssh-public-key: {}\n".format(e) self.log.info(msg) - raise N2VCException(msg) + raise N2VCExecutionException(e, primitive_name="get-ssh-public-key") # return public key if exists return output["pubkey"] if "pubkey" in output else output @@ -588,9 +598,6 @@ class N2VCJujuConnector(N2VCConnector): self.log.error(message) raise N2VCBadArgumentsException(message=message, bad_args=["endpoint_2"]) - if not self._authenticated: - await self._juju_login() - # get the model, the applications and the machines from the ee_id's model_1, app_1, _machine_1 = self._get_ee_id_components(ee_id_1) model_2, app_2, _machine_2 = self._get_ee_id_components(ee_id_2) @@ -605,7 +612,7 @@ class N2VCJujuConnector(N2VCConnector): # add juju relations between two applications try: - await self._juju_add_relation( + await self.libjuju.add_relation( model_name=model_1, application_name_1=app_1, application_name_2=app_2, @@ -620,16 +627,11 @@ class N2VCJujuConnector(N2VCConnector): raise N2VCException(message=message) async def remove_relation(self): - if not self._authenticated: - await self._juju_login() # TODO self.log.info("Method not implemented yet") raise MethodNotImplemented() async def deregister_execution_environments(self): - if not self._authenticated: - await self._juju_login() - # TODO self.log.info("Method not implemented yet") raise MethodNotImplemented() @@ -638,9 +640,6 @@ class N2VCJujuConnector(N2VCConnector): ): self.log.info("Deleting namespace={}".format(namespace)) - if not self._authenticated: - await self._juju_login() - # check arguments if namespace is None: raise N2VCBadArgumentsException( @@ -652,7 +651,9 @@ class N2VCJujuConnector(N2VCConnector): ) if ns_id is not None: try: - await self._juju_destroy_model( + if not await self.libjuju.model_exists(ns_id): + raise N2VCNotFound(message="Model {} does not exist".format(ns_id)) + await self.libjuju.destroy_model( model_name=ns_id, total_timeout=total_timeout ) except N2VCNotFound: @@ -673,9 +674,6 @@ class N2VCJujuConnector(N2VCConnector): ): self.log.info("Deleting execution environment ee_id={}".format(ee_id)) - if not self._authenticated: - await self._juju_login() - # check arguments if ee_id is None: raise N2VCBadArgumentsException( @@ -688,8 +686,8 @@ class N2VCJujuConnector(N2VCConnector): # destroy the application try: - await self._juju_destroy_application( - model_name=model_name, application_name=application_name + await self.libjuju.destroy_model( + model_name=model_name, total_timeout=total_timeout ) except Exception as e: raise N2VCException( @@ -728,9 +726,6 @@ class N2VCJujuConnector(N2VCConnector): ) ) - if not self._authenticated: - await self._juju_login() - # check arguments if ee_id is None or len(ee_id) == 0: raise N2VCBadArgumentsException( @@ -760,14 +755,56 @@ class N2VCJujuConnector(N2VCConnector): if primitive_name == "config": # Special case: config primitive try: - await self._juju_configure_application( + await self.libjuju.configure_application( model_name=model_name, application_name=application_name, config=params_dict, - db_dict=db_dict, - progress_timeout=progress_timeout, - total_timeout=total_timeout, ) + actions = await self.libjuju.get_actions( + application_name=application_name, model_name=model_name, + ) + self.log.debug( + "Application {} has these actions: {}".format( + application_name, actions + ) + ) + if "verify-ssh-credentials" in actions: + # execute verify-credentials + num_retries = 20 + retry_timeout = 15.0 + for _ in range(num_retries): + try: + self.log.debug("Executing action verify-ssh-credentials...") + output, ok = await self.libjuju.execute_action( + model_name=model_name, + application_name=application_name, + action_name="verify-ssh-credentials", + db_dict=db_dict, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("Result: {}, output: {}".format(ok, output)) + break + except asyncio.CancelledError: + raise + except Exception as e: + self.log.debug( + "Error executing verify-ssh-credentials: {}. Retrying...".format( + e + ) + ) + await asyncio.sleep(retry_timeout) + else: + self.log.error( + "Error executing verify-ssh-credentials after {} retries. ".format( + num_retries + ) + ) + else: + msg = "Action verify-ssh-credentials does not exist in application {}".format( + application_name + ) + self.log.debug(msg=msg) except Exception as e: self.log.error("Error configuring juju application: {}".format(e)) raise N2VCExecutionException( @@ -779,7 +816,7 @@ class N2VCJujuConnector(N2VCConnector): return "CONFIG OK" else: try: - output, status = await self._juju_execute_action( + output, status = await self.libjuju.execute_action( model_name=model_name, application_name=application_name, action_name=primitive_name, @@ -805,12 +842,17 @@ class N2VCJujuConnector(N2VCConnector): async def disconnect(self): self.log.info("closing juju N2VC...") - await self._juju_logout() + try: + await self.libjuju.disconnect() + except Exception as e: + raise N2VCConnectionException( + message="Error disconnecting controller: {}".format(e), url=self.urldef _write_ee_id_db(self, db_dict: dict, ee_id: str): diff --git a/n2vc/utils.py b/n2vc/utils.py new file mode 100644 index 0000000..990575d --- /dev/null +++ b/n2vc/utils.py @@ -0,0 +1,105 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from juju.machine import Machine +from juju.application import Application +from juju.action import Action +from juju.unit import Unit + + +class N2VCDeploymentStatus(Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + UNKNOWN = "unknown" + + +class Dict(dict): + """ + Dict class that allows to access the keys like attributes + """ + + def __getattribute__(self, name): + if name in self: + return self[name] + + +class EntityType(Enum): + MACHINE = Machine + APPLICATION = Application + ACTION = Action + UNIT = Unit + + @classmethod + def has_value(cls, value): + return value in cls._value2member_map_ # pylint: disable=E1101 + + @classmethod + def get_entity(cls, value): + return ( + cls._value2member_map_[value] # pylint: disable=E1101 + if value in cls._value2member_map_ # pylint: disable=E1101 + else None # pylint: disable=E1101 + ) + + @classmethod + def get_entity_from_delta(cls, delta_entity: str): + """ + Get Value from delta entity + + :param: delta_entity: Possible values are "machine", "application", "unit", "action" + """ + for v in cls._value2member_map_: # pylint: disable=E1101 + if v.__name__.lower() == delta_entity: + return cls.get_entity(v) + + +FinalStatus = Dict( + { + EntityType.MACHINE: Dict({"field": "agent_status", "status": ["started"]}), + EntityType.APPLICATION: Dict( + {"field": "status", "status": ["active", "blocked"]} + ), + EntityType.ACTION: Dict( + {"field": "status", "status": ["completed", "failed", "cancelled"]} + ), + } +) + +JujuStatusToOSM = { + EntityType.MACHINE: { + "pending": N2VCDeploymentStatus.PENDING, + "started": N2VCDeploymentStatus.COMPLETED, + }, + EntityType.APPLICATION: { + "waiting": N2VCDeploymentStatus.RUNNING, + "maintenance": N2VCDeploymentStatus.RUNNING, + "blocked": N2VCDeploymentStatus.RUNNING, + "error": N2VCDeploymentStatus.FAILED, + "active": N2VCDeploymentStatus.COMPLETED, + }, + EntityType.ACTION: { + "running": N2VCDeploymentStatus.RUNNING, + "completed": N2VCDeploymentStatus.COMPLETED, + }, + EntityType.UNIT: { + "waiting": N2VCDeploymentStatus.RUNNING, + "maintenance": N2VCDeploymentStatus.RUNNING, + "blocked": N2VCDeploymentStatus.RUNNING, + "error": N2VCDeploymentStatus.FAILED, + "active": N2VCDeploymentStatus.COMPLETED, + }, +} diff --git a/test-requirements.txt b/test-requirements.txt index 45ed6dc..fe2f4df 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -17,3 +17,4 @@ mock requests-mock coverage==4.5.3 asynctest +juju==2.8.1 \ No newline at end of file diff --git a/tox.ini b/tox.ini index 490d03e..bc316e1 100644 --- a/tox.ini +++ b/tox.ini @@ -65,7 +65,7 @@ commands = [flake8] # W503 is invalid PEP-8 -max-line-length = 88 +max-line-length = 100 show-source = True ignore = W503,E203 exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst