K8s Juju connector 55/8155/2
authorAdam Israel <adam.israel@canonical.com>
Thu, 7 Nov 2019 14:46:59 +0000 (09:46 -0500)
committerAdam Israel <adam.israel@canonical.com>
Tue, 12 Nov 2019 04:45:09 +0000 (20:45 -0800)
Juju connector for Kubernetes

Change-Id: I2a020aa55840dd7e76391d7ad751be7c56db5eeb
Signed-off-by: Adam Israel <adam.israel@canonical.com>
Jenkinsfile
n2vc/exceptions.py
n2vc/k8s_juju_conn.py [new file with mode: 0644]
tests/bundles/k8s-zookeeper-downgrade.yaml [new file with mode: 0644]
tests/bundles/k8s-zookeeper-upgrade.yaml [new file with mode: 0644]
tests/bundles/k8s-zookeeper.yaml [new file with mode: 0644]
tests/test_k8s_juju_conn.py [new file with mode: 0644]

index ed9e879..e384cbd 100644 (file)
@@ -1,3 +1,18 @@
+/*
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
 properties([
     parameters([
         string(defaultValue: env.BRANCH_NAME, description: '', name: 'GERRIT_BRANCH'),
index f5c9fb0..4e44f95 100644 (file)
@@ -38,3 +38,11 @@ class NoRouteToHost(Exception):
 
 class AuthenticationFailed(Exception):
     """The authentication for the specified user failed."""
+
+
+class InvalidCACertificate(Exception):
+    """The CA Certificate is not valid."""
+
+
+class NotImplemented(Exception):
+    """The method is not implemented."""
diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py
new file mode 100644 (file)
index 0000000..4f62898
--- /dev/null
@@ -0,0 +1,952 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import concurrent
+from .exceptions import NotImplemented
+
+import juju
+# from juju.bundle import BundleHandler
+from juju.controller import Controller
+from juju.model import Model
+from juju.errors import JujuAPIError, JujuError
+
+import logging
+
+from n2vc.k8s_conn import K8sConnector
+
+import os
+# import re
+# import ssl
+import subprocess
+# from .vnf import N2VC
+
+import uuid
+import yaml
+
+
+class K8sJujuConnector(K8sConnector):
+    def __init__(
+            self,
+            fs,
+            kubectl_command='/usr/bin/kubectl',
+            log=None
+    ):
+        """
+
+        :param kubectl_command: path to kubectl executable
+        :param helm_command: path to helm executable
+        :param fs: file system for kubernetes and helm configuration
+        :param log: logger
+        """
+
+        # parent class
+        K8sConnector.__init__(
+            self,
+            kubectl_command=kubectl_command,
+            fs=fs,
+            log=log,
+        )
+
+        self.info('Initializing K8S Juju connector')
+
+        self.authenticated = False
+        self.models = {}
+        self.log = logging.getLogger(__name__)
+        self.info('K8S Juju connector initialized')
+
+    """Initialization"""
+    async def init_env(
+        self,
+        k8s_creds: dict,
+        namespace: str = 'kube-system',
+        reuse_cluster_uuid: str = None,
+    ) -> str:
+        """Initialize a Kubernetes environment
+
+        :param k8s_creds dict: A dictionary containing the Kubernetes cluster
+        configuration
+        :param namespace str: The Kubernetes namespace to initialize
+
+        :return: UUID of the k8s context or raises an exception
+        """
+
+        """Bootstrapping
+
+        Bootstrapping cannot be done, by design, through the API. We need to
+        use the CLI tools.
+        """
+        # TODO: The path may change
+        jujudir = "/snap/bin"
+
+        self.k8scli = "{}/juju".format(jujudir)
+
+        """
+        WIP: Workflow
+
+        1. Has the environment already been bootstrapped?
+        - Check the database to see if we have a record for this env
+
+        2. If this is a new env, create it
+        - Add the k8s cloud to Juju
+        - Bootstrap
+        - Record it in the database
+
+        3. Connect to the Juju controller for this cloud
+
+        """
+        # cluster_uuid = reuse_cluster_uuid
+        # if not cluster_uuid:
+        #     cluster_uuid = str(uuid4())
+
+        ##################################################
+        # TODO: Pull info from db based on the namespace #
+        ##################################################
+
+        if not reuse_cluster_uuid:
+            # This is a new cluster, so bootstrap it
+
+            cluster_uuid = str(uuid.uuid4())
+
+            # Add k8s cloud to Juju (unless it's microk8s)
+
+            # Does the kubeconfig contain microk8s?
+            microk8s = self.is_microk8s_by_credentials(k8s_creds)
+
+            if not microk8s:
+                # Name the new k8s cloud
+                k8s_cloud = "{}-k8s".format(namespace)
+
+                await self.add_k8s(k8s_cloud, k8s_creds)
+
+                # Bootstrap Juju controller
+                self.bootstrap(k8s_cloud, cluster_uuid)
+            else:
+                # k8s_cloud = 'microk8s-test'
+                k8s_cloud = "{}-k8s".format(namespace)
+
+                await self.add_k8s(k8s_cloud, k8s_creds)
+
+                await self.bootstrap(k8s_cloud, cluster_uuid)
+
+            # Get the controller information
+
+            # Parse ~/.local/share/juju/controllers.yaml
+            # controllers.testing.api-endpoints|ca-cert|uuid
+            with open(os.path.expanduser(
+                "~/.local/share/juju/controllers.yaml"
+            )) as f:
+                controllers = yaml.load(f, Loader=yaml.Loader)
+                controller = controllers['controllers'][cluster_uuid]
+                endpoints = controller['api-endpoints']
+                self.juju_endpoint = endpoints[0]
+                self.juju_ca_cert = controller['ca-cert']
+
+            # Parse ~/.local/share/juju/accounts
+            # controllers.testing.user|password
+            with open(os.path.expanduser(
+                "~/.local/share/juju/accounts.yaml"
+            )) as f:
+                controllers = yaml.load(f, Loader=yaml.Loader)
+                controller = controllers['controllers'][cluster_uuid]
+
+                self.juju_user = controller['user']
+                self.juju_secret = controller['password']
+
+            print("user: {}".format(self.juju_user))
+            print("secret: {}".format(self.juju_secret))
+            print("endpoint: {}".format(self.juju_endpoint))
+            print("ca-cert: {}".format(self.juju_ca_cert))
+
+            # raise Exception("EOL")
+
+            self.juju_public_key = None
+
+            config = {
+                'endpoint': self.juju_endpoint,
+                'username': self.juju_user,
+                'secret': self.juju_secret,
+                'cacert': self.juju_ca_cert,
+                'namespace': namespace,
+                'microk8s': microk8s,
+            }
+
+            # Store the cluster configuration so it
+            # can be used for subsequent calls
+            await self.set_config(cluster_uuid, config)
+
+        else:
+            # This is an existing cluster, so get its config
+            cluster_uuid = reuse_cluster_uuid
+
+            config = self.get_config(cluster_uuid)
+
+            self.juju_endpoint = config['endpoint']
+            self.juju_user = config['username']
+            self.juju_secret = config['secret']
+            self.juju_ca_cert = config['cacert']
+            self.juju_public_key = None
+
+        # Login to the k8s cluster
+        if not self.authenticated:
+            await self.login()
+
+        # We're creating a new cluster
+        print("Getting model {}".format(self.get_namespace(cluster_uuid)))
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        # Disconnect from the model
+        if model and model.is_connected():
+            await model.disconnect()
+
+        return cluster_uuid
+
+    """Repo Management"""
+    async def repo_add(
+        self,
+        name: str,
+        url: str,
+        type: str = "charm",
+    ):
+        raise NotImplemented()
+
+    async def repo_list(self):
+        raise NotImplemented()
+
+    async def repo_remove(
+        self,
+        name: str,
+    ):
+        raise NotImplemented()
+
+    """Reset"""
+    async def reset(
+        self,
+        cluster_uuid: str,
+    ) -> bool:
+        """Reset a cluster
+
+        Resets the Kubernetes cluster by removing the model that represents it.
+
+        :param cluster_uuid str: The UUID of the cluster to reset
+        :return: Returns True if successful or raises an exception.
+        """
+
+        try:
+            if not self.authenticated:
+                await self.login()
+
+            if self.controller.is_connected():
+                # Destroy the model
+                namespace = self.get_namespace(cluster_uuid)
+                if await self.has_model(namespace):
+                    print("[reset] Destroying model")
+                    await self.controller.destroy_model(
+                        namespace,
+                        destroy_storage=True
+                    )
+
+                # Disconnect from the controller
+                print("[reset] Disconnecting controller")
+                await self.controller.disconnect()
+
+                # Destroy the controller (via CLI)
+                print("[reset] Destroying controller")
+                await self.destroy_controller(cluster_uuid)
+
+                """Remove the k8s cloud
+
+                Only remove the k8s cloud if it's not a microk8s cloud,
+                since microk8s is a built-in cloud type.
+                """
+                # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
+                # if not microk8s:
+                print("[reset] Removing k8s cloud")
+                namespace = self.get_namespace(cluster_uuid)
+                k8s_cloud = "{}-k8s".format(namespace)
+                await self.remove_cloud(k8s_cloud)
+
+        except Exception as ex:
+            print("Caught exception during reset: {}".format(ex))
+
+    """Deployment"""
+    async def install(
+        self,
+        cluster_uuid: str,
+        kdu_model: str,
+        atomic: bool = True,
+        timeout: int = None,
+        params: dict = None,
+    ) -> str:
+        """Install a bundle
+
+        :param cluster_uuid str: The UUID of the cluster to install to
+        :param kdu_model str: The name or path of a bundle to install
+        :param atomic bool: If set, waits until the model is active and resets
+                            the cluster on failure.
+        :param timeout int: The time, in seconds, to wait for the install
+                            to finish
+        :param params dict: Key-value pairs of instantiation parameters
+
+        :return: If successful, returns ?
+        """
+
+        if not self.authenticated:
+            print("[install] Logging in to the controller")
+            await self.login()
+
+        ##
+        # Get or create the model, based on the namespace the cluster was
+        # instantiated with.
+        namespace = self.get_namespace(cluster_uuid)
+        model = await self.get_model(namespace)
+        if not model:
+            # Create the new model
+            model = await self.add_model(namespace)
+
+        if model:
+            # TODO: Instantiation parameters
+
+            print("[install] deploying {}".format(kdu_model))
+            await model.deploy(kdu_model)
+
+            # Get the application
+            if atomic:
+                # applications = model.applications
+                print("[install] Applications: {}".format(model.applications))
+                for name in model.applications:
+                    print("[install] Waiting for {} to settle".format(name))
+                    application = model.applications[name]
+                    try:
+                        # It's not enough to wait for all units to be active;
+                        # the application status needs to be active as well.
+                        print("Waiting for all units to be active...")
+                        await model.block_until(
+                            lambda: all(
+                                unit.agent_status == 'idle'
+                                and application.status in ['active', 'unknown']
+                                and unit.workload_status in [
+                                    'active', 'unknown'
+                                ] for unit in application.units
+                            ),
+                            timeout=timeout
+                        )
+                        print("All units active.")
+
+                    except concurrent.futures._base.TimeoutError:
+                        print("[install] Timeout exceeded; resetting cluster")
+                        await self.reset(cluster_uuid)
+                        return False
+
+            # Wait for the application to be active
+            if model.is_connected():
+                print("[install] Disconnecting model")
+                await model.disconnect()
+
+            return True
+        raise Exception("Unable to install")
+
+    async def instances_list(
+            self,
+            cluster_uuid: str
+    ) -> list:
+        """
+        returns a list of deployed releases in a cluster
+
+        :param cluster_uuid: the cluster
+        :return:
+        """
+        return []
+
+    async def upgrade(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+        kdu_model: str = None,
+        params: dict = None,
+    ) -> str:
+        """Upgrade a model
+
+        :param cluster_uuid str: The UUID of the cluster to upgrade
+        :param kdu_instance str: The unique name of the KDU instance
+        :param kdu_model str: The name or path of the bundle to upgrade to
+        :param params dict: Key-value pairs of instantiation parameters
+
+        :return: If successful, reference to the new revision number of the
+                 KDU instance.
+        """
+
+        # TODO: Loop through the bundle and upgrade each charm individually
+
+        """
+        The API doesn't have a concept of bundle upgrades, because there are
+        many possible changes: charm revision, disk, number of units, etc.
+
+        As such, we are only supporting a limited subset of upgrades. We'll
+        upgrade the charm revision but leave storage and scale untouched.
+
+        Scale changes should happen through OSM constructs, and changes to
+        storage would require a redeployment of the service, at least in this
+        initial release.
+        """
+        namespace = self.get_namespace(cluster_uuid)
+        model = await self.get_model(namespace)
+
+        with open(kdu_model, 'r') as f:
+            bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+            """
+            {
+                'description': 'Test bundle',
+                'bundle': 'kubernetes',
+                'applications': {
+                    'mariadb-k8s': {
+                        'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+                        'scale': 1,
+                        'options': {
+                            'password': 'manopw',
+                            'root_password': 'osm4u',
+                            'user': 'mano'
+                        },
+                        'series': 'kubernetes'
+                    }
+                }
+            }
+            """
+            # TODO: This should be returned in an agreed-upon format
+            for name in bundle['applications']:
+                print(model.applications)
+                application = model.applications[name]
+                print(application)
+
+                path = bundle['applications'][name]['charm']
+
+                try:
+                    await application.upgrade_charm(switch=path)
+                except juju.errors.JujuError as ex:
+                    if 'already running charm' in str(ex):
+                        # We're already running this version
+                        pass
+
+        await model.disconnect()
+
+        return True
+        raise NotImplemented()
+
+    """Rollback"""
+    async def rollback(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+        revision: int = 0,
+    ) -> str:
+        """Rollback a model
+
+        :param cluster_uuid str: The UUID of the cluster to rollback
+        :param kdu_instance str: The unique name of the KDU instance
+        :param revision int: The revision to revert to. If omitted, rolls back
+                             the previous upgrade.
+
+        :return: If successful, returns the revision of active KDU instance,
+                 or raises an exception
+        """
+        raise NotImplemented()
+
+    """Deletion"""
+    async def uninstall(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+    ) -> bool:
+        """Uninstall a KDU instance
+
+        :param cluster_uuid str: The UUID of the cluster to uninstall
+        :param kdu_instance str: The unique name of the KDU instance
+
+        :return: Returns True if successful, or raises an exception
+        """
+        removed = False
+
+        # Remove an application from the model
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        if model:
+            # Get the application
+            if kdu_instance not in model.applications:
+                # TODO: Raise a named exception
+                raise Exception("Application not found.")
+
+            application = model.applications[kdu_instance]
+
+            # Destroy the application
+            await application.destroy()
+
+            # TODO: Verify removal
+
+            removed = True
+        return removed
+
+    """Introspection"""
+    async def inspect_kdu(
+        self,
+        kdu_model: str,
+    ) -> dict:
+        """Inspect a KDU
+
+        Inspects a bundle and returns a dictionary of config parameters and
+        their default values.
+
+        :param kdu_model str: The name or path of the bundle to inspect.
+
+        :return: If successful, returns a dictionary of available parameters
+                 and their default values.
+        """
+
+        kdu = {}
+        with open(kdu_model, 'r') as f:
+            bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+            """
+            {
+                'description': 'Test bundle',
+                'bundle': 'kubernetes',
+                'applications': {
+                    'mariadb-k8s': {
+                        'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+                        'scale': 1,
+                        'options': {
+                            'password': 'manopw',
+                            'root_password': 'osm4u',
+                            'user': 'mano'
+                        },
+                        'series': 'kubernetes'
+                    }
+                }
+            }
+            """
+            # TODO: This should be returned in an agreed-upon format
+            kdu = bundle['applications']
+
+        return kdu
+
+    async def help_kdu(
+        self,
+        kdu_model: str,
+    ) -> str:
+        """View the README
+
+        If available, returns the README of the bundle.
+
+        :param kdu_model str: The name or path of a bundle
+
+        :return: If found, returns the contents of the README.
+        """
+        readme = None
+
+        files = ['README', 'README.txt', 'README.md']
+        path = os.path.dirname(kdu_model)
+        for file in os.listdir(path):
+            if file in files:
+                with open(file, 'r') as f:
+                    readme = f.read()
+                    break
+
+        return readme
+
+    async def status_kdu(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+    ) -> dict:
+        """Get the status of the KDU
+
+        Get the current status of the KDU instance.
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param kdu_instance str: The unique id of the KDU instance
+
+        :return: Returns a dictionary containing namespace, state, resources,
+                 and deployment_time.
+        """
+        status = {}
+
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        # model = await self.get_model_by_uuid(cluster_uuid)
+        if model:
+            model_status = await model.get_status()
+            status = model_status.applications
+
+            for name in model_status.applications:
+                application = model_status.applications[name]
+                status[name] = {
+                    'status': application['status']['status']
+                }
+
+            if model.is_connected():
+                await model.disconnect()
+
+        return status
+
+    # Private methods
+    async def add_k8s(
+        self,
+        cloud_name: str,
+        credentials: dict,
+    ) -> bool:
+        """Add a k8s cloud to Juju
+
+        Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
+        Juju Controller.
+
+        :param cloud_name str: The name of the cloud to add.
+        :param credentials dict: A dictionary representing the output of
+            `kubectl config view --raw`.
+
+        :returns: True if successful, otherwise raises an exception.
+        """
+        cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            input=yaml.dump(credentials, Dumper=yaml.Dumper),
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+        return True
+
+    async def add_model(
+        self,
+        model_name: str
+    ) -> juju.model.Model:
+        """Adds a model to the controller
+
+        Adds a new model to the Juju controller
+
+        :param model_name str: The name of the model to add.
+        :returns: The juju.model.Model object of the new model upon success or
+                  raises an exception.
+        """
+        if not self.authenticated:
+            await self.login()
+
+        model = await self.controller.add_model(
+            model_name,
+            config={'authorized-keys': self.juju_public_key}
+        )
+        return model
+
+    async def bootstrap(
+        self,
+        cloud_name: str,
+        cluster_uuid: str
+    ) -> bool:
+        """Bootstrap a Kubernetes controller
+
+        Bootstrap a Juju controller inside the Kubernetes cluster
+
+        :param cloud_name str: The name of the cloud.
+        :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :returns: True upon success or raises an exception.
+        """
+        cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
+        print("Bootstrapping controller {} in cloud {}".format(
+            cluster_uuid, cloud_name
+        ))
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            #
+            if 'already exists' not in p.stderr:
+                raise Exception(p.stderr)
+
+        return True
+
+    async def destroy_controller(
+        self,
+        cluster_uuid: str
+    ) -> bool:
+        """Destroy a Kubernetes controller
+
+        Destroy an existing Kubernetes controller.
+
+        :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :returns: True upon success or raises an exception.
+        """
+        cmd = [
+            self.k8scli,
+            "destroy-controller",
+            "--destroy-all-models",
+            "--destroy-storage",
+            "-y",
+            cluster_uuid
+        ]
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            #
+            if 'already exists' not in p.stderr:
+                raise Exception(p.stderr)
+
+    def get_config(
+        self,
+        cluster_uuid: str,
+    ) -> dict:
+        """Get the cluster configuration
+
+        Gets the configuration of the cluster
+
+        :param cluster_uuid str: The UUID of the cluster.
+        :return: A dict upon success, or raises an exception.
+        """
+        cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+        if os.path.exists(cluster_config):
+            with open(cluster_config, 'r') as f:
+                config = yaml.load(f.read(), Loader=yaml.FullLoader)
+                return config
+        else:
+            raise Exception(
+                "Unable to locate configuration for cluster {}".format(
+                    cluster_uuid
+                )
+            )
+
+    async def get_model(
+        self,
+        model_name: str,
+    ) -> juju.model.Model:
+        """Get a model from the Juju Controller.
+
+        Note: Model objects returned must call disconnected() before it goes
+        out of scope.
+
+        :param model_name str: The name of the model to get
+        :return The juju.model.Model object if found, or None.
+        """
+        if not self.authenticated:
+            await self.login()
+
+        model = None
+        models = await self.controller.list_models()
+
+        if model_name in models:
+            model = await self.controller.get_model(
+                model_name
+            )
+        return model
+
+    def get_namespace(
+        self,
+        cluster_uuid: str,
+    ) -> str:
+        """Get the namespace UUID
+        Gets the namespace's unique name
+
+        :param cluster_uuid str: The UUID of the cluster
+        :returns: The namespace UUID, or raises an exception
+        """
+        config = self.get_config(cluster_uuid)
+
+        # Make sure the name is in the config
+        if 'namespace' not in config:
+            raise Exception("Namespace not found.")
+
+        # TODO: We want to make sure this is unique to the cluster, in case
+        # the cluster is being reused.
+        # Consider pre/appending the cluster id to the namespace string
+        return config['namespace']
+
+    async def has_model(
+        self,
+        model_name: str
+    ) -> bool:
+        """Check if a model exists in the controller
+
+        Checks to see if a model exists in the connected Juju controller.
+
+        :param model_name str: The name of the model
+        :return: A boolean indicating if the model exists
+        """
+        models = await self.controller.list_models()
+
+        if model_name in models:
+            return True
+        return False
+
+    def is_microk8s_by_cluster_uuid(
+        self,
+        cluster_uuid: str,
+    ) -> bool:
+        """Check if a cluster is micro8s
+
+        Checks if a cluster is running microk8s
+
+        :param cluster_uuid str: The UUID of the cluster
+        :returns: A boolean if the cluster is running microk8s
+        """
+        config = self.get_config(cluster_uuid)
+        return config['microk8s']
+
+    def is_microk8s_by_credentials(
+        self,
+        credentials: dict,
+    ) -> bool:
+        """Check if a cluster is micro8s
+
+        Checks if a cluster is running microk8s
+
+        :param credentials dict: A dictionary containing the k8s credentials
+        :returns: A boolean if the cluster is running microk8s
+        """
+        for context in credentials['contexts']:
+            if 'microk8s' in context['name']:
+                return True
+
+        return False
+
+    async def login(self):
+        """Login to the Juju controller."""
+
+        if self.authenticated:
+            return
+
+        self.connecting = True
+
+        self.controller = Controller()
+
+        if self.juju_secret:
+            self.log.debug(
+                "Connecting to controller... ws://{} as {}/{}".format(
+                    self.juju_endpoint,
+                    self.juju_user,
+                    self.juju_secret,
+                )
+            )
+            try:
+                await self.controller.connect(
+                    endpoint=self.juju_endpoint,
+                    username=self.juju_user,
+                    password=self.juju_secret,
+                    cacert=self.juju_ca_cert,
+                )
+                self.authenticated = True
+                self.log.debug("JujuApi: Logged into controller")
+            except Exception as ex:
+                print(ex)
+                self.log.debug("Caught exception: {}".format(ex))
+                pass
+        else:
+            self.log.fatal("VCA credentials not configured.")
+            self.authenticated = False
+
+    async def logout(self):
+        """Logout of the Juju controller."""
+        print("[logout]")
+        if not self.authenticated:
+            return False
+
+        for model in self.models:
+            print("Logging out of model {}".format(model))
+            await self.models[model].disconnect()
+
+        if self.controller:
+            self.log.debug("Disconnecting controller {}".format(
+                self.controller
+            ))
+            await self.controller.disconnect()
+            self.controller = None
+
+        self.authenticated = False
+
+    async def remove_cloud(
+        self,
+        cloud_name: str,
+    ) -> bool:
+        """Remove a k8s cloud from Juju
+
+        Removes a Kubernetes cloud from Juju.
+
+        :param cloud_name str: The name of the cloud to add.
+
+        :returns: True if successful, otherwise raises an exception.
+        """
+
+        # Remove the bootstrapped controller
+        cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+
+        # Remove the cloud from the local config
+        cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+
+
+        return True
+
+    async def set_config(
+        self,
+        cluster_uuid: str,
+        config: dict,
+    ) -> bool:
+        """Save the cluster configuration
+
+        Saves the cluster information to the file store
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param config dict: A dictionary containing the cluster configuration
+        :returns: Boolean upon success or raises an exception.
+        """
+
+        cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+        if not os.path.exists(cluster_config):
+            print("Writing config to {}".format(cluster_config))
+            with open(cluster_config, 'w') as f:
+                f.write(yaml.dump(config, Dumper=yaml.Dumper))
+
+        return True
diff --git a/tests/bundles/k8s-zookeeper-downgrade.yaml b/tests/bundles/k8s-zookeeper-downgrade.yaml
new file mode 100644 (file)
index 0000000..8a99e2c
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-29'
+    scale: 1
+    series: kubernetes
diff --git a/tests/bundles/k8s-zookeeper-upgrade.yaml b/tests/bundles/k8s-zookeeper-upgrade.yaml
new file mode 100644 (file)
index 0000000..8308f2f
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-31'
+    scale: 1
+    series: kubernetes
diff --git a/tests/bundles/k8s-zookeeper.yaml b/tests/bundles/k8s-zookeeper.yaml
new file mode 100644 (file)
index 0000000..689220e
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-30'
+    scale: 1
+    series: kubernetes
diff --git a/tests/test_k8s_juju_conn.py b/tests/test_k8s_juju_conn.py
new file mode 100644 (file)
index 0000000..6e1e98e
--- /dev/null
@@ -0,0 +1,129 @@
+#  Copyright 2019 Canonical Ltd.
+
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+
+#      http://www.apache.org/licenses/LICENSE-2.0
+
+#      Unless required by applicable law or agreed to in writing, software
+#      distributed under the License is distributed on an "AS IS" BASIS,
+#      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#      See the License for the specific language governing permissions and
+#      limitations under the License.
+
+import argparse
+import asyncio
+import logging
+import n2vc.k8s_juju_conn
+from base import get_juju_public_key
+import os
+from osm_common.fslocal import FsLocal
+import subprocess
+import yaml
+
+
+def get_args():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--cluster_uuid", help='The UUID of an existing cluster to use', default=None)
+    parser.add_argument("--reset", action="store_true")
+    return parser.parse_args()
+
+async def main():
+
+    args = get_args()
+
+    reuse_cluster_uuid = args.cluster_uuid
+
+    log = logging.getLogger()
+    log.level = logging.DEBUG
+
+    # Extract parameters from the environment in order to run our tests
+    vca_host = os.getenv('VCA_HOST', '127.0.0.1')
+    vca_port = os.getenv('VCA_PORT', 17070)
+    vca_user = os.getenv('VCA_USER', 'admin')
+    vca_charms = os.getenv('VCA_CHARMS', None)
+    vca_secret = os.getenv('VCA_SECRET', None)
+    vca_ca_cert = os.getenv('VCA_CACERT', None)
+
+    # Get the Juju Public key
+    juju_public_key = get_juju_public_key()
+    if juju_public_key:
+        with open(juju_public_key, 'r') as f:
+            juju_public_key = f.read()
+    else:
+        raise Exception("No Juju Public Key found")
+
+    storage = {
+        'driver': 'local',
+        'path': '/srv/app/storage'
+    }
+    fs = FsLocal()
+    fs.fs_connect(storage)
+
+    client = n2vc.k8s_juju_conn.K8sJujuConnector(
+        kubectl_command = '/bin/true',
+        fs = fs,
+    )
+
+    # kubectl config view --raw
+    # microk8s.config
+
+    # if microk8s then
+    kubecfg = subprocess.getoutput('microk8s.config')
+    # else
+    # kubecfg.subprocess.getoutput('kubectl config view --raw')
+    
+    k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
+    namespace = 'testing'
+    kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
+
+    """init_env"""
+    cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
+    print(cluster_uuid)
+
+    if not reuse_cluster_uuid:
+        # This is a new cluster, so install to it
+
+        """install"""
+        # async def install(self, cluster_uuid, kdu_model, atomic=True, timeout=None, params=None):
+        # TODO: Re-add storage declaration to bundle. The API doesn't support the storage option yet. David is investigating.
+
+        # Deploy the bundle
+        kdu_instance = await client.install(cluster_uuid, kdu_model, atomic=True, timeout=600)
+
+        if kdu_instance:
+            # Inspect
+            print("Getting status")
+            status = await client.status_kdu(cluster_uuid, kdu_instance)
+            print(status)
+
+    # Inspect the bundle
+    config = await client.inspect_kdu(kdu_model)
+    print(config)
+
+    readme = await client.help_kdu(kdu_model)
+    # print(readme)
+
+
+    """upgrade
+    Upgrade to a newer version of the bundle
+    """
+    kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-upgrade.yaml"
+    upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+    kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-downgrade.yaml"
+    upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+    """uninstall"""
+
+    """reset"""
+    if args.reset:
+        await client.reset(cluster_uuid)
+
+    await client.logout()
+
+    print("Done")
+
+if __name__ == "__main__":
+    asyncio.run(main())