- Add libjuju.py to encapsulate all code related with the communication using libjuju
- Replace juju_observer with juju_watcher
- Add utils with some utilities like EntityType, FinalStatus, and JujuStatusToOSM
Change-Id: I5d46ad65d6ed37b49bf2504209bfdd46d1b5a7e4
Signed-off-by: David Garcia <david.garcia@canonical.com>
"""The Charm can't be found or is not readable."""
+class JujuControllerFailedConnecting(Exception):
+ """Failed connecting to juju controller."""
+
+
+class JujuModelAlreadyExists(Exception):
+ """The model already exists."""
+
+
class JujuApplicationExists(Exception):
"""The Application already exists."""
+class JujuApplicationNotFound(Exception):
+ """The Application cannot be found."""
+
+
+class JujuMachineNotFound(Exception):
+ """The machine cannot be found."""
+
+
class N2VCPrimitiveExecutionFailed(Exception):
"""Something failed while attempting to execute a primitive."""
def __repr__(self):
return self._message
+
+
+class EntityInvalidException(Exception):
+ """Entity is not valid, the type does not match any EntityType."""
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import time
+from juju.client import client
+from n2vc.utils import FinalStatus, EntityType
+from n2vc.exceptions import EntityInvalidException
+from n2vc.n2vc_conn import N2VCConnector
+from juju.model import ModelEntity, Model
+from juju.client.overrides import Delta
+
+import logging
+
+logger = logging.getLogger("__main__")
+
+
+class JujuModelWatcher:
+ @staticmethod
+ async def wait_for(
+ model,
+ entity: ModelEntity,
+ progress_timeout: float = 3600,
+ total_timeout: float = 3600,
+ db_dict: dict = None,
+ n2vc: N2VCConnector = None,
+ ):
+ """
+ Wait for entity to reach its final state.
+
+ :param: model: Model to observe
+ :param: entity: Entity object
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: n2vc: N2VC Connector objector
+
+ :raises: asyncio.TimeoutError when timeout reaches
+ """
+
+ if progress_timeout is None:
+ progress_timeout = 3600.0
+ if total_timeout is None:
+ total_timeout = 3600.0
+
+ entity_type = EntityType.get_entity(type(entity))
+ if entity_type not in FinalStatus:
+ raise EntityInvalidException("Entity type not found")
+
+ # Get final states
+ final_states = FinalStatus[entity_type].status
+ field_to_check = FinalStatus[entity_type].field
+
+ # Coroutine to wait until the entity reaches the final state
+ wait_for_entity = asyncio.ensure_future(
+ asyncio.wait_for(
+ model.block_until(
+ lambda: entity.__getattribute__(field_to_check) in final_states
+ ),
+ timeout=total_timeout,
+ )
+ )
+
+ # Coroutine to watch the model for changes (and write them to DB)
+ watcher = asyncio.ensure_future(
+ JujuModelWatcher.model_watcher(
+ model,
+ entity_id=entity.entity_id,
+ entity_type=entity_type,
+ timeout=progress_timeout,
+ db_dict=db_dict,
+ n2vc=n2vc,
+ )
+ )
+
+ tasks = [wait_for_entity, watcher]
+ try:
+ # Execute tasks, and stop when the first is finished
+ # The watcher task won't never finish (unless it timeouts)
+ await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+ except Exception as e:
+ raise e
+ finally:
+ # Cancel tasks
+ for task in tasks:
+ task.cancel()
+
+ @staticmethod
+ async def model_watcher(
+ model: Model,
+ entity_id: str,
+ entity_type: EntityType,
+ timeout: float,
+ db_dict: dict = None,
+ n2vc: N2VCConnector = None,
+ ):
+ """
+ Observes the changes related to an specific entity in a model
+
+ :param: model: Model to observe
+ :param: entity_id: ID of the entity to be observed
+ :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
+ :param: timeout: Maximum time between two updates in the model
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: n2vc: N2VC Connector objector
+
+ :raises: asyncio.TimeoutError when timeout reaches
+ """
+
+ allwatcher = client.AllWatcherFacade.from_connection(model.connection())
+
+ # Genenerate array with entity types to listen
+ entity_types = (
+ [entity_type, EntityType.UNIT]
+ if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too
+ else [entity_type]
+ )
+
+ # Get time when it should timeout
+ timeout_end = time.time() + timeout
+
+ while True:
+ change = await allwatcher.Next()
+ for delta in change.deltas:
+ write = False
+ delta_entity = None
+
+ # Get delta EntityType
+ delta_entity = EntityType.get_entity_from_delta(delta.entity)
+
+ if delta_entity in entity_types:
+ # Get entity id
+ if entity_type == EntityType.APPLICATION:
+ id = (
+ delta.data["application"]
+ if delta_entity == EntityType.UNIT
+ else delta.data["name"]
+ )
+ else:
+ id = delta.data["id"]
+
+ # Write if the entity id match
+ write = True if id == entity_id else False
+
+ # Update timeout
+ timeout_end = time.time() + timeout
+ (status, status_message, vca_status) = JujuModelWatcher.get_status(
+ delta, entity_type=delta_entity
+ )
+
+ if write and n2vc is not None and db_dict:
+ # Write status to DB
+ status = n2vc.osm_status(delta_entity, status)
+ await n2vc.write_app_status_to_db(
+ db_dict=db_dict,
+ status=status,
+ detailed_status=status_message,
+ vca_status=vca_status,
+ entity_type=delta_entity.value.__name__.lower(),
+ )
+ # Check if timeout
+ if time.time() > timeout_end:
+ raise asyncio.TimeoutError()
+
+ @staticmethod
+ def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str):
+ """
+ Get status from delta
+
+ :param: delta: Delta generated by the allwatcher
+ :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
+
+ :return (status, message, vca_status)
+ """
+ if entity_type == EntityType.MACHINE:
+ return (
+ delta.data["agent-status"]["current"],
+ delta.data["instance-status"]["message"],
+ delta.data["instance-status"]["current"],
+ )
+ elif entity_type == EntityType.ACTION:
+ return (
+ delta.data["status"],
+ delta.data["status"],
+ delta.data["status"],
+ )
+ elif entity_type == EntityType.APPLICATION:
+ return (
+ delta.data["status"]["current"],
+ delta.data["status"]["message"],
+ delta.data["status"]["current"],
+ )
+ elif entity_type == EntityType.UNIT:
+ return (
+ delta.data["workload-status"]["current"],
+ delta.data["workload-status"]["message"],
+ delta.data["workload-status"]["current"],
+ )
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import logging
+from juju.controller import Controller
+from juju.client.connector import NoConnectionException
+from juju.client import client
+import time
+
+from juju.errors import JujuAPIError
+from juju.model import Model
+from juju.machine import Machine
+from juju.application import Application
+from juju.client._definitions import FullStatus
+from n2vc.juju_watcher import JujuModelWatcher
+from n2vc.provisioner import AsyncSSHProvisioner
+from n2vc.n2vc_conn import N2VCConnector
+from n2vc.exceptions import (
+ JujuMachineNotFound,
+ JujuApplicationNotFound,
+ JujuModelAlreadyExists,
+ JujuControllerFailedConnecting,
+ JujuApplicationExists,
+)
+
+
+class Libjuju:
+ def __init__(
+ self,
+ endpoint: str,
+ api_proxy: str,
+ username: str,
+ password: str,
+ cacert: str,
+ loop: asyncio.AbstractEventLoop = None,
+ log: logging.Logger = None,
+ db: dict = None,
+ n2vc: N2VCConnector = None,
+ apt_mirror: str = None,
+ enable_os_upgrade: bool = True,
+ ):
+ """
+ Constructor
+
+ :param: endpoint: Endpoint of the juju controller (host:port)
+ :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
+ :param: username: Juju username
+ :param: password: Juju password
+ :param: cacert: Juju CA Certificate
+ :param: loop: Asyncio loop
+ :param: log: Logger
+ :param: db: DB object
+ :param: n2vc: N2VC object
+ :param: apt_mirror: APT Mirror
+ :param: enable_os_upgrade: Enable OS Upgrade
+ """
+
+ self.endpoints = [endpoint] # TODO: Store and get endpoints from DB
+ self.api_proxy = api_proxy
+ self.username = username
+ self.password = password
+ self.cacert = cacert
+ self.loop = loop or asyncio.get_event_loop()
+ self.log = log or logging.getLogger("Libjuju")
+ self.db = db
+ self.n2vc = n2vc
+
+ # Generate config for models
+ self.model_config = {}
+ if apt_mirror:
+ self.model_config["apt-mirror"] = apt_mirror
+ self.model_config["enable-os-refresh-update"] = enable_os_upgrade
+ self.model_config["enable-os-upgrade"] = enable_os_upgrade
+
+ self.reconnecting = asyncio.Lock(loop=self.loop)
+ self.creating_model = asyncio.Lock(loop=self.loop)
+
+ self.models = set()
+ self.controller = Controller(loop=self.loop)
+
+ self.loop.run_until_complete(self.connect())
+
+ async def connect(self):
+ """Connect to the controller"""
+
+ self.log.debug("Connecting from controller")
+ await self.controller.connect(
+ endpoint=self.endpoints,
+ username=self.username,
+ password=self.password,
+ cacert=self.cacert,
+ )
+ e = self.controller.connection().endpoint
+ self.log.info("Connected to controller: {}".format(e))
+
+ async def disconnect(self):
+ """Disconnect from controller"""
+
+ self.log.debug("Disconnecting from controller")
+ await self.controller.disconnect()
+ self.log.info("Disconnected from controller")
+
+ def controller_connected(self) -> bool:
+ """Check if the controller connection is open
+
+ :return: bool: True if connected, False if not connected
+ """
+
+ is_connected = False
+ try:
+ is_connected = self.controller.connection().is_open
+ except NoConnectionException:
+ self.log.warning("VCA not connected")
+ return is_connected
+
+ async def disconnect_model(self, model: Model):
+ """
+ Disconnect model
+
+ :param: model: Model that will be disconnected
+ """
+ try:
+ await model.disconnect()
+ except Exception:
+ pass
+
+ async def _reconnect(
+ self,
+ retry: bool = False,
+ timeout: int = 5,
+ time_between_retries: int = 3,
+ maximum_retries: int = 0,
+ ):
+ """
+ Reconnect to the controller
+
+ :param: retry: Set it to True to retry if the connection fails
+ :param: time_between_retries: Time in seconds between retries
+ :param: maximum_retries Maximum retries. If not set, it will retry forever
+
+ :raises: Exception if cannot connect to the controller
+ """
+
+ if self.reconnecting.locked():
+ # Return if another function is trying to reconnect
+ return
+ async with self.reconnecting:
+ attempt = 0
+ while True:
+ try:
+ await asyncio.wait_for(self.connect(), timeout=timeout)
+ break
+ except asyncio.TimeoutError:
+ self.log.error("Error reconnecting to controller: Timeout")
+ except Exception as e:
+ self.log.error("Error reconnecting to controller: {}".format(e))
+
+ attempt += 1
+ maximum_retries_reached = attempt == maximum_retries
+
+ if not retry or maximum_retries_reached:
+ raise JujuControllerFailedConnecting("Controller is not connected")
+ else:
+ await asyncio.sleep(time_between_retries)
+
+ async def add_model(self, model_name: str, cloud_name: str):
+ """
+ Create model
+
+ :param: model_name: Model name
+ :param: cloud_name: Cloud name
+ """
+
+ # Reconnect to the controller if not connected
+ if not self.controller_connected():
+ await self._reconnect()
+
+ # Raise exception if model already exists
+ if await self.model_exists(model_name):
+ raise JujuModelAlreadyExists("Model {} already exists.".format(model_name))
+
+ # Block until other workers have finished model creation
+ while self.creating_model.locked():
+ await asyncio.sleep(0.1)
+
+ # If the model exists, return it from the controller
+ if model_name in self.models:
+ return await self.get_model(model_name)
+
+ # Create the model
+ self.log.debug("Creating model {}".format(model_name))
+ async with self.creating_model:
+ model = await self.controller.add_model(
+ model_name,
+ config=self.model_config,
+ cloud_name=cloud_name,
+ credential_name=cloud_name,
+ )
+ await self.disconnect_model(model)
+ self.models.add(model_name)
+
+ async def get_model(self, model_name: str) -> Model:
+ """
+ Get model from controller
+
+ :param: model_name: Model name
+
+ :return: Model: The created Juju model object
+ """
+
+ # Check if controller is connected
+ if not self.controller_connected():
+ await self._reconnect()
+ return await self.controller.get_model(model_name)
+
+ async def model_exists(self, model_name: str) -> bool:
+ """
+ Check if model exists
+
+ :param: model_name: Model name
+
+ :return bool
+ """
+
+ # Check if controller is connected
+ if not self.controller_connected():
+ await self._reconnect()
+
+ return model_name in await self.controller.list_models()
+
+ async def get_model_status(self, model_name: str) -> FullStatus:
+ """
+ Get model status
+
+ :param: model_name: Model name
+
+ :return: Full status object
+ """
+ model = await self.get_model(model_name)
+ status = await model.get_status()
+ await self.disconnect_model(model)
+ return status
+
+ async def create_machine(
+ self,
+ model_name: str,
+ machine_id: str = None,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ series: str = "xenial",
+ ) -> (Machine, bool):
+ """
+ Create machine
+
+ :param: model_name: Model name
+ :param: machine_id: Machine id
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+
+ :return: (juju.machine.Machine, bool): Machine object and a boolean saying
+ if the machine is new or it already existed
+ """
+ new = False
+ machine = None
+
+ self.log.debug(
+ "Creating machine (id={}) in model: {}".format(machine_id, model_name)
+ )
+
+ # Get model
+ model = await self.get_model(model_name)
+ try:
+ if machine_id is not None:
+ self.log.debug(
+ "Searching machine (id={}) in model {}".format(
+ machine_id, model_name
+ )
+ )
+
+ # Get machines from model and get the machine with machine_id if exists
+ machines = await model.get_machines()
+ if machine_id in machines:
+ self.log.debug(
+ "Machine (id={}) found in model {}".format(
+ machine_id, model_name
+ )
+ )
+ machine = model.machines[machine_id]
+ else:
+ raise JujuMachineNotFound("Machine {} not found".format(machine_id))
+
+ if machine is None:
+ self.log.debug("Creating a new machine in model {}".format(model_name))
+
+ # Create machine
+ machine = await model.add_machine(
+ spec=None, constraints=None, disks=None, series=series
+ )
+ new = True
+
+ # Wait until the machine is ready
+ await JujuModelWatcher.wait_for(
+ model=model,
+ entity=machine,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ db_dict=db_dict,
+ n2vc=self.n2vc,
+ )
+ except Exception as e:
+ raise e
+ finally:
+ await self.disconnect_model(model)
+
+ self.log.debug("Machine ready at {}".format(machine.dns_name))
+ return machine, new
+
+ async def provision_machine(
+ self,
+ model_name: str,
+ hostname: str,
+ username: str,
+ private_key_path: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> str:
+ """
+ Manually provisioning of a machine
+
+ :param: model_name: Model name
+ :param: hostname: IP to access the machine
+ :param: username: Username to login to the machine
+ :param: private_key_path: Local path for the private key
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+
+ :return: (Entity): Machine id
+ """
+ self.log.debug(
+ "Provisioning machine. model: {}, hostname: {}, username: {}".format(
+ model_name, hostname, username
+ )
+ )
+
+ # Get model
+ model = await self.get_model(model_name)
+
+ try:
+ # Get provisioner
+ provisioner = AsyncSSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log,
+ )
+
+ # Provision machine
+ params = await provisioner.provision_machine()
+
+ params.jobs = ["JobHostUnits"]
+
+ self.log.debug("Adding machine to model")
+ connection = model.connection()
+ client_facade = client.ClientFacade.from_connection(connection)
+
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+
+ if error:
+ msg = "Error adding machine: {}".format(error.message)
+ self.log.error(msg=msg)
+ raise ValueError(msg)
+
+ machine_id = results.machines[0].machine
+
+ self.log.debug("Installing Juju agent into machine {}".format(machine_id))
+ asyncio.ensure_future(
+ provisioner.install_agent(
+ connection=connection,
+ nonce=params.nonce,
+ machine_id=machine_id,
+ api=self.api_proxy,
+ )
+ )
+
+ machine = None
+ for _ in range(10):
+ machine_list = await model.get_machines()
+ if machine_id in machine_list:
+ self.log.debug("Machine {} found in model!".format(machine_id))
+ machine = model.machines.get(machine_id)
+ break
+ await asyncio.sleep(2)
+
+ if machine is None:
+ msg = "Machine {} not found in model".format(machine_id)
+ self.log.error(msg=msg)
+ raise JujuMachineNotFound(msg)
+
+ await JujuModelWatcher.wait_for(
+ model=model,
+ entity=machine,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ db_dict=db_dict,
+ n2vc=self.n2vc,
+ )
+ except Exception as e:
+ raise e
+ finally:
+ await self.disconnect_model(model)
+
+ self.log.debug("Machine provisioned {}".format(machine_id))
+
+ return machine_id
+
+ async def deploy_charm(
+ self,
+ application_name: str,
+ path: str,
+ model_name: str,
+ machine_id: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ series: str = None,
+ ):
+ """Deploy charm
+
+ :param: application_name: Application name
+ :param: path: Local path to the charm
+ :param: model_name: Model name
+ :param: machine_id ID of the machine
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+ :param: config: Config for the charm
+ :param: series: Series of the charm
+
+ :return: (juju.application.Application): Juju application
+ """
+
+ # Get model
+ model = await self.get_model(model_name)
+
+ try:
+ application = None
+ if application_name not in model.applications:
+ self.log.debug(
+ "Deploying charm {} to machine {} in model ~{}".format(
+ application_name, machine_id, model_name
+ )
+ )
+ self.log.debug("charm: {}".format(path))
+ if machine_id is not None:
+ if machine_id not in model.machines:
+ msg = "Machine {} not found in model".format(machine_id)
+ self.log.error(msg=msg)
+ raise JujuMachineNotFound(msg)
+ machine = model.machines[machine_id]
+ series = machine.series
+
+ application = await model.deploy(
+ entity_url=path,
+ application_name=application_name,
+ channel="stable",
+ num_units=1,
+ series=series,
+ to=machine_id,
+ config=config,
+ )
+
+ await JujuModelWatcher.wait_for(
+ model=model,
+ entity=application,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ db_dict=db_dict,
+ n2vc=self.n2vc,
+ )
+ else:
+ raise JujuApplicationExists("Application {} exists".format(application_name))
+
+ except Exception as e:
+ raise e
+ finally:
+ await self.disconnect_model(model)
+
+ self.log.debug("application deployed")
+
+ return application
+
+ async def _get_application(
+ self, model: Model, application_name: str
+ ) -> Application:
+ """Get application
+
+ :param: model: Model object
+ :param: application_name: Application name
+
+ :return: juju.application.Application (or None if it doesn't exist)
+ """
+ if model.applications and application_name in model.applications:
+ return model.applications[application_name]
+
+ async def execute_action(
+ self,
+ application_name: str,
+ model_name: str,
+ action_name: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ **kwargs
+ ):
+ """Execute action
+
+ :param: application_name: Application name
+ :param: model_name: Model name
+ :param: cloud_name: Cloud name
+ :param: action_name: Name of the action
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+
+ :return: (str, str): (output and status)
+ """
+ # Get model and observer
+ model = await self.get_model(model_name)
+
+ try:
+ # Get application
+ application = await self._get_application(
+ model, application_name=application_name,
+ )
+ if application is None:
+ raise JujuApplicationNotFound("Cannot execute action")
+
+ # Get unit
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ if unit is None:
+ raise Exception("Cannot execute action: leader unit not found")
+
+ actions = await application.get_actions()
+
+ if action_name not in actions:
+ raise Exception(
+ "Action {} not in available actions".format(action_name)
+ )
+
+ self.log.debug(
+ "Executing action {} using params {}".format(action_name, kwargs)
+ )
+ action = await unit.run_action(action_name, **kwargs)
+
+ # Register action with observer and wait for it to finish
+ await JujuModelWatcher.wait_for(
+ model=model,
+ entity=action,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ db_dict=db_dict,
+ n2vc=self.n2vc,
+ )
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+ status = (
+ status[action.entity_id] if action.entity_id in status else "failed"
+ )
+
+ self.log.debug("action completed with status: {}".format(action.status))
+ except Exception as e:
+ raise e
+ finally:
+ await self.disconnect_model(model)
+
+ return output, status
+
+ async def get_actions(self, application_name: str, model_name: str) -> dict:
+ """Get list of actions
+
+ :param: application_name: Application name
+ :param: model_name: Model name
+
+ :return: Dict with this format
+ {
+ "action_name": "Description of the action",
+ ...
+ }
+ """
+
+ # Get model
+ model = await self.get_model(model_name)
+
+ # Get application
+ application = await self._get_application(
+ model, application_name=application_name,
+ )
+
+ # Get list of actions
+ actions = await application.get_actions()
+
+ # Disconnect from model
+ await self.disconnect_model(model)
+
+ return actions
+
+ async def add_relation(
+ self,
+ model_name: str,
+ application_name_1: str,
+ application_name_2: str,
+ relation_1: str,
+ relation_2: str,
+ ):
+ """Add relation
+
+ :param: model_name: Model name
+ :param: application_name_1 First application name
+ :param: application_name_2: Second application name
+ :param: relation_1: First relation name
+ :param: relation_2: Second relation name
+ """
+
+ # Get model
+ model = await self.get_model(model_name)
+
+ # Build relation strings
+ r1 = "{}:{}".format(application_name_1, relation_1)
+ r2 = "{}:{}".format(application_name_2, relation_2)
+
+ # Add relation
+ self.log.debug("Adding relation: {} -> {}".format(r1, r2))
+ try:
+ await model.add_relation(relation1=r1, relation2=r2)
+ except JujuAPIError as e:
+ if "not found" in e.message:
+ self.log.warning("Relation not found: {}".format(e.message))
+ return
+ if "already exists" in e.message:
+ self.log.warning("Relation already exists: {}".format(e.message))
+ return
+ # another exception, raise it
+ raise e
+ finally:
+ await self.disconnect_model(model)
+
+ async def destroy_model(
+ self, model_name: str, total_timeout: float,
+ ):
+ """
+ Destroy model
+
+ :param: model_name: Model name
+ :param: total_timeout: Timeout
+ """
+ model = await self.get_model(model_name)
+ uuid = model.info.uuid
+
+ # Destroy applications
+ for application_name in model.applications:
+ try:
+ await self.destroy_application(
+ model, application_name=application_name,
+ )
+ except Exception as e:
+ self.log.error(
+ "Error destroying application {} in model {}: {}".format(
+ application_name, model_name, e
+ )
+ )
+
+ # Destroy machines
+ machines = await model.get_machines()
+ for machine_id in machines:
+ try:
+ await self.destroy_machine(
+ model, machine_id=machine_id, total_timeout=total_timeout,
+ )
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ pass
+
+ # Disconnect model
+ await self.disconnect_model(model)
+
+ # Destroy model
+ self.models.remove(model_name)
+ await self.controller.destroy_model(uuid)
+
+ # Wait until model is destroyed
+ self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+ last_exception = ""
+
+ if total_timeout is None:
+ total_timeout = 3600
+ end = time.time() + total_timeout
+ while time.time() < end:
+ try:
+ models = await self.controller.list_models()
+ if model_name not in models:
+ self.log.debug(
+ "The model {} ({}) was destroyed".format(model_name, uuid)
+ )
+ return
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ last_exception = e
+ await asyncio.sleep(5)
+ raise Exception(
+ "Timeout waiting for model {} to be destroyed {}".format(
+ model_name, last_exception
+ )
+ )
+
+ async def destroy_application(self, model: Model, application_name: str):
+ """
+ Destroy application
+
+ :param: model: Model object
+ :param: application_name: Application name
+ """
+ self.log.debug(
+ "Destroying application {} in model {}".format(
+ application_name, model.info.name
+ )
+ )
+ application = model.applications.get(application_name)
+ if application:
+ await application.destroy()
+ else:
+ self.log.warning("Application not found: {}".format(application_name))
+
+ async def destroy_machine(
+ self, model: Model, machine_id: str, total_timeout: float = 3600
+ ):
+ """
+ Destroy machine
+
+ :param: model: Model object
+ :param: machine_id: Machine id
+ :param: total_timeout: Timeout in seconds
+ """
+ machines = await model.get_machines()
+ if machine_id in machines:
+ machine = model.machines[machine_id]
+ # TODO: change this by machine.is_manual when this is upstreamed:
+ # https://github.com/juju/python-libjuju/pull/396
+ if "instance-id" in machine.safe_data and machine.safe_data[
+ "instance-id"
+ ].startswith("manual:"):
+ await machine.destroy(force=True)
+
+ # max timeout
+ end = time.time() + total_timeout
+
+ # wait for machine removal
+ machines = await model.get_machines()
+ while machine_id in machines and time.time() < end:
+ self.log.debug(
+ "Waiting for machine {} is destroyed".format(machine_id)
+ )
+ await asyncio.sleep(0.5)
+ machines = await model.get_machines()
+ self.log.debug("Machine destroyed: {}".format(machine_id))
+ else:
+ self.log.debug("Machine not found: {}".format(machine_id))
+
+ async def configure_application(
+ self, model_name: str, application_name: str, config: dict = None
+ ):
+ """Configure application
+
+ :param: model_name: Model name
+ :param: application_name: Application name
+ :param: config: Config to apply to the charm
+ """
+ if config:
+ model = await self.get_model(model_name)
+ application = await self._get_application(
+ model, application_name=application_name,
+ )
+ await application.set_config(config)
+ await self.disconnect_model(model)
import abc
import asyncio
-from enum import Enum
from http import HTTPStatus
import os
import shlex
import yaml
from n2vc.loggable import Loggable
-
-
-class N2VCDeploymentStatus(Enum):
- PENDING = "pending"
- RUNNING = "running"
- COMPLETED = "completed"
- FAILED = "failed"
- UNKNOWN = "unknown"
+from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
class N2VCConnector(abc.ABC, Loggable):
else:
self.log.info("Exception writing status to database: {}".format(e))
+ def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
+ if status not in JujuStatusToOSM[entity_type]:
+ self.log.warning("Status {} not found in JujuStatusToOSM.")
+ return N2VCDeploymentStatus.UNKNOWN
+ return JujuStatusToOSM[entity_type][status]
+
+# DEPRECATED
def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
if statustype == "application" or statustype == "unit":
if status in ["waiting", "maintenance"]:
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
from n2vc.provisioner import AsyncSSHProvisioner
+from n2vc.libjuju import Libjuju
class N2VCJujuConnector(N2VCConnector):
"""
- ####################################################################################
- ################################### P U B L I C ####################################
- ####################################################################################
+####################################################################################
+################################### P U B L I C ####################################
+####################################################################################
"""
BUILT_IN_CLOUDS = ["localhost", "microk8s"]
False # it will be True when juju connection be stablished
)
self._creating_model = False # True during model creation
+ self.libjuju = Libjuju(
+ endpoint=self.url,
+ api_proxy=self.api_proxy,
+ enable_os_upgrade=self.enable_os_upgrade,
+ apt_mirror=self.apt_mirror,
+ username=self.username,
+ password=self.secret,
+ cacert=self.ca_cert,
+ loop=self.loop,
+ log=self.log,
+ db=self.db,
+ n2vc=self,
+ )
# create juju pub key file in lcm container at
# ./local/share/juju/ssh/juju_id_rsa.pub
# self.log.info('Getting NS status. namespace: {}'.format(namespace))
- if not self._authenticated:
- await self._juju_login()
-
_nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components(
namespace=namespace
)
self.log.error(msg)
raise N2VCBadArgumentsException(msg, ["namespace"])
- # get juju model (create model if needed)
- model = await self._juju_get_model(model_name=model_name)
-
- status = await model.get_status()
+ status = await self.libjuju.get_model_status(model_name)
if yaml_format:
return obj_to_yaml(status)
)
)
- if not self._authenticated:
- await self._juju_login()
-
machine_id = None
if reuse_ee_id:
model_name, application_name, machine_id = self._get_ee_id_components(
# create or reuse a new juju machine
try:
- machine = await self._juju_create_machine(
+ if not await self.libjuju.model_exists(model_name):
+ await self.libjuju.add_model(model_name, cloud_name=self.cloud)
+ machine, new = await self.libjuju.create_machine(
model_name=model_name,
- application_name=application_name,
machine_id=machine_id,
db_dict=db_dict,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
)
+ # id for the execution environment
+ ee_id = N2VCJujuConnector._build_ee_id(
+ model_name=model_name,
+ application_name=application_name,
+ machine_id=str(machine.entity_id),
+ )
+ self.log.debug("ee_id: {}".format(ee_id))
+
+ if new:
+ # write ee_id in database
+ self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
+
except Exception as e:
message = "Error creating machine on juju: {}".format(e)
self.log.error(message)
raise N2VCException(message=message)
- # id for the execution environment
- ee_id = N2VCJujuConnector._build_ee_id(
- model_name=model_name,
- application_name=application_name,
- machine_id=str(machine.entity_id),
- )
- self.log.debug("ee_id: {}".format(ee_id))
-
# new machine credentials
- credentials = dict()
- credentials["hostname"] = machine.dns_name
+ credentials = {
+ "hostname": machine.dns_name,
+ }
self.log.info(
"Execution environment created. ee_id: {}, credentials: {}".format(
total_timeout: float = None,
) -> str:
- if not self._authenticated:
- await self._juju_login()
-
self.log.info(
"Registering execution environment. namespace={}, credentials={}".format(
namespace, credentials
# register machine on juju
try:
- machine_id = await self._juju_provision_machine(
+ if not self.api_proxy:
+ msg = "Cannot provision machine: api_proxy is not defined"
+ self.log.error(msg=msg)
+ raise N2VCException(message=msg)
+ if not await self.libjuju.model_exists(model_name):
+ await self.libjuju.add_model(model_name, cloud_name=self.cloud)
+ machine_id = await self.libjuju.provision_machine(
model_name=model_name,
hostname=hostname,
username=username,
).format(ee_id, artifact_path, db_dict)
)
- if not self._authenticated:
- await self._juju_login()
-
# check arguments
if ee_id is None or len(ee_id) == 0:
raise N2VCBadArgumentsException(
full_path = self.fs.path + "/" + artifact_path
try:
- await self._juju_deploy_charm(
+ await self.libjuju.deploy_charm(
model_name=model_name,
application_name=application_name,
- charm_path=full_path,
+ path=full_path,
machine_id=machine_id,
db_dict=db_dict,
progress_timeout=progress_timeout,
).format(ee_id, db_dict)
)
- if not self._authenticated:
- await self._juju_login()
-
# check arguments
if ee_id is None or len(ee_id) == 0:
raise N2VCBadArgumentsException(
output = None
+ application_name = N2VCJujuConnector._format_app_name(application_name)
+
# execute action: generate-ssh-key
try:
- output, _status = await self._juju_execute_action(
+ output, _status = await self.libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name="generate-ssh-key",
# execute action: get-ssh-public-key
try:
- output, _status = await self._juju_execute_action(
+ output, _status = await self.libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name="get-ssh-public-key",
except Exception as e:
msg = "Cannot execute action get-ssh-public-key: {}\n".format(e)
self.log.info(msg)
- raise N2VCException(msg)
+ raise N2VCExecutionException(e, primitive_name="get-ssh-public-key")
# return public key if exists
return output["pubkey"] if "pubkey" in output else output
self.log.error(message)
raise N2VCBadArgumentsException(message=message, bad_args=["endpoint_2"])
- if not self._authenticated:
- await self._juju_login()
-
# get the model, the applications and the machines from the ee_id's
model_1, app_1, _machine_1 = self._get_ee_id_components(ee_id_1)
model_2, app_2, _machine_2 = self._get_ee_id_components(ee_id_2)
# add juju relations between two applications
try:
- await self._juju_add_relation(
+ await self.libjuju.add_relation(
model_name=model_1,
application_name_1=app_1,
application_name_2=app_2,
raise N2VCException(message=message)
async def remove_relation(self):
- if not self._authenticated:
- await self._juju_login()
# TODO
self.log.info("Method not implemented yet")
raise MethodNotImplemented()
async def deregister_execution_environments(self):
- if not self._authenticated:
- await self._juju_login()
- # TODO
self.log.info("Method not implemented yet")
raise MethodNotImplemented()
):
self.log.info("Deleting namespace={}".format(namespace))
- if not self._authenticated:
- await self._juju_login()
-
# check arguments
if namespace is None:
raise N2VCBadArgumentsException(
)
if ns_id is not None:
try:
- await self._juju_destroy_model(
+ if not await self.libjuju.model_exists(ns_id):
+ raise N2VCNotFound(message="Model {} does not exist".format(ns_id))
+ await self.libjuju.destroy_model(
model_name=ns_id, total_timeout=total_timeout
)
except N2VCNotFound:
):
self.log.info("Deleting execution environment ee_id={}".format(ee_id))
- if not self._authenticated:
- await self._juju_login()
-
# check arguments
if ee_id is None:
raise N2VCBadArgumentsException(
# destroy the application
try:
- await self._juju_destroy_application(
- model_name=model_name, application_name=application_name
+ await self.libjuju.destroy_model(
+ model_name=model_name, total_timeout=total_timeout
)
except Exception as e:
raise N2VCException(
)
)
- if not self._authenticated:
- await self._juju_login()
-
# check arguments
if ee_id is None or len(ee_id) == 0:
raise N2VCBadArgumentsException(
if primitive_name == "config":
# Special case: config primitive
try:
- await self._juju_configure_application(
+ await self.libjuju.configure_application(
model_name=model_name,
application_name=application_name,
config=params_dict,
- db_dict=db_dict,
- progress_timeout=progress_timeout,
- total_timeout=total_timeout,
)
+ actions = await self.libjuju.get_actions(
+ application_name=application_name, model_name=model_name,
+ )
+ self.log.debug(
+ "Application {} has these actions: {}".format(
+ application_name, actions
+ )
+ )
+ if "verify-ssh-credentials" in actions:
+ # execute verify-credentials
+ num_retries = 20
+ retry_timeout = 15.0
+ for _ in range(num_retries):
+ try:
+ self.log.debug("Executing action verify-ssh-credentials...")
+ output, ok = await self.libjuju.execute_action(
+ model_name=model_name,
+ application_name=application_name,
+ action_name="verify-ssh-credentials",
+ db_dict=db_dict,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("Result: {}, output: {}".format(ok, output))
+ break
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ self.log.debug(
+ "Error executing verify-ssh-credentials: {}. Retrying...".format(
+ e
+ )
+ )
+ await asyncio.sleep(retry_timeout)
+ else:
+ self.log.error(
+ "Error executing verify-ssh-credentials after {} retries. ".format(
+ num_retries
+ )
+ )
+ else:
+ msg = "Action verify-ssh-credentials does not exist in application {}".format(
+ application_name
+ )
+ self.log.debug(msg=msg)
except Exception as e:
self.log.error("Error configuring juju application: {}".format(e))
raise N2VCExecutionException(
return "CONFIG OK"
else:
try:
- output, status = await self._juju_execute_action(
+ output, status = await self.libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name=primitive_name,
async def disconnect(self):
self.log.info("closing juju N2VC...")
- await self._juju_logout()
+ try:
+ await self.libjuju.disconnect()
+ except Exception as e:
+ raise N2VCConnectionException(
+ message="Error disconnecting controller: {}".format(e), url=self.url
+ )
"""
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
+####################################################################################
+################################### P R I V A T E ##################################
+####################################################################################
"""
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from enum import Enum
+from juju.machine import Machine
+from juju.application import Application
+from juju.action import Action
+from juju.unit import Unit
+
+
+class N2VCDeploymentStatus(Enum):
+ PENDING = "pending"
+ RUNNING = "running"
+ COMPLETED = "completed"
+ FAILED = "failed"
+ UNKNOWN = "unknown"
+
+
+class Dict(dict):
+ """
+ Dict class that allows to access the keys like attributes
+ """
+
+ def __getattribute__(self, name):
+ if name in self:
+ return self[name]
+
+
+class EntityType(Enum):
+ MACHINE = Machine
+ APPLICATION = Application
+ ACTION = Action
+ UNIT = Unit
+
+ @classmethod
+ def has_value(cls, value):
+ return value in cls._value2member_map_ # pylint: disable=E1101
+
+ @classmethod
+ def get_entity(cls, value):
+ return (
+ cls._value2member_map_[value] # pylint: disable=E1101
+ if value in cls._value2member_map_ # pylint: disable=E1101
+ else None # pylint: disable=E1101
+ )
+
+ @classmethod
+ def get_entity_from_delta(cls, delta_entity: str):
+ """
+ Get Value from delta entity
+
+ :param: delta_entity: Possible values are "machine", "application", "unit", "action"
+ """
+ for v in cls._value2member_map_: # pylint: disable=E1101
+ if v.__name__.lower() == delta_entity:
+ return cls.get_entity(v)
+
+
+FinalStatus = Dict(
+ {
+ EntityType.MACHINE: Dict({"field": "agent_status", "status": ["started"]}),
+ EntityType.APPLICATION: Dict(
+ {"field": "status", "status": ["active", "blocked"]}
+ ),
+ EntityType.ACTION: Dict(
+ {"field": "status", "status": ["completed", "failed", "cancelled"]}
+ ),
+ }
+)
+
+JujuStatusToOSM = {
+ EntityType.MACHINE: {
+ "pending": N2VCDeploymentStatus.PENDING,
+ "started": N2VCDeploymentStatus.COMPLETED,
+ },
+ EntityType.APPLICATION: {
+ "waiting": N2VCDeploymentStatus.RUNNING,
+ "maintenance": N2VCDeploymentStatus.RUNNING,
+ "blocked": N2VCDeploymentStatus.RUNNING,
+ "error": N2VCDeploymentStatus.FAILED,
+ "active": N2VCDeploymentStatus.COMPLETED,
+ },
+ EntityType.ACTION: {
+ "running": N2VCDeploymentStatus.RUNNING,
+ "completed": N2VCDeploymentStatus.COMPLETED,
+ },
+ EntityType.UNIT: {
+ "waiting": N2VCDeploymentStatus.RUNNING,
+ "maintenance": N2VCDeploymentStatus.RUNNING,
+ "blocked": N2VCDeploymentStatus.RUNNING,
+ "error": N2VCDeploymentStatus.FAILED,
+ "active": N2VCDeploymentStatus.COMPLETED,
+ },
+}
requests-mock
coverage==4.5.3
asynctest
+juju==2.8.1
\ No newline at end of file
[flake8]
# W503 is invalid PEP-8
-max-line-length = 88
+max-line-length = 100
show-source = True
ignore = W503,E203
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst