X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Flibjuju.py;h=028cea820e6d5ef5414c7362253381a48f6302c0;hp=7a730335553392c5041706ce28e6303155bff94f;hb=7e887b22fdc176021b215c3b83a052276fdbeefc;hpb=b0a8f409e149715bf37d30c414474888c8a499f3 diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py index 7a73033..028cea8 100644 --- a/n2vc/libjuju.py +++ b/n2vc/libjuju.py @@ -14,10 +14,11 @@ import asyncio import logging +import typing import time -from juju.errors import JujuAPIError +import juju.errors from juju.model import Model from juju.machine import Machine from juju.application import Application @@ -32,6 +33,7 @@ from juju.controller import Controller from juju.client import client from juju import tag +from n2vc.definitions import Offer, RelationEndpoint from n2vc.juju_watcher import JujuModelWatcher from n2vc.provisioner import AsyncSSHProvisioner from n2vc.n2vc_conn import N2VCConnector @@ -43,10 +45,13 @@ from n2vc.exceptions import ( JujuControllerFailedConnecting, JujuApplicationExists, JujuInvalidK8sConfiguration, + JujuError, ) -from n2vc.utils import DB_DATA -from osm_common.dbbase import DbException +from n2vc.vca.cloud import Cloud as VcaCloud +from n2vc.vca.connection import Connection from kubernetes.client.configuration import Configuration +from retrying_async import retry + RBAC_LABEL_KEY_NAME = "rbac-id" @@ -54,68 +59,35 @@ RBAC_LABEL_KEY_NAME = "rbac-id" class Libjuju: def __init__( self, - endpoint: str, - api_proxy: str, - username: str, - password: str, - cacert: str, + vca_connection: Connection, loop: asyncio.AbstractEventLoop = None, log: logging.Logger = None, - db: dict = None, n2vc: N2VCConnector = None, - apt_mirror: str = None, - enable_os_upgrade: bool = True, ): """ Constructor - :param: endpoint: Endpoint of the juju controller (host:port) - :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs - :param: username: Juju username - :param: password: Juju password - :param: cacert: Juju CA Certificate + :param: vca_connection: n2vc.vca.connection object :param: loop: Asyncio loop :param: log: Logger - :param: db: DB object :param: n2vc: N2VC object - :param: apt_mirror: APT Mirror - :param: enable_os_upgrade: Enable OS Upgrade """ self.log = log or logging.getLogger("Libjuju") - self.db = db - db_endpoints = self._get_api_endpoints_db() - self.endpoints = None - if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints: - self.endpoints = [endpoint] - self._update_api_endpoints_db(self.endpoints) - else: - self.endpoints = db_endpoints - self.api_proxy = api_proxy - self.username = username - self.password = password - self.cacert = cacert - self.loop = loop or asyncio.get_event_loop() self.n2vc = n2vc + self.vca_connection = vca_connection - # Generate config for models - self.model_config = {} - if apt_mirror: - self.model_config["apt-mirror"] = apt_mirror - self.model_config["enable-os-refresh-update"] = enable_os_upgrade - self.model_config["enable-os-upgrade"] = enable_os_upgrade - + self.loop = loop or asyncio.get_event_loop() self.loop.set_exception_handler(self.handle_exception) self.creating_model = asyncio.Lock(loop=self.loop) - self.log.debug("Libjuju initialized!") - - self.health_check_task = self._create_health_check_task() + if self.vca_connection.is_default: + self.health_check_task = self._create_health_check_task() def _create_health_check_task(self): return self.loop.create_task(self.health_check()) - async def get_controller(self, timeout: float = 15.0) -> Controller: + async def get_controller(self, timeout: float = 60.0) -> Controller: """ Get controller @@ -123,26 +95,30 @@ class Libjuju: """ controller = None try: - controller = Controller(loop=self.loop) + controller = Controller() await asyncio.wait_for( controller.connect( - endpoint=self.endpoints, - username=self.username, - password=self.password, - cacert=self.cacert, + endpoint=self.vca_connection.data.endpoints, + username=self.vca_connection.data.user, + password=self.vca_connection.data.secret, + cacert=self.vca_connection.data.cacert, ), timeout=timeout, ) - endpoints = await controller.api_endpoints - if self.endpoints != endpoints: - self.endpoints = endpoints - self._update_api_endpoints_db(self.endpoints) + if self.vca_connection.is_default: + endpoints = await controller.api_endpoints + if not all( + endpoint in self.vca_connection.endpoints for endpoint in endpoints + ): + await self.vca_connection.update_endpoints(endpoints) return controller except asyncio.CancelledError as e: raise e except Exception as e: self.log.error( - "Failed connecting to controller: {}...".format(self.endpoints) + "Failed connecting to controller: {}... {}".format( + self.vca_connection.data.endpoints, e + ) ) if controller: await self.disconnect_controller(controller) @@ -171,14 +147,13 @@ class Libjuju: if controller: await controller.disconnect() - async def add_model(self, model_name: str, cloud_name: str, credential_name=None): + @retry(attempts=3, delay=5, timeout=None) + async def add_model(self, model_name: str, cloud: VcaCloud): """ Create model :param: model_name: Model name - :param: cloud_name: Cloud name - :param: credential_name: Credential name to use for adding the model - If not specified, same name as the cloud will be used. + :param: cloud: Cloud object """ # Get controller @@ -196,18 +171,99 @@ class Libjuju: self.log.debug("Creating model {}".format(model_name)) model = await controller.add_model( model_name, - config=self.model_config, - cloud_name=cloud_name, - credential_name=credential_name or cloud_name, + config=self.vca_connection.data.model_config, + cloud_name=cloud.name, + credential_name=cloud.credential_name, ) + except juju.errors.JujuAPIError as e: + if "already exists" in e.message: + pass + else: + raise e finally: if model: await self.disconnect_model(model) await self.disconnect_controller(controller) - async def get_model( - self, controller: Controller, model_name: str, id=None - ) -> Model: + async def get_executed_actions(self, model_name: str) -> list: + """ + Get executed/history of actions for a model. + + :param: model_name: Model name, str. + :return: List of executed actions for a model. + """ + model = None + executed_actions = [] + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + # Get all unique action names + actions = {} + for application in model.applications: + application_actions = await self.get_actions(application, model_name) + actions.update(application_actions) + # Get status of all actions + for application_action in actions: + app_action_status_list = await model.get_action_status( + name=application_action + ) + for action_id, action_status in app_action_status_list.items(): + executed_action = { + "id": action_id, + "action": application_action, + "status": action_status, + } + # Get action output by id + action_status = await model.get_action_output(executed_action["id"]) + for k, v in action_status.items(): + executed_action[k] = v + executed_actions.append(executed_action) + except Exception as e: + raise JujuError( + "Error in getting executed actions for model: {}. Error: {}".format( + model_name, str(e) + ) + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) + return executed_actions + + async def get_application_configs( + self, model_name: str, application_name: str + ) -> dict: + """ + Get available configs for an application. + + :param: model_name: Model name, str. + :param: application_name: Application name, str. + + :return: A dict which has key - action name, value - action description + """ + model = None + application_configs = {} + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + application = self._get_application( + model, application_name=application_name + ) + application_configs = await application.get_config() + except Exception as e: + raise JujuError( + "Error in getting configs for application: {} in model: {}. Error: {}".format( + application_name, model_name, str(e) + ) + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) + return application_configs + + @retry(attempts=3, delay=5) + async def get_model(self, controller: Controller, model_name: str) -> Model: """ Get model from controller @@ -290,7 +346,7 @@ class Libjuju: db_dict: dict = None, progress_timeout: float = None, total_timeout: float = None, - series: str = "xenial", + series: str = "bionic", wait: bool = True, ) -> (Machine, bool): """ @@ -362,6 +418,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) finally: await self.disconnect_model(model) @@ -443,7 +500,7 @@ class Libjuju: connection=connection, nonce=params.nonce, machine_id=machine_id, - proxy=self.api_proxy, + proxy=self.vca_connection.data.api_proxy, series=params.series, ) ) @@ -474,6 +531,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) except Exception as e: raise e @@ -501,7 +559,7 @@ class Libjuju: controller = await self.get_controller() model = await self.get_model(controller, model_name) try: - await model.deploy(uri) + await model.deploy(uri, trust=True) if wait: await JujuModelWatcher.wait_for_model(model, timeout=timeout) self.log.debug("All units active in model {}".format(model_name)) @@ -509,6 +567,146 @@ class Libjuju: await self.disconnect_model(model) await self.disconnect_controller(controller) + async def add_unit( + self, + application_name: str, + model_name: str, + machine_id: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ): + """Add unit + + :param: application_name: Application name + :param: model_name: Model name + :param: machine_id Machine id + :param: db_dict: Dictionary with data of the DB to write the updates + :param: progress_timeout: Maximum time between two updates in the model + :param: total_timeout: Timeout for the entity to be active + + :return: None + """ + + model = None + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + application = self._get_application(model, application_name) + + if application is not None: + + # Checks if the given machine id in the model, + # otherwise function raises an error + _machine, _series = self._get_machine_info(model, machine_id) + + self.log.debug( + "Adding unit (machine {}) to application {} in model ~{}".format( + machine_id, application_name, model_name + ) + ) + + await application.add_unit(to=machine_id) + + await JujuModelWatcher.wait_for( + model=model, + entity=application, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + db_dict=db_dict, + n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, + ) + self.log.debug( + "Unit is added to application {} in model {}".format( + application_name, model_name + ) + ) + else: + raise JujuApplicationNotFound( + "Application {} not exists".format(application_name) + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) + + async def destroy_unit( + self, + application_name: str, + model_name: str, + machine_id: str, + total_timeout: float = None, + ): + """Destroy unit + + :param: application_name: Application name + :param: model_name: Model name + :param: machine_id Machine id + :param: total_timeout: Timeout for the entity to be active + + :return: None + """ + + model = None + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + application = self._get_application(model, application_name) + + if application is None: + raise JujuApplicationNotFound( + "Application not found: {} (model={})".format( + application_name, model_name + ) + ) + + unit = self._get_unit(application, machine_id) + if not unit: + raise JujuError( + "A unit with machine id {} not in available units".format( + machine_id + ) + ) + + unit_name = unit.name + + self.log.debug( + "Destroying unit {} from application {} in model {}".format( + unit_name, application_name, model_name + ) + ) + await application.destroy_unit(unit_name) + + self.log.debug( + "Waiting for unit {} to be destroyed in application {} (model={})...".format( + unit_name, application_name, model_name + ) + ) + + # TODO: Add functionality in the Juju watcher to replace this kind of blocks + if total_timeout is None: + total_timeout = 3600 + end = time.time() + total_timeout + while time.time() < end: + if not self._get_unit(application, machine_id): + self.log.debug( + "The unit {} was destroyed in application {} (model={}) ".format( + unit_name, application_name, model_name + ) + ) + return + await asyncio.sleep(5) + self.log.debug( + "Unit {} is destroyed from application {} in model {}".format( + unit_name, application_name, model_name + ) + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) + async def deploy_charm( self, application_name: str, @@ -551,16 +749,10 @@ class Libjuju: model = await self.get_model(controller, model_name) try: - application = None if application_name not in model.applications: if machine_id is not None: - if machine_id not in model.machines: - msg = "Machine {} not found in model".format(machine_id) - self.log.error(msg=msg) - raise JujuMachineNotFound(msg) - machine = model.machines[machine_id] - series = machine.series + machine, series = self._get_machine_info(model, machine_id) application = await model.deploy( entity_url=path, @@ -589,6 +781,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) self.log.debug( "Application {} is ready in model {}".format( @@ -599,12 +792,96 @@ class Libjuju: raise JujuApplicationExists( "Application {} exists".format(application_name) ) + except juju.errors.JujuError as e: + if "already exists" in e.message: + raise JujuApplicationExists( + "Application {} exists".format(application_name) + ) + else: + raise e finally: await self.disconnect_model(model) await self.disconnect_controller(controller) return application + async def scale_application( + self, + model_name: str, + application_name: str, + scale: int = 1, + total_timeout: float = None, + ): + """ + Scale application (K8s) + + :param: model_name: Model name + :param: application_name: Application name + :param: scale: Scale to which to set this application + :param: total_timeout: Timeout for the entity to be active + """ + + model = None + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + + self.log.debug( + "Scaling application {} in model {}".format( + application_name, model_name + ) + ) + application = self._get_application(model, application_name) + if application is None: + raise JujuApplicationNotFound("Cannot scale application") + await application.scale(scale=scale) + # Wait until application is scaled in model + self.log.debug( + "Waiting for application {} to be scaled in model {}...".format( + application_name, model_name + ) + ) + if total_timeout is None: + total_timeout = 1800 + end = time.time() + total_timeout + while time.time() < end: + application_scale = self._get_application_count(model, application_name) + # Before calling wait_for_model function, + # wait until application unit count and scale count are equal. + # Because there is a delay before scaling triggers in Juju model. + if application_scale == scale: + await JujuModelWatcher.wait_for_model( + model=model, timeout=total_timeout + ) + self.log.debug( + "Application {} is scaled in model {}".format( + application_name, model_name + ) + ) + return + await asyncio.sleep(5) + raise Exception( + "Timeout waiting for application {} in model {} to be scaled".format( + application_name, model_name + ) + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) + + def _get_application_count(self, model: Model, application_name: str) -> int: + """Get number of units of the application + + :param: model: Model object + :param: application_name: Application name + + :return: int (or None if application doesn't exist) + """ + application = self._get_application(model, application_name) + if application is not None: + return len(application.units) + def _get_application(self, model: Model, application_name: str) -> Application: """Get application @@ -616,15 +893,50 @@ class Libjuju: if model.applications and application_name in model.applications: return model.applications[application_name] + def _get_unit(self, application: Application, machine_id: str) -> Unit: + """Get unit + + :param: application: Application object + :param: machine_id: Machine id + + :return: Unit + """ + unit = None + for u in application.units: + if u.machine_id == machine_id: + unit = u + break + return unit + + def _get_machine_info( + self, + model, + machine_id: str, + ) -> (str, str): + """Get machine info + + :param: model: Model object + :param: machine_id: Machine id + + :return: (str, str): (machine, series) + """ + if machine_id not in model.machines: + msg = "Machine {} not found in model".format(machine_id) + self.log.error(msg=msg) + raise JujuMachineNotFound(msg) + machine = model.machines[machine_id] + return machine, machine.series + async def execute_action( self, application_name: str, model_name: str, action_name: str, db_dict: dict = None, + machine_id: str = None, progress_timeout: float = None, total_timeout: float = None, - **kwargs + **kwargs, ): """Execute action @@ -632,6 +944,7 @@ class Libjuju: :param: model_name: Model name :param: action_name: Name of the action :param: db_dict: Dictionary with data of the DB to write the updates + :param: machine_id Machine id :param: progress_timeout: Maximum time between two updates in the model :param: total_timeout: Timeout for the entity to be active @@ -654,25 +967,30 @@ class Libjuju: ) if application is None: raise JujuApplicationNotFound("Cannot execute action") - - # Get leader unit # Racing condition: # Ocassionally, self._get_leader_unit() will return None # because the leader elected hook has not been triggered yet. # Therefore, we are doing some retries. If it happens again, # re-open bug 1236 - attempts = 3 - time_between_retries = 10 - unit = None - for _ in range(attempts): + if machine_id is None: unit = await self._get_leader_unit(application) - if unit is None: - await asyncio.sleep(time_between_retries) - else: - break - if unit is None: - raise JujuLeaderUnitNotFound( - "Cannot execute action: leader unit not found" + self.log.debug( + "Action {} is being executed on the leader unit {}".format( + action_name, unit.name + ) + ) + else: + unit = self._get_unit(application, machine_id) + if not unit: + raise JujuError( + "A unit with machine id {} not in available units".format( + machine_id + ) + ) + self.log.debug( + "Action {} is being executed on {} unit".format( + action_name, unit.name + ) ) actions = await application.get_actions() @@ -696,6 +1014,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) output = await model.get_action_output(action_uuid=action.entity_id) @@ -797,7 +1116,7 @@ class Libjuju: # Add relation try: await model.add_relation(endpoint_1, endpoint_2) - except JujuAPIError as e: + except juju.errors.JujuAPIError as e: if "not found" in e.message: self.log.warning("Relation not found: {}".format(e.message)) return @@ -810,31 +1129,74 @@ class Libjuju: await self.disconnect_model(model) await self.disconnect_controller(controller) + async def offer(self, endpoint: RelationEndpoint) -> Offer: + """ + Create an offer from a RelationEndpoint + + :param: endpoint: Relation endpoint + + :return: Offer object + """ + model_name = endpoint.model_name + offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}" + controller = await self.get_controller() + model = None + try: + model = await self.get_model(controller, model_name) + await model.create_offer(endpoint.endpoint, offer_name=offer_name) + offer_list = await self._list_offers(model_name, offer_name=offer_name) + if offer_list: + return Offer(offer_list[0].offer_url) + else: + raise Exception("offer was not created") + except juju.errors.JujuError as e: + if "application offer already exists" not in e.message: + raise e + finally: + if model: + self.disconnect_model(model) + self.disconnect_controller(controller) + async def consume( self, - offer_url: str, model_name: str, - ): + offer: Offer, + provider_libjuju: "Libjuju", + ) -> str: """ - Adds a remote offer to the model. Relations can be created later using "juju relate". + Consumes a remote offer in the model. Relations can be created later using "juju relate". - :param: offer_url: Offer Url - :param: model_name: Model name + :param: model_name: Model name + :param: offer: Offer object to consume + :param: provider_libjuju: Libjuju object of the provider endpoint :raises ParseError if there's a problem parsing the offer_url :raises JujuError if remote offer includes and endpoint :raises JujuAPIError if the operation is not successful + + :returns: Saas name. It is the application name in the model that reference the remote application. """ + saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}' + if offer.vca_id: + saas_name = f"{saas_name}-{offer.vca_id}" controller = await self.get_controller() - model = await controller.get_model(model_name) - + model = None + provider_controller = None try: - await model.consume(offer_url) + model = await controller.get_model(model_name) + provider_controller = await provider_libjuju.get_controller() + await model.consume( + offer.url, application_alias=saas_name, controller=provider_controller + ) + return saas_name finally: - await self.disconnect_model(model) + if model: + await self.disconnect_model(model) + if provider_controller: + await provider_libjuju.disconnect_controller(provider_controller) await self.disconnect_controller(controller) - async def destroy_model(self, model_name: str, total_timeout: float): + async def destroy_model(self, model_name: str, total_timeout: float = 1800): """ Destroy model @@ -848,60 +1210,108 @@ class Libjuju: if not await self.model_exists(model_name, controller=controller): return - model = await self.get_model(controller, model_name) self.log.debug("Destroying model {}".format(model_name)) - uuid = model.info.uuid + model = await self.get_model(controller, model_name) # Destroy machines that are manually provisioned # and still are in pending state await self._destroy_pending_machines(model, only_manual=True) - - # Disconnect model await self.disconnect_model(model) - await controller.destroy_model(uuid, force=True, max_wait=0) + await self._destroy_model( + model_name, + controller, + timeout=total_timeout, + ) + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller) - # Wait until model is destroyed - self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) + async def _destroy_model( + self, model_name: str, controller: Controller, timeout: float = 1800 + ): + """ + Destroy model from controller - if total_timeout is None: - total_timeout = 3600 - end = time.time() + total_timeout - while time.time() < end: - models = await controller.list_models() - if model_name not in models: - self.log.debug( - "The model {} ({}) was destroyed".format(model_name, uuid) - ) - return + :param: model: Model name to be removed + :param: controller: Controller object + :param: timeout: Timeout in seconds + """ + + async def _destroy_model_loop(model_name: str, controller: Controller): + while await self.model_exists(model_name, controller=controller): + await controller.destroy_model( + model_name, destroy_storage=True, force=True, max_wait=0 + ) await asyncio.sleep(5) + + try: + await asyncio.wait_for( + _destroy_model_loop(model_name, controller), timeout=timeout + ) + except asyncio.TimeoutError: raise Exception( "Timeout waiting for model {} to be destroyed".format(model_name) ) - except Exception as e: - if model: - await self.disconnect_model(model) + except juju.errors.JujuError as e: + if any("has been removed" in error for error in e.errors): + return raise e - finally: - await self.disconnect_controller(controller) - async def destroy_application(self, model: Model, application_name: str): + async def destroy_application( + self, model_name: str, application_name: str, total_timeout: float + ): """ Destroy application - :param: model: Model object + :param: model_name: Model name :param: application_name: Application name + :param: total_timeout: Timeout """ - self.log.debug( - "Destroying application {} in model {}".format( - application_name, model.info.name + + controller = await self.get_controller() + model = None + + try: + model = await self.get_model(controller, model_name) + self.log.debug( + "Destroying application {} in model {}".format( + application_name, model_name + ) ) - ) - application = model.applications.get(application_name) - if application: - await application.destroy() - else: - self.log.warning("Application not found: {}".format(application_name)) + application = self._get_application(model, application_name) + if application: + await application.destroy() + else: + self.log.warning("Application not found: {}".format(application_name)) + + self.log.debug( + "Waiting for application {} to be destroyed in model {}...".format( + application_name, model_name + ) + ) + if total_timeout is None: + total_timeout = 3600 + end = time.time() + total_timeout + while time.time() < end: + if not self._get_application(model, application_name): + self.log.debug( + "The application {} was destroyed in model {} ".format( + application_name, model_name + ) + ) + return + await asyncio.sleep(5) + raise Exception( + "Timeout waiting for application {} to be destroyed in model {}".format( + application_name, model_name + ) + ) + finally: + if model is not None: + await self.disconnect_model(model) + await self.disconnect_controller(controller) async def _destroy_pending_machines(self, model: Model, only_manual: bool = False): """ @@ -946,57 +1356,6 @@ class Libjuju: await self.disconnect_model(model) await self.disconnect_controller(controller) - def _get_api_endpoints_db(self) -> [str]: - """ - Get API Endpoints from DB - - :return: List of API endpoints - """ - self.log.debug("Getting endpoints from database") - - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - if juju_info and DB_DATA.api_endpoints.key in juju_info: - return juju_info[DB_DATA.api_endpoints.key] - - def _update_api_endpoints_db(self, endpoints: [str]): - """ - Update API endpoints in Database - - :param: List of endpoints - """ - self.log.debug("Saving endpoints {} in database".format(endpoints)) - - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - # If it doesn't, then create it - if not juju_info: - try: - self.db.create( - DB_DATA.api_endpoints.table, - DB_DATA.api_endpoints.filter, - ) - except DbException as e: - # Racing condition: check if another N2VC worker has created it - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - if not juju_info: - raise e - self.db.set_one( - DB_DATA.api_endpoints.table, - DB_DATA.api_endpoints.filter, - {DB_DATA.api_endpoints.key: endpoints}, - ) - def handle_exception(self, loop, context): # All unhandled exceptions by libjuju are handled here. pass @@ -1035,17 +1394,29 @@ class Libjuju: finally: await self.disconnect_controller(controller) - async def list_offers(self, model_name: str) -> QueryApplicationOffersResults: - """List models with certain names + async def _list_offers( + self, model_name: str, offer_name: str = None + ) -> QueryApplicationOffersResults: + """ + List offers within a model :param: model_name: Model name + :param: offer_name: Offer name to filter. - :return: Returns list of offers + :return: Returns application offers results in the model """ controller = await self.get_controller() try: - return await controller.list_offers(model_name) + offers = (await controller.list_offers(model_name)).results + if offer_name: + matching_offer = [] + for offer in offers: + if offer.offer_name == offer_name: + matching_offer.append(offer) + break + offers = matching_offer + return offers finally: await self.disconnect_controller(controller) @@ -1185,23 +1556,65 @@ class Libjuju: controller = await self.get_controller() try: await controller.remove_cloud(name) + except juju.errors.JujuError as e: + if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]: + self.log.warning(f"Cloud {name} not found, so it could not be deleted.") + else: + raise e finally: await self.disconnect_controller(controller) + @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound()) async def _get_leader_unit(self, application: Application) -> Unit: unit = None for u in application.units: if await u.is_leader_from_status(): unit = u break + if not unit: + raise Exception() return unit - async def get_cloud_credentials(self, cloud_name: str, credential_name: str): + async def get_cloud_credentials(self, cloud: Cloud) -> typing.List: + """ + Get cloud credentials + + :param: cloud: Cloud object. The returned credentials will be from this cloud. + + :return: List of credentials object associated to the specified cloud + + """ controller = await self.get_controller() try: facade = client.CloudFacade.from_connection(controller.connection()) - cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name) + cloud_cred_tag = tag.credential( + cloud.name, self.vca_connection.data.user, cloud.credential_name + ) params = [client.Entity(cloud_cred_tag)] return (await facade.Credential(params)).results finally: await self.disconnect_controller(controller) + + async def check_application_exists(self, model_name, application_name) -> bool: + """Check application exists + + :param: model_name: Model Name + :param: application_name: Application Name + + :return: Boolean + """ + + model = None + controller = await self.get_controller() + try: + model = await self.get_model(controller, model_name) + self.log.debug( + "Checking if application {} exists in model {}".format( + application_name, model_name + ) + ) + return self._get_application(model, application_name) is not None + finally: + if model: + await self.disconnect_model(model) + await self.disconnect_controller(controller)