Get the kubeconfig credentials from MongoDB
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
index 4f62898..808201d 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import asyncio
 import concurrent
 import concurrent
-from .exceptions import NotImplemented
+import os
+import uuid
+import yaml
 
 import juju
 
 import juju
-# from juju.bundle import BundleHandler
 from juju.controller import Controller
 from juju.model import Model
 from juju.controller import Controller
 from juju.model import Model
-from juju.errors import JujuAPIError, JujuError
-
-import logging
-
+from n2vc.exceptions import K8sException
 from n2vc.k8s_conn import K8sConnector
 from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
+from .exceptions import MethodNotImplemented
 
 
-import os
+
+# from juju.bundle import BundleHandler
 # import re
 # import ssl
 # import re
 # import ssl
-import subprocess
 # from .vnf import N2VC
 # from .vnf import N2VC
-
-import uuid
-import yaml
-
-
 class K8sJujuConnector(K8sConnector):
     def __init__(
 class K8sJujuConnector(K8sConnector):
     def __init__(
-            self,
-            fs,
-            kubectl_command='/usr/bin/kubectl',
-            log=None
+        self,
+        fs: object,
+        db: object,
+        kubectl_command: str = "/usr/bin/kubectl",
+        juju_command: str = "/usr/bin/juju",
+        log: object = None,
+        on_update_db=None,
     ):
         """
 
     ):
         """
 
@@ -52,33 +51,39 @@ class K8sJujuConnector(K8sConnector):
 
         # parent class
         K8sConnector.__init__(
 
         # parent class
         K8sConnector.__init__(
-            self,
-            kubectl_command=kubectl_command,
-            fs=fs,
-            log=log,
+            self, db, log=log, on_update_db=on_update_db,
         )
 
         )
 
-        self.info('Initializing K8S Juju connector')
+        self.fs = fs
+        self.log.debug("Initializing K8S Juju connector")
 
         self.authenticated = False
         self.models = {}
 
         self.authenticated = False
         self.models = {}
-        self.log = logging.getLogger(__name__)
-        self.info('K8S Juju connector initialized')
+
+        self.juju_command = juju_command
+        self.juju_secret = ""
+
+        self.log.debug("K8S Juju connector initialized")
 
     """Initialization"""
 
     """Initialization"""
+
     async def init_env(
         self,
     async def init_env(
         self,
-        k8s_creds: dict,
-        namespace: str = 'kube-system',
+        k8s_creds: str,
+        namespace: str = "kube-system",
         reuse_cluster_uuid: str = None,
         reuse_cluster_uuid: str = None,
-    ) -> str:
-        """Initialize a Kubernetes environment
-
-        :param k8s_creds dict: A dictionary containing the Kubernetes cluster
-        configuration
-        :param namespace str: The Kubernetes namespace to initialize
-
-        :return: UUID of the k8s context or raises an exception
+    ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Juju bundles.
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+            '.kube/config'
+        :param namespace: optional namespace to be used for juju. By default,
+            'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some
+            software in the cluster
+            (on error, an exception will be raised)
         """
 
         """Bootstrapping
         """
 
         """Bootstrapping
@@ -86,10 +91,6 @@ class K8sJujuConnector(K8sConnector):
         Bootstrapping cannot be done, by design, through the API. We need to
         use the CLI tools.
         """
         Bootstrapping cannot be done, by design, through the API. We need to
         use the CLI tools.
         """
-        # TODO: The path may change
-        jujudir = "/snap/bin"
-
-        self.k8scli = "{}/juju".format(jujudir)
 
         """
         WIP: Workflow
 
         """
         WIP: Workflow
@@ -113,126 +114,120 @@ class K8sJujuConnector(K8sConnector):
         # TODO: Pull info from db based on the namespace #
         ##################################################
 
         # TODO: Pull info from db based on the namespace #
         ##################################################
 
-        if not reuse_cluster_uuid:
-            # This is a new cluster, so bootstrap it
-
-            cluster_uuid = str(uuid.uuid4())
-
-            # Add k8s cloud to Juju (unless it's microk8s)
-
-            # Does the kubeconfig contain microk8s?
-            microk8s = self.is_microk8s_by_credentials(k8s_creds)
-
-            if not microk8s:
-                # Name the new k8s cloud
-                k8s_cloud = "{}-k8s".format(namespace)
-
-                await self.add_k8s(k8s_cloud, k8s_creds)
-
-                # Bootstrap Juju controller
-                self.bootstrap(k8s_cloud, cluster_uuid)
-            else:
-                # k8s_cloud = 'microk8s-test'
-                k8s_cloud = "{}-k8s".format(namespace)
-
-                await self.add_k8s(k8s_cloud, k8s_creds)
-
-                await self.bootstrap(k8s_cloud, cluster_uuid)
-
-            # Get the controller information
-
-            # Parse ~/.local/share/juju/controllers.yaml
-            # controllers.testing.api-endpoints|ca-cert|uuid
-            with open(os.path.expanduser(
-                "~/.local/share/juju/controllers.yaml"
-            )) as f:
-                controllers = yaml.load(f, Loader=yaml.Loader)
-                controller = controllers['controllers'][cluster_uuid]
-                endpoints = controller['api-endpoints']
-                self.juju_endpoint = endpoints[0]
-                self.juju_ca_cert = controller['ca-cert']
-
-            # Parse ~/.local/share/juju/accounts
-            # controllers.testing.user|password
-            with open(os.path.expanduser(
-                "~/.local/share/juju/accounts.yaml"
-            )) as f:
-                controllers = yaml.load(f, Loader=yaml.Loader)
-                controller = controllers['controllers'][cluster_uuid]
-
-                self.juju_user = controller['user']
-                self.juju_secret = controller['password']
-
-            print("user: {}".format(self.juju_user))
-            print("secret: {}".format(self.juju_secret))
-            print("endpoint: {}".format(self.juju_endpoint))
-            print("ca-cert: {}".format(self.juju_ca_cert))
-
-            # raise Exception("EOL")
-
-            self.juju_public_key = None
-
-            config = {
-                'endpoint': self.juju_endpoint,
-                'username': self.juju_user,
-                'secret': self.juju_secret,
-                'cacert': self.juju_ca_cert,
-                'namespace': namespace,
-                'microk8s': microk8s,
-            }
-
-            # Store the cluster configuration so it
-            # can be used for subsequent calls
-            await self.set_config(cluster_uuid, config)
-
-        else:
-            # This is an existing cluster, so get its config
-            cluster_uuid = reuse_cluster_uuid
-
-            config = self.get_config(cluster_uuid)
-
-            self.juju_endpoint = config['endpoint']
-            self.juju_user = config['username']
-            self.juju_secret = config['secret']
-            self.juju_ca_cert = config['cacert']
-            self.juju_public_key = None
+        ###################################################
+        # TODO: Make it idempotent, calling add-k8s and   #
+        # bootstrap whenever reuse_cluster_uuid is passed #
+        # as parameter                                    #
+        # `init_env` is called to initialize the K8s      #
+        # cluster for juju. If this initialization fails, #
+        # it can be called again by LCM with the param    #
+        # reuse_cluster_uuid, e.g. to try to fix it.       #
+        ###################################################
+
+        # This is a new cluster, so bootstrap it
+
+        cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
+
+        # Is a local k8s cluster?
+        localk8s = self.is_local_k8s(k8s_creds)
+
+        # If the k8s is external, the juju controller needs a loadbalancer
+        loadbalancer = False if localk8s else True
+
+        # Name the new k8s cloud
+        k8s_cloud = "k8s-{}".format(cluster_uuid)
+
+        self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
+        await self.add_k8s(k8s_cloud, k8s_creds)
+
+        # Bootstrap Juju controller
+        self.log.debug("Bootstrapping...")
+        await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
+        self.log.debug("Bootstrap done.")
+
+        # Get the controller information
+
+        # Parse ~/.local/share/juju/controllers.yaml
+        # controllers.testing.api-endpoints|ca-cert|uuid
+        self.log.debug("Getting controller endpoints")
+        with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
+            controllers = yaml.load(f, Loader=yaml.Loader)
+            controller = controllers["controllers"][cluster_uuid]
+            endpoints = controller["api-endpoints"]
+            self.juju_endpoint = endpoints[0]
+            self.juju_ca_cert = controller["ca-cert"]
+
+        # Parse ~/.local/share/juju/accounts
+        # controllers.testing.user|password
+        self.log.debug("Getting accounts")
+        with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
+            controllers = yaml.load(f, Loader=yaml.Loader)
+            controller = controllers["controllers"][cluster_uuid]
+
+            self.juju_user = controller["user"]
+            self.juju_secret = controller["password"]
+
+        # raise Exception("EOL")
+
+        self.juju_public_key = None
+
+        config = {
+            "endpoint": self.juju_endpoint,
+            "username": self.juju_user,
+            "secret": self.juju_secret,
+            "cacert": self.juju_ca_cert,
+            "namespace": namespace,
+            "loadbalancer": loadbalancer,
+        }
+
+        # Store the cluster configuration so it
+        # can be used for subsequent calls
+        self.log.debug("Setting config")
+        await self.set_config(cluster_uuid, config)
 
         # Login to the k8s cluster
         if not self.authenticated:
 
         # Login to the k8s cluster
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
         # We're creating a new cluster
 
         # We're creating a new cluster
-        print("Getting model {}".format(self.get_namespace(cluster_uuid)))
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+        #    cluster_uuid=cluster_uuid))
+        # model = await self.get_model(
+        #    self.get_namespace(cluster_uuid),
+        #    cluster_uuid=cluster_uuid
+        # )
 
         # Disconnect from the model
 
         # Disconnect from the model
-        if model and model.is_connected():
-            await model.disconnect()
+        if model and model.is_connected():
+        #    await model.disconnect()
 
 
-        return cluster_uuid
+        return cluster_uuid, True
 
     """Repo Management"""
 
     """Repo Management"""
+
     async def repo_add(
     async def repo_add(
-        self,
-        name: str,
-        url: str,
-        type: str = "charm",
+        self, name: str, url: str, _type: str = "charm",
     ):
     ):
-        raise NotImplemented()
+        raise MethodNotImplemented()
 
     async def repo_list(self):
 
     async def repo_list(self):
-        raise NotImplemented()
+        raise MethodNotImplemented()
 
     async def repo_remove(
 
     async def repo_remove(
-        self,
-        name: str,
+        self, name: str,
     ):
     ):
-        raise NotImplemented()
+        raise MethodNotImplemented()
+
+    async def synchronize_repos(self, cluster_uuid: str, name: str):
+        """
+        Returns None as currently add_repo is not implemented
+        """
+        return None
 
     """Reset"""
 
     """Reset"""
+
     async def reset(
     async def reset(
-        self,
-        cluster_uuid: str,
+        self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
     ) -> bool:
         """Reset a cluster
 
     ) -> bool:
         """Reset a cluster
 
@@ -244,50 +239,45 @@ class K8sJujuConnector(K8sConnector):
 
         try:
             if not self.authenticated:
 
         try:
             if not self.authenticated:
-                await self.login()
+                await self.login(cluster_uuid)
 
             if self.controller.is_connected():
                 # Destroy the model
                 namespace = self.get_namespace(cluster_uuid)
                 if await self.has_model(namespace):
 
             if self.controller.is_connected():
                 # Destroy the model
                 namespace = self.get_namespace(cluster_uuid)
                 if await self.has_model(namespace):
-                    print("[reset] Destroying model")
-                    await self.controller.destroy_model(
-                        namespace,
-                        destroy_storage=True
-                    )
+                    self.log.debug("[reset] Destroying model")
+                    await self.controller.destroy_model(namespace, destroy_storage=True)
 
                 # Disconnect from the controller
 
                 # Disconnect from the controller
-                print("[reset] Disconnecting controller")
-                await self.controller.disconnect()
+                self.log.debug("[reset] Disconnecting controller")
+                await self.logout()
 
                 # Destroy the controller (via CLI)
 
                 # Destroy the controller (via CLI)
-                print("[reset] Destroying controller")
+                self.log.debug("[reset] Destroying controller")
                 await self.destroy_controller(cluster_uuid)
 
                 await self.destroy_controller(cluster_uuid)
 
-                """Remove the k8s cloud
-
-                Only remove the k8s cloud if it's not a microk8s cloud,
-                since microk8s is a built-in cloud type.
-                """
-                # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
-                # if not microk8s:
-                print("[reset] Removing k8s cloud")
-                namespace = self.get_namespace(cluster_uuid)
-                k8s_cloud = "{}-k8s".format(namespace)
+                self.log.debug("[reset] Removing k8s cloud")
+                k8s_cloud = "k8s-{}".format(cluster_uuid)
                 await self.remove_cloud(k8s_cloud)
 
         except Exception as ex:
                 await self.remove_cloud(k8s_cloud)
 
         except Exception as ex:
-            print("Caught exception during reset: {}".format(ex))
+            self.log.debug("Caught exception during reset: {}".format(ex))
+
+        return True
 
     """Deployment"""
 
     """Deployment"""
+
     async def install(
         self,
         cluster_uuid: str,
         kdu_model: str,
         atomic: bool = True,
     async def install(
         self,
         cluster_uuid: str,
         kdu_model: str,
         atomic: bool = True,
-        timeout: int = None,
+        timeout: float = 300,
         params: dict = None,
         params: dict = None,
-    ) -> str:
+        db_dict: dict = None,
+        kdu_name: str = None,
+        namespace: str = None,
+    ) -> bool:
         """Install a bundle
 
         :param cluster_uuid str: The UUID of the cluster to install to
         """Install a bundle
 
         :param cluster_uuid str: The UUID of the cluster to install to
@@ -297,69 +287,106 @@ class K8sJujuConnector(K8sConnector):
         :param timeout int: The time, in seconds, to wait for the install
                             to finish
         :param params dict: Key-value pairs of instantiation parameters
         :param timeout int: The time, in seconds, to wait for the install
                             to finish
         :param params dict: Key-value pairs of instantiation parameters
+        :param kdu_name: Name of the KDU instance to be installed
+        :param namespace: K8s namespace to use for the KDU instance
 
         :return: If successful, returns ?
         """
 
         if not self.authenticated:
 
         :return: If successful, returns ?
         """
 
         if not self.authenticated:
-            print("[install] Logging in to the controller")
-            await self.login()
+            self.log.debug("[install] Logging in to the controller")
+            await self.login(cluster_uuid)
 
         ##
 
         ##
-        # Get or create the model, based on the namespace the cluster was
-        # instantiated with.
-        namespace = self.get_namespace(cluster_uuid)
-        model = await self.get_model(namespace)
-        if not model:
-            # Create the new model
-            model = await self.add_model(namespace)
+        # Get or create the model, based on the NS
+        # uuid.
+        if kdu_name:
+            kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+        else:
+            kdu_instance = db_dict["filter"]["_id"]
+
+        self.log.debug("Checking for model named {}".format(kdu_instance))
+
+        # Create the new model
+        self.log.debug("Adding model: {}".format(kdu_instance))
+        model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
 
         if model:
             # TODO: Instantiation parameters
 
 
         if model:
             # TODO: Instantiation parameters
 
-            print("[install] deploying {}".format(kdu_model))
-            await model.deploy(kdu_model)
+            """
+            "Juju bundle that models the KDU, in any of the following ways:
+                - <juju-repo>/<juju-bundle>
+                - <juju-bundle folder under k8s_models folder in the package>
+                - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+                    in the package>
+                - <URL_where_to_fetch_juju_bundle>
+            """
+            try:
+                previous_workdir = os.getcwd()
+            except FileNotFoundError:
+                previous_workdir = "/app/storage"
+
+            bundle = kdu_model
+            if kdu_model.startswith("cs:"):
+                bundle = kdu_model
+            elif kdu_model.startswith("http"):
+                # Download the file
+                pass
+            else:
+                new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
+
+                os.chdir(new_workdir)
+
+                bundle = "local:{}".format(kdu_model)
+
+            if not bundle:
+                # Raise named exception that the bundle could not be found
+                raise Exception()
+
+            self.log.debug("[install] deploying {}".format(bundle))
+            await model.deploy(bundle)
 
             # Get the application
             if atomic:
                 # applications = model.applications
 
             # Get the application
             if atomic:
                 # applications = model.applications
-                print("[install] Applications: {}".format(model.applications))
+                self.log.debug("[install] Applications: {}".format(model.applications))
                 for name in model.applications:
                 for name in model.applications:
-                    print("[install] Waiting for {} to settle".format(name))
+                    self.log.debug("[install] Waiting for {} to settle".format(name))
                     application = model.applications[name]
                     try:
                         # It's not enough to wait for all units to be active;
                         # the application status needs to be active as well.
                     application = model.applications[name]
                     try:
                         # It's not enough to wait for all units to be active;
                         # the application status needs to be active as well.
-                        print("Waiting for all units to be active...")
+                        self.log.debug("Waiting for all units to be active...")
                         await model.block_until(
                             lambda: all(
                         await model.block_until(
                             lambda: all(
-                                unit.agent_status == 'idle'
-                                and application.status in ['active', 'unknown']
-                                and unit.workload_status in [
-                                    'active', 'unknown'
-                                ] for unit in application.units
+                                unit.agent_status == "idle"
+                                and application.status in ["active", "unknown"]
+                                and unit.workload_status in ["active", "unknown"]
+                                for unit in application.units
                             ),
                             ),
-                            timeout=timeout
+                            timeout=timeout,
                         )
                         )
-                        print("All units active.")
+                        self.log.debug("All units active.")
 
 
+                    # TODO use asyncio.TimeoutError
                     except concurrent.futures._base.TimeoutError:
                     except concurrent.futures._base.TimeoutError:
-                        print("[install] Timeout exceeded; resetting cluster")
+                        os.chdir(previous_workdir)
+                        self.log.debug("[install] Timeout exceeded; resetting cluster")
                         await self.reset(cluster_uuid)
                         return False
 
             # Wait for the application to be active
             if model.is_connected():
                         await self.reset(cluster_uuid)
                         return False
 
             # Wait for the application to be active
             if model.is_connected():
-                print("[install] Disconnecting model")
+                self.log.debug("[install] Disconnecting model")
                 await model.disconnect()
 
                 await model.disconnect()
 
-            return True
+            os.chdir(previous_workdir)
+
+            return kdu_instance
         raise Exception("Unable to install")
 
         raise Exception("Unable to install")
 
-    async def instances_list(
-            self,
-            cluster_uuid: str
-    ) -> list:
+    async def instances_list(self, cluster_uuid: str) -> list:
         """
         returns a list of deployed releases in a cluster
 
         """
         returns a list of deployed releases in a cluster
 
@@ -400,10 +427,10 @@ class K8sJujuConnector(K8sConnector):
         initial release.
         """
         namespace = self.get_namespace(cluster_uuid)
         initial release.
         """
         namespace = self.get_namespace(cluster_uuid)
-        model = await self.get_model(namespace)
+        model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
 
 
-        with open(kdu_model, 'r') as f:
-            bundle = yaml.load(f, Loader=yaml.FullLoader)
+        with open(kdu_model, "r") as f:
+            bundle = yaml.safe_load(f)
 
             """
             {
 
             """
             {
@@ -424,31 +451,29 @@ class K8sJujuConnector(K8sConnector):
             }
             """
             # TODO: This should be returned in an agreed-upon format
             }
             """
             # TODO: This should be returned in an agreed-upon format
-            for name in bundle['applications']:
-                print(model.applications)
+            for name in bundle["applications"]:
+                self.log.debug(model.applications)
                 application = model.applications[name]
                 application = model.applications[name]
-                print(application)
+                self.log.debug(application)
 
 
-                path = bundle['applications'][name]['charm']
+                path = bundle["applications"][name]["charm"]
 
                 try:
                     await application.upgrade_charm(switch=path)
                 except juju.errors.JujuError as ex:
 
                 try:
                     await application.upgrade_charm(switch=path)
                 except juju.errors.JujuError as ex:
-                    if 'already running charm' in str(ex):
+                    if "already running charm" in str(ex):
                         # We're already running this version
                         pass
 
         await model.disconnect()
 
         return True
                         # We're already running this version
                         pass
 
         await model.disconnect()
 
         return True
-        raise NotImplemented()
+        raise MethodNotImplemented()
 
     """Rollback"""
 
     """Rollback"""
+
     async def rollback(
     async def rollback(
-        self,
-        cluster_uuid: str,
-        kdu_instance: str,
-        revision: int = 0,
+        self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
     ) -> str:
         """Rollback a model
 
     ) -> str:
         """Rollback a model
 
@@ -460,47 +485,109 @@ class K8sJujuConnector(K8sConnector):
         :return: If successful, returns the revision of active KDU instance,
                  or raises an exception
         """
         :return: If successful, returns the revision of active KDU instance,
                  or raises an exception
         """
-        raise NotImplemented()
+        raise MethodNotImplemented()
 
     """Deletion"""
 
     """Deletion"""
-    async def uninstall(
-        self,
-        cluster_uuid: str,
-        kdu_instance: str,
-    ) -> bool:
+
+    async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
         """Uninstall a KDU instance
 
         """Uninstall a KDU instance
 
-        :param cluster_uuid str: The UUID of the cluster to uninstall
+        :param cluster_uuid str: The UUID of the cluster
         :param kdu_instance str: The unique name of the KDU instance
 
         :return: Returns True if successful, or raises an exception
         """
         :param kdu_instance str: The unique name of the KDU instance
 
         :return: Returns True if successful, or raises an exception
         """
-        removed = False
+        if not self.authenticated:
+            self.log.debug("[uninstall] Connecting to controller")
+            await self.login(cluster_uuid)
 
 
-        # Remove an application from the model
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        self.log.debug("[uninstall] Destroying model")
 
 
-        if model:
-            # Get the application
-            if kdu_instance not in model.applications:
-                # TODO: Raise a named exception
-                raise Exception("Application not found.")
+        await self.controller.destroy_models(kdu_instance)
+
+        self.log.debug("[uninstall] Model destroyed and disconnecting")
+        await self.logout()
 
 
-            application = model.applications[kdu_instance]
+        return True
 
 
-            # Destroy the application
-            await application.destroy()
+    async def exec_primitive(
+        self,
+        cluster_uuid: str = None,
+        kdu_instance: str = None,
+        primitive_name: str = None,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+    ) -> str:
+        """Exec primitive (Juju action)
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param kdu_instance str: The unique name of the KDU instance
+        :param primitive_name: Name of action that will be executed
+        :param timeout: Timeout for action execution
+        :param params: Dictionary of all the parameters needed for the action
+        :db_dict: Dictionary for any additional data
 
 
-            # TODO: Verify removal
+        :return: Returns the output of the action
+        """
+        if not self.authenticated:
+            self.log.debug("[exec_primitive] Connecting to controller")
+            await self.login(cluster_uuid)
+
+        if not params or "application-name" not in params:
+            raise K8sException(
+                "Missing application-name argument, \
+                                argument needed for K8s actions"
+            )
+        try:
+            self.log.debug(
+                "[exec_primitive] Getting model "
+                "kdu_instance: {}".format(kdu_instance)
+            )
+
+            model = await self.get_model(kdu_instance, cluster_uuid)
+
+            application_name = params["application-name"]
+            application = model.applications[application_name]
+
+            actions = await application.get_actions()
+            if primitive_name not in actions:
+                raise K8sException("Primitive {} not found".format(primitive_name))
+
+            unit = None
+            for u in application.units:
+                if await u.is_leader_from_status():
+                    unit = u
+                    break
 
 
-            removed = True
-        return removed
+            if unit is None:
+                raise K8sException("No leader unit found to execute action")
+
+            self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+            action = await unit.run_action(primitive_name, **params)
+
+            output = await model.get_action_output(action_uuid=action.entity_id)
+            status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+            status = (
+                status[action.entity_id] if action.entity_id in status else "failed"
+            )
+
+            if status != "completed":
+                raise K8sException(
+                    "status is not completed: {} output: {}".format(status, output)
+                )
+
+            return output
+
+        except Exception as e:
+            error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
+            self.log.error(error_msg)
+            raise K8sException(message=error_msg)
 
     """Introspection"""
 
     """Introspection"""
-    async def inspect_kdu(
-        self,
-        kdu_model: str,
-    ) -> dict:
+
+    async def inspect_kdu(self, kdu_model: str,) -> dict:
         """Inspect a KDU
 
         Inspects a bundle and returns a dictionary of config parameters and
         """Inspect a KDU
 
         Inspects a bundle and returns a dictionary of config parameters and
@@ -513,8 +600,8 @@ class K8sJujuConnector(K8sConnector):
         """
 
         kdu = {}
         """
 
         kdu = {}
-        with open(kdu_model, 'r') as f:
-            bundle = yaml.load(f, Loader=yaml.FullLoader)
+        with open(kdu_model, "r") as f:
+            bundle = yaml.safe_load(f)
 
             """
             {
 
             """
             {
@@ -535,14 +622,11 @@ class K8sJujuConnector(K8sConnector):
             }
             """
             # TODO: This should be returned in an agreed-upon format
             }
             """
             # TODO: This should be returned in an agreed-upon format
-            kdu = bundle['applications']
+            kdu = bundle["applications"]
 
         return kdu
 
 
         return kdu
 
-    async def help_kdu(
-        self,
-        kdu_model: str,
-    ) -> str:
+    async def help_kdu(self, kdu_model: str,) -> str:
         """View the README
 
         If available, returns the README of the bundle.
         """View the README
 
         If available, returns the README of the bundle.
@@ -553,21 +637,17 @@ class K8sJujuConnector(K8sConnector):
         """
         readme = None
 
         """
         readme = None
 
-        files = ['README', 'README.txt', 'README.md']
+        files = ["README", "README.txt", "README.md"]
         path = os.path.dirname(kdu_model)
         for file in os.listdir(path):
             if file in files:
         path = os.path.dirname(kdu_model)
         for file in os.listdir(path):
             if file in files:
-                with open(file, 'r') as f:
+                with open(file, "r") as f:
                     readme = f.read()
                     break
 
         return readme
 
                     readme = f.read()
                     break
 
         return readme
 
-    async def status_kdu(
-        self,
-        cluster_uuid: str,
-        kdu_instance: str,
-    ) -> dict:
+    async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
         """Get the status of the KDU
 
         Get the current status of the KDU instance.
         """Get the status of the KDU
 
         Get the current status of the KDU instance.
@@ -580,7 +660,9 @@ class K8sJujuConnector(K8sConnector):
         """
         status = {}
 
         """
         status = {}
 
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        model = await self.get_model(
+            self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
+        )
 
         # model = await self.get_model_by_uuid(cluster_uuid)
         if model:
 
         # model = await self.get_model_by_uuid(cluster_uuid)
         if model:
@@ -589,21 +671,58 @@ class K8sJujuConnector(K8sConnector):
 
             for name in model_status.applications:
                 application = model_status.applications[name]
 
             for name in model_status.applications:
                 application = model_status.applications[name]
-                status[name] = {
-                    'status': application['status']['status']
-                }
+                status[name] = {"status": application["status"]["status"]}
 
             if model.is_connected():
                 await model.disconnect()
 
         return status
 
 
             if model.is_connected():
                 await model.disconnect()
 
         return status
 
+    async def get_services(
+        self, cluster_uuid: str, kdu_instance: str, namespace: str
+    ) -> list:
+        """Return a list of services of a kdu_instance"""
+
+        credentials = self.get_credentials(cluster_uuid=cluster_uuid)
+
+        config_path = "/tmp/{}".format(cluster_uuid)
+        config_file = "{}/config".format(config_path)
+
+        if not os.path.exists(config_path):
+            os.makedirs(config_path)
+        with open(config_file, "w") as f:
+            f.write(credentials)
+
+        kubectl = Kubectl(config_file=config_file)
+        return kubectl.get_services(
+            field_selector="metadata.namespace={}".format(kdu_instance)
+        )
+
+    async def get_service(
+        self, cluster_uuid: str, service_name: str, namespace: str
+    ) -> object:
+        """Return data for a specific service inside a namespace"""
+
+        credentials = self.get_credentials(cluster_uuid=cluster_uuid)
+
+        config_path = "/tmp/{}".format(cluster_uuid)
+        config_file = "{}/config".format(config_path)
+
+        if not os.path.exists(config_path):
+            os.makedirs(config_path)
+        with open(config_file, "w") as f:
+            f.write(credentials)
+
+        kubectl = Kubectl(config_file=config_file)
+
+        return kubectl.get_services(
+            field_selector="metadata.name={},metadata.namespace={}".format(
+                service_name, namespace
+            )
+        )[0]
+
     # Private methods
     # Private methods
-    async def add_k8s(
-        self,
-        cloud_name: str,
-        credentials: dict,
-    ) -> bool:
+    async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
         """Add a k8s cloud to Juju
 
         Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
         """Add a k8s cloud to Juju
 
         Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
@@ -615,25 +734,34 @@ class K8sJujuConnector(K8sConnector):
 
         :returns: True if successful, otherwise raises an exception.
         """
 
         :returns: True if successful, otherwise raises an exception.
         """
-        cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
 
 
-        p = subprocess.run(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            input=yaml.dump(credentials, Dumper=yaml.Dumper),
-            encoding='ascii'
+        cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
+        self.log.debug(cmd)
+
+        process = await asyncio.create_subprocess_exec(
+            *cmd,
+            stdout=asyncio.subprocess.PIPE,
+            stderr=asyncio.subprocess.PIPE,
+            stdin=asyncio.subprocess.PIPE,
         )
         )
-        retcode = p.returncode
 
 
-        if retcode > 0:
-            raise Exception(p.stderr)
+        # Feed the process the credentials
+        process.stdin.write(credentials.encode("utf-8"))
+        await process.stdin.drain()
+        process.stdin.close()
+
+        _stdout, stderr = await process.communicate()
+
+        return_code = process.returncode
+
+        self.log.debug("add-k8s return code: {}".format(return_code))
+
+        if return_code > 0:
+            raise Exception(stderr)
+
         return True
 
         return True
 
-    async def add_model(
-        self,
-        model_name: str
-    ) -> juju.model.Model:
+    async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
         """Adds a model to the controller
 
         Adds a new model to the Juju controller
         """Adds a model to the controller
 
         Adds a new model to the Juju controller
@@ -643,18 +771,28 @@ class K8sJujuConnector(K8sConnector):
                   raises an exception.
         """
         if not self.authenticated:
                   raises an exception.
         """
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
 
-        model = await self.controller.add_model(
-            model_name,
-            config={'authorized-keys': self.juju_public_key}
+        self.log.debug(
+            "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
         )
         )
+        model = None
+        try:
+            if self.juju_public_key is not None:
+                model = await self.controller.add_model(
+                    model_name, config={"authorized-keys": self.juju_public_key}
+                )
+            else:
+                model = await self.controller.add_model(model_name)
+        except Exception as ex:
+            self.log.debug(ex)
+            self.log.debug("Caught exception: {}".format(ex))
+            pass
+
         return model
 
     async def bootstrap(
         return model
 
     async def bootstrap(
-        self,
-        cloud_name: str,
-        cluster_uuid: str
+        self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
     ) -> bool:
         """Bootstrap a Kubernetes controller
 
     ) -> bool:
         """Bootstrap a Kubernetes controller
 
@@ -662,32 +800,46 @@ class K8sJujuConnector(K8sConnector):
 
         :param cloud_name str: The name of the cloud.
         :param cluster_uuid str: The UUID of the cluster to bootstrap.
 
         :param cloud_name str: The name of the cloud.
         :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :param loadbalancer bool: If the controller should use loadbalancer or not.
         :returns: True upon success or raises an exception.
         """
         :returns: True upon success or raises an exception.
         """
-        cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
-        print("Bootstrapping controller {} in cloud {}".format(
-            cluster_uuid, cloud_name
-        ))
 
 
-        p = subprocess.run(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            encoding='ascii'
+        if not loadbalancer:
+            cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
+        else:
+            """
+            For public clusters, specify that the controller service is using a
+            LoadBalancer.
+            """
+            cmd = [
+                self.juju_command,
+                "bootstrap",
+                cloud_name,
+                cluster_uuid,
+                "--config",
+                "controller-service-type=loadbalancer",
+            ]
+
+        self.log.debug(
+            "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+        )
+
+        process = await asyncio.create_subprocess_exec(
+            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
         )
         )
-        retcode = p.returncode
 
 
-        if retcode > 0:
+        _stdout, stderr = await process.communicate()
+
+        return_code = process.returncode
+
+        if return_code > 0:
             #
             #
-            if 'already exists' not in p.stderr:
-                raise Exception(p.stderr)
+            if b"already exists" not in stderr:
+                raise Exception(stderr)
 
         return True
 
 
         return True
 
-    async def destroy_controller(
-        self,
-        cluster_uuid: str
-    ) -> bool:
+    async def destroy_controller(self, cluster_uuid: str) -> bool:
         """Destroy a Kubernetes controller
 
         Destroy an existing Kubernetes controller.
         """Destroy a Kubernetes controller
 
         Destroy an existing Kubernetes controller.
@@ -696,31 +848,46 @@ class K8sJujuConnector(K8sConnector):
         :returns: True upon success or raises an exception.
         """
         cmd = [
         :returns: True upon success or raises an exception.
         """
         cmd = [
-            self.k8scli,
+            self.juju_command,
             "destroy-controller",
             "--destroy-all-models",
             "--destroy-storage",
             "-y",
             "destroy-controller",
             "--destroy-all-models",
             "--destroy-storage",
             "-y",
-            cluster_uuid
+            cluster_uuid,
         ]
 
         ]
 
-        p = subprocess.run(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            encoding='ascii'
+        process = await asyncio.create_subprocess_exec(
+            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
         )
         )
-        retcode = p.returncode
 
 
-        if retcode > 0:
+        _stdout, stderr = await process.communicate()
+
+        return_code = process.returncode
+
+        if return_code > 0:
             #
             #
-            if 'already exists' not in p.stderr:
-                raise Exception(p.stderr)
+            if "already exists" not in stderr:
+                raise Exception(stderr)
 
 
-    def get_config(
-        self,
-        cluster_uuid: str,
-    ) -> dict:
+    def get_credentials(self, cluster_uuid: str) -> str:
+        """
+        Get Cluster Kubeconfig
+        """
+        k8scluster = self.db.get_one(
+            "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
+        )
+
+        self.db.encrypt_decrypt_fields(
+            k8scluster.get("credentials"),
+            "decrypt",
+            ["password", "secret"],
+            schema_version=k8scluster["schema_version"],
+            salt=k8scluster["_id"],
+        )
+
+        return yaml.safe_dump(k8scluster.get("credentials"))
+
+    def get_config(self, cluster_uuid: str,) -> dict:
         """Get the cluster configuration
 
         Gets the configuration of the cluster
         """Get the cluster configuration
 
         Gets the configuration of the cluster
@@ -730,20 +897,15 @@ class K8sJujuConnector(K8sConnector):
         """
         cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
         if os.path.exists(cluster_config):
         """
         cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
         if os.path.exists(cluster_config):
-            with open(cluster_config, 'r') as f:
-                config = yaml.load(f.read(), Loader=yaml.FullLoader)
+            with open(cluster_config, "r") as f:
+                config = yaml.safe_load(f.read())
                 return config
         else:
             raise Exception(
                 return config
         else:
             raise Exception(
-                "Unable to locate configuration for cluster {}".format(
-                    cluster_uuid
-                )
+                "Unable to locate configuration for cluster {}".format(cluster_uuid)
             )
 
             )
 
-    async def get_model(
-        self,
-        model_name: str,
-    ) -> juju.model.Model:
+    async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
         """Get a model from the Juju Controller.
 
         Note: Model objects returned must call disconnected() before it goes
         """Get a model from the Juju Controller.
 
         Note: Model objects returned must call disconnected() before it goes
@@ -753,21 +915,16 @@ class K8sJujuConnector(K8sConnector):
         :return The juju.model.Model object if found, or None.
         """
         if not self.authenticated:
         :return The juju.model.Model object if found, or None.
         """
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
         model = None
         models = await self.controller.list_models()
 
         model = None
         models = await self.controller.list_models()
-
         if model_name in models:
         if model_name in models:
-            model = await self.controller.get_model(
-                model_name
-            )
+            self.log.debug("Found model: {}".format(model_name))
+            model = await self.controller.get_model(model_name)
         return model
 
         return model
 
-    def get_namespace(
-        self,
-        cluster_uuid: str,
-    ) -> str:
+    def get_namespace(self, cluster_uuid: str,) -> str:
         """Get the namespace UUID
         Gets the namespace's unique name
 
         """Get the namespace UUID
         Gets the namespace's unique name
 
@@ -777,18 +934,15 @@ class K8sJujuConnector(K8sConnector):
         config = self.get_config(cluster_uuid)
 
         # Make sure the name is in the config
         config = self.get_config(cluster_uuid)
 
         # Make sure the name is in the config
-        if 'namespace' not in config:
+        if "namespace" not in config:
             raise Exception("Namespace not found.")
 
         # TODO: We want to make sure this is unique to the cluster, in case
         # the cluster is being reused.
         # Consider pre/appending the cluster id to the namespace string
             raise Exception("Namespace not found.")
 
         # TODO: We want to make sure this is unique to the cluster, in case
         # the cluster is being reused.
         # Consider pre/appending the cluster id to the namespace string
-        return config['namespace']
+        return config["namespace"]
 
 
-    async def has_model(
-        self,
-        model_name: str
-    ) -> bool:
+    async def has_model(self, model_name: str) -> bool:
         """Check if a model exists in the controller
 
         Checks to see if a model exists in the connected Juju controller.
         """Check if a model exists in the controller
 
         Checks to see if a model exists in the connected Juju controller.
@@ -802,38 +956,26 @@ class K8sJujuConnector(K8sConnector):
             return True
         return False
 
             return True
         return False
 
-    def is_microk8s_by_cluster_uuid(
-        self,
-        cluster_uuid: str,
-    ) -> bool:
-        """Check if a cluster is micro8s
+    def is_local_k8s(self, credentials: str,) -> bool:
+        """Check if a cluster is local
 
 
-        Checks if a cluster is running microk8s
+        Checks if a cluster is running in the local host
 
 
-        :param cluster_uuid str: The UUID of the cluster
-        :returns: A boolean if the cluster is running microk8s
+        :param credentials dict: A dictionary containing the k8s credentials
+        :returns: A boolean if the cluster is running locally
         """
         """
-        config = self.get_config(cluster_uuid)
-        return config['microk8s']
 
 
-    def is_microk8s_by_credentials(
-        self,
-        credentials: dict,
-    ) -> bool:
-        """Check if a cluster is micro8s
+        creds = yaml.safe_load(credentials)
 
 
-        Checks if a cluster is running microk8s
-
-        :param credentials dict: A dictionary containing the k8s credentials
-        :returns: A boolean if the cluster is running microk8s
-        """
-        for context in credentials['contexts']:
-            if 'microk8s' in context['name']:
-                return True
+        if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
+            for cluster in creds["clusters"]:
+                if "server" in cluster["cluster"]:
+                    if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
+                        return True
 
         return False
 
 
         return False
 
-    async def login(self):
+    async def login(self, cluster_uuid):
         """Login to the Juju controller."""
 
         if self.authenticated:
         """Login to the Juju controller."""
 
         if self.authenticated:
@@ -841,14 +983,21 @@ class K8sJujuConnector(K8sConnector):
 
         self.connecting = True
 
 
         self.connecting = True
 
+        # Test: Make sure we have the credentials loaded
+        config = self.get_config(cluster_uuid)
+
+        self.juju_endpoint = config["endpoint"]
+        self.juju_user = config["username"]
+        self.juju_secret = config["secret"]
+        self.juju_ca_cert = config["cacert"]
+        self.juju_public_key = None
+
         self.controller = Controller()
 
         if self.juju_secret:
             self.log.debug(
                 "Connecting to controller... ws://{} as {}/{}".format(
         self.controller = Controller()
 
         if self.juju_secret:
             self.log.debug(
                 "Connecting to controller... ws://{} as {}/{}".format(
-                    self.juju_endpoint,
-                    self.juju_user,
-                    self.juju_secret,
+                    self.juju_endpoint, self.juju_user, self.juju_secret,
                 )
             )
             try:
                 )
             )
             try:
@@ -861,7 +1010,7 @@ class K8sJujuConnector(K8sConnector):
                 self.authenticated = True
                 self.log.debug("JujuApi: Logged into controller")
             except Exception as ex:
                 self.authenticated = True
                 self.log.debug("JujuApi: Logged into controller")
             except Exception as ex:
-                print(ex)
+                self.log.debug(ex)
                 self.log.debug("Caught exception: {}".format(ex))
                 pass
         else:
                 self.log.debug("Caught exception: {}".format(ex))
                 pass
         else:
@@ -870,27 +1019,22 @@ class K8sJujuConnector(K8sConnector):
 
     async def logout(self):
         """Logout of the Juju controller."""
 
     async def logout(self):
         """Logout of the Juju controller."""
-        print("[logout]")
+        self.log.debug("[logout]")
         if not self.authenticated:
             return False
 
         for model in self.models:
         if not self.authenticated:
             return False
 
         for model in self.models:
-            print("Logging out of model {}".format(model))
+            self.log.debug("Logging out of model {}".format(model))
             await self.models[model].disconnect()
 
         if self.controller:
             await self.models[model].disconnect()
 
         if self.controller:
-            self.log.debug("Disconnecting controller {}".format(
-                self.controller
-            ))
+            self.log.debug("Disconnecting controller {}".format(self.controller))
             await self.controller.disconnect()
             self.controller = None
 
         self.authenticated = False
 
             await self.controller.disconnect()
             self.controller = None
 
         self.authenticated = False
 
-    async def remove_cloud(
-        self,
-        cloud_name: str,
-    ) -> bool:
+    async def remove_cloud(self, cloud_name: str,) -> bool:
         """Remove a k8s cloud from Juju
 
         Removes a Kubernetes cloud from Juju.
         """Remove a k8s cloud from Juju
 
         Removes a Kubernetes cloud from Juju.
@@ -901,39 +1045,34 @@ class K8sJujuConnector(K8sConnector):
         """
 
         # Remove the bootstrapped controller
         """
 
         # Remove the bootstrapped controller
-        cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
-        p = subprocess.run(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            encoding='ascii'
+        cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
+        process = await asyncio.create_subprocess_exec(
+            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
         )
         )
-        retcode = p.returncode
 
 
-        if retcode > 0:
-            raise Exception(p.stderr)
+        _stdout, stderr = await process.communicate()
+
+        return_code = process.returncode
+
+        if return_code > 0:
+            raise Exception(stderr)
 
         # Remove the cloud from the local config
 
         # Remove the cloud from the local config
-        cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
-        p = subprocess.run(
-            cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            encoding='ascii'
+        cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
+        process = await asyncio.create_subprocess_exec(
+            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
         )
         )
-        retcode = p.returncode
 
 
-        if retcode > 0:
-            raise Exception(p.stderr)
+        _stdout, stderr = await process.communicate()
+
+        return_code = process.returncode
 
 
+        if return_code > 0:
+            raise Exception(stderr)
 
         return True
 
 
         return True
 
-    async def set_config(
-        self,
-        cluster_uuid: str,
-        config: dict,
-    ) -> bool:
+    async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
         """Save the cluster configuration
 
         Saves the cluster information to the file store
         """Save the cluster configuration
 
         Saves the cluster information to the file store
@@ -945,8 +1084,8 @@ class K8sJujuConnector(K8sConnector):
 
         cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
         if not os.path.exists(cluster_config):
 
         cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
         if not os.path.exists(cluster_config):
-            print("Writing config to {}".format(cluster_config))
-            with open(cluster_config, 'w') as f:
+            self.log.debug("Writing config to {}".format(cluster_config))
+            with open(cluster_config, "w") as f:
                 f.write(yaml.dump(config, Dumper=yaml.Dumper))
 
         return True
                 f.write(yaml.dump(config, Dumper=yaml.Dumper))
 
         return True