Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / libjuju.py
index b897bc6..f36ff39 100644 (file)
 
 import asyncio
 import logging
 
 import asyncio
 import logging
+import os
 import typing
 import typing
+import yaml
 
 import time
 
 import juju.errors
 
 import time
 
 import juju.errors
+from juju.bundle import BundleHandler
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
 from juju.unit import Unit
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
 from juju.unit import Unit
+from juju.url import URL
+from juju.version import DEFAULT_ARCHITECTURE
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
@@ -33,6 +38,7 @@ from juju.controller import Controller
 from juju.client import client
 from juju import tag
 
 from juju.client import client
 from juju import tag
 
+from n2vc.definitions import Offer, RelationEndpoint
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -55,11 +61,18 @@ from retrying_async import retry
 RBAC_LABEL_KEY_NAME = "rbac-id"
 
 
 RBAC_LABEL_KEY_NAME = "rbac-id"
 
 
+@asyncio.coroutine
+def retry_callback(attempt, exc, args, kwargs, delay=0.5, *, loop):
+    # Specifically overridden from upstream implementation so it can
+    # continue to work with Python 3.10
+    yield from asyncio.sleep(attempt * delay)
+    return retry
+
+
 class Libjuju:
     def __init__(
         self,
         vca_connection: Connection,
 class Libjuju:
     def __init__(
         self,
         vca_connection: Connection,
-        loop: asyncio.AbstractEventLoop = None,
         log: logging.Logger = None,
         n2vc: N2VCConnector = None,
     ):
         log: logging.Logger = None,
         n2vc: N2VCConnector = None,
     ):
@@ -67,7 +80,6 @@ class Libjuju:
         Constructor
 
         :param: vca_connection:         n2vc.vca.connection object
         Constructor
 
         :param: vca_connection:         n2vc.vca.connection object
-        :param: loop:                   Asyncio loop
         :param: log:                    Logger
         :param: n2vc:                   N2VC object
         """
         :param: log:                    Logger
         :param: n2vc:                   N2VC object
         """
@@ -76,15 +88,13 @@ class Libjuju:
         self.n2vc = n2vc
         self.vca_connection = vca_connection
 
         self.n2vc = n2vc
         self.vca_connection = vca_connection
 
-        self.loop = loop or asyncio.get_event_loop()
-        self.loop.set_exception_handler(self.handle_exception)
-        self.creating_model = asyncio.Lock(loop=self.loop)
+        self.creating_model = asyncio.Lock()
 
         if self.vca_connection.is_default:
             self.health_check_task = self._create_health_check_task()
 
     def _create_health_check_task(self):
 
         if self.vca_connection.is_default:
             self.health_check_task = self._create_health_check_task()
 
     def _create_health_check_task(self):
-        return self.loop.create_task(self.health_check())
+        return asyncio.get_event_loop().create_task(self.health_check())
 
     async def get_controller(self, timeout: float = 60.0) -> Controller:
         """
 
     async def get_controller(self, timeout: float = 60.0) -> Controller:
         """
@@ -94,7 +104,7 @@ class Libjuju:
         """
         controller = None
         try:
         """
         controller = None
         try:
-            controller = Controller(loop=self.loop)
+            controller = Controller()
             await asyncio.wait_for(
                 controller.connect(
                     endpoint=self.vca_connection.data.endpoints,
             await asyncio.wait_for(
                 controller.connect(
                     endpoint=self.vca_connection.data.endpoints,
@@ -121,7 +131,10 @@ class Libjuju:
             )
             if controller:
                 await self.disconnect_controller(controller)
             )
             if controller:
                 await self.disconnect_controller(controller)
-            raise JujuControllerFailedConnecting(e)
+
+            raise JujuControllerFailedConnecting(
+                f"Error connecting to Juju controller: {e}"
+            )
 
     async def disconnect(self):
         """Disconnect"""
 
     async def disconnect(self):
         """Disconnect"""
@@ -146,7 +159,7 @@ class Libjuju:
         if controller:
             await controller.disconnect()
 
         if controller:
             await controller.disconnect()
 
-    @retry(attempts=3, delay=5, timeout=None)
+    @retry(attempts=3, delay=5, timeout=None, callback=retry_callback)
     async def add_model(self, model_name: str, cloud: VcaCloud):
         """
         Create model
     async def add_model(self, model_name: str, cloud: VcaCloud):
         """
         Create model
@@ -261,7 +274,7 @@ class Libjuju:
             await self.disconnect_controller(controller)
         return application_configs
 
             await self.disconnect_controller(controller)
         return application_configs
 
-    @retry(attempts=3, delay=5)
+    @retry(attempts=3, delay=5, callback=retry_callback)
     async def get_model(self, controller: Controller, model_name: str) -> Model:
         """
         Get model from controller
     async def get_model(self, controller: Controller, model_name: str) -> Model:
         """
         Get model from controller
@@ -545,27 +558,122 @@ class Libjuju:
         return machine_id
 
     async def deploy(
         return machine_id
 
     async def deploy(
-        self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+        self,
+        uri: str,
+        model_name: str,
+        wait: bool = True,
+        timeout: float = 3600,
+        instantiation_params: dict = None,
     ):
         """
         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
 
     ):
         """
         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
 
-        :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
-        :param: model_name:     Model name
-        :param: wait:           Indicates whether to wait or not until all applications are active
-        :param: timeout:        Time in seconds to wait until all applications are active
+        :param uri:            Path or Charm Store uri in which the charm or bundle can be found
+        :param model_name:     Model name
+        :param wait:           Indicates whether to wait or not until all applications are active
+        :param timeout:        Time in seconds to wait until all applications are active
+        :param instantiation_params: To be applied as overlay bundle over primary bundle.
         """
         controller = await self.get_controller()
         model = await self.get_model(controller, model_name)
         """
         controller = await self.get_controller()
         model = await self.get_model(controller, model_name)
+        overlays = []
         try:
         try:
-            await model.deploy(uri, trust=True)
+            await self._validate_instantiation_params(uri, model, instantiation_params)
+            overlays = self._get_overlays(model_name, instantiation_params)
+            await model.deploy(uri, trust=True, overlays=overlays)
             if wait:
                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
                 self.log.debug("All units active in model {}".format(model_name))
         finally:
             if wait:
                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
                 self.log.debug("All units active in model {}".format(model_name))
         finally:
+            self._remove_overlay_file(overlays)
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
+    async def _validate_instantiation_params(
+        self, uri: str, model, instantiation_params: dict
+    ) -> None:
+        """Checks if all the applications in instantiation_params
+        exist ins the original bundle.
+
+        Raises:
+            JujuApplicationNotFound if there is an invalid app in
+            the instantiation params.
+        """
+        overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
+        if not overlay_apps:
+            return
+        original_apps = await self._get_apps_in_original_bundle(uri, model)
+        if not all(app in original_apps for app in overlay_apps):
+            raise JujuApplicationNotFound(
+                "Cannot find application {} in original bundle {}".format(
+                    overlay_apps, original_apps
+                )
+            )
+
+    async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
+        """Bundle is downloaded in BundleHandler.fetch_plan.
+        That method takes care of opening and exception handling.
+
+        Resolve method gets all the information regarding the channel,
+        track, revision, type, source.
+
+        Returns:
+            Set with the names of the applications in original bundle.
+        """
+        url = URL.parse(uri)
+        architecture = DEFAULT_ARCHITECTURE  # only AMD64 is allowed
+        res = await model.deploy_types[str(url.schema)].resolve(
+            url, architecture, entity_url=uri
+        )
+        handler = BundleHandler(model, trusted=True, forced=False)
+        await handler.fetch_plan(url, res.origin)
+        return handler.applications
+
+    def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
+        """Extract applications key in instantiation params.
+
+        Returns:
+            List with the names of the applications in instantiation params.
+
+        Raises:
+            JujuError if applications key is not found.
+        """
+        if not instantiation_params:
+            return []
+        try:
+            return [key for key in instantiation_params.get("applications")]
+        except Exception as e:
+            raise JujuError("Invalid overlay format. {}".format(str(e)))
+
+    def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
+        """Creates a temporary overlay file which includes the instantiation params.
+        Only one overlay file is created.
+
+        Returns:
+            List with one overlay filename. Empty list if there are no instantiation params.
+        """
+        if not instantiation_params:
+            return []
+        file_name = model_name + "-overlay.yaml"
+        self._write_overlay_file(file_name, instantiation_params)
+        return [file_name]
+
+    def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
+        with open(file_name, "w") as file:
+            yaml.dump(instantiation_params, file)
+
+    def _remove_overlay_file(self, overlay: list) -> None:
+        """Overlay contains either one or zero file names."""
+        if not overlay:
+            return
+        try:
+            filename = overlay[0]
+            os.remove(filename)
+        except OSError as e:
+            self.log.warning(
+                "Overlay file {} could not be removed: {}".format(filename, e)
+            )
+
     async def add_unit(
         self,
         application_name: str,
     async def add_unit(
         self,
         application_name: str,
@@ -594,7 +702,6 @@ class Libjuju:
             application = self._get_application(model, application_name)
 
             if application is not None:
             application = self._get_application(model, application_name)
 
             if application is not None:
-
                 # Checks if the given machine id in the model,
                 # otherwise function raises an error
                 _machine, _series = self._get_machine_info(model, machine_id)
                 # Checks if the given machine id in the model,
                 # otherwise function raises an error
                 _machine, _series = self._get_machine_info(model, machine_id)
@@ -749,7 +856,6 @@ class Libjuju:
 
         try:
             if application_name not in model.applications:
 
         try:
             if application_name not in model.applications:
-
                 if machine_id is not None:
                     machine, series = self._get_machine_info(model, machine_id)
 
                 if machine_id is not None:
                     machine, series = self._get_machine_info(model, machine_id)
 
@@ -791,12 +897,164 @@ class Libjuju:
                 raise JujuApplicationExists(
                     "Application {} exists".format(application_name)
                 )
                 raise JujuApplicationExists(
                     "Application {} exists".format(application_name)
                 )
+        except juju.errors.JujuError as e:
+            if "already exists" in e.message:
+                raise JujuApplicationExists(
+                    "Application {} exists".format(application_name)
+                )
+            else:
+                raise e
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
+        return application
+
+    async def upgrade_charm(
+        self,
+        application_name: str,
+        path: str,
+        model_name: str,
+        total_timeout: float = None,
+        **kwargs,
+    ):
+        """Upgrade Charm
+
+        :param: application_name:   Application name
+        :param: model_name:         Model name
+        :param: path:               Local path to the charm
+        :param: total_timeout:      Timeout for the entity to be active
+
+        :return: (str, str): (output and status)
+        """
+
+        self.log.debug(
+            "Upgrading charm {} in model {} from path {}".format(
+                application_name, model_name, path
+            )
+        )
+
+        await self.resolve_application(
+            model_name=model_name, application_name=application_name
+        )
+
+        # Get controller
+        controller = await self.get_controller()
+
+        # Get model
+        model = await self.get_model(controller, model_name)
+
+        try:
+            # Get application
+            application = self._get_application(
+                model,
+                application_name=application_name,
+            )
+            if application is None:
+                raise JujuApplicationNotFound(
+                    "Cannot find application {} to upgrade".format(application_name)
+                )
+
+            await application.refresh(path=path)
+
+            self.log.debug(
+                "Wait until charm upgrade is completed for application {} (model={})".format(
+                    application_name, model_name
+                )
+            )
+
+            await JujuModelWatcher.ensure_units_idle(
+                model=model, application=application
+            )
+
+            if application.status == "error":
+                error_message = "Unknown"
+                for unit in application.units:
+                    if (
+                        unit.workload_status == "error"
+                        and unit.workload_status_message != ""
+                    ):
+                        error_message = unit.workload_status_message
+
+                message = "Application {} failed update in {}: {}".format(
+                    application_name, model_name, error_message
+                )
+                self.log.error(message)
+                raise JujuError(message=message)
+
+            self.log.debug(
+                "Application {} is ready in model {}".format(
+                    application_name, model_name
+                )
+            )
+
         finally:
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
         return application
 
         finally:
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
         return application
 
+    async def resolve_application(self, model_name: str, application_name: str):
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+
+        try:
+            application = self._get_application(
+                model,
+                application_name=application_name,
+            )
+            if application is None:
+                raise JujuApplicationNotFound(
+                    "Cannot find application {} to resolve".format(application_name)
+                )
+
+            while application.status == "error":
+                for unit in application.units:
+                    if unit.workload_status == "error":
+                        self.log.debug(
+                            "Model {}, Application {}, Unit {} in error state, resolving".format(
+                                model_name, application_name, unit.entity_id
+                            )
+                        )
+                        try:
+                            await unit.resolved(retry=False)
+                        except Exception:
+                            pass
+
+                await asyncio.sleep(1)
+
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
+    async def resolve(self, model_name: str):
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        all_units_active = False
+        try:
+            while not all_units_active:
+                all_units_active = True
+                for application_name, application in model.applications.items():
+                    if application.status == "error":
+                        for unit in application.units:
+                            if unit.workload_status == "error":
+                                self.log.debug(
+                                    "Model {}, Application {}, Unit {} in error state, resolving".format(
+                                        model_name, application_name, unit.entity_id
+                                    )
+                                )
+                                try:
+                                    await unit.resolved(retry=False)
+                                    all_units_active = False
+                                except Exception:
+                                    pass
+
+                if not all_units_active:
+                    await asyncio.sleep(5)
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
     async def scale_application(
         self,
         model_name: str,
     async def scale_application(
         self,
         model_name: str,
@@ -1109,10 +1367,10 @@ class Libjuju:
         try:
             await model.add_relation(endpoint_1, endpoint_2)
         except juju.errors.JujuAPIError as e:
         try:
             await model.add_relation(endpoint_1, endpoint_2)
         except juju.errors.JujuAPIError as e:
-            if "not found" in e.message:
+            if self._relation_is_not_found(e):
                 self.log.warning("Relation not found: {}".format(e.message))
                 return
                 self.log.warning("Relation not found: {}".format(e.message))
                 return
-            if "already exists" in e.message:
+            if self._relation_already_exist(e):
                 self.log.warning("Relation already exists: {}".format(e.message))
                 return
             # another exception, raise it
                 self.log.warning("Relation already exists: {}".format(e.message))
                 return
             # another exception, raise it
@@ -1121,28 +1379,83 @@ class Libjuju:
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
+    def _relation_is_not_found(self, juju_error):
+        text = "not found"
+        return (text in juju_error.message) or (
+            juju_error.error_code and text in juju_error.error_code
+        )
+
+    def _relation_already_exist(self, juju_error):
+        text = "already exists"
+        return (text in juju_error.message) or (
+            juju_error.error_code and text in juju_error.error_code
+        )
+
+    async def offer(self, endpoint: RelationEndpoint) -> Offer:
+        """
+        Create an offer from a RelationEndpoint
+
+        :param: endpoint: Relation endpoint
+
+        :return: Offer object
+        """
+        model_name = endpoint.model_name
+        offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
+        controller = await self.get_controller()
+        model = None
+        try:
+            model = await self.get_model(controller, model_name)
+            await model.create_offer(endpoint.endpoint, offer_name=offer_name)
+            offer_list = await self._list_offers(model_name, offer_name=offer_name)
+            if offer_list:
+                return Offer(offer_list[0].offer_url)
+            else:
+                raise Exception("offer was not created")
+        except juju.errors.JujuError as e:
+            if "application offer already exists" not in e.message:
+                raise e
+        finally:
+            if model:
+                self.disconnect_model(model)
+            self.disconnect_controller(controller)
+
     async def consume(
         self,
     async def consume(
         self,
-        offer_url: str,
         model_name: str,
         model_name: str,
-    ):
+        offer: Offer,
+        provider_libjuju: "Libjuju",
+    ) -> str:
         """
         """
-        Adds a remote offer to the model. Relations can be created later using "juju relate".
+        Consumes a remote offer in the model. Relations can be created later using "juju relate".
 
 
-        :param: offer_url:      Offer Url
-        :param: model_name:     Model name
+        :param: model_name:             Model name
+        :param: offer:                  Offer object to consume
+        :param: provider_libjuju:       Libjuju object of the provider endpoint
 
         :raises ParseError if there's a problem parsing the offer_url
         :raises JujuError if remote offer includes and endpoint
         :raises JujuAPIError if the operation is not successful
 
         :raises ParseError if there's a problem parsing the offer_url
         :raises JujuError if remote offer includes and endpoint
         :raises JujuAPIError if the operation is not successful
+
+        :returns: Saas name. It is the application name in the model that reference the remote application.
         """
         """
+        saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
+        if offer.vca_id:
+            saas_name = f"{saas_name}-{offer.vca_id}"
         controller = await self.get_controller()
         controller = await self.get_controller()
-        model = await controller.get_model(model_name)
-
+        model = None
+        provider_controller = None
         try:
         try:
-            await model.consume(offer_url)
+            model = await controller.get_model(model_name)
+            provider_controller = await provider_libjuju.get_controller()
+            await model.consume(
+                offer.url, application_alias=saas_name, controller=provider_controller
+            )
+            return saas_name
         finally:
         finally:
-            await self.disconnect_model(model)
+            if model:
+                await self.disconnect_model(model)
+            if provider_controller:
+                await provider_libjuju.disconnect_controller(provider_controller)
             await self.disconnect_controller(controller)
 
     async def destroy_model(self, model_name: str, total_timeout: float = 1800):
             await self.disconnect_controller(controller)
 
     async def destroy_model(self, model_name: str, total_timeout: float = 1800):
@@ -1157,28 +1470,38 @@ class Libjuju:
         model = None
         try:
             if not await self.model_exists(model_name, controller=controller):
         model = None
         try:
             if not await self.model_exists(model_name, controller=controller):
+                self.log.warn(f"Model {model_name} doesn't exist")
                 return
 
                 return
 
-            self.log.debug("Destroying model {}".format(model_name))
-
+            self.log.debug(f"Getting model {model_name} to be destroyed")
             model = await self.get_model(controller, model_name)
             model = await self.get_model(controller, model_name)
+            self.log.debug(f"Destroying manual machines in model {model_name}")
             # Destroy machines that are manually provisioned
             # and still are in pending state
             await self._destroy_pending_machines(model, only_manual=True)
             await self.disconnect_model(model)
 
             # Destroy machines that are manually provisioned
             # and still are in pending state
             await self._destroy_pending_machines(model, only_manual=True)
             await self.disconnect_model(model)
 
-            await self._destroy_model(
-                model_name,
-                controller,
+            await asyncio.wait_for(
+                self._destroy_model(model_name, controller),
                 timeout=total_timeout,
             )
                 timeout=total_timeout,
             )
+        except Exception as e:
+            if not await self.model_exists(model_name, controller=controller):
+                self.log.warn(
+                    f"Failed deleting model {model_name}: model doesn't exist"
+                )
+                return
+            self.log.warn(f"Failed deleting model {model_name}: {e}")
+            raise e
         finally:
             if model:
                 await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
     async def _destroy_model(
         finally:
             if model:
                 await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
     async def _destroy_model(
-        self, model_name: str, controller: Controller, timeout: float = 1800
+        self,
+        model_name: str,
+        controller: Controller,
     ):
         """
         Destroy model from controller
     ):
         """
         Destroy model from controller
@@ -1187,22 +1510,42 @@ class Libjuju:
         :param: controller: Controller object
         :param: timeout: Timeout in seconds
         """
         :param: controller: Controller object
         :param: timeout: Timeout in seconds
         """
+        self.log.debug(f"Destroying model {model_name}")
 
 
-        async def _destroy_model_loop(model_name: str, controller: Controller):
-            while await self.model_exists(model_name, controller=controller):
+        async def _destroy_model_gracefully(model_name: str, controller: Controller):
+            self.log.info(f"Gracefully deleting model {model_name}")
+            resolved = False
+            while model_name in await controller.list_models():
+                if not resolved:
+                    await self.resolve(model_name)
+                    resolved = True
+                await controller.destroy_model(model_name, destroy_storage=True)
+
+                await asyncio.sleep(5)
+            self.log.info(f"Model {model_name} deleted gracefully")
+
+        async def _destroy_model_forcefully(model_name: str, controller: Controller):
+            self.log.info(f"Forcefully deleting model {model_name}")
+            while model_name in await controller.list_models():
                 await controller.destroy_model(
                 await controller.destroy_model(
-                    model_name, destroy_storage=True, force=True, max_wait=0
+                    model_name, destroy_storage=True, force=True, max_wait=60
                 )
                 await asyncio.sleep(5)
                 )
                 await asyncio.sleep(5)
+            self.log.info(f"Model {model_name} deleted forcefully")
 
         try:
 
         try:
-            await asyncio.wait_for(
-                _destroy_model_loop(model_name, controller), timeout=timeout
-            )
-        except asyncio.TimeoutError:
-            raise Exception(
-                "Timeout waiting for model {} to be destroyed".format(model_name)
-            )
+            try:
+                await asyncio.wait_for(
+                    _destroy_model_gracefully(model_name, controller), timeout=120
+                )
+            except asyncio.TimeoutError:
+                await _destroy_model_forcefully(model_name, controller)
+        except juju.errors.JujuError as e:
+            if any("has been removed" in error for error in e.errors):
+                return
+            if any("model not found" in error for error in e.errors):
+                return
+            raise e
 
     async def destroy_application(
         self, model_name: str, application_name: str, total_timeout: float
 
     async def destroy_application(
         self, model_name: str, application_name: str, total_timeout: float
@@ -1301,10 +1644,6 @@ class Libjuju:
                     await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
                     await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
-    def handle_exception(self, loop, context):
-        # All unhandled exceptions by libjuju are handled here.
-        pass
-
     async def health_check(self, interval: float = 300.0):
         """
         Health check to make sure controller and controller_model connections are OK
     async def health_check(self, interval: float = 300.0):
         """
         Health check to make sure controller and controller_model connections are OK
@@ -1339,17 +1678,29 @@ class Libjuju:
         finally:
             await self.disconnect_controller(controller)
 
         finally:
             await self.disconnect_controller(controller)
 
-    async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
-        """List models with certain names
+    async def _list_offers(
+        self, model_name: str, offer_name: str = None
+    ) -> QueryApplicationOffersResults:
+        """
+        List offers within a model
 
         :param: model_name: Model name
 
         :param: model_name: Model name
+        :param: offer_name: Offer name to filter.
 
 
-        :return:            Returns list of offers
+        :return: Returns application offers results in the model
         """
 
         controller = await self.get_controller()
         try:
         """
 
         controller = await self.get_controller()
         try:
-            return await controller.list_offers(model_name)
+            offers = (await controller.list_offers(model_name)).results
+            if offer_name:
+                matching_offer = []
+                for offer in offers:
+                    if offer.offer_name == offer_name:
+                        matching_offer.append(offer)
+                        break
+                offers = matching_offer
+            return offers
         finally:
             await self.disconnect_controller(controller)
 
         finally:
             await self.disconnect_controller(controller)
 
@@ -1497,7 +1848,9 @@ class Libjuju:
         finally:
             await self.disconnect_controller(controller)
 
         finally:
             await self.disconnect_controller(controller)
 
-    @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
+    @retry(
+        attempts=20, delay=5, fallback=JujuLeaderUnitNotFound(), callback=retry_callback
+    )
     async def _get_leader_unit(self, application: Application) -> Unit:
         unit = None
         for u in application.units:
     async def _get_leader_unit(self, application: Application) -> Unit:
         unit = None
         for u in application.units: