Merge branch 'feature5837' 12/8212/3
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Fri, 22 Nov 2019 11:07:49 +0000 (12:07 +0100)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Fri, 22 Nov 2019 11:07:49 +0000 (12:07 +0100)
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Change-Id: I31abc0d7de97c1dd3fb45fd7798fbab5d1bad5b4

n2vc/k8s_conn.py [new file with mode: 0644]
n2vc/k8s_helm_conn.py [new file with mode: 0644]
n2vc/k8s_juju_conn.py [new file with mode: 0644]
n2vc/loggable.py [new file with mode: 0644]
tests/bundles/k8s-zookeeper-downgrade.yaml [new file with mode: 0644]
tests/bundles/k8s-zookeeper-upgrade.yaml [new file with mode: 0644]
tests/bundles/k8s-zookeeper.yaml [new file with mode: 0644]
tests/test_k8s_juju_conn.py [new file with mode: 0644]

diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py
new file mode 100644 (file)
index 0000000..d951c2c
--- /dev/null
@@ -0,0 +1,350 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import asyncio
+from n2vc.loggable import Loggable
+import abc
+import time
+
+
+class K8sConnector(abc.ABC, Loggable):
+
+    """
+    ##################################################################################################
+    ########################################## P U B L I C ###########################################
+    ##################################################################################################
+    """
+
+    def __init__(
+            self,
+            db: object,
+            log: object = None,
+            on_update_db=None
+    ):
+        """
+
+        :param db: database object to write current operation status
+        :param log: logger for tracing
+        :param on_update_db: callback called when k8s connector updates database
+        """
+
+        # parent class
+        Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S')
+
+        self.info('Initializing generic K8S connector')
+
+        # the database and update callback
+        self.db = db
+        self.on_update_db = on_update_db
+
+        self.info('K8S generic connector initialized')
+
+    @abc.abstractmethod
+    async def init_env(
+            self,
+            k8s_creds: str,
+            namespace: str = 'kube-system',
+            reuse_cluster_uuid = None
+    ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
+            client (OSM)
+            server (Tiller/Charm)
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+        :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+        (on error, an exception will be raised)
+        """
+
+    @abc.abstractmethod
+    async def repo_add(
+            self,
+            cluster_uuid: str,
+            name: str,
+            url: str,
+            repo_type: str = 'chart'
+    ):
+        """
+        Add a new repository to OSM database
+
+        :param cluster_uuid: the cluster
+        :param name: name for the repo in OSM
+        :param url: URL of the repo
+        :param repo_type: either "chart" or "bundle"
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def repo_list(
+            self,
+            cluster_uuid: str
+    ):
+        """
+        Get the list of registered repositories
+
+        :param cluster_uuid: the cluster
+        :return: list of registered repositories: [ (name, url) .... ]
+        """
+
+    @abc.abstractmethod
+    async def repo_remove(
+            self,
+            cluster_uuid: str,
+            name: str
+    ):
+        """
+        Remove a repository from OSM
+
+        :param name: repo name in OSM
+        :param cluster_uuid: the cluster
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def reset(
+            self,
+            cluster_uuid: str,
+            force: bool = False,
+            uninstall_sw: bool = False
+    ) -> bool:
+        """
+        Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters.
+        Intended to be used e.g. when the NS instance is deleted.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM.
+        :param force: force deletion, even in case there are deployed releases
+        :param uninstall_sw: flag to indicate that sw uninstallation from software is needed
+        :return: str: kdu_instance generated by helm
+        """
+
+    @abc.abstractmethod
+    async def install(
+            self,
+            cluster_uuid: str,
+            kdu_model: str,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+        """
+        Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle
+        properly parametrized (in practice, this call would happen before any _initial-config-primitive_
+        of the VNF is called).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_model: chart/bundle:version reference (string), which can be either of these options:
+            - a name of chart/bundle available via the repos known by OSM
+            - a path to a packaged chart/bundle
+            - a path to an unpacked chart/bundle directory or a URL
+        :param atomic: If set, installation process purges chart/bundle on fail, also will wait until
+            all the K8s objects are active
+        :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+            Helm default timeout: 300s)
+        :param params: dictionary of key-value pairs for instantiation parameters (overriding default values)
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def upgrade(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            kdu_model: str = None,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+        """
+        Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle.
+        It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance to be updated
+        :param kdu_model: new chart/bundle:version reference
+        :param atomic: rollback in case of fail and wait for pods and services are available
+        :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+            Helm default timeout: 300s)
+        :param params: new dictionary of key-value pairs for instantiation parameters
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return: reference to the new revision number of the KDU instance
+        """
+
+    @abc.abstractmethod
+    async def rollback(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            revision=0,
+            db_dict: dict = None
+    ):
+        """
+        Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call.
+        It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration.
+        This would be exposed as Day-2 primitive.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :param revision: revision to which revert changes. If omitted, it will revert the last update only
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return:If successful, reference to the current active revision of the KDU instance after the rollback
+        """
+
+    @abc.abstractmethod
+    async def uninstall(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ):
+        """
+        Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+        after all _terminate-config-primitive_ of the VNF are invoked).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance to be deleted
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def inspect_kdu(
+            self,
+            kdu_model: str
+    ) -> str:
+        """
+        These calls will retrieve from the Charm/Bundle:
+
+            - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
+                the contents of `values.yaml`).
+            - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
+
+        :param cluster_uuid: the cluster to get the information
+        :param kdu_model: chart/bundle reference
+        :return: If successful, it will return a dictionary containing the list of available parameters
+            and their default values
+        """
+
+    @abc.abstractmethod
+    async def help_kdu(
+            self,
+            kdu_model: str
+    ) -> str:
+        """
+
+        :param cluster_uuid: the cluster to get the information
+        :param kdu_model: chart/bundle reference
+        :return: If successful, it will return the contents of the 'readme.md'
+        """
+
+    @abc.abstractmethod
+    async def status_kdu(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ) -> str:
+        """
+        This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve
+        the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied
+        to a given instance. This call would be based on the `status` call.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :return: If successful, it will return the following vector of arguments:
+        - K8s `namespace` in the cluster where the KDU lives
+        - `state` of the KDU instance. It can be:
+              - UNKNOWN
+              - DEPLOYED
+              - DELETED
+              - SUPERSEDED
+              - FAILED or
+              - DELETING
+        - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources
+        - Last `deployment_time`.
+
+        """
+
+    """
+    ##################################################################################################
+    ########################################## P R I V A T E #########################################
+    ##################################################################################################
+    """
+
+    async def write_app_status_to_db(
+            self,
+            db_dict: dict,
+            status: str,
+            detailed_status: str,
+            operation: str
+    ) -> bool:
+
+        if not self.db:
+            self.warning('No db => No database write')
+            return False
+
+        if not db_dict:
+            self.warning('No db_dict => No database write')
+            return False
+
+        self.debug('status={}'.format(status))
+
+        try:
+
+            the_table = db_dict['collection']
+            the_filter = db_dict['filter']
+            the_path = db_dict['path']
+            if not the_path[-1] == '.':
+                the_path = the_path + '.'
+            update_dict = {
+                the_path + 'operation': operation,
+                the_path + 'status': status,
+                the_path + 'detailed-status': detailed_status,
+                the_path + 'status-time': str(time.time()),
+            }
+
+            self.db.set_one(
+                table=the_table,
+                q_filter=the_filter,
+                update_dict=update_dict,
+                fail_on_empty=True
+            )
+
+            # database callback
+            if self.on_update_db:
+                if asyncio.iscoroutinefunction(self.on_update_db):
+                    await self.on_update_db(the_table, the_filter, the_path, update_dict)
+                else:
+                    self.on_update_db(the_table, the_filter, the_path, update_dict)
+
+            return True
+
+        except Exception as e:
+            self.info('Exception writing status to database: {}'.format(e))
+            return False
diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py
new file mode 100644 (file)
index 0000000..cd15d73
--- /dev/null
@@ -0,0 +1,1105 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import paramiko
+import subprocess
+import os
+import shutil
+import asyncio
+import time
+import yaml
+from uuid import uuid4
+import random
+from n2vc.k8s_conn import K8sConnector
+
+
+class K8sHelmConnector(K8sConnector):
+
+    """
+    ##################################################################################################
+    ########################################## P U B L I C ###########################################
+    ##################################################################################################
+    """
+
+    def __init__(
+            self,
+            fs: object,
+            db: object,
+            kubectl_command: str = '/usr/bin/kubectl',
+            helm_command: str = '/usr/bin/helm',
+            log: object = None,
+            on_update_db=None
+    ):
+        """
+
+        :param fs: file system for kubernetes and helm configuration
+        :param db: database object to write current operation status
+        :param kubectl_command: path to kubectl executable
+        :param helm_command: path to helm executable
+        :param log: logger
+        :param on_update_db: callback called when k8s connector updates database
+        """
+
+        # parent class
+        K8sConnector.__init__(
+            self,
+            db=db,
+            log=log,
+            on_update_db=on_update_db
+        )
+
+        self.info('Initializing K8S Helm connector')
+
+        # random numbers for release name generation
+        random.seed(time.time())
+
+        # the file system
+        self.fs = fs
+
+        # exception if kubectl is not installed
+        self.kubectl_command = kubectl_command
+        self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
+
+        # exception if helm is not installed
+        self._helm_command = helm_command
+        self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+
+        self.info('K8S Helm connector initialized')
+
+    async def init_env(
+            self,
+            k8s_creds: str,
+            namespace: str = 'kube-system',
+            reuse_cluster_uuid=None
+    ) -> (str, bool):
+
+        cluster_uuid = reuse_cluster_uuid
+        if not cluster_uuid:
+            cluster_uuid = str(uuid4())
+
+        self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+
+        # create config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        f = open(config_filename, "w")
+        f.write(k8s_creds)
+        f.close()
+
+        # check if tiller pod is up in cluster
+        command = '{} --kubeconfig={} --namespace={} get deployments'\
+            .format(self.kubectl_command, config_filename, namespace)
+        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+        output_table = K8sHelmConnector._output_to_table(output=output)
+
+        # find 'tiller' pod in all pods
+        already_initialized = False
+        try:
+            for row in output_table:
+                if row[0].startswith('tiller-deploy'):
+                    already_initialized = True
+                    break
+        except Exception as e:
+            pass
+
+        # helm init
+        n2vc_installed_sw = False
+        if not already_initialized:
+            self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
+            command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
+                .format(self._helm_command, config_filename, namespace, helm_dir)
+            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+            n2vc_installed_sw = True
+        else:
+            # check client helm installation
+            check_file = helm_dir + '/repository/repositories.yaml'
+            if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
+                self.info('Initializing helm in client: {}'.format(cluster_uuid))
+                command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
+                    .format(self._helm_command, config_filename, namespace, helm_dir)
+                output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+            else:
+                self.info('Helm client already initialized')
+
+        self.info('Cluster initialized {}'.format(cluster_uuid))
+
+        return cluster_uuid, n2vc_installed_sw
+
+    async def repo_add(
+            self,
+            cluster_uuid: str,
+            name: str,
+            url: str,
+            repo_type: str = 'chart'
+    ):
+
+        self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        # helm repo update
+        command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
+        self.debug('updating repo: {}'.format(command))
+        await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+        # helm repo add name url
+        command = '{} --kubeconfig={} --home={} repo add {} {}'\
+            .format(self._helm_command, config_filename, helm_dir, name, url)
+        self.debug('adding repo: {}'.format(command))
+        await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+    async def repo_list(
+            self,
+            cluster_uuid: str
+    ) -> list:
+        """
+        Get the list of registered repositories
+
+        :return: list of registered repositories: [ (name, url) .... ]
+        """
+
+        self.debug('list repositories for cluster {}'.format(cluster_uuid))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+
+        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+        if output and len(output) > 0:
+            return yaml.load(output, Loader=yaml.SafeLoader)
+        else:
+            return []
+
+    async def repo_remove(
+            self,
+            cluster_uuid: str,
+            name: str
+    ):
+        """
+        Remove a repository from OSM
+
+        :param cluster_uuid: the cluster
+        :param name: repo name in OSM
+        :return: True if successful
+        """
+
+        self.debug('list repositories for cluster {}'.format(cluster_uuid))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} --kubeconfig={} --home={} repo remove {}'\
+            .format(self._helm_command, config_filename, helm_dir, name)
+
+        await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+    async def reset(
+            self,
+            cluster_uuid: str,
+            force: bool = False,
+            uninstall_sw: bool = False
+    ) -> bool:
+
+        self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+
+        # get kube and helm directories
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
+
+        # uninstall releases if needed
+        releases = await self.instances_list(cluster_uuid=cluster_uuid)
+        if len(releases) > 0:
+            if force:
+                for r in releases:
+                    try:
+                        kdu_instance = r.get('Name')
+                        chart = r.get('Chart')
+                        self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
+                        await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+                    except Exception as e:
+                        self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+            else:
+                msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
+                    .format(cluster_uuid)
+                self.error(msg)
+                raise Exception(msg)
+
+        if uninstall_sw:
+
+            self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+
+            # find namespace for tiller pod
+            command = '{} --kubeconfig={} get deployments --all-namespaces'\
+                .format(self.kubectl_command, config_filename)
+            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+            output_table = K8sHelmConnector._output_to_table(output=output)
+            namespace = None
+            for r in output_table:
+                try:
+                    if 'tiller-deploy' in r[1]:
+                        namespace = r[0]
+                        break
+                except Exception as e:
+                    pass
+            else:
+                msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
+                self.error(msg)
+                # raise Exception(msg)
+
+            self.debug('namespace for tiller: {}'.format(namespace))
+
+            force_str = '--force'
+
+            if namespace:
+                # delete tiller deployment
+                self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
+                command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
+                    .format(self.kubectl_command, namespace, config_filename, force_str)
+                await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+                # uninstall tiller from cluster
+                self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+                command = '{} --kubeconfig={} --home={} reset'\
+                    .format(self._helm_command, config_filename, helm_dir)
+                self.debug('resetting: {}'.format(command))
+                output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+            else:
+                self.debug('namespace not found')
+
+        # delete cluster directory
+        dir = self.fs.path + '/' + cluster_uuid
+        self.debug('Removing directory {}'.format(dir))
+        shutil.rmtree(dir, ignore_errors=True)
+
+        return True
+
+    async def install(
+            self,
+            cluster_uuid: str,
+            kdu_model: str,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+
+        self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+
+        start = time.time()
+        end = start + timeout
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        # params to str
+        # params_str = K8sHelmConnector._params_to_set_option(params)
+        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+
+        timeout_str = ''
+        if timeout:
+            timeout_str = '--timeout {}'.format(timeout)
+
+        # atomic
+        atomic_str = ''
+        if atomic:
+            atomic_str = '--atomic'
+
+        # version
+        version_str = ''
+        if ':' in kdu_model:
+            parts = kdu_model.split(sep=':')
+            if len(parts) == 2:
+                version_str = '--version {}'.format(parts[1])
+                kdu_model = parts[0]
+
+        # generate a name for the releas. Then, check if already exists
+        kdu_instance = None
+        while kdu_instance is None:
+            kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
+            try:
+                result = await self._status_kdu(
+                    cluster_uuid=cluster_uuid,
+                    kdu_instance=kdu_instance,
+                    show_error_log=False
+                )
+                if result is not None:
+                    # instance already exists: generate a new one
+                    kdu_instance = None
+            except:
+                kdu_instance = None
+
+        # helm repo install
+        command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
+            .format(self._helm_command, atomic_str, config_filename, helm_dir,
+                    params_str, timeout_str, kdu_instance, kdu_model, version_str)
+        self.debug('installing: {}'.format(command))
+
+        if atomic:
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+            )
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_uuid=cluster_uuid,
+                    kdu_instance=kdu_instance,
+                    db_dict=db_dict,
+                    operation='install',
+                    run_once=False
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([exec_task])
+
+            # cancel status task
+            status_task.cancel()
+
+            output, rc = exec_task.result()
+
+        else:
+
+            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
+        # write final status
+        await self._store_status(
+            cluster_uuid=cluster_uuid,
+            kdu_instance=kdu_instance,
+            db_dict=db_dict,
+            operation='install',
+            run_once=True,
+            check_every=0
+        )
+
+        if rc != 0:
+            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+            self.error(msg)
+            raise Exception(msg)
+
+        self.debug('Returning kdu_instance {}'.format(kdu_instance))
+        return kdu_instance
+
+    async def instances_list(
+            self,
+            cluster_uuid: str
+    ) -> list:
+        """
+        returns a list of deployed releases in a cluster
+
+        :param cluster_uuid: the cluster
+        :return:
+        """
+
+        self.debug('list releases for cluster {}'.format(cluster_uuid))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} --kubeconfig={} --home={} list --output yaml'\
+            .format(self._helm_command, config_filename, helm_dir)
+
+        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+        if output and len(output) > 0:
+            return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
+        else:
+            return []
+
+    async def upgrade(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            kdu_model: str = None,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+
+        self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+
+        start = time.time()
+        end = start + timeout
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        # params to str
+        # params_str = K8sHelmConnector._params_to_set_option(params)
+        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+
+        timeout_str = ''
+        if timeout:
+            timeout_str = '--timeout {}'.format(timeout)
+
+        # atomic
+        atomic_str = ''
+        if atomic:
+            atomic_str = '--atomic'
+
+        # version
+        version_str = ''
+        if kdu_model and ':' in kdu_model:
+            parts = kdu_model.split(sep=':')
+            if len(parts) == 2:
+                version_str = '--version {}'.format(parts[1])
+                kdu_model = parts[0]
+
+        # helm repo upgrade
+        command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
+            .format(self._helm_command, atomic_str, config_filename, helm_dir,
+                    params_str, timeout_str, kdu_instance, kdu_model, version_str)
+        self.debug('upgrading: {}'.format(command))
+
+        if atomic:
+
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+            )
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_uuid=cluster_uuid,
+                    kdu_instance=kdu_instance,
+                    db_dict=db_dict,
+                    operation='upgrade',
+                    run_once=False
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([ exec_task ])
+
+            # cancel status task
+            status_task.cancel()
+            output, rc = exec_task.result()
+
+        else:
+
+            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
+        # write final status
+        await self._store_status(
+            cluster_uuid=cluster_uuid,
+            kdu_instance=kdu_instance,
+            db_dict=db_dict,
+            operation='upgrade',
+            run_once=True,
+            check_every=0
+        )
+
+        if rc != 0:
+            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+            self.error(msg)
+            raise Exception(msg)
+
+        # return new revision number
+        instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+        if instance:
+            revision = int(instance.get('Revision'))
+            self.debug('New revision: {}'.format(revision))
+            return revision
+        else:
+            return 0
+
+    async def rollback(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            revision=0,
+            db_dict: dict = None
+    ):
+
+        self.debug('rollback kdu_instance {} to revision {} from cluster {}'
+                   .format(kdu_instance, revision, cluster_uuid))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
+            .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
+
+        # exec helm in a task
+        exec_task = asyncio.ensure_future(
+            coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+        )
+        # write status in another task
+        status_task = asyncio.ensure_future(
+            coro_or_future=self._store_status(
+                cluster_uuid=cluster_uuid,
+                kdu_instance=kdu_instance,
+                db_dict=db_dict,
+                operation='rollback',
+                run_once=False
+            )
+        )
+
+        # wait for execution task
+        await asyncio.wait([exec_task])
+
+        # cancel status task
+        status_task.cancel()
+
+        output, rc = exec_task.result()
+
+        # write final status
+        await self._store_status(
+            cluster_uuid=cluster_uuid,
+            kdu_instance=kdu_instance,
+            db_dict=db_dict,
+            operation='rollback',
+            run_once=True,
+            check_every=0
+        )
+
+        if rc != 0:
+            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+            self.error(msg)
+            raise Exception(msg)
+
+        # return new revision number
+        instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+        if instance:
+            revision = int(instance.get('Revision'))
+            self.debug('New revision: {}'.format(revision))
+            return revision
+        else:
+            return 0
+
+    async def uninstall(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ):
+        """
+        Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+        after all _terminate-config-primitive_ of the VNF are invoked).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance to be deleted
+        :return: True if successful
+        """
+
+        self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} --kubeconfig={} --home={} delete --purge {}'\
+            .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+
+        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+        return self._output_to_table(output)
+
+    async def inspect_kdu(
+            self,
+            kdu_model: str
+    ) -> str:
+
+        self.debug('inspect kdu_model {}'.format(kdu_model))
+
+        command = '{} inspect values {}'\
+            .format(self._helm_command, kdu_model)
+
+        output, rc = await self._local_async_exec(command=command)
+
+        return output
+
+    async def help_kdu(
+            self,
+            kdu_model: str
+    ):
+
+        self.debug('help kdu_model {}'.format(kdu_model))
+
+        command = '{} inspect readme {}'\
+            .format(self._helm_command, kdu_model)
+
+        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+        return output
+
+    async def status_kdu(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ):
+
+        return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+
+
+    """
+    ##################################################################################################
+    ########################################## P R I V A T E #########################################
+    ##################################################################################################
+    """
+
+    async def _status_kdu(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            show_error_log: bool = False
+    ):
+
+        self.debug('status of kdu_instance {}'.format(kdu_instance))
+
+        # config filename
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+        command = '{} --kubeconfig={} --home={} status {} --output yaml'\
+            .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+
+        output, rc = await self._local_async_exec(
+            command=command,
+            raise_exception_on_error=True,
+            show_error_log=show_error_log
+        )
+
+        if rc != 0:
+            return None
+
+        data = yaml.load(output, Loader=yaml.SafeLoader)
+
+        # remove field 'notes'
+        try:
+            del data.get('info').get('status')['notes']
+        except KeyError:
+            pass
+
+        # parse field 'resources'
+        try:
+            resources = str(data.get('info').get('status').get('resources'))
+            resource_table = self._output_to_table(resources)
+            data.get('info').get('status')['resources'] = resource_table
+        except Exception as e:
+            pass
+
+        return data
+
+    async def get_instance_info(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ):
+        instances = await self.instances_list(cluster_uuid=cluster_uuid)
+        for instance in instances:
+            if instance.get('Name') == kdu_instance:
+                return instance
+        self.debug('Instance {} not found'.format(kdu_instance))
+        return None
+
+    @staticmethod
+    def _generate_release_name(
+            chart_name: str
+    ):
+        name = ''
+        for c in chart_name:
+            if c.isalpha() or c.isnumeric():
+                name += c
+            else:
+                name += '-'
+        if len(name) > 35:
+            name = name[0:35]
+
+        # if does not start with alpha character, prefix 'a'
+        if not name[0].isalpha():
+            name = 'a' + name
+
+        name += '-'
+
+        def get_random_number():
+            r = random.randrange(start=1, stop=99999999)
+            s = str(r)
+            s = s.rjust(width=10, fillchar=' ')
+            return s
+
+        name = name + get_random_number()
+        return name.lower()
+
+    async def _store_status(
+            self,
+            cluster_uuid: str,
+            operation: str,
+            kdu_instance: str,
+            check_every: float = 10,
+            db_dict: dict = None,
+            run_once: bool = False
+    ):
+        while True:
+            try:
+                await asyncio.sleep(check_every)
+                detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+                status = detailed_status.get('info').get('Description')
+                print('=' * 60)
+                self.debug('STATUS:\n{}'.format(status))
+                self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
+                print('=' * 60)
+                # write status to db
+                result = await self.write_app_status_to_db(
+                    db_dict=db_dict,
+                    status=str(status),
+                    detailed_status=str(detailed_status),
+                    operation=operation)
+                if not result:
+                    self.info('Error writing in database. Task exiting...')
+                    return
+            except asyncio.CancelledError:
+                self.debug('Task cancelled')
+                return
+            except Exception as e:
+                pass
+            finally:
+                if run_once:
+                    return
+
+    async def _is_install_completed(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ) -> bool:
+
+        status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+
+        # extract info.status.resources-> str
+        # format:
+        #       ==> v1/Deployment
+        #       NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
+        #       halting-horse-mongodb   0/1     1            0           0s
+        #       halting-petit-mongodb   1/1     1            0           0s
+        # blank line
+        resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+
+        # convert to table
+        resources = K8sHelmConnector._output_to_table(resources)
+
+        num_lines = len(resources)
+        index = 0
+        while index < num_lines:
+            try:
+                line1 = resources[index]
+                index += 1
+                # find '==>' in column 0
+                if line1[0] == '==>':
+                    line2 = resources[index]
+                    index += 1
+                    # find READY in column 1
+                    if line2[1] == 'READY':
+                        # read next lines
+                        line3 = resources[index]
+                        index += 1
+                        while len(line3) > 1 and index < num_lines:
+                            ready_value = line3[1]
+                            parts = ready_value.split(sep='/')
+                            current = int(parts[0])
+                            total = int(parts[1])
+                            if current < total:
+                                self.debug('NOT READY:\n    {}'.format(line3))
+                                ready = False
+                            line3 = resources[index]
+                            index += 1
+
+            except Exception as e:
+                pass
+
+        return ready
+
+    @staticmethod
+    def _get_deep(dictionary: dict, members: tuple):
+        target = dictionary
+        value = None
+        try:
+            for m in members:
+                value = target.get(m)
+                if not value:
+                    return None
+                else:
+                    target = value
+        except Exception as e:
+            pass
+        return value
+
+    # find key:value in several lines
+    @staticmethod
+    def _find_in_lines(p_lines: list, p_key: str) -> str:
+        for line in p_lines:
+            try:
+                if line.startswith(p_key + ':'):
+                    parts = line.split(':')
+                    the_value = parts[1].strip()
+                    return the_value
+            except Exception as e:
+                # ignore it
+                pass
+        return None
+
+    # params for use in -f file
+    # returns values file option and filename (in order to delete it at the end)
+    def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
+        params_str = ''
+
+        if params and len(params) > 0:
+            kube_dir, helm_dir, config_filename, cluster_dir = \
+                self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+            def get_random_number():
+                r = random.randrange(start=1, stop=99999999)
+                s = str(r)
+                while len(s) < 10:
+                    s = '0' + s
+                return s
+
+            params2 = dict()
+            for key in params:
+                value = params.get(key)
+                if '!!yaml' in str(value):
+                    value = yaml.load(value[7:])
+                params2[key] = value
+
+            values_file = get_random_number() + '.yaml'
+            with open(values_file, 'w') as stream:
+                yaml.dump(params2, stream, indent=4, default_flow_style=False)
+
+            return '-f {}'.format(values_file), values_file
+
+        return '', None
+
+    # params for use in --set option
+    @staticmethod
+    def _params_to_set_option(params: dict) -> str:
+        params_str = ''
+        if params and len(params) > 0:
+            start = True
+            for key in params:
+                value = params.get(key, None)
+                if value is not None:
+                    if start:
+                        params_str += '--set '
+                        start = False
+                    else:
+                        params_str += ','
+                    params_str += '{}={}'.format(key, value)
+        return params_str
+
+    @staticmethod
+    def _output_to_lines(output: str) -> list:
+        output_lines = list()
+        lines = output.splitlines(keepends=False)
+        for line in lines:
+            line = line.strip()
+            if len(line) > 0:
+                output_lines.append(line)
+        return output_lines
+
+    @staticmethod
+    def _output_to_table(output: str) -> list:
+        output_table = list()
+        lines = output.splitlines(keepends=False)
+        for line in lines:
+            line = line.replace('\t', ' ')
+            line_list = list()
+            output_table.append(line_list)
+            cells = line.split(sep=' ')
+            for cell in cells:
+                cell = cell.strip()
+                if len(cell) > 0:
+                    line_list.append(cell)
+        return output_table
+
+    def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
+        """
+        Returns kube and helm directories
+
+        :param cluster_name:
+        :param create_if_not_exist:
+        :return: kube, helm directories, config filename and cluster dir.
+                Raises exception if not exist and cannot create
+        """
+
+        base = self.fs.path
+        if base.endswith("/") or base.endswith("\\"):
+            base = base[:-1]
+
+        # base dir for cluster
+        cluster_dir = base + '/' + cluster_name
+        if create_if_not_exist and not os.path.exists(cluster_dir):
+            self.debug('Creating dir {}'.format(cluster_dir))
+            os.makedirs(cluster_dir)
+        if not os.path.exists(cluster_dir):
+            msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
+            self.error(msg)
+            raise Exception(msg)
+
+        # kube dir
+        kube_dir = cluster_dir + '/' + '.kube'
+        if create_if_not_exist and not os.path.exists(kube_dir):
+            self.debug('Creating dir {}'.format(kube_dir))
+            os.makedirs(kube_dir)
+        if not os.path.exists(kube_dir):
+            msg = 'Kube config dir {} does not exist'.format(kube_dir)
+            self.error(msg)
+            raise Exception(msg)
+
+        # helm home dir
+        helm_dir = cluster_dir + '/' + '.helm'
+        if create_if_not_exist and not os.path.exists(helm_dir):
+            self.debug('Creating dir {}'.format(helm_dir))
+            os.makedirs(helm_dir)
+        if not os.path.exists(helm_dir):
+            msg = 'Helm config dir {} does not exist'.format(helm_dir)
+            self.error(msg)
+            raise Exception(msg)
+
+        config_filename = kube_dir + '/config'
+        return kube_dir, helm_dir, config_filename, cluster_dir
+
+    @staticmethod
+    def _remove_multiple_spaces(str):
+        str = str.strip()
+        while '  ' in str:
+            str = str.replace('  ', ' ')
+        return str
+
+    def _local_exec(
+            self,
+            command: str
+    ) -> (str, int):
+        command = K8sHelmConnector._remove_multiple_spaces(command)
+        self.debug('Executing sync local command: {}'.format(command))
+        # raise exception if fails
+        output = ''
+        try:
+            output = subprocess.check_output(command, shell=True, universal_newlines=True)
+            return_code = 0
+            self.debug(output)
+        except Exception as e:
+            return_code = 1
+
+        return output, return_code
+
+    async def _local_async_exec(
+            self,
+            command: str,
+            raise_exception_on_error: bool = False,
+            show_error_log: bool = True
+    ) -> (str, int):
+
+        command = K8sHelmConnector._remove_multiple_spaces(command)
+        self.debug('Executing async local command: {}'.format(command))
+
+        # split command
+        command = command.split(sep=' ')
+
+        try:
+            process = await asyncio.create_subprocess_exec(
+                *command,
+                stdout=asyncio.subprocess.PIPE,
+                stderr=asyncio.subprocess.PIPE
+            )
+
+            # wait for command terminate
+            stdout, stderr = await process.communicate()
+
+            return_code = process.returncode
+
+            output = ''
+            if stdout:
+                output = stdout.decode('utf-8').strip()
+            if stderr:
+                output = stderr.decode('utf-8').strip()
+
+            if return_code != 0 and show_error_log:
+                self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+            else:
+                self.debug('Return code: {}'.format(return_code))
+
+            if raise_exception_on_error and return_code != 0:
+                raise Exception(output)
+
+            return output, return_code
+
+        except Exception as e:
+            msg = 'Exception executing command: {} -> {}'.format(command, e)
+            if show_error_log:
+                self.error(msg)
+            return '', -1
+
+    def _remote_exec(
+            self,
+            hostname: str,
+            username: str,
+            password: str,
+            command: str,
+            timeout: int = 10
+    ) -> (str, int):
+
+        command = K8sHelmConnector._remove_multiple_spaces(command)
+        self.debug('Executing sync remote ssh command: {}'.format(command))
+
+        ssh = paramiko.SSHClient()
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        ssh.connect(hostname=hostname, username=username, password=password)
+        ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
+        output = ssh_stdout.read().decode('utf-8')
+        error = ssh_stderr.read().decode('utf-8')
+        if error:
+            self.error('ERROR: {}'.format(error))
+            return_code = 1
+        else:
+            return_code = 0
+        output = output.replace('\\n', '\n')
+        self.debug('OUTPUT: {}'.format(output))
+
+        return output, return_code
+
+    def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
+        self.debug('Checking if file {} exists...'.format(filename))
+        if os.path.exists(filename):
+            return True
+        else:
+            msg = 'File {} does not exist'.format(filename)
+            if exception_if_not_exists:
+                self.error(msg)
+                raise Exception(msg)
+
diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py
new file mode 100644 (file)
index 0000000..4f62898
--- /dev/null
@@ -0,0 +1,952 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import concurrent
+from .exceptions import NotImplemented
+
+import juju
+# from juju.bundle import BundleHandler
+from juju.controller import Controller
+from juju.model import Model
+from juju.errors import JujuAPIError, JujuError
+
+import logging
+
+from n2vc.k8s_conn import K8sConnector
+
+import os
+# import re
+# import ssl
+import subprocess
+# from .vnf import N2VC
+
+import uuid
+import yaml
+
+
+class K8sJujuConnector(K8sConnector):
+    def __init__(
+            self,
+            fs,
+            kubectl_command='/usr/bin/kubectl',
+            log=None
+    ):
+        """
+
+        :param kubectl_command: path to kubectl executable
+        :param helm_command: path to helm executable
+        :param fs: file system for kubernetes and helm configuration
+        :param log: logger
+        """
+
+        # parent class
+        K8sConnector.__init__(
+            self,
+            kubectl_command=kubectl_command,
+            fs=fs,
+            log=log,
+        )
+
+        self.info('Initializing K8S Juju connector')
+
+        self.authenticated = False
+        self.models = {}
+        self.log = logging.getLogger(__name__)
+        self.info('K8S Juju connector initialized')
+
+    """Initialization"""
+    async def init_env(
+        self,
+        k8s_creds: dict,
+        namespace: str = 'kube-system',
+        reuse_cluster_uuid: str = None,
+    ) -> str:
+        """Initialize a Kubernetes environment
+
+        :param k8s_creds dict: A dictionary containing the Kubernetes cluster
+        configuration
+        :param namespace str: The Kubernetes namespace to initialize
+
+        :return: UUID of the k8s context or raises an exception
+        """
+
+        """Bootstrapping
+
+        Bootstrapping cannot be done, by design, through the API. We need to
+        use the CLI tools.
+        """
+        # TODO: The path may change
+        jujudir = "/snap/bin"
+
+        self.k8scli = "{}/juju".format(jujudir)
+
+        """
+        WIP: Workflow
+
+        1. Has the environment already been bootstrapped?
+        - Check the database to see if we have a record for this env
+
+        2. If this is a new env, create it
+        - Add the k8s cloud to Juju
+        - Bootstrap
+        - Record it in the database
+
+        3. Connect to the Juju controller for this cloud
+
+        """
+        # cluster_uuid = reuse_cluster_uuid
+        # if not cluster_uuid:
+        #     cluster_uuid = str(uuid4())
+
+        ##################################################
+        # TODO: Pull info from db based on the namespace #
+        ##################################################
+
+        if not reuse_cluster_uuid:
+            # This is a new cluster, so bootstrap it
+
+            cluster_uuid = str(uuid.uuid4())
+
+            # Add k8s cloud to Juju (unless it's microk8s)
+
+            # Does the kubeconfig contain microk8s?
+            microk8s = self.is_microk8s_by_credentials(k8s_creds)
+
+            if not microk8s:
+                # Name the new k8s cloud
+                k8s_cloud = "{}-k8s".format(namespace)
+
+                await self.add_k8s(k8s_cloud, k8s_creds)
+
+                # Bootstrap Juju controller
+                self.bootstrap(k8s_cloud, cluster_uuid)
+            else:
+                # k8s_cloud = 'microk8s-test'
+                k8s_cloud = "{}-k8s".format(namespace)
+
+                await self.add_k8s(k8s_cloud, k8s_creds)
+
+                await self.bootstrap(k8s_cloud, cluster_uuid)
+
+            # Get the controller information
+
+            # Parse ~/.local/share/juju/controllers.yaml
+            # controllers.testing.api-endpoints|ca-cert|uuid
+            with open(os.path.expanduser(
+                "~/.local/share/juju/controllers.yaml"
+            )) as f:
+                controllers = yaml.load(f, Loader=yaml.Loader)
+                controller = controllers['controllers'][cluster_uuid]
+                endpoints = controller['api-endpoints']
+                self.juju_endpoint = endpoints[0]
+                self.juju_ca_cert = controller['ca-cert']
+
+            # Parse ~/.local/share/juju/accounts
+            # controllers.testing.user|password
+            with open(os.path.expanduser(
+                "~/.local/share/juju/accounts.yaml"
+            )) as f:
+                controllers = yaml.load(f, Loader=yaml.Loader)
+                controller = controllers['controllers'][cluster_uuid]
+
+                self.juju_user = controller['user']
+                self.juju_secret = controller['password']
+
+            print("user: {}".format(self.juju_user))
+            print("secret: {}".format(self.juju_secret))
+            print("endpoint: {}".format(self.juju_endpoint))
+            print("ca-cert: {}".format(self.juju_ca_cert))
+
+            # raise Exception("EOL")
+
+            self.juju_public_key = None
+
+            config = {
+                'endpoint': self.juju_endpoint,
+                'username': self.juju_user,
+                'secret': self.juju_secret,
+                'cacert': self.juju_ca_cert,
+                'namespace': namespace,
+                'microk8s': microk8s,
+            }
+
+            # Store the cluster configuration so it
+            # can be used for subsequent calls
+            await self.set_config(cluster_uuid, config)
+
+        else:
+            # This is an existing cluster, so get its config
+            cluster_uuid = reuse_cluster_uuid
+
+            config = self.get_config(cluster_uuid)
+
+            self.juju_endpoint = config['endpoint']
+            self.juju_user = config['username']
+            self.juju_secret = config['secret']
+            self.juju_ca_cert = config['cacert']
+            self.juju_public_key = None
+
+        # Login to the k8s cluster
+        if not self.authenticated:
+            await self.login()
+
+        # We're creating a new cluster
+        print("Getting model {}".format(self.get_namespace(cluster_uuid)))
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        # Disconnect from the model
+        if model and model.is_connected():
+            await model.disconnect()
+
+        return cluster_uuid
+
+    """Repo Management"""
+    async def repo_add(
+        self,
+        name: str,
+        url: str,
+        type: str = "charm",
+    ):
+        raise NotImplemented()
+
+    async def repo_list(self):
+        raise NotImplemented()
+
+    async def repo_remove(
+        self,
+        name: str,
+    ):
+        raise NotImplemented()
+
+    """Reset"""
+    async def reset(
+        self,
+        cluster_uuid: str,
+    ) -> bool:
+        """Reset a cluster
+
+        Resets the Kubernetes cluster by removing the model that represents it.
+
+        :param cluster_uuid str: The UUID of the cluster to reset
+        :return: Returns True if successful or raises an exception.
+        """
+
+        try:
+            if not self.authenticated:
+                await self.login()
+
+            if self.controller.is_connected():
+                # Destroy the model
+                namespace = self.get_namespace(cluster_uuid)
+                if await self.has_model(namespace):
+                    print("[reset] Destroying model")
+                    await self.controller.destroy_model(
+                        namespace,
+                        destroy_storage=True
+                    )
+
+                # Disconnect from the controller
+                print("[reset] Disconnecting controller")
+                await self.controller.disconnect()
+
+                # Destroy the controller (via CLI)
+                print("[reset] Destroying controller")
+                await self.destroy_controller(cluster_uuid)
+
+                """Remove the k8s cloud
+
+                Only remove the k8s cloud if it's not a microk8s cloud,
+                since microk8s is a built-in cloud type.
+                """
+                # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
+                # if not microk8s:
+                print("[reset] Removing k8s cloud")
+                namespace = self.get_namespace(cluster_uuid)
+                k8s_cloud = "{}-k8s".format(namespace)
+                await self.remove_cloud(k8s_cloud)
+
+        except Exception as ex:
+            print("Caught exception during reset: {}".format(ex))
+
+    """Deployment"""
+    async def install(
+        self,
+        cluster_uuid: str,
+        kdu_model: str,
+        atomic: bool = True,
+        timeout: int = None,
+        params: dict = None,
+    ) -> str:
+        """Install a bundle
+
+        :param cluster_uuid str: The UUID of the cluster to install to
+        :param kdu_model str: The name or path of a bundle to install
+        :param atomic bool: If set, waits until the model is active and resets
+                            the cluster on failure.
+        :param timeout int: The time, in seconds, to wait for the install
+                            to finish
+        :param params dict: Key-value pairs of instantiation parameters
+
+        :return: If successful, returns ?
+        """
+
+        if not self.authenticated:
+            print("[install] Logging in to the controller")
+            await self.login()
+
+        ##
+        # Get or create the model, based on the namespace the cluster was
+        # instantiated with.
+        namespace = self.get_namespace(cluster_uuid)
+        model = await self.get_model(namespace)
+        if not model:
+            # Create the new model
+            model = await self.add_model(namespace)
+
+        if model:
+            # TODO: Instantiation parameters
+
+            print("[install] deploying {}".format(kdu_model))
+            await model.deploy(kdu_model)
+
+            # Get the application
+            if atomic:
+                # applications = model.applications
+                print("[install] Applications: {}".format(model.applications))
+                for name in model.applications:
+                    print("[install] Waiting for {} to settle".format(name))
+                    application = model.applications[name]
+                    try:
+                        # It's not enough to wait for all units to be active;
+                        # the application status needs to be active as well.
+                        print("Waiting for all units to be active...")
+                        await model.block_until(
+                            lambda: all(
+                                unit.agent_status == 'idle'
+                                and application.status in ['active', 'unknown']
+                                and unit.workload_status in [
+                                    'active', 'unknown'
+                                ] for unit in application.units
+                            ),
+                            timeout=timeout
+                        )
+                        print("All units active.")
+
+                    except concurrent.futures._base.TimeoutError:
+                        print("[install] Timeout exceeded; resetting cluster")
+                        await self.reset(cluster_uuid)
+                        return False
+
+            # Wait for the application to be active
+            if model.is_connected():
+                print("[install] Disconnecting model")
+                await model.disconnect()
+
+            return True
+        raise Exception("Unable to install")
+
+    async def instances_list(
+            self,
+            cluster_uuid: str
+    ) -> list:
+        """
+        returns a list of deployed releases in a cluster
+
+        :param cluster_uuid: the cluster
+        :return:
+        """
+        return []
+
+    async def upgrade(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+        kdu_model: str = None,
+        params: dict = None,
+    ) -> str:
+        """Upgrade a model
+
+        :param cluster_uuid str: The UUID of the cluster to upgrade
+        :param kdu_instance str: The unique name of the KDU instance
+        :param kdu_model str: The name or path of the bundle to upgrade to
+        :param params dict: Key-value pairs of instantiation parameters
+
+        :return: If successful, reference to the new revision number of the
+                 KDU instance.
+        """
+
+        # TODO: Loop through the bundle and upgrade each charm individually
+
+        """
+        The API doesn't have a concept of bundle upgrades, because there are
+        many possible changes: charm revision, disk, number of units, etc.
+
+        As such, we are only supporting a limited subset of upgrades. We'll
+        upgrade the charm revision but leave storage and scale untouched.
+
+        Scale changes should happen through OSM constructs, and changes to
+        storage would require a redeployment of the service, at least in this
+        initial release.
+        """
+        namespace = self.get_namespace(cluster_uuid)
+        model = await self.get_model(namespace)
+
+        with open(kdu_model, 'r') as f:
+            bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+            """
+            {
+                'description': 'Test bundle',
+                'bundle': 'kubernetes',
+                'applications': {
+                    'mariadb-k8s': {
+                        'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+                        'scale': 1,
+                        'options': {
+                            'password': 'manopw',
+                            'root_password': 'osm4u',
+                            'user': 'mano'
+                        },
+                        'series': 'kubernetes'
+                    }
+                }
+            }
+            """
+            # TODO: This should be returned in an agreed-upon format
+            for name in bundle['applications']:
+                print(model.applications)
+                application = model.applications[name]
+                print(application)
+
+                path = bundle['applications'][name]['charm']
+
+                try:
+                    await application.upgrade_charm(switch=path)
+                except juju.errors.JujuError as ex:
+                    if 'already running charm' in str(ex):
+                        # We're already running this version
+                        pass
+
+        await model.disconnect()
+
+        return True
+        raise NotImplemented()
+
+    """Rollback"""
+    async def rollback(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+        revision: int = 0,
+    ) -> str:
+        """Rollback a model
+
+        :param cluster_uuid str: The UUID of the cluster to rollback
+        :param kdu_instance str: The unique name of the KDU instance
+        :param revision int: The revision to revert to. If omitted, rolls back
+                             the previous upgrade.
+
+        :return: If successful, returns the revision of active KDU instance,
+                 or raises an exception
+        """
+        raise NotImplemented()
+
+    """Deletion"""
+    async def uninstall(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+    ) -> bool:
+        """Uninstall a KDU instance
+
+        :param cluster_uuid str: The UUID of the cluster to uninstall
+        :param kdu_instance str: The unique name of the KDU instance
+
+        :return: Returns True if successful, or raises an exception
+        """
+        removed = False
+
+        # Remove an application from the model
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        if model:
+            # Get the application
+            if kdu_instance not in model.applications:
+                # TODO: Raise a named exception
+                raise Exception("Application not found.")
+
+            application = model.applications[kdu_instance]
+
+            # Destroy the application
+            await application.destroy()
+
+            # TODO: Verify removal
+
+            removed = True
+        return removed
+
+    """Introspection"""
+    async def inspect_kdu(
+        self,
+        kdu_model: str,
+    ) -> dict:
+        """Inspect a KDU
+
+        Inspects a bundle and returns a dictionary of config parameters and
+        their default values.
+
+        :param kdu_model str: The name or path of the bundle to inspect.
+
+        :return: If successful, returns a dictionary of available parameters
+                 and their default values.
+        """
+
+        kdu = {}
+        with open(kdu_model, 'r') as f:
+            bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+            """
+            {
+                'description': 'Test bundle',
+                'bundle': 'kubernetes',
+                'applications': {
+                    'mariadb-k8s': {
+                        'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+                        'scale': 1,
+                        'options': {
+                            'password': 'manopw',
+                            'root_password': 'osm4u',
+                            'user': 'mano'
+                        },
+                        'series': 'kubernetes'
+                    }
+                }
+            }
+            """
+            # TODO: This should be returned in an agreed-upon format
+            kdu = bundle['applications']
+
+        return kdu
+
+    async def help_kdu(
+        self,
+        kdu_model: str,
+    ) -> str:
+        """View the README
+
+        If available, returns the README of the bundle.
+
+        :param kdu_model str: The name or path of a bundle
+
+        :return: If found, returns the contents of the README.
+        """
+        readme = None
+
+        files = ['README', 'README.txt', 'README.md']
+        path = os.path.dirname(kdu_model)
+        for file in os.listdir(path):
+            if file in files:
+                with open(file, 'r') as f:
+                    readme = f.read()
+                    break
+
+        return readme
+
+    async def status_kdu(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+    ) -> dict:
+        """Get the status of the KDU
+
+        Get the current status of the KDU instance.
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param kdu_instance str: The unique id of the KDU instance
+
+        :return: Returns a dictionary containing namespace, state, resources,
+                 and deployment_time.
+        """
+        status = {}
+
+        model = await self.get_model(self.get_namespace(cluster_uuid))
+
+        # model = await self.get_model_by_uuid(cluster_uuid)
+        if model:
+            model_status = await model.get_status()
+            status = model_status.applications
+
+            for name in model_status.applications:
+                application = model_status.applications[name]
+                status[name] = {
+                    'status': application['status']['status']
+                }
+
+            if model.is_connected():
+                await model.disconnect()
+
+        return status
+
+    # Private methods
+    async def add_k8s(
+        self,
+        cloud_name: str,
+        credentials: dict,
+    ) -> bool:
+        """Add a k8s cloud to Juju
+
+        Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
+        Juju Controller.
+
+        :param cloud_name str: The name of the cloud to add.
+        :param credentials dict: A dictionary representing the output of
+            `kubectl config view --raw`.
+
+        :returns: True if successful, otherwise raises an exception.
+        """
+        cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            input=yaml.dump(credentials, Dumper=yaml.Dumper),
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+        return True
+
+    async def add_model(
+        self,
+        model_name: str
+    ) -> juju.model.Model:
+        """Adds a model to the controller
+
+        Adds a new model to the Juju controller
+
+        :param model_name str: The name of the model to add.
+        :returns: The juju.model.Model object of the new model upon success or
+                  raises an exception.
+        """
+        if not self.authenticated:
+            await self.login()
+
+        model = await self.controller.add_model(
+            model_name,
+            config={'authorized-keys': self.juju_public_key}
+        )
+        return model
+
+    async def bootstrap(
+        self,
+        cloud_name: str,
+        cluster_uuid: str
+    ) -> bool:
+        """Bootstrap a Kubernetes controller
+
+        Bootstrap a Juju controller inside the Kubernetes cluster
+
+        :param cloud_name str: The name of the cloud.
+        :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :returns: True upon success or raises an exception.
+        """
+        cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
+        print("Bootstrapping controller {} in cloud {}".format(
+            cluster_uuid, cloud_name
+        ))
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            #
+            if 'already exists' not in p.stderr:
+                raise Exception(p.stderr)
+
+        return True
+
+    async def destroy_controller(
+        self,
+        cluster_uuid: str
+    ) -> bool:
+        """Destroy a Kubernetes controller
+
+        Destroy an existing Kubernetes controller.
+
+        :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :returns: True upon success or raises an exception.
+        """
+        cmd = [
+            self.k8scli,
+            "destroy-controller",
+            "--destroy-all-models",
+            "--destroy-storage",
+            "-y",
+            cluster_uuid
+        ]
+
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            #
+            if 'already exists' not in p.stderr:
+                raise Exception(p.stderr)
+
+    def get_config(
+        self,
+        cluster_uuid: str,
+    ) -> dict:
+        """Get the cluster configuration
+
+        Gets the configuration of the cluster
+
+        :param cluster_uuid str: The UUID of the cluster.
+        :return: A dict upon success, or raises an exception.
+        """
+        cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+        if os.path.exists(cluster_config):
+            with open(cluster_config, 'r') as f:
+                config = yaml.load(f.read(), Loader=yaml.FullLoader)
+                return config
+        else:
+            raise Exception(
+                "Unable to locate configuration for cluster {}".format(
+                    cluster_uuid
+                )
+            )
+
+    async def get_model(
+        self,
+        model_name: str,
+    ) -> juju.model.Model:
+        """Get a model from the Juju Controller.
+
+        Note: Model objects returned must call disconnected() before it goes
+        out of scope.
+
+        :param model_name str: The name of the model to get
+        :return The juju.model.Model object if found, or None.
+        """
+        if not self.authenticated:
+            await self.login()
+
+        model = None
+        models = await self.controller.list_models()
+
+        if model_name in models:
+            model = await self.controller.get_model(
+                model_name
+            )
+        return model
+
+    def get_namespace(
+        self,
+        cluster_uuid: str,
+    ) -> str:
+        """Get the namespace UUID
+        Gets the namespace's unique name
+
+        :param cluster_uuid str: The UUID of the cluster
+        :returns: The namespace UUID, or raises an exception
+        """
+        config = self.get_config(cluster_uuid)
+
+        # Make sure the name is in the config
+        if 'namespace' not in config:
+            raise Exception("Namespace not found.")
+
+        # TODO: We want to make sure this is unique to the cluster, in case
+        # the cluster is being reused.
+        # Consider pre/appending the cluster id to the namespace string
+        return config['namespace']
+
+    async def has_model(
+        self,
+        model_name: str
+    ) -> bool:
+        """Check if a model exists in the controller
+
+        Checks to see if a model exists in the connected Juju controller.
+
+        :param model_name str: The name of the model
+        :return: A boolean indicating if the model exists
+        """
+        models = await self.controller.list_models()
+
+        if model_name in models:
+            return True
+        return False
+
+    def is_microk8s_by_cluster_uuid(
+        self,
+        cluster_uuid: str,
+    ) -> bool:
+        """Check if a cluster is micro8s
+
+        Checks if a cluster is running microk8s
+
+        :param cluster_uuid str: The UUID of the cluster
+        :returns: A boolean if the cluster is running microk8s
+        """
+        config = self.get_config(cluster_uuid)
+        return config['microk8s']
+
+    def is_microk8s_by_credentials(
+        self,
+        credentials: dict,
+    ) -> bool:
+        """Check if a cluster is micro8s
+
+        Checks if a cluster is running microk8s
+
+        :param credentials dict: A dictionary containing the k8s credentials
+        :returns: A boolean if the cluster is running microk8s
+        """
+        for context in credentials['contexts']:
+            if 'microk8s' in context['name']:
+                return True
+
+        return False
+
+    async def login(self):
+        """Login to the Juju controller."""
+
+        if self.authenticated:
+            return
+
+        self.connecting = True
+
+        self.controller = Controller()
+
+        if self.juju_secret:
+            self.log.debug(
+                "Connecting to controller... ws://{} as {}/{}".format(
+                    self.juju_endpoint,
+                    self.juju_user,
+                    self.juju_secret,
+                )
+            )
+            try:
+                await self.controller.connect(
+                    endpoint=self.juju_endpoint,
+                    username=self.juju_user,
+                    password=self.juju_secret,
+                    cacert=self.juju_ca_cert,
+                )
+                self.authenticated = True
+                self.log.debug("JujuApi: Logged into controller")
+            except Exception as ex:
+                print(ex)
+                self.log.debug("Caught exception: {}".format(ex))
+                pass
+        else:
+            self.log.fatal("VCA credentials not configured.")
+            self.authenticated = False
+
+    async def logout(self):
+        """Logout of the Juju controller."""
+        print("[logout]")
+        if not self.authenticated:
+            return False
+
+        for model in self.models:
+            print("Logging out of model {}".format(model))
+            await self.models[model].disconnect()
+
+        if self.controller:
+            self.log.debug("Disconnecting controller {}".format(
+                self.controller
+            ))
+            await self.controller.disconnect()
+            self.controller = None
+
+        self.authenticated = False
+
+    async def remove_cloud(
+        self,
+        cloud_name: str,
+    ) -> bool:
+        """Remove a k8s cloud from Juju
+
+        Removes a Kubernetes cloud from Juju.
+
+        :param cloud_name str: The name of the cloud to add.
+
+        :returns: True if successful, otherwise raises an exception.
+        """
+
+        # Remove the bootstrapped controller
+        cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+
+        # Remove the cloud from the local config
+        cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
+        p = subprocess.run(
+            cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            encoding='ascii'
+        )
+        retcode = p.returncode
+
+        if retcode > 0:
+            raise Exception(p.stderr)
+
+
+        return True
+
+    async def set_config(
+        self,
+        cluster_uuid: str,
+        config: dict,
+    ) -> bool:
+        """Save the cluster configuration
+
+        Saves the cluster information to the file store
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param config dict: A dictionary containing the cluster configuration
+        :returns: Boolean upon success or raises an exception.
+        """
+
+        cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+        if not os.path.exists(cluster_config):
+            print("Writing config to {}".format(cluster_config))
+            with open(cluster_config, 'w') as f:
+                f.write(yaml.dump(config, Dumper=yaml.Dumper))
+
+        return True
diff --git a/n2vc/loggable.py b/n2vc/loggable.py
new file mode 100644 (file)
index 0000000..40efa24
--- /dev/null
@@ -0,0 +1,167 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+
+import logging
+import asyncio
+import time
+import inspect
+import datetime
+import threading    # only for logging purposes (not for using threads)
+
+
+class Loggable:
+
+    def __init__(
+            self,
+            log,
+            log_to_console: bool = False,
+            prefix: str = ''
+    ):
+
+        self._last_log_time = None   # used for time increment in logging
+        self._log_to_console = log_to_console
+        self._prefix = prefix
+        if log is not None:
+            self.log = log
+        else:
+            self.log = logging.getLogger(__name__)
+
+    def debug(self, msg: str):
+        self._log_msg(log_level='DEBUG', msg=msg)
+
+    def info(self, msg: str):
+        self._log_msg(log_level='INFO', msg=msg)
+
+    def warning(self, msg: str):
+        self._log_msg(log_level='WARNING', msg=msg)
+
+    def error(self, msg: str):
+        self._log_msg(log_level='ERROR', msg=msg)
+
+    def critical(self, msg: str):
+        self._log_msg(log_level='CRITICAL', msg=msg)
+
+    ##################################################################################################
+
+    def _log_msg(self, log_level: str, msg: str):
+        """Generic log method"""
+        msg = self._format_log(
+            log_level=log_level,
+            msg=msg,
+            obj=self,
+            level=3,
+            include_path=False,
+            include_thread=False,
+            include_coroutine=True
+        )
+        if self._log_to_console:
+            print(msg)
+        else:
+            if self.log is not None:
+                if log_level == 'DEBUG':
+                    self.log.debug(msg)
+                elif log_level == 'INFO':
+                    self.log.info(msg)
+                elif log_level == 'WARNING':
+                    self.log.warning(msg)
+                elif log_level == 'ERROR':
+                    self.log.error(msg)
+                elif log_level == 'CRITICAL':
+                    self.log.critical(msg)
+
+    def _format_log(
+            self,
+            log_level: str,
+            msg: str = '',
+            obj: object = None,
+            level: int = None,
+            include_path: bool = False,
+            include_thread: bool = False,
+            include_coroutine: bool = True
+    ) -> str:
+
+        # time increment from last log
+        now = time.perf_counter()
+        if self._last_log_time is None:
+            time_str = ' (+0.000)'
+        else:
+            diff = round(now - self._last_log_time, 3)
+            time_str = ' (+{})'.format(diff)
+        self._last_log_time = now
+
+        if level is None:
+            level = 1
+
+        # stack info
+        fi = inspect.stack()[level]
+        filename = fi.filename
+        func = fi.function
+        lineno = fi.lineno
+        # filename without path
+        if not include_path:
+            i = filename.rfind('/')
+            if i > 0:
+                filename = filename[i+1:]
+
+        # datetime
+        dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
+        dt = dt + time_str
+        dt = time_str       # logger already shows datetime
+
+        # current thread
+        if include_thread:
+            thread_name = 'th:{}'.format(threading.current_thread().getName())
+        else:
+            thread_name = ''
+
+        # current coroutine
+
+        coroutine_id = ''
+        if include_coroutine:
+            try:
+                if asyncio.Task.current_task() is not None:
+                    def print_cor_name(c):
+                        import inspect
+                        try:
+                            for m in inspect.getmembers(c):
+                                if m[0] == '__name__':
+                                    return m[1]
+                        except Exception:
+                            pass
+                    coro = asyncio.Task.current_task()._coro
+                    coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro))
+            except Exception:
+                coroutine_id = ''
+
+        # classname
+        if obj is not None:
+            obj_type = obj.__class__.__name__  # type: str
+            log_msg = \
+                '{} {} {} {} {}::{}.{}():{}\n{}'\
+                .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg))
+        else:
+            log_msg = \
+                '{} {} {} {} {}::{}():{}\n{}'\
+                .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg))
+
+        return log_msg
diff --git a/tests/bundles/k8s-zookeeper-downgrade.yaml b/tests/bundles/k8s-zookeeper-downgrade.yaml
new file mode 100644 (file)
index 0000000..8a99e2c
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-29'
+    scale: 1
+    series: kubernetes
diff --git a/tests/bundles/k8s-zookeeper-upgrade.yaml b/tests/bundles/k8s-zookeeper-upgrade.yaml
new file mode 100644 (file)
index 0000000..8308f2f
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-31'
+    scale: 1
+    series: kubernetes
diff --git a/tests/bundles/k8s-zookeeper.yaml b/tests/bundles/k8s-zookeeper.yaml
new file mode 100644 (file)
index 0000000..689220e
--- /dev/null
@@ -0,0 +1,21 @@
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+  zookeeper-k8s:
+    charm: 'cs:~charmed-osm/zookeeper-k8s-30'
+    scale: 1
+    series: kubernetes
diff --git a/tests/test_k8s_juju_conn.py b/tests/test_k8s_juju_conn.py
new file mode 100644 (file)
index 0000000..6e1e98e
--- /dev/null
@@ -0,0 +1,129 @@
+#  Copyright 2019 Canonical Ltd.
+
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+
+#      http://www.apache.org/licenses/LICENSE-2.0
+
+#      Unless required by applicable law or agreed to in writing, software
+#      distributed under the License is distributed on an "AS IS" BASIS,
+#      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#      See the License for the specific language governing permissions and
+#      limitations under the License.
+
+import argparse
+import asyncio
+import logging
+import n2vc.k8s_juju_conn
+from base import get_juju_public_key
+import os
+from osm_common.fslocal import FsLocal
+import subprocess
+import yaml
+
+
+def get_args():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--cluster_uuid", help='The UUID of an existing cluster to use', default=None)
+    parser.add_argument("--reset", action="store_true")
+    return parser.parse_args()
+
+async def main():
+
+    args = get_args()
+
+    reuse_cluster_uuid = args.cluster_uuid
+
+    log = logging.getLogger()
+    log.level = logging.DEBUG
+
+    # Extract parameters from the environment in order to run our tests
+    vca_host = os.getenv('VCA_HOST', '127.0.0.1')
+    vca_port = os.getenv('VCA_PORT', 17070)
+    vca_user = os.getenv('VCA_USER', 'admin')
+    vca_charms = os.getenv('VCA_CHARMS', None)
+    vca_secret = os.getenv('VCA_SECRET', None)
+    vca_ca_cert = os.getenv('VCA_CACERT', None)
+
+    # Get the Juju Public key
+    juju_public_key = get_juju_public_key()
+    if juju_public_key:
+        with open(juju_public_key, 'r') as f:
+            juju_public_key = f.read()
+    else:
+        raise Exception("No Juju Public Key found")
+
+    storage = {
+        'driver': 'local',
+        'path': '/srv/app/storage'
+    }
+    fs = FsLocal()
+    fs.fs_connect(storage)
+
+    client = n2vc.k8s_juju_conn.K8sJujuConnector(
+        kubectl_command = '/bin/true',
+        fs = fs,
+    )
+
+    # kubectl config view --raw
+    # microk8s.config
+
+    # if microk8s then
+    kubecfg = subprocess.getoutput('microk8s.config')
+    # else
+    # kubecfg.subprocess.getoutput('kubectl config view --raw')
+    
+    k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
+    namespace = 'testing'
+    kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
+
+    """init_env"""
+    cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
+    print(cluster_uuid)
+
+    if not reuse_cluster_uuid:
+        # This is a new cluster, so install to it
+
+        """install"""
+        # async def install(self, cluster_uuid, kdu_model, atomic=True, timeout=None, params=None):
+        # TODO: Re-add storage declaration to bundle. The API doesn't support the storage option yet. David is investigating.
+
+        # Deploy the bundle
+        kdu_instance = await client.install(cluster_uuid, kdu_model, atomic=True, timeout=600)
+
+        if kdu_instance:
+            # Inspect
+            print("Getting status")
+            status = await client.status_kdu(cluster_uuid, kdu_instance)
+            print(status)
+
+    # Inspect the bundle
+    config = await client.inspect_kdu(kdu_model)
+    print(config)
+
+    readme = await client.help_kdu(kdu_model)
+    # print(readme)
+
+
+    """upgrade
+    Upgrade to a newer version of the bundle
+    """
+    kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-upgrade.yaml"
+    upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+    kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-downgrade.yaml"
+    upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+    """uninstall"""
+
+    """reset"""
+    if args.reset:
+        await client.reset(cluster_uuid)
+
+    await client.logout()
+
+    print("Done")
+
+if __name__ == "__main__":
+    asyncio.run(main())