Fix bug 1542 to allow juju to add Azure AKS
[osm/N2VC.git] / n2vc / libjuju.py
index d8b0858..ce1c9df 100644 (file)
 
 import asyncio
 import logging
-from juju.controller import Controller
-from juju.client import client
+import typing
+
 import time
 
 from juju.errors import JujuAPIError
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
-from juju.client._definitions import FullStatus, QueryApplicationOffersResults
+from juju.unit import Unit
+from juju.client._definitions import (
+    FullStatus,
+    QueryApplicationOffersResults,
+    Cloud,
+    CloudCredential,
+)
+from juju.controller import Controller
+from juju.client import client
+from juju import tag
+
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -31,74 +41,52 @@ from n2vc.exceptions import (
     JujuApplicationNotFound,
     JujuLeaderUnitNotFound,
     JujuActionNotFound,
-    JujuModelAlreadyExists,
     JujuControllerFailedConnecting,
     JujuApplicationExists,
+    JujuInvalidK8sConfiguration,
+    JujuError,
 )
-from n2vc.utils import DB_DATA
-from osm_common.dbbase import DbException
+from n2vc.vca.cloud import Cloud as VcaCloud
+from n2vc.vca.connection import Connection
+from kubernetes.client.configuration import Configuration
+from retrying_async import retry
+
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
 
 
 class Libjuju:
     def __init__(
         self,
-        endpoint: str,
-        api_proxy: str,
-        username: str,
-        password: str,
-        cacert: str,
+        vca_connection: Connection,
         loop: asyncio.AbstractEventLoop = None,
         log: logging.Logger = None,
-        db: dict = None,
         n2vc: N2VCConnector = None,
-        apt_mirror: str = None,
-        enable_os_upgrade: bool = True,
     ):
         """
         Constructor
 
-        :param: endpoint:               Endpoint of the juju controller (host:port)
-        :param: api_proxy:              Endpoint of the juju controller - Reachable from the VNFs
-        :param: username:               Juju username
-        :param: password:               Juju password
-        :param: cacert:                 Juju CA Certificate
+        :param: vca_connection:         n2vc.vca.connection object
         :param: loop:                   Asyncio loop
         :param: log:                    Logger
-        :param: db:                     DB object
         :param: n2vc:                   N2VC object
-        :param: apt_mirror:             APT Mirror
-        :param: enable_os_upgrade:      Enable OS Upgrade
         """
 
         self.log = log or logging.getLogger("Libjuju")
-        self.db = db
-        db_endpoints = self._get_api_endpoints_db()
-        self.endpoints = db_endpoints or [endpoint]
-        if db_endpoints is None:
-            self._update_api_endpoints_db(self.endpoints)
-        self.api_proxy = api_proxy
-        self.username = username
-        self.password = password
-        self.cacert = cacert
-        self.loop = loop or asyncio.get_event_loop()
         self.n2vc = n2vc
+        self.vca_connection = vca_connection
 
-        # Generate config for models
-        self.model_config = {}
-        if apt_mirror:
-            self.model_config["apt-mirror"] = apt_mirror
-        self.model_config["enable-os-refresh-update"] = enable_os_upgrade
-        self.model_config["enable-os-upgrade"] = enable_os_upgrade
-
+        self.loop = loop or asyncio.get_event_loop()
         self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
-        self.models = set()
-        self.log.debug("Libjuju initialized!")
+        if self.vca_connection.is_default:
+            self.health_check_task = self._create_health_check_task()
 
-        self.health_check_task = self.loop.create_task(self.health_check())
+    def _create_health_check_task(self):
+        return self.loop.create_task(self.health_check())
 
-    async def get_controller(self, timeout: float = 5.0) -> Controller:
+    async def get_controller(self, timeout: float = 60.0) -> Controller:
         """
         Get controller
 
@@ -109,23 +97,27 @@ class Libjuju:
             controller = Controller(loop=self.loop)
             await asyncio.wait_for(
                 controller.connect(
-                    endpoint=self.endpoints,
-                    username=self.username,
-                    password=self.password,
-                    cacert=self.cacert,
+                    endpoint=self.vca_connection.data.endpoints,
+                    username=self.vca_connection.data.user,
+                    password=self.vca_connection.data.secret,
+                    cacert=self.vca_connection.data.cacert,
                 ),
                 timeout=timeout,
             )
-            endpoints = await controller.api_endpoints
-            if self.endpoints != endpoints:
-                self.endpoints = endpoints
-                self._update_api_endpoints_db(self.endpoints)
+            if self.vca_connection.is_default:
+                endpoints = await controller.api_endpoints
+                if not all(
+                    endpoint in self.vca_connection.endpoints for endpoint in endpoints
+                ):
+                    await self.vca_connection.update_endpoints(endpoints)
             return controller
         except asyncio.CancelledError as e:
             raise e
         except Exception as e:
             self.log.error(
-                "Failed connecting to controller: {}...".format(self.endpoints)
+                "Failed connecting to controller: {}... {}".format(
+                    self.vca_connection.data.endpoints, e
+                )
             )
             if controller:
                 await self.disconnect_controller(controller)
@@ -151,52 +143,126 @@ class Libjuju:
 
         :param: controller: Controller that will be disconnected
         """
-        await controller.disconnect()
+        if controller:
+            await controller.disconnect()
 
-    async def add_model(self, model_name: str, cloud_name: str):
+    @retry(attempts=3, delay=5, timeout=None)
+    async def add_model(self, model_name: str, cloud: VcaCloud):
         """
         Create model
 
         :param: model_name: Model name
-        :param: cloud_name: Cloud name
+        :param: cloud: Cloud object
         """
 
         # Get controller
         controller = await self.get_controller()
         model = None
         try:
-            # Raise exception if model already exists
-            if await self.model_exists(model_name, controller=controller):
-                raise JujuModelAlreadyExists(
-                    "Model {} already exists.".format(model_name)
-                )
-
             # Block until other workers have finished model creation
             while self.creating_model.locked():
                 await asyncio.sleep(0.1)
 
-            # If the model exists, return it from the controller
-            if model_name in self.models:
-                return
-
             # Create the model
             async with self.creating_model:
+                if await self.model_exists(model_name, controller=controller):
+                    return
                 self.log.debug("Creating model {}".format(model_name))
                 model = await controller.add_model(
                     model_name,
-                    config=self.model_config,
-                    cloud_name=cloud_name,
-                    credential_name=cloud_name,
+                    config=self.vca_connection.data.model_config,
+                    cloud_name=cloud.name,
+                    credential_name=cloud.credential_name,
                 )
-                self.models.add(model_name)
+        except JujuAPIError as e:
+            if "already exists" in e.message:
+                pass
+            else:
+                raise e
         finally:
             if model:
                 await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
-    async def get_model(
-        self, controller: Controller, model_name: str, id=None
-    ) -> Model:
+    async def get_executed_actions(self, model_name: str) -> list:
+        """
+        Get executed/history of actions for a model.
+
+        :param: model_name: Model name, str.
+        :return: List of executed actions for a model.
+        """
+        model = None
+        executed_actions = []
+        controller = await self.get_controller()
+        try:
+            model = await self.get_model(controller, model_name)
+            # Get all unique action names
+            actions = {}
+            for application in model.applications:
+                application_actions = await self.get_actions(application, model_name)
+                actions.update(application_actions)
+            # Get status of all actions
+            for application_action in actions:
+                app_action_status_list = await model.get_action_status(
+                    name=application_action
+                )
+                for action_id, action_status in app_action_status_list.items():
+                    executed_action = {
+                        "id": action_id,
+                        "action": application_action,
+                        "status": action_status,
+                    }
+                    # Get action output by id
+                    action_status = await model.get_action_output(executed_action["id"])
+                    for k, v in action_status.items():
+                        executed_action[k] = v
+                    executed_actions.append(executed_action)
+        except Exception as e:
+            raise JujuError(
+                "Error in getting executed actions for model: {}. Error: {}".format(
+                    model_name, str(e)
+                )
+            )
+        finally:
+            if model:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+        return executed_actions
+
+    async def get_application_configs(
+        self, model_name: str, application_name: str
+    ) -> dict:
+        """
+        Get available configs for an application.
+
+        :param: model_name: Model name, str.
+        :param: application_name: Application name, str.
+
+        :return: A dict which has key - action name, value - action description
+        """
+        model = None
+        application_configs = {}
+        controller = await self.get_controller()
+        try:
+            model = await self.get_model(controller, model_name)
+            application = self._get_application(
+                model, application_name=application_name
+            )
+            application_configs = await application.get_config()
+        except Exception as e:
+            raise JujuError(
+                "Error in getting configs for application: {} in model: {}. Error: {}".format(
+                    application_name, model_name, str(e)
+                )
+            )
+        finally:
+            if model:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+        return application_configs
+
+    @retry(attempts=3, delay=5)
+    async def get_model(self, controller: Controller, model_name: str) -> Model:
         """
         Get model from controller
 
@@ -242,7 +308,9 @@ class Libjuju:
         """
         if not model_names:
             raise Exception(
-                "model_names must be a non-empty array. Given value: {}".format(model_names)
+                "model_names must be a non-empty array. Given value: {}".format(
+                    model_names
+                )
             )
         non_existing_models = []
         models = await self.list_models()
@@ -349,6 +417,7 @@ class Libjuju:
                         total_timeout=total_timeout,
                         db_dict=db_dict,
                         n2vc=self.n2vc,
+                        vca_id=self.vca_connection._vca_id,
                     )
         finally:
             await self.disconnect_model(model)
@@ -430,7 +499,8 @@ class Libjuju:
                     connection=connection,
                     nonce=params.nonce,
                     machine_id=machine_id,
-                    proxy=self.api_proxy,
+                    proxy=self.vca_connection.data.api_proxy,
+                    series=params.series,
                 )
             )
 
@@ -460,6 +530,7 @@ class Libjuju:
                 total_timeout=total_timeout,
                 db_dict=db_dict,
                 n2vc=self.n2vc,
+                vca_id=self.vca_connection._vca_id,
             )
         except Exception as e:
             raise e
@@ -473,6 +544,28 @@ class Libjuju:
 
         return machine_id
 
+    async def deploy(
+        self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+    ):
+        """
+        Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
+
+        :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
+        :param: model_name:     Model name
+        :param: wait:           Indicates whether to wait or not until all applications are active
+        :param: timeout:        Time in seconds to wait until all applications are active
+        """
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            await model.deploy(uri)
+            if wait:
+                await JujuModelWatcher.wait_for_model(model, timeout=timeout)
+                self.log.debug("All units active in model {}".format(model_name))
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
     async def deploy_charm(
         self,
         application_name: str,
@@ -553,6 +646,7 @@ class Libjuju:
                     total_timeout=total_timeout,
                     db_dict=db_dict,
                     n2vc=self.n2vc,
+                    vca_id=self.vca_connection._vca_id,
                 )
                 self.log.debug(
                     "Application {} is ready in model {}".format(
@@ -569,6 +663,83 @@ class Libjuju:
 
         return application
 
+    async def scale_application(
+        self,
+        model_name: str,
+        application_name: str,
+        scale: int = 1,
+        total_timeout: float = None,
+    ):
+        """
+        Scale application (K8s)
+
+        :param: model_name:         Model name
+        :param: application_name:   Application name
+        :param: scale:              Scale to which to set this application
+        :param: total_timeout:      Timeout for the entity to be active
+        """
+
+        model = None
+        controller = await self.get_controller()
+        try:
+            model = await self.get_model(controller, model_name)
+
+            self.log.debug(
+                "Scaling application {} in model {}".format(
+                    application_name, model_name
+                )
+            )
+            application = self._get_application(model, application_name)
+            if application is None:
+                raise JujuApplicationNotFound("Cannot scale application")
+            await application.scale(scale=scale)
+            # Wait until application is scaled in model
+            self.log.debug(
+                "Waiting for application {} to be scaled in model {}...".format(
+                    application_name, model_name
+                )
+            )
+            if total_timeout is None:
+                total_timeout = 1800
+            end = time.time() + total_timeout
+            while time.time() < end:
+                application_scale = self._get_application_count(model, application_name)
+                # Before calling wait_for_model function,
+                # wait until application unit count and scale count are equal.
+                # Because there is a delay before scaling triggers in Juju model.
+                if application_scale == scale:
+                    await JujuModelWatcher.wait_for_model(
+                        model=model, timeout=total_timeout
+                    )
+                    self.log.debug(
+                        "Application {} is scaled in model {}".format(
+                            application_name, model_name
+                        )
+                    )
+                    return
+                await asyncio.sleep(5)
+            raise Exception(
+                "Timeout waiting for application {} in model {} to be scaled".format(
+                    application_name, model_name
+                )
+            )
+        finally:
+            if model:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
+    def _get_application_count(self, model: Model, application_name: str) -> int:
+        """Get number of units of the application
+
+        :param: model:              Model object
+        :param: application_name:   Application name
+
+        :return: int (or None if application doesn't exist)
+        """
+        application = self._get_application(model, application_name)
+        if application is not None:
+            return len(application.units)
+
     def _get_application(self, model: Model, application_name: str) -> Application:
         """Get application
 
@@ -613,18 +784,19 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
 
-            # Get unit
-            unit = None
-            for u in application.units:
-                if await u.is_leader_from_status():
-                    unit = u
-            if unit is None:
-                raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
+            # Get leader unit
+            # Racing condition:
+            #   Ocassionally, self._get_leader_unit() will return None
+            #   because the leader elected hook has not been triggered yet.
+            #   Therefore, we are doing some retries. If it happens again,
+            #   re-open bug 1236
+            unit = await self._get_leader_unit(application)
 
             actions = await application.get_actions()
 
@@ -647,6 +819,7 @@ class Libjuju:
                 total_timeout=total_timeout,
                 db_dict=db_dict,
                 n2vc=self.n2vc,
+                vca_id=self.vca_connection._vca_id,
             )
 
             output = await model.get_action_output(action_uuid=action.entity_id)
@@ -691,7 +864,8 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
 
             # Return list of actions
@@ -702,24 +876,41 @@ class Libjuju:
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
+    async def get_metrics(self, model_name: str, application_name: str) -> dict:
+        """Get the metrics collected by the VCA.
+
+        :param model_name The name or unique id of the network service
+        :param application_name The name of the application
+        """
+        if not model_name or not application_name:
+            raise Exception("model_name and application_name must be non-empty strings")
+        metrics = {}
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            application = self._get_application(model, application_name)
+            if application is not None:
+                metrics = await application.get_metrics()
+        finally:
+            self.disconnect_model(model)
+            self.disconnect_controller(controller)
+        return metrics
+
     async def add_relation(
         self,
         model_name: str,
-        application_name_1: str,
-        application_name_2: str,
-        relation_1: str,
-        relation_2: str,
+        endpoint_1: str,
+        endpoint_2: str,
     ):
         """Add relation
 
-        :param: model_name:             Model name
-        :param: application_name_1      First application name
-        :param: application_name_2:     Second application name
-        :param: relation_1:             First relation name
-        :param: relation_2:             Second relation name
+        :param: model_name:     Model name
+        :param: endpoint_1      First endpoint name
+                                ("app:endpoint" format or directly the saas name)
+        :param: endpoint_2:     Second endpoint name (^ same format)
         """
 
-        self.log.debug("Adding relation: {} -> {}".format(relation_1, relation_2))
+        self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
 
         # Get controller
         controller = await self.get_controller()
@@ -727,13 +918,9 @@ class Libjuju:
         # Get model
         model = await self.get_model(controller, model_name)
 
-        # Build relation strings
-        r1 = "{}:{}".format(application_name_1, relation_1)
-        r2 = "{}:{}".format(application_name_2, relation_2)
-
         # Add relation
         try:
-            await model.add_relation(relation1=r1, relation2=r2)
+            await model.add_relation(endpoint_1, endpoint_2)
         except JujuAPIError as e:
             if "not found" in e.message:
                 self.log.warning("Relation not found: {}".format(e.message))
@@ -747,6 +934,30 @@ class Libjuju:
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
+    async def consume(
+        self,
+        offer_url: str,
+        model_name: str,
+    ):
+        """
+        Adds a remote offer to the model. Relations can be created later using "juju relate".
+
+        :param: offer_url:      Offer Url
+        :param: model_name:     Model name
+
+        :raises ParseError if there's a problem parsing the offer_url
+        :raises JujuError if remote offer includes and endpoint
+        :raises JujuAPIError if the operation is not successful
+        """
+        controller = await self.get_controller()
+        model = await controller.get_model(model_name)
+
+        try:
+            await model.consume(offer_url)
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
     async def destroy_model(self, model_name: str, total_timeout: float):
         """
         Destroy model
@@ -756,106 +967,118 @@ class Libjuju:
         """
 
         controller = await self.get_controller()
-        model = await self.get_model(controller, model_name)
+        model = None
         try:
+            if not await self.model_exists(model_name, controller=controller):
+                return
+
+            model = await self.get_model(controller, model_name)
             self.log.debug("Destroying model {}".format(model_name))
             uuid = model.info.uuid
 
-            # Destroy machines
-            machines = await model.get_machines()
-            for machine_id in machines:
-                try:
-                    await self.destroy_machine(
-                        model, machine_id=machine_id, total_timeout=total_timeout,
-                    )
-                except asyncio.CancelledError:
-                    raise
-                except Exception:
-                    pass
+            # Destroy machines that are manually provisioned
+            # and still are in pending state
+            await self._destroy_pending_machines(model, only_manual=True)
 
             # Disconnect model
             await self.disconnect_model(model)
 
-            # Destroy model
-            if model_name in self.models:
-                self.models.remove(model_name)
-
-            await controller.destroy_model(uuid)
+            await controller.destroy_model(uuid, force=True, max_wait=0)
 
             # Wait until model is destroyed
             self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
-            last_exception = ""
 
             if total_timeout is None:
                 total_timeout = 3600
             end = time.time() + total_timeout
             while time.time() < end:
-                try:
-                    models = await controller.list_models()
-                    if model_name not in models:
-                        self.log.debug(
-                            "The model {} ({}) was destroyed".format(model_name, uuid)
-                        )
-                        return
-                except asyncio.CancelledError:
-                    raise
-                except Exception as e:
-                    last_exception = e
+                models = await controller.list_models()
+                if model_name not in models:
+                    self.log.debug(
+                        "The model {} ({}) was destroyed".format(model_name, uuid)
+                    )
+                    return
                 await asyncio.sleep(5)
             raise Exception(
-                "Timeout waiting for model {} to be destroyed {}".format(
-                    model_name, last_exception
-                )
+                "Timeout waiting for model {} to be destroyed".format(model_name)
             )
+        except Exception as e:
+            if model:
+                await self.disconnect_model(model)
+            raise e
         finally:
             await self.disconnect_controller(controller)
 
-    async def destroy_application(self, model: Model, application_name: str):
+    async def destroy_application(
+        self, model_name: str, application_name: str, total_timeout: float
+    ):
         """
         Destroy application
 
-        :param: model:              Model object
+        :param: model_name:         Model name
         :param: application_name:   Application name
+        :param: total_timeout:      Timeout
         """
-        self.log.debug(
-            "Destroying application {} in model {}".format(
-                application_name, model.info.name
+
+        controller = await self.get_controller()
+        model = None
+
+        try:
+            model = await self.get_model(controller, model_name)
+            self.log.debug(
+                "Destroying application {} in model {}".format(
+                    application_name, model_name
+                )
             )
-        )
-        application = model.applications.get(application_name)
-        if application:
-            await application.destroy()
-        else:
-            self.log.warning("Application not found: {}".format(application_name))
+            application = self._get_application(model, application_name)
+            if application:
+                await application.destroy()
+            else:
+                self.log.warning("Application not found: {}".format(application_name))
 
-    async def destroy_machine(
-        self, model: Model, machine_id: str, total_timeout: float = 3600
-    ):
-        """
-        Destroy machine
+            self.log.debug(
+                "Waiting for application {} to be destroyed in model {}...".format(
+                    application_name, model_name
+                )
+            )
+            if total_timeout is None:
+                total_timeout = 3600
+            end = time.time() + total_timeout
+            while time.time() < end:
+                if not self._get_application(model, application_name):
+                    self.log.debug(
+                        "The application {} was destroyed in model {} ".format(
+                            application_name, model_name
+                        )
+                    )
+                    return
+                await asyncio.sleep(5)
+            raise Exception(
+                "Timeout waiting for application {} to be destroyed in model {}".format(
+                    application_name, model_name
+                )
+            )
+        finally:
+            if model is not None:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
-        :param: model:          Model object
-        :param: machine_id:     Machine id
-        :param: total_timeout:  Timeout in seconds
+    async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
         """
-        machines = await model.get_machines()
-        if machine_id in machines:
-            machine = machines[machine_id]
-            await machine.destroy(force=True)
-            # max timeout
-            end = time.time() + total_timeout
+        Destroy pending machines in a given model
 
-            # wait for machine removal
-            machines = await model.get_machines()
-            while machine_id in machines and time.time() < end:
-                self.log.debug(
-                    "Waiting for machine {} is destroyed".format(machine_id)
-                )
-                await asyncio.sleep(0.5)
-                machines = await model.get_machines()
-            self.log.debug("Machine destroyed: {}".format(machine_id))
-        else:
-            self.log.debug("Machine not found: {}".format(machine_id))
+        :param: only_manual:    Bool that indicates only manually provisioned
+                                machines should be destroyed (if True), or that
+                                all pending machines should be destroyed
+        """
+        status = await model.get_status()
+        for machine_id in status.machines:
+            machine_status = status.machines[machine_id]
+            if machine_status.agent_status.status == "pending":
+                if only_manual and not machine_status.instance_id.startswith("manual:"):
+                    break
+                machine = model.machines[machine_id]
+                await machine.destroy(force=True)
 
     async def configure_application(
         self, model_name: str, application_name: str, config: dict = None
@@ -869,67 +1092,20 @@ class Libjuju:
         self.log.debug("Configuring application {}".format(application_name))
 
         if config:
+            controller = await self.get_controller()
+            model = None
             try:
-                controller = await self.get_controller()
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
-                    model, application_name=application_name,
+                    model,
+                    application_name=application_name,
                 )
                 await application.set_config(config)
             finally:
-                await self.disconnect_model(model)
+                if model:
+                    await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
-    def _get_api_endpoints_db(self) -> [str]:
-        """
-        Get API Endpoints from DB
-
-        :return: List of API endpoints
-        """
-        self.log.debug("Getting endpoints from database")
-
-        juju_info = self.db.get_one(
-            DB_DATA.api_endpoints.table,
-            q_filter=DB_DATA.api_endpoints.filter,
-            fail_on_empty=False,
-        )
-        if juju_info and DB_DATA.api_endpoints.key in juju_info:
-            return juju_info[DB_DATA.api_endpoints.key]
-
-    def _update_api_endpoints_db(self, endpoints: [str]):
-        """
-        Update API endpoints in Database
-
-        :param: List of endpoints
-        """
-        self.log.debug("Saving endpoints {} in database".format(endpoints))
-
-        juju_info = self.db.get_one(
-            DB_DATA.api_endpoints.table,
-            q_filter=DB_DATA.api_endpoints.filter,
-            fail_on_empty=False,
-        )
-        # If it doesn't, then create it
-        if not juju_info:
-            try:
-                self.db.create(
-                    DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
-                )
-            except DbException as e:
-                # Racing condition: check if another N2VC worker has created it
-                juju_info = self.db.get_one(
-                    DB_DATA.api_endpoints.table,
-                    q_filter=DB_DATA.api_endpoints.filter,
-                    fail_on_empty=False,
-                )
-                if not juju_info:
-                    raise e
-        self.db.set_one(
-            DB_DATA.api_endpoints.table,
-            DB_DATA.api_endpoints.filter,
-            {DB_DATA.api_endpoints.key: endpoints},
-        )
-
     def handle_exception(self, loop, context):
         # All unhandled exceptions by libjuju are handled here.
         pass
@@ -940,6 +1116,7 @@ class Libjuju:
 
         :param: interval: Time in seconds between checks
         """
+        controller = None
         while True:
             try:
                 controller = await self.get_controller()
@@ -980,3 +1157,173 @@ class Libjuju:
             return await controller.list_offers(model_name)
         finally:
             await self.disconnect_controller(controller)
+
+    async def add_k8s(
+        self,
+        name: str,
+        rbac_id: str,
+        token: str,
+        client_cert_data: str,
+        configuration: Configuration,
+        storage_class: str,
+        credential_name: str = None,
+    ):
+        """
+        Add a Kubernetes cloud to the controller
+
+        Similar to the `juju add-k8s` command in the CLI
+
+        :param: name:               Name for the K8s cloud
+        :param: configuration:      Kubernetes configuration object
+        :param: storage_class:      Storage Class to use in the cloud
+        :param: credential_name:    Storage Class to use in the cloud
+        """
+
+        if not storage_class:
+            raise Exception("storage_class must be a non-empty string")
+        if not name:
+            raise Exception("name must be a non-empty string")
+        if not configuration:
+            raise Exception("configuration must be provided")
+
+        endpoint = configuration.host
+        credential = self.get_k8s_cloud_credential(
+            configuration,
+            client_cert_data,
+            token,
+        )
+        credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
+        cloud = client.Cloud(
+            type_="kubernetes",
+            auth_types=[credential.auth_type],
+            endpoint=endpoint,
+            ca_certificates=[client_cert_data],
+            config={
+                "operator-storage": storage_class,
+                "workload-storage": storage_class,
+            },
+        )
+
+        return await self.add_cloud(
+            name, cloud, credential, credential_name=credential_name
+        )
+
+    def get_k8s_cloud_credential(
+        self,
+        configuration: Configuration,
+        client_cert_data: str,
+        token: str = None,
+    ) -> client.CloudCredential:
+        attrs = {}
+        # TODO: Test with AKS
+        key = None  # open(configuration.key_file, "r").read()
+        username = configuration.username
+        password = configuration.password
+
+        if client_cert_data:
+            attrs["ClientCertificateData"] = client_cert_data
+        if key:
+            attrs["ClientKeyData"] = key
+        if token:
+            if username or password:
+                raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+            attrs["Token"] = token
+
+        auth_type = None
+        if key:
+            auth_type = "oauth2"
+            if client_cert_data:
+                auth_type = "oauth2withcert"
+            if not token:
+                raise JujuInvalidK8sConfiguration(
+                    "missing token for auth type {}".format(auth_type)
+                )
+        elif username:
+            if not password:
+                self.log.debug(
+                    "credential for user {} has empty password".format(username)
+                )
+            attrs["username"] = username
+            attrs["password"] = password
+            if client_cert_data:
+                auth_type = "userpasswithcert"
+            else:
+                auth_type = "userpass"
+        elif client_cert_data and token:
+            auth_type = "certificate"
+        else:
+            raise JujuInvalidK8sConfiguration("authentication method not supported")
+        return client.CloudCredential(auth_type=auth_type, attrs=attrs)
+
+    async def add_cloud(
+        self,
+        name: str,
+        cloud: Cloud,
+        credential: CloudCredential = None,
+        credential_name: str = None,
+    ) -> Cloud:
+        """
+        Add cloud to the controller
+
+        :param: name:               Name of the cloud to be added
+        :param: cloud:              Cloud object
+        :param: credential:         CloudCredentials object for the cloud
+        :param: credential_name:    Credential name.
+                                    If not defined, cloud of the name will be used.
+        """
+        controller = await self.get_controller()
+        try:
+            _ = await controller.add_cloud(name, cloud)
+            if credential:
+                await controller.add_credential(
+                    credential_name or name, credential=credential, cloud=name
+                )
+            # Need to return the object returned by the controller.add_cloud() function
+            # I'm returning the original value now until this bug is fixed:
+            #   https://github.com/juju/python-libjuju/issues/443
+            return cloud
+        finally:
+            await self.disconnect_controller(controller)
+
+    async def remove_cloud(self, name: str):
+        """
+        Remove cloud
+
+        :param: name:   Name of the cloud to be removed
+        """
+        controller = await self.get_controller()
+        try:
+            await controller.remove_cloud(name)
+        finally:
+            await self.disconnect_controller(controller)
+
+    @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
+    async def _get_leader_unit(self, application: Application) -> Unit:
+        unit = None
+        for u in application.units:
+            if await u.is_leader_from_status():
+                unit = u
+                break
+        if not unit:
+            raise Exception()
+        return unit
+
+    async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
+        """
+        Get cloud credentials
+
+        :param: cloud: Cloud object. The returned credentials will be from this cloud.
+
+        :return: List of credentials object associated to the specified cloud
+
+        """
+        controller = await self.get_controller()
+        try:
+            facade = client.CloudFacade.from_connection(controller.connection())
+            cloud_cred_tag = tag.credential(
+                cloud.name, self.vca_connection.data.user, cloud.credential_name
+            )
+            params = [client.Entity(cloud_cred_tag)]
+            return (await facade.Credential(params)).results
+        finally:
+            await self.disconnect_controller(controller)