--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import asyncio
+from n2vc.loggable import Loggable
+import abc
+import time
+
+
+class K8sConnector(abc.ABC, Loggable):
+
+ """
+ ##################################################################################################
+ ########################################## P U B L I C ###########################################
+ ##################################################################################################
+ """
+
+ def __init__(
+ self,
+ db: object,
+ log: object = None,
+ on_update_db=None
+ ):
+ """
+
+ :param db: database object to write current operation status
+ :param log: logger for tracing
+ :param on_update_db: callback called when k8s connector updates database
+ """
+
+ # parent class
+ Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S')
+
+ self.info('Initializing generic K8S connector')
+
+ # the database and update callback
+ self.db = db
+ self.on_update_db = on_update_db
+
+ self.info('K8S generic connector initialized')
+
+ @abc.abstractmethod
+ async def init_env(
+ self,
+ k8s_creds: str,
+ namespace: str = 'kube-system',
+ reuse_cluster_uuid = None
+ ) -> (str, bool):
+ """
+ It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
+ client (OSM)
+ server (Tiller/Charm)
+
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+ :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+ (on error, an exception will be raised)
+ """
+
+ @abc.abstractmethod
+ async def repo_add(
+ self,
+ cluster_uuid: str,
+ name: str,
+ url: str,
+ repo_type: str = 'chart'
+ ):
+ """
+ Add a new repository to OSM database
+
+ :param cluster_uuid: the cluster
+ :param name: name for the repo in OSM
+ :param url: URL of the repo
+ :param repo_type: either "chart" or "bundle"
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def repo_list(
+ self,
+ cluster_uuid: str
+ ):
+ """
+ Get the list of registered repositories
+
+ :param cluster_uuid: the cluster
+ :return: list of registered repositories: [ (name, url) .... ]
+ """
+
+ @abc.abstractmethod
+ async def repo_remove(
+ self,
+ cluster_uuid: str,
+ name: str
+ ):
+ """
+ Remove a repository from OSM
+
+ :param name: repo name in OSM
+ :param cluster_uuid: the cluster
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def reset(
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False
+ ) -> bool:
+ """
+ Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters.
+ Intended to be used e.g. when the NS instance is deleted.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM.
+ :param force: force deletion, even in case there are deployed releases
+ :param uninstall_sw: flag to indicate that sw uninstallation from software is needed
+ :return: str: kdu_instance generated by helm
+ """
+
+ @abc.abstractmethod
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+ """
+ Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle
+ properly parametrized (in practice, this call would happen before any _initial-config-primitive_
+ of the VNF is called).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_model: chart/bundle:version reference (string), which can be either of these options:
+ - a name of chart/bundle available via the repos known by OSM
+ - a path to a packaged chart/bundle
+ - a path to an unpacked chart/bundle directory or a URL
+ :param atomic: If set, installation process purges chart/bundle on fail, also will wait until
+ all the K8s objects are active
+ :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+ Helm default timeout: 300s)
+ :param params: dictionary of key-value pairs for instantiation parameters (overriding default values)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def upgrade(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+ """
+ Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle.
+ It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance to be updated
+ :param kdu_model: new chart/bundle:version reference
+ :param atomic: rollback in case of fail and wait for pods and services are available
+ :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+ Helm default timeout: 300s)
+ :param params: new dictionary of key-value pairs for instantiation parameters
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return: reference to the new revision number of the KDU instance
+ """
+
+ @abc.abstractmethod
+ async def rollback(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision=0,
+ db_dict: dict = None
+ ):
+ """
+ Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call.
+ It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration.
+ This would be exposed as Day-2 primitive.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param revision: revision to which revert changes. If omitted, it will revert the last update only
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return:If successful, reference to the current active revision of the KDU instance after the rollback
+ """
+
+ @abc.abstractmethod
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ):
+ """
+ Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+ after all _terminate-config-primitive_ of the VNF are invoked).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance to be deleted
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def inspect_kdu(
+ self,
+ kdu_model: str
+ ) -> str:
+ """
+ These calls will retrieve from the Charm/Bundle:
+
+ - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
+ the contents of `values.yaml`).
+ - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
+
+ :param cluster_uuid: the cluster to get the information
+ :param kdu_model: chart/bundle reference
+ :return: If successful, it will return a dictionary containing the list of available parameters
+ and their default values
+ """
+
+ @abc.abstractmethod
+ async def help_kdu(
+ self,
+ kdu_model: str
+ ) -> str:
+ """
+
+ :param cluster_uuid: the cluster to get the information
+ :param kdu_model: chart/bundle reference
+ :return: If successful, it will return the contents of the 'readme.md'
+ """
+
+ @abc.abstractmethod
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ) -> str:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve
+ the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied
+ to a given instance. This call would be based on the `status` call.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources
+ - Last `deployment_time`.
+
+ """
+
+ """
+ ##################################################################################################
+ ########################################## P R I V A T E #########################################
+ ##################################################################################################
+ """
+
+ async def write_app_status_to_db(
+ self,
+ db_dict: dict,
+ status: str,
+ detailed_status: str,
+ operation: str
+ ) -> bool:
+
+ if not self.db:
+ self.warning('No db => No database write')
+ return False
+
+ if not db_dict:
+ self.warning('No db_dict => No database write')
+ return False
+
+ self.debug('status={}'.format(status))
+
+ try:
+
+ the_table = db_dict['collection']
+ the_filter = db_dict['filter']
+ the_path = db_dict['path']
+ if not the_path[-1] == '.':
+ the_path = the_path + '.'
+ update_dict = {
+ the_path + 'operation': operation,
+ the_path + 'status': status,
+ the_path + 'detailed-status': detailed_status,
+ the_path + 'status-time': str(time.time()),
+ }
+
+ self.db.set_one(
+ table=the_table,
+ q_filter=the_filter,
+ update_dict=update_dict,
+ fail_on_empty=True
+ )
+
+ # database callback
+ if self.on_update_db:
+ if asyncio.iscoroutinefunction(self.on_update_db):
+ await self.on_update_db(the_table, the_filter, the_path, update_dict)
+ else:
+ self.on_update_db(the_table, the_filter, the_path, update_dict)
+
+ return True
+
+ except Exception as e:
+ self.info('Exception writing status to database: {}'.format(e))
+ return False
--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import paramiko
+import subprocess
+import os
+import shutil
+import asyncio
+import time
+import yaml
+from uuid import uuid4
+import random
+from n2vc.k8s_conn import K8sConnector
+
+
+class K8sHelmConnector(K8sConnector):
+
+ """
+ ##################################################################################################
+ ########################################## P U B L I C ###########################################
+ ##################################################################################################
+ """
+
+ def __init__(
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = '/usr/bin/kubectl',
+ helm_command: str = '/usr/bin/helm',
+ log: object = None,
+ on_update_db=None
+ ):
+ """
+
+ :param fs: file system for kubernetes and helm configuration
+ :param db: database object to write current operation status
+ :param kubectl_command: path to kubectl executable
+ :param helm_command: path to helm executable
+ :param log: logger
+ :param on_update_db: callback called when k8s connector updates database
+ """
+
+ # parent class
+ K8sConnector.__init__(
+ self,
+ db=db,
+ log=log,
+ on_update_db=on_update_db
+ )
+
+ self.info('Initializing K8S Helm connector')
+
+ # random numbers for release name generation
+ random.seed(time.time())
+
+ # the file system
+ self.fs = fs
+
+ # exception if kubectl is not installed
+ self.kubectl_command = kubectl_command
+ self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
+
+ # exception if helm is not installed
+ self._helm_command = helm_command
+ self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+
+ self.info('K8S Helm connector initialized')
+
+ async def init_env(
+ self,
+ k8s_creds: str,
+ namespace: str = 'kube-system',
+ reuse_cluster_uuid=None
+ ) -> (str, bool):
+
+ cluster_uuid = reuse_cluster_uuid
+ if not cluster_uuid:
+ cluster_uuid = str(uuid4())
+
+ self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+
+ # create config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ f = open(config_filename, "w")
+ f.write(k8s_creds)
+ f.close()
+
+ # check if tiller pod is up in cluster
+ command = '{} --kubeconfig={} --namespace={} get deployments'\
+ .format(self.kubectl_command, config_filename, namespace)
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ output_table = K8sHelmConnector._output_to_table(output=output)
+
+ # find 'tiller' pod in all pods
+ already_initialized = False
+ try:
+ for row in output_table:
+ if row[0].startswith('tiller-deploy'):
+ already_initialized = True
+ break
+ except Exception as e:
+ pass
+
+ # helm init
+ n2vc_installed_sw = False
+ if not already_initialized:
+ self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
+ command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
+ .format(self._helm_command, config_filename, namespace, helm_dir)
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ n2vc_installed_sw = True
+ else:
+ # check client helm installation
+ check_file = helm_dir + '/repository/repositories.yaml'
+ if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
+ self.info('Initializing helm in client: {}'.format(cluster_uuid))
+ command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
+ .format(self._helm_command, config_filename, namespace, helm_dir)
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ else:
+ self.info('Helm client already initialized')
+
+ self.info('Cluster initialized {}'.format(cluster_uuid))
+
+ return cluster_uuid, n2vc_installed_sw
+
+ async def repo_add(
+ self,
+ cluster_uuid: str,
+ name: str,
+ url: str,
+ repo_type: str = 'chart'
+ ):
+
+ self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ # helm repo update
+ command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
+ self.debug('updating repo: {}'.format(command))
+ await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ # helm repo add name url
+ command = '{} --kubeconfig={} --home={} repo add {} {}'\
+ .format(self._helm_command, config_filename, helm_dir, name, url)
+ self.debug('adding repo: {}'.format(command))
+ await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ async def repo_list(
+ self,
+ cluster_uuid: str
+ ) -> list:
+ """
+ Get the list of registered repositories
+
+ :return: list of registered repositories: [ (name, url) .... ]
+ """
+
+ self.debug('list repositories for cluster {}'.format(cluster_uuid))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ if output and len(output) > 0:
+ return yaml.load(output, Loader=yaml.SafeLoader)
+ else:
+ return []
+
+ async def repo_remove(
+ self,
+ cluster_uuid: str,
+ name: str
+ ):
+ """
+ Remove a repository from OSM
+
+ :param cluster_uuid: the cluster
+ :param name: repo name in OSM
+ :return: True if successful
+ """
+
+ self.debug('list repositories for cluster {}'.format(cluster_uuid))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} --kubeconfig={} --home={} repo remove {}'\
+ .format(self._helm_command, config_filename, helm_dir, name)
+
+ await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ async def reset(
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False
+ ) -> bool:
+
+ self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+
+ # get kube and helm directories
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
+
+ # uninstall releases if needed
+ releases = await self.instances_list(cluster_uuid=cluster_uuid)
+ if len(releases) > 0:
+ if force:
+ for r in releases:
+ try:
+ kdu_instance = r.get('Name')
+ chart = r.get('Chart')
+ self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
+ await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ except Exception as e:
+ self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+ else:
+ msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
+ .format(cluster_uuid)
+ self.error(msg)
+ raise Exception(msg)
+
+ if uninstall_sw:
+
+ self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+
+ # find namespace for tiller pod
+ command = '{} --kubeconfig={} get deployments --all-namespaces'\
+ .format(self.kubectl_command, config_filename)
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ output_table = K8sHelmConnector._output_to_table(output=output)
+ namespace = None
+ for r in output_table:
+ try:
+ if 'tiller-deploy' in r[1]:
+ namespace = r[0]
+ break
+ except Exception as e:
+ pass
+ else:
+ msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
+ self.error(msg)
+ # raise Exception(msg)
+
+ self.debug('namespace for tiller: {}'.format(namespace))
+
+ force_str = '--force'
+
+ if namespace:
+ # delete tiller deployment
+ self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
+ command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
+ .format(self.kubectl_command, namespace, config_filename, force_str)
+ await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ # uninstall tiller from cluster
+ self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ command = '{} --kubeconfig={} --home={} reset'\
+ .format(self._helm_command, config_filename, helm_dir)
+ self.debug('resetting: {}'.format(command))
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ else:
+ self.debug('namespace not found')
+
+ # delete cluster directory
+ dir = self.fs.path + '/' + cluster_uuid
+ self.debug('Removing directory {}'.format(dir))
+ shutil.rmtree(dir, ignore_errors=True)
+
+ return True
+
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+
+ self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+
+ start = time.time()
+ end = start + timeout
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ # params to str
+ # params_str = K8sHelmConnector._params_to_set_option(params)
+ params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+
+ timeout_str = ''
+ if timeout:
+ timeout_str = '--timeout {}'.format(timeout)
+
+ # atomic
+ atomic_str = ''
+ if atomic:
+ atomic_str = '--atomic'
+
+ # version
+ version_str = ''
+ if ':' in kdu_model:
+ parts = kdu_model.split(sep=':')
+ if len(parts) == 2:
+ version_str = '--version {}'.format(parts[1])
+ kdu_model = parts[0]
+
+ # generate a name for the releas. Then, check if already exists
+ kdu_instance = None
+ while kdu_instance is None:
+ kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
+ try:
+ result = await self._status_kdu(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ show_error_log=False
+ )
+ if result is not None:
+ # instance already exists: generate a new one
+ kdu_instance = None
+ except:
+ kdu_instance = None
+
+ # helm repo install
+ command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
+ .format(self._helm_command, atomic_str, config_filename, helm_dir,
+ params_str, timeout_str, kdu_instance, kdu_model, version_str)
+ self.debug('installing: {}'.format(command))
+
+ if atomic:
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='install',
+ run_once=False
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+
+ output, rc = exec_task.result()
+
+ else:
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ # remove temporal values yaml file
+ if file_to_delete:
+ os.remove(file_to_delete)
+
+ # write final status
+ await self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='install',
+ run_once=True,
+ check_every=0
+ )
+
+ if rc != 0:
+ msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ self.error(msg)
+ raise Exception(msg)
+
+ self.debug('Returning kdu_instance {}'.format(kdu_instance))
+ return kdu_instance
+
+ async def instances_list(
+ self,
+ cluster_uuid: str
+ ) -> list:
+ """
+ returns a list of deployed releases in a cluster
+
+ :param cluster_uuid: the cluster
+ :return:
+ """
+
+ self.debug('list releases for cluster {}'.format(cluster_uuid))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} --kubeconfig={} --home={} list --output yaml'\
+ .format(self._helm_command, config_filename, helm_dir)
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ if output and len(output) > 0:
+ return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
+ else:
+ return []
+
+ async def upgrade(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+
+ self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+
+ start = time.time()
+ end = start + timeout
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ # params to str
+ # params_str = K8sHelmConnector._params_to_set_option(params)
+ params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+
+ timeout_str = ''
+ if timeout:
+ timeout_str = '--timeout {}'.format(timeout)
+
+ # atomic
+ atomic_str = ''
+ if atomic:
+ atomic_str = '--atomic'
+
+ # version
+ version_str = ''
+ if kdu_model and ':' in kdu_model:
+ parts = kdu_model.split(sep=':')
+ if len(parts) == 2:
+ version_str = '--version {}'.format(parts[1])
+ kdu_model = parts[0]
+
+ # helm repo upgrade
+ command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
+ .format(self._helm_command, atomic_str, config_filename, helm_dir,
+ params_str, timeout_str, kdu_instance, kdu_model, version_str)
+ self.debug('upgrading: {}'.format(command))
+
+ if atomic:
+
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='upgrade',
+ run_once=False
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([ exec_task ])
+
+ # cancel status task
+ status_task.cancel()
+ output, rc = exec_task.result()
+
+ else:
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ # remove temporal values yaml file
+ if file_to_delete:
+ os.remove(file_to_delete)
+
+ # write final status
+ await self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='upgrade',
+ run_once=True,
+ check_every=0
+ )
+
+ if rc != 0:
+ msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ self.error(msg)
+ raise Exception(msg)
+
+ # return new revision number
+ instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ if instance:
+ revision = int(instance.get('Revision'))
+ self.debug('New revision: {}'.format(revision))
+ return revision
+ else:
+ return 0
+
+ async def rollback(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision=0,
+ db_dict: dict = None
+ ):
+
+ self.debug('rollback kdu_instance {} to revision {} from cluster {}'
+ .format(kdu_instance, revision, cluster_uuid))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
+ .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
+
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='rollback',
+ run_once=False
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+
+ output, rc = exec_task.result()
+
+ # write final status
+ await self._store_status(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ db_dict=db_dict,
+ operation='rollback',
+ run_once=True,
+ check_every=0
+ )
+
+ if rc != 0:
+ msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ self.error(msg)
+ raise Exception(msg)
+
+ # return new revision number
+ instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ if instance:
+ revision = int(instance.get('Revision'))
+ self.debug('New revision: {}'.format(revision))
+ return revision
+ else:
+ return 0
+
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ):
+ """
+ Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+ after all _terminate-config-primitive_ of the VNF are invoked).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance to be deleted
+ :return: True if successful
+ """
+
+ self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} --kubeconfig={} --home={} delete --purge {}'\
+ .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ return self._output_to_table(output)
+
+ async def inspect_kdu(
+ self,
+ kdu_model: str
+ ) -> str:
+
+ self.debug('inspect kdu_model {}'.format(kdu_model))
+
+ command = '{} inspect values {}'\
+ .format(self._helm_command, kdu_model)
+
+ output, rc = await self._local_async_exec(command=command)
+
+ return output
+
+ async def help_kdu(
+ self,
+ kdu_model: str
+ ):
+
+ self.debug('help kdu_model {}'.format(kdu_model))
+
+ command = '{} inspect readme {}'\
+ .format(self._helm_command, kdu_model)
+
+ output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+
+ return output
+
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ):
+
+ return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+
+
+ """
+ ##################################################################################################
+ ########################################## P R I V A T E #########################################
+ ##################################################################################################
+ """
+
+ async def _status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ show_error_log: bool = False
+ ):
+
+ self.debug('status of kdu_instance {}'.format(kdu_instance))
+
+ # config filename
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ command = '{} --kubeconfig={} --home={} status {} --output yaml'\
+ .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+
+ output, rc = await self._local_async_exec(
+ command=command,
+ raise_exception_on_error=True,
+ show_error_log=show_error_log
+ )
+
+ if rc != 0:
+ return None
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ # remove field 'notes'
+ try:
+ del data.get('info').get('status')['notes']
+ except KeyError:
+ pass
+
+ # parse field 'resources'
+ try:
+ resources = str(data.get('info').get('status').get('resources'))
+ resource_table = self._output_to_table(resources)
+ data.get('info').get('status')['resources'] = resource_table
+ except Exception as e:
+ pass
+
+ return data
+
+ async def get_instance_info(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ):
+ instances = await self.instances_list(cluster_uuid=cluster_uuid)
+ for instance in instances:
+ if instance.get('Name') == kdu_instance:
+ return instance
+ self.debug('Instance {} not found'.format(kdu_instance))
+ return None
+
+ @staticmethod
+ def _generate_release_name(
+ chart_name: str
+ ):
+ name = ''
+ for c in chart_name:
+ if c.isalpha() or c.isnumeric():
+ name += c
+ else:
+ name += '-'
+ if len(name) > 35:
+ name = name[0:35]
+
+ # if does not start with alpha character, prefix 'a'
+ if not name[0].isalpha():
+ name = 'a' + name
+
+ name += '-'
+
+ def get_random_number():
+ r = random.randrange(start=1, stop=99999999)
+ s = str(r)
+ s = s.rjust(width=10, fillchar=' ')
+ return s
+
+ name = name + get_random_number()
+ return name.lower()
+
+ async def _store_status(
+ self,
+ cluster_uuid: str,
+ operation: str,
+ kdu_instance: str,
+ check_every: float = 10,
+ db_dict: dict = None,
+ run_once: bool = False
+ ):
+ while True:
+ try:
+ await asyncio.sleep(check_every)
+ detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ status = detailed_status.get('info').get('Description')
+ print('=' * 60)
+ self.debug('STATUS:\n{}'.format(status))
+ self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
+ print('=' * 60)
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation)
+ if not result:
+ self.info('Error writing in database. Task exiting...')
+ return
+ except asyncio.CancelledError:
+ self.debug('Task cancelled')
+ return
+ except Exception as e:
+ pass
+ finally:
+ if run_once:
+ return
+
+ async def _is_install_completed(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ) -> bool:
+
+ status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+
+ # extract info.status.resources-> str
+ # format:
+ # ==> v1/Deployment
+ # NAME READY UP-TO-DATE AVAILABLE AGE
+ # halting-horse-mongodb 0/1 1 0 0s
+ # halting-petit-mongodb 1/1 1 0 0s
+ # blank line
+ resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+
+ # convert to table
+ resources = K8sHelmConnector._output_to_table(resources)
+
+ num_lines = len(resources)
+ index = 0
+ while index < num_lines:
+ try:
+ line1 = resources[index]
+ index += 1
+ # find '==>' in column 0
+ if line1[0] == '==>':
+ line2 = resources[index]
+ index += 1
+ # find READY in column 1
+ if line2[1] == 'READY':
+ # read next lines
+ line3 = resources[index]
+ index += 1
+ while len(line3) > 1 and index < num_lines:
+ ready_value = line3[1]
+ parts = ready_value.split(sep='/')
+ current = int(parts[0])
+ total = int(parts[1])
+ if current < total:
+ self.debug('NOT READY:\n {}'.format(line3))
+ ready = False
+ line3 = resources[index]
+ index += 1
+
+ except Exception as e:
+ pass
+
+ return ready
+
+ @staticmethod
+ def _get_deep(dictionary: dict, members: tuple):
+ target = dictionary
+ value = None
+ try:
+ for m in members:
+ value = target.get(m)
+ if not value:
+ return None
+ else:
+ target = value
+ except Exception as e:
+ pass
+ return value
+
+ # find key:value in several lines
+ @staticmethod
+ def _find_in_lines(p_lines: list, p_key: str) -> str:
+ for line in p_lines:
+ try:
+ if line.startswith(p_key + ':'):
+ parts = line.split(':')
+ the_value = parts[1].strip()
+ return the_value
+ except Exception as e:
+ # ignore it
+ pass
+ return None
+
+ # params for use in -f file
+ # returns values file option and filename (in order to delete it at the end)
+ def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
+ params_str = ''
+
+ if params and len(params) > 0:
+ kube_dir, helm_dir, config_filename, cluster_dir = \
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+ def get_random_number():
+ r = random.randrange(start=1, stop=99999999)
+ s = str(r)
+ while len(s) < 10:
+ s = '0' + s
+ return s
+
+ params2 = dict()
+ for key in params:
+ value = params.get(key)
+ if '!!yaml' in str(value):
+ value = yaml.load(value[7:])
+ params2[key] = value
+
+ values_file = get_random_number() + '.yaml'
+ with open(values_file, 'w') as stream:
+ yaml.dump(params2, stream, indent=4, default_flow_style=False)
+
+ return '-f {}'.format(values_file), values_file
+
+ return '', None
+
+ # params for use in --set option
+ @staticmethod
+ def _params_to_set_option(params: dict) -> str:
+ params_str = ''
+ if params and len(params) > 0:
+ start = True
+ for key in params:
+ value = params.get(key, None)
+ if value is not None:
+ if start:
+ params_str += '--set '
+ start = False
+ else:
+ params_str += ','
+ params_str += '{}={}'.format(key, value)
+ return params_str
+
+ @staticmethod
+ def _output_to_lines(output: str) -> list:
+ output_lines = list()
+ lines = output.splitlines(keepends=False)
+ for line in lines:
+ line = line.strip()
+ if len(line) > 0:
+ output_lines.append(line)
+ return output_lines
+
+ @staticmethod
+ def _output_to_table(output: str) -> list:
+ output_table = list()
+ lines = output.splitlines(keepends=False)
+ for line in lines:
+ line = line.replace('\t', ' ')
+ line_list = list()
+ output_table.append(line_list)
+ cells = line.split(sep=' ')
+ for cell in cells:
+ cell = cell.strip()
+ if len(cell) > 0:
+ line_list.append(cell)
+ return output_table
+
+ def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
+ """
+ Returns kube and helm directories
+
+ :param cluster_name:
+ :param create_if_not_exist:
+ :return: kube, helm directories, config filename and cluster dir.
+ Raises exception if not exist and cannot create
+ """
+
+ base = self.fs.path
+ if base.endswith("/") or base.endswith("\\"):
+ base = base[:-1]
+
+ # base dir for cluster
+ cluster_dir = base + '/' + cluster_name
+ if create_if_not_exist and not os.path.exists(cluster_dir):
+ self.debug('Creating dir {}'.format(cluster_dir))
+ os.makedirs(cluster_dir)
+ if not os.path.exists(cluster_dir):
+ msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
+ self.error(msg)
+ raise Exception(msg)
+
+ # kube dir
+ kube_dir = cluster_dir + '/' + '.kube'
+ if create_if_not_exist and not os.path.exists(kube_dir):
+ self.debug('Creating dir {}'.format(kube_dir))
+ os.makedirs(kube_dir)
+ if not os.path.exists(kube_dir):
+ msg = 'Kube config dir {} does not exist'.format(kube_dir)
+ self.error(msg)
+ raise Exception(msg)
+
+ # helm home dir
+ helm_dir = cluster_dir + '/' + '.helm'
+ if create_if_not_exist and not os.path.exists(helm_dir):
+ self.debug('Creating dir {}'.format(helm_dir))
+ os.makedirs(helm_dir)
+ if not os.path.exists(helm_dir):
+ msg = 'Helm config dir {} does not exist'.format(helm_dir)
+ self.error(msg)
+ raise Exception(msg)
+
+ config_filename = kube_dir + '/config'
+ return kube_dir, helm_dir, config_filename, cluster_dir
+
+ @staticmethod
+ def _remove_multiple_spaces(str):
+ str = str.strip()
+ while ' ' in str:
+ str = str.replace(' ', ' ')
+ return str
+
+ def _local_exec(
+ self,
+ command: str
+ ) -> (str, int):
+ command = K8sHelmConnector._remove_multiple_spaces(command)
+ self.debug('Executing sync local command: {}'.format(command))
+ # raise exception if fails
+ output = ''
+ try:
+ output = subprocess.check_output(command, shell=True, universal_newlines=True)
+ return_code = 0
+ self.debug(output)
+ except Exception as e:
+ return_code = 1
+
+ return output, return_code
+
+ async def _local_async_exec(
+ self,
+ command: str,
+ raise_exception_on_error: bool = False,
+ show_error_log: bool = True
+ ) -> (str, int):
+
+ command = K8sHelmConnector._remove_multiple_spaces(command)
+ self.debug('Executing async local command: {}'.format(command))
+
+ # split command
+ command = command.split(sep=' ')
+
+ try:
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+
+ # wait for command terminate
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ output = ''
+ if stdout:
+ output = stdout.decode('utf-8').strip()
+ if stderr:
+ output = stderr.decode('utf-8').strip()
+
+ if return_code != 0 and show_error_log:
+ self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+ else:
+ self.debug('Return code: {}'.format(return_code))
+
+ if raise_exception_on_error and return_code != 0:
+ raise Exception(output)
+
+ return output, return_code
+
+ except Exception as e:
+ msg = 'Exception executing command: {} -> {}'.format(command, e)
+ if show_error_log:
+ self.error(msg)
+ return '', -1
+
+ def _remote_exec(
+ self,
+ hostname: str,
+ username: str,
+ password: str,
+ command: str,
+ timeout: int = 10
+ ) -> (str, int):
+
+ command = K8sHelmConnector._remove_multiple_spaces(command)
+ self.debug('Executing sync remote ssh command: {}'.format(command))
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(hostname=hostname, username=username, password=password)
+ ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
+ output = ssh_stdout.read().decode('utf-8')
+ error = ssh_stderr.read().decode('utf-8')
+ if error:
+ self.error('ERROR: {}'.format(error))
+ return_code = 1
+ else:
+ return_code = 0
+ output = output.replace('\\n', '\n')
+ self.debug('OUTPUT: {}'.format(output))
+
+ return output, return_code
+
+ def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
+ self.debug('Checking if file {} exists...'.format(filename))
+ if os.path.exists(filename):
+ return True
+ else:
+ msg = 'File {} does not exist'.format(filename)
+ if exception_if_not_exists:
+ self.error(msg)
+ raise Exception(msg)
+
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import concurrent
+from .exceptions import NotImplemented
+
+import juju
+# from juju.bundle import BundleHandler
+from juju.controller import Controller
+from juju.model import Model
+from juju.errors import JujuAPIError, JujuError
+
+import logging
+
+from n2vc.k8s_conn import K8sConnector
+
+import os
+# import re
+# import ssl
+import subprocess
+# from .vnf import N2VC
+
+import uuid
+import yaml
+
+
+class K8sJujuConnector(K8sConnector):
+ def __init__(
+ self,
+ fs,
+ kubectl_command='/usr/bin/kubectl',
+ log=None
+ ):
+ """
+
+ :param kubectl_command: path to kubectl executable
+ :param helm_command: path to helm executable
+ :param fs: file system for kubernetes and helm configuration
+ :param log: logger
+ """
+
+ # parent class
+ K8sConnector.__init__(
+ self,
+ kubectl_command=kubectl_command,
+ fs=fs,
+ log=log,
+ )
+
+ self.info('Initializing K8S Juju connector')
+
+ self.authenticated = False
+ self.models = {}
+ self.log = logging.getLogger(__name__)
+ self.info('K8S Juju connector initialized')
+
+ """Initialization"""
+ async def init_env(
+ self,
+ k8s_creds: dict,
+ namespace: str = 'kube-system',
+ reuse_cluster_uuid: str = None,
+ ) -> str:
+ """Initialize a Kubernetes environment
+
+ :param k8s_creds dict: A dictionary containing the Kubernetes cluster
+ configuration
+ :param namespace str: The Kubernetes namespace to initialize
+
+ :return: UUID of the k8s context or raises an exception
+ """
+
+ """Bootstrapping
+
+ Bootstrapping cannot be done, by design, through the API. We need to
+ use the CLI tools.
+ """
+ # TODO: The path may change
+ jujudir = "/snap/bin"
+
+ self.k8scli = "{}/juju".format(jujudir)
+
+ """
+ WIP: Workflow
+
+ 1. Has the environment already been bootstrapped?
+ - Check the database to see if we have a record for this env
+
+ 2. If this is a new env, create it
+ - Add the k8s cloud to Juju
+ - Bootstrap
+ - Record it in the database
+
+ 3. Connect to the Juju controller for this cloud
+
+ """
+ # cluster_uuid = reuse_cluster_uuid
+ # if not cluster_uuid:
+ # cluster_uuid = str(uuid4())
+
+ ##################################################
+ # TODO: Pull info from db based on the namespace #
+ ##################################################
+
+ if not reuse_cluster_uuid:
+ # This is a new cluster, so bootstrap it
+
+ cluster_uuid = str(uuid.uuid4())
+
+ # Add k8s cloud to Juju (unless it's microk8s)
+
+ # Does the kubeconfig contain microk8s?
+ microk8s = self.is_microk8s_by_credentials(k8s_creds)
+
+ if not microk8s:
+ # Name the new k8s cloud
+ k8s_cloud = "{}-k8s".format(namespace)
+
+ await self.add_k8s(k8s_cloud, k8s_creds)
+
+ # Bootstrap Juju controller
+ self.bootstrap(k8s_cloud, cluster_uuid)
+ else:
+ # k8s_cloud = 'microk8s-test'
+ k8s_cloud = "{}-k8s".format(namespace)
+
+ await self.add_k8s(k8s_cloud, k8s_creds)
+
+ await self.bootstrap(k8s_cloud, cluster_uuid)
+
+ # Get the controller information
+
+ # Parse ~/.local/share/juju/controllers.yaml
+ # controllers.testing.api-endpoints|ca-cert|uuid
+ with open(os.path.expanduser(
+ "~/.local/share/juju/controllers.yaml"
+ )) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers['controllers'][cluster_uuid]
+ endpoints = controller['api-endpoints']
+ self.juju_endpoint = endpoints[0]
+ self.juju_ca_cert = controller['ca-cert']
+
+ # Parse ~/.local/share/juju/accounts
+ # controllers.testing.user|password
+ with open(os.path.expanduser(
+ "~/.local/share/juju/accounts.yaml"
+ )) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers['controllers'][cluster_uuid]
+
+ self.juju_user = controller['user']
+ self.juju_secret = controller['password']
+
+ print("user: {}".format(self.juju_user))
+ print("secret: {}".format(self.juju_secret))
+ print("endpoint: {}".format(self.juju_endpoint))
+ print("ca-cert: {}".format(self.juju_ca_cert))
+
+ # raise Exception("EOL")
+
+ self.juju_public_key = None
+
+ config = {
+ 'endpoint': self.juju_endpoint,
+ 'username': self.juju_user,
+ 'secret': self.juju_secret,
+ 'cacert': self.juju_ca_cert,
+ 'namespace': namespace,
+ 'microk8s': microk8s,
+ }
+
+ # Store the cluster configuration so it
+ # can be used for subsequent calls
+ await self.set_config(cluster_uuid, config)
+
+ else:
+ # This is an existing cluster, so get its config
+ cluster_uuid = reuse_cluster_uuid
+
+ config = self.get_config(cluster_uuid)
+
+ self.juju_endpoint = config['endpoint']
+ self.juju_user = config['username']
+ self.juju_secret = config['secret']
+ self.juju_ca_cert = config['cacert']
+ self.juju_public_key = None
+
+ # Login to the k8s cluster
+ if not self.authenticated:
+ await self.login()
+
+ # We're creating a new cluster
+ print("Getting model {}".format(self.get_namespace(cluster_uuid)))
+ model = await self.get_model(self.get_namespace(cluster_uuid))
+
+ # Disconnect from the model
+ if model and model.is_connected():
+ await model.disconnect()
+
+ return cluster_uuid
+
+ """Repo Management"""
+ async def repo_add(
+ self,
+ name: str,
+ url: str,
+ type: str = "charm",
+ ):
+ raise NotImplemented()
+
+ async def repo_list(self):
+ raise NotImplemented()
+
+ async def repo_remove(
+ self,
+ name: str,
+ ):
+ raise NotImplemented()
+
+ """Reset"""
+ async def reset(
+ self,
+ cluster_uuid: str,
+ ) -> bool:
+ """Reset a cluster
+
+ Resets the Kubernetes cluster by removing the model that represents it.
+
+ :param cluster_uuid str: The UUID of the cluster to reset
+ :return: Returns True if successful or raises an exception.
+ """
+
+ try:
+ if not self.authenticated:
+ await self.login()
+
+ if self.controller.is_connected():
+ # Destroy the model
+ namespace = self.get_namespace(cluster_uuid)
+ if await self.has_model(namespace):
+ print("[reset] Destroying model")
+ await self.controller.destroy_model(
+ namespace,
+ destroy_storage=True
+ )
+
+ # Disconnect from the controller
+ print("[reset] Disconnecting controller")
+ await self.controller.disconnect()
+
+ # Destroy the controller (via CLI)
+ print("[reset] Destroying controller")
+ await self.destroy_controller(cluster_uuid)
+
+ """Remove the k8s cloud
+
+ Only remove the k8s cloud if it's not a microk8s cloud,
+ since microk8s is a built-in cloud type.
+ """
+ # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
+ # if not microk8s:
+ print("[reset] Removing k8s cloud")
+ namespace = self.get_namespace(cluster_uuid)
+ k8s_cloud = "{}-k8s".format(namespace)
+ await self.remove_cloud(k8s_cloud)
+
+ except Exception as ex:
+ print("Caught exception during reset: {}".format(ex))
+
+ """Deployment"""
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: int = None,
+ params: dict = None,
+ ) -> str:
+ """Install a bundle
+
+ :param cluster_uuid str: The UUID of the cluster to install to
+ :param kdu_model str: The name or path of a bundle to install
+ :param atomic bool: If set, waits until the model is active and resets
+ the cluster on failure.
+ :param timeout int: The time, in seconds, to wait for the install
+ to finish
+ :param params dict: Key-value pairs of instantiation parameters
+
+ :return: If successful, returns ?
+ """
+
+ if not self.authenticated:
+ print("[install] Logging in to the controller")
+ await self.login()
+
+ ##
+ # Get or create the model, based on the namespace the cluster was
+ # instantiated with.
+ namespace = self.get_namespace(cluster_uuid)
+ model = await self.get_model(namespace)
+ if not model:
+ # Create the new model
+ model = await self.add_model(namespace)
+
+ if model:
+ # TODO: Instantiation parameters
+
+ print("[install] deploying {}".format(kdu_model))
+ await model.deploy(kdu_model)
+
+ # Get the application
+ if atomic:
+ # applications = model.applications
+ print("[install] Applications: {}".format(model.applications))
+ for name in model.applications:
+ print("[install] Waiting for {} to settle".format(name))
+ application = model.applications[name]
+ try:
+ # It's not enough to wait for all units to be active;
+ # the application status needs to be active as well.
+ print("Waiting for all units to be active...")
+ await model.block_until(
+ lambda: all(
+ unit.agent_status == 'idle'
+ and application.status in ['active', 'unknown']
+ and unit.workload_status in [
+ 'active', 'unknown'
+ ] for unit in application.units
+ ),
+ timeout=timeout
+ )
+ print("All units active.")
+
+ except concurrent.futures._base.TimeoutError:
+ print("[install] Timeout exceeded; resetting cluster")
+ await self.reset(cluster_uuid)
+ return False
+
+ # Wait for the application to be active
+ if model.is_connected():
+ print("[install] Disconnecting model")
+ await model.disconnect()
+
+ return True
+ raise Exception("Unable to install")
+
+ async def instances_list(
+ self,
+ cluster_uuid: str
+ ) -> list:
+ """
+ returns a list of deployed releases in a cluster
+
+ :param cluster_uuid: the cluster
+ :return:
+ """
+ return []
+
+ async def upgrade(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ params: dict = None,
+ ) -> str:
+ """Upgrade a model
+
+ :param cluster_uuid str: The UUID of the cluster to upgrade
+ :param kdu_instance str: The unique name of the KDU instance
+ :param kdu_model str: The name or path of the bundle to upgrade to
+ :param params dict: Key-value pairs of instantiation parameters
+
+ :return: If successful, reference to the new revision number of the
+ KDU instance.
+ """
+
+ # TODO: Loop through the bundle and upgrade each charm individually
+
+ """
+ The API doesn't have a concept of bundle upgrades, because there are
+ many possible changes: charm revision, disk, number of units, etc.
+
+ As such, we are only supporting a limited subset of upgrades. We'll
+ upgrade the charm revision but leave storage and scale untouched.
+
+ Scale changes should happen through OSM constructs, and changes to
+ storage would require a redeployment of the service, at least in this
+ initial release.
+ """
+ namespace = self.get_namespace(cluster_uuid)
+ model = await self.get_model(namespace)
+
+ with open(kdu_model, 'r') as f:
+ bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+ """
+ {
+ 'description': 'Test bundle',
+ 'bundle': 'kubernetes',
+ 'applications': {
+ 'mariadb-k8s': {
+ 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+ 'scale': 1,
+ 'options': {
+ 'password': 'manopw',
+ 'root_password': 'osm4u',
+ 'user': 'mano'
+ },
+ 'series': 'kubernetes'
+ }
+ }
+ }
+ """
+ # TODO: This should be returned in an agreed-upon format
+ for name in bundle['applications']:
+ print(model.applications)
+ application = model.applications[name]
+ print(application)
+
+ path = bundle['applications'][name]['charm']
+
+ try:
+ await application.upgrade_charm(switch=path)
+ except juju.errors.JujuError as ex:
+ if 'already running charm' in str(ex):
+ # We're already running this version
+ pass
+
+ await model.disconnect()
+
+ return True
+ raise NotImplemented()
+
+ """Rollback"""
+ async def rollback(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision: int = 0,
+ ) -> str:
+ """Rollback a model
+
+ :param cluster_uuid str: The UUID of the cluster to rollback
+ :param kdu_instance str: The unique name of the KDU instance
+ :param revision int: The revision to revert to. If omitted, rolls back
+ the previous upgrade.
+
+ :return: If successful, returns the revision of active KDU instance,
+ or raises an exception
+ """
+ raise NotImplemented()
+
+ """Deletion"""
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ ) -> bool:
+ """Uninstall a KDU instance
+
+ :param cluster_uuid str: The UUID of the cluster to uninstall
+ :param kdu_instance str: The unique name of the KDU instance
+
+ :return: Returns True if successful, or raises an exception
+ """
+ removed = False
+
+ # Remove an application from the model
+ model = await self.get_model(self.get_namespace(cluster_uuid))
+
+ if model:
+ # Get the application
+ if kdu_instance not in model.applications:
+ # TODO: Raise a named exception
+ raise Exception("Application not found.")
+
+ application = model.applications[kdu_instance]
+
+ # Destroy the application
+ await application.destroy()
+
+ # TODO: Verify removal
+
+ removed = True
+ return removed
+
+ """Introspection"""
+ async def inspect_kdu(
+ self,
+ kdu_model: str,
+ ) -> dict:
+ """Inspect a KDU
+
+ Inspects a bundle and returns a dictionary of config parameters and
+ their default values.
+
+ :param kdu_model str: The name or path of the bundle to inspect.
+
+ :return: If successful, returns a dictionary of available parameters
+ and their default values.
+ """
+
+ kdu = {}
+ with open(kdu_model, 'r') as f:
+ bundle = yaml.load(f, Loader=yaml.FullLoader)
+
+ """
+ {
+ 'description': 'Test bundle',
+ 'bundle': 'kubernetes',
+ 'applications': {
+ 'mariadb-k8s': {
+ 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+ 'scale': 1,
+ 'options': {
+ 'password': 'manopw',
+ 'root_password': 'osm4u',
+ 'user': 'mano'
+ },
+ 'series': 'kubernetes'
+ }
+ }
+ }
+ """
+ # TODO: This should be returned in an agreed-upon format
+ kdu = bundle['applications']
+
+ return kdu
+
+ async def help_kdu(
+ self,
+ kdu_model: str,
+ ) -> str:
+ """View the README
+
+ If available, returns the README of the bundle.
+
+ :param kdu_model str: The name or path of a bundle
+
+ :return: If found, returns the contents of the README.
+ """
+ readme = None
+
+ files = ['README', 'README.txt', 'README.md']
+ path = os.path.dirname(kdu_model)
+ for file in os.listdir(path):
+ if file in files:
+ with open(file, 'r') as f:
+ readme = f.read()
+ break
+
+ return readme
+
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ ) -> dict:
+ """Get the status of the KDU
+
+ Get the current status of the KDU instance.
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique id of the KDU instance
+
+ :return: Returns a dictionary containing namespace, state, resources,
+ and deployment_time.
+ """
+ status = {}
+
+ model = await self.get_model(self.get_namespace(cluster_uuid))
+
+ # model = await self.get_model_by_uuid(cluster_uuid)
+ if model:
+ model_status = await model.get_status()
+ status = model_status.applications
+
+ for name in model_status.applications:
+ application = model_status.applications[name]
+ status[name] = {
+ 'status': application['status']['status']
+ }
+
+ if model.is_connected():
+ await model.disconnect()
+
+ return status
+
+ # Private methods
+ async def add_k8s(
+ self,
+ cloud_name: str,
+ credentials: dict,
+ ) -> bool:
+ """Add a k8s cloud to Juju
+
+ Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
+ Juju Controller.
+
+ :param cloud_name str: The name of the cloud to add.
+ :param credentials dict: A dictionary representing the output of
+ `kubectl config view --raw`.
+
+ :returns: True if successful, otherwise raises an exception.
+ """
+ cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
+
+ p = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ input=yaml.dump(credentials, Dumper=yaml.Dumper),
+ encoding='ascii'
+ )
+ retcode = p.returncode
+
+ if retcode > 0:
+ raise Exception(p.stderr)
+ return True
+
+ async def add_model(
+ self,
+ model_name: str
+ ) -> juju.model.Model:
+ """Adds a model to the controller
+
+ Adds a new model to the Juju controller
+
+ :param model_name str: The name of the model to add.
+ :returns: The juju.model.Model object of the new model upon success or
+ raises an exception.
+ """
+ if not self.authenticated:
+ await self.login()
+
+ model = await self.controller.add_model(
+ model_name,
+ config={'authorized-keys': self.juju_public_key}
+ )
+ return model
+
+ async def bootstrap(
+ self,
+ cloud_name: str,
+ cluster_uuid: str
+ ) -> bool:
+ """Bootstrap a Kubernetes controller
+
+ Bootstrap a Juju controller inside the Kubernetes cluster
+
+ :param cloud_name str: The name of the cloud.
+ :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ :returns: True upon success or raises an exception.
+ """
+ cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
+ print("Bootstrapping controller {} in cloud {}".format(
+ cluster_uuid, cloud_name
+ ))
+
+ p = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='ascii'
+ )
+ retcode = p.returncode
+
+ if retcode > 0:
+ #
+ if 'already exists' not in p.stderr:
+ raise Exception(p.stderr)
+
+ return True
+
+ async def destroy_controller(
+ self,
+ cluster_uuid: str
+ ) -> bool:
+ """Destroy a Kubernetes controller
+
+ Destroy an existing Kubernetes controller.
+
+ :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ :returns: True upon success or raises an exception.
+ """
+ cmd = [
+ self.k8scli,
+ "destroy-controller",
+ "--destroy-all-models",
+ "--destroy-storage",
+ "-y",
+ cluster_uuid
+ ]
+
+ p = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='ascii'
+ )
+ retcode = p.returncode
+
+ if retcode > 0:
+ #
+ if 'already exists' not in p.stderr:
+ raise Exception(p.stderr)
+
+ def get_config(
+ self,
+ cluster_uuid: str,
+ ) -> dict:
+ """Get the cluster configuration
+
+ Gets the configuration of the cluster
+
+ :param cluster_uuid str: The UUID of the cluster.
+ :return: A dict upon success, or raises an exception.
+ """
+ cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+ if os.path.exists(cluster_config):
+ with open(cluster_config, 'r') as f:
+ config = yaml.load(f.read(), Loader=yaml.FullLoader)
+ return config
+ else:
+ raise Exception(
+ "Unable to locate configuration for cluster {}".format(
+ cluster_uuid
+ )
+ )
+
+ async def get_model(
+ self,
+ model_name: str,
+ ) -> juju.model.Model:
+ """Get a model from the Juju Controller.
+
+ Note: Model objects returned must call disconnected() before it goes
+ out of scope.
+
+ :param model_name str: The name of the model to get
+ :return The juju.model.Model object if found, or None.
+ """
+ if not self.authenticated:
+ await self.login()
+
+ model = None
+ models = await self.controller.list_models()
+
+ if model_name in models:
+ model = await self.controller.get_model(
+ model_name
+ )
+ return model
+
+ def get_namespace(
+ self,
+ cluster_uuid: str,
+ ) -> str:
+ """Get the namespace UUID
+ Gets the namespace's unique name
+
+ :param cluster_uuid str: The UUID of the cluster
+ :returns: The namespace UUID, or raises an exception
+ """
+ config = self.get_config(cluster_uuid)
+
+ # Make sure the name is in the config
+ if 'namespace' not in config:
+ raise Exception("Namespace not found.")
+
+ # TODO: We want to make sure this is unique to the cluster, in case
+ # the cluster is being reused.
+ # Consider pre/appending the cluster id to the namespace string
+ return config['namespace']
+
+ async def has_model(
+ self,
+ model_name: str
+ ) -> bool:
+ """Check if a model exists in the controller
+
+ Checks to see if a model exists in the connected Juju controller.
+
+ :param model_name str: The name of the model
+ :return: A boolean indicating if the model exists
+ """
+ models = await self.controller.list_models()
+
+ if model_name in models:
+ return True
+ return False
+
+ def is_microk8s_by_cluster_uuid(
+ self,
+ cluster_uuid: str,
+ ) -> bool:
+ """Check if a cluster is micro8s
+
+ Checks if a cluster is running microk8s
+
+ :param cluster_uuid str: The UUID of the cluster
+ :returns: A boolean if the cluster is running microk8s
+ """
+ config = self.get_config(cluster_uuid)
+ return config['microk8s']
+
+ def is_microk8s_by_credentials(
+ self,
+ credentials: dict,
+ ) -> bool:
+ """Check if a cluster is micro8s
+
+ Checks if a cluster is running microk8s
+
+ :param credentials dict: A dictionary containing the k8s credentials
+ :returns: A boolean if the cluster is running microk8s
+ """
+ for context in credentials['contexts']:
+ if 'microk8s' in context['name']:
+ return True
+
+ return False
+
+ async def login(self):
+ """Login to the Juju controller."""
+
+ if self.authenticated:
+ return
+
+ self.connecting = True
+
+ self.controller = Controller()
+
+ if self.juju_secret:
+ self.log.debug(
+ "Connecting to controller... ws://{} as {}/{}".format(
+ self.juju_endpoint,
+ self.juju_user,
+ self.juju_secret,
+ )
+ )
+ try:
+ await self.controller.connect(
+ endpoint=self.juju_endpoint,
+ username=self.juju_user,
+ password=self.juju_secret,
+ cacert=self.juju_ca_cert,
+ )
+ self.authenticated = True
+ self.log.debug("JujuApi: Logged into controller")
+ except Exception as ex:
+ print(ex)
+ self.log.debug("Caught exception: {}".format(ex))
+ pass
+ else:
+ self.log.fatal("VCA credentials not configured.")
+ self.authenticated = False
+
+ async def logout(self):
+ """Logout of the Juju controller."""
+ print("[logout]")
+ if not self.authenticated:
+ return False
+
+ for model in self.models:
+ print("Logging out of model {}".format(model))
+ await self.models[model].disconnect()
+
+ if self.controller:
+ self.log.debug("Disconnecting controller {}".format(
+ self.controller
+ ))
+ await self.controller.disconnect()
+ self.controller = None
+
+ self.authenticated = False
+
+ async def remove_cloud(
+ self,
+ cloud_name: str,
+ ) -> bool:
+ """Remove a k8s cloud from Juju
+
+ Removes a Kubernetes cloud from Juju.
+
+ :param cloud_name str: The name of the cloud to add.
+
+ :returns: True if successful, otherwise raises an exception.
+ """
+
+ # Remove the bootstrapped controller
+ cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
+ p = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='ascii'
+ )
+ retcode = p.returncode
+
+ if retcode > 0:
+ raise Exception(p.stderr)
+
+ # Remove the cloud from the local config
+ cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
+ p = subprocess.run(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ encoding='ascii'
+ )
+ retcode = p.returncode
+
+ if retcode > 0:
+ raise Exception(p.stderr)
+
+
+ return True
+
+ async def set_config(
+ self,
+ cluster_uuid: str,
+ config: dict,
+ ) -> bool:
+ """Save the cluster configuration
+
+ Saves the cluster information to the file store
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param config dict: A dictionary containing the cluster configuration
+ :returns: Boolean upon success or raises an exception.
+ """
+
+ cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
+ if not os.path.exists(cluster_config):
+ print("Writing config to {}".format(cluster_config))
+ with open(cluster_config, 'w') as f:
+ f.write(yaml.dump(config, Dumper=yaml.Dumper))
+
+ return True
--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+
+import logging
+import asyncio
+import time
+import inspect
+import datetime
+import threading # only for logging purposes (not for using threads)
+
+
+class Loggable:
+
+ def __init__(
+ self,
+ log,
+ log_to_console: bool = False,
+ prefix: str = ''
+ ):
+
+ self._last_log_time = None # used for time increment in logging
+ self._log_to_console = log_to_console
+ self._prefix = prefix
+ if log is not None:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
+ def debug(self, msg: str):
+ self._log_msg(log_level='DEBUG', msg=msg)
+
+ def info(self, msg: str):
+ self._log_msg(log_level='INFO', msg=msg)
+
+ def warning(self, msg: str):
+ self._log_msg(log_level='WARNING', msg=msg)
+
+ def error(self, msg: str):
+ self._log_msg(log_level='ERROR', msg=msg)
+
+ def critical(self, msg: str):
+ self._log_msg(log_level='CRITICAL', msg=msg)
+
+ ##################################################################################################
+
+ def _log_msg(self, log_level: str, msg: str):
+ """Generic log method"""
+ msg = self._format_log(
+ log_level=log_level,
+ msg=msg,
+ obj=self,
+ level=3,
+ include_path=False,
+ include_thread=False,
+ include_coroutine=True
+ )
+ if self._log_to_console:
+ print(msg)
+ else:
+ if self.log is not None:
+ if log_level == 'DEBUG':
+ self.log.debug(msg)
+ elif log_level == 'INFO':
+ self.log.info(msg)
+ elif log_level == 'WARNING':
+ self.log.warning(msg)
+ elif log_level == 'ERROR':
+ self.log.error(msg)
+ elif log_level == 'CRITICAL':
+ self.log.critical(msg)
+
+ def _format_log(
+ self,
+ log_level: str,
+ msg: str = '',
+ obj: object = None,
+ level: int = None,
+ include_path: bool = False,
+ include_thread: bool = False,
+ include_coroutine: bool = True
+ ) -> str:
+
+ # time increment from last log
+ now = time.perf_counter()
+ if self._last_log_time is None:
+ time_str = ' (+0.000)'
+ else:
+ diff = round(now - self._last_log_time, 3)
+ time_str = ' (+{})'.format(diff)
+ self._last_log_time = now
+
+ if level is None:
+ level = 1
+
+ # stack info
+ fi = inspect.stack()[level]
+ filename = fi.filename
+ func = fi.function
+ lineno = fi.lineno
+ # filename without path
+ if not include_path:
+ i = filename.rfind('/')
+ if i > 0:
+ filename = filename[i+1:]
+
+ # datetime
+ dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
+ dt = dt + time_str
+ dt = time_str # logger already shows datetime
+
+ # current thread
+ if include_thread:
+ thread_name = 'th:{}'.format(threading.current_thread().getName())
+ else:
+ thread_name = ''
+
+ # current coroutine
+
+ coroutine_id = ''
+ if include_coroutine:
+ try:
+ if asyncio.Task.current_task() is not None:
+ def print_cor_name(c):
+ import inspect
+ try:
+ for m in inspect.getmembers(c):
+ if m[0] == '__name__':
+ return m[1]
+ except Exception:
+ pass
+ coro = asyncio.Task.current_task()._coro
+ coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro))
+ except Exception:
+ coroutine_id = ''
+
+ # classname
+ if obj is not None:
+ obj_type = obj.__class__.__name__ # type: str
+ log_msg = \
+ '{} {} {} {} {}::{}.{}():{}\n{}'\
+ .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg))
+ else:
+ log_msg = \
+ '{} {} {} {} {}::{}():{}\n{}'\
+ .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg))
+
+ return log_msg
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+ zookeeper-k8s:
+ charm: 'cs:~charmed-osm/zookeeper-k8s-29'
+ scale: 1
+ series: kubernetes
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+ zookeeper-k8s:
+ charm: 'cs:~charmed-osm/zookeeper-k8s-31'
+ scale: 1
+ series: kubernetes
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+description: Test bundle
+bundle: kubernetes
+applications:
+ zookeeper-k8s:
+ charm: 'cs:~charmed-osm/zookeeper-k8s-30'
+ scale: 1
+ series: kubernetes
--- /dev/null
+# Copyright 2019 Canonical Ltd.
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import asyncio
+import logging
+import n2vc.k8s_juju_conn
+from base import get_juju_public_key
+import os
+from osm_common.fslocal import FsLocal
+import subprocess
+import yaml
+
+
+def get_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--cluster_uuid", help='The UUID of an existing cluster to use', default=None)
+ parser.add_argument("--reset", action="store_true")
+ return parser.parse_args()
+
+async def main():
+
+ args = get_args()
+
+ reuse_cluster_uuid = args.cluster_uuid
+
+ log = logging.getLogger()
+ log.level = logging.DEBUG
+
+ # Extract parameters from the environment in order to run our tests
+ vca_host = os.getenv('VCA_HOST', '127.0.0.1')
+ vca_port = os.getenv('VCA_PORT', 17070)
+ vca_user = os.getenv('VCA_USER', 'admin')
+ vca_charms = os.getenv('VCA_CHARMS', None)
+ vca_secret = os.getenv('VCA_SECRET', None)
+ vca_ca_cert = os.getenv('VCA_CACERT', None)
+
+ # Get the Juju Public key
+ juju_public_key = get_juju_public_key()
+ if juju_public_key:
+ with open(juju_public_key, 'r') as f:
+ juju_public_key = f.read()
+ else:
+ raise Exception("No Juju Public Key found")
+
+ storage = {
+ 'driver': 'local',
+ 'path': '/srv/app/storage'
+ }
+ fs = FsLocal()
+ fs.fs_connect(storage)
+
+ client = n2vc.k8s_juju_conn.K8sJujuConnector(
+ kubectl_command = '/bin/true',
+ fs = fs,
+ )
+
+ # kubectl config view --raw
+ # microk8s.config
+
+ # if microk8s then
+ kubecfg = subprocess.getoutput('microk8s.config')
+ # else
+ # kubecfg.subprocess.getoutput('kubectl config view --raw')
+
+ k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
+ namespace = 'testing'
+ kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
+
+ """init_env"""
+ cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
+ print(cluster_uuid)
+
+ if not reuse_cluster_uuid:
+ # This is a new cluster, so install to it
+
+ """install"""
+ # async def install(self, cluster_uuid, kdu_model, atomic=True, timeout=None, params=None):
+ # TODO: Re-add storage declaration to bundle. The API doesn't support the storage option yet. David is investigating.
+
+ # Deploy the bundle
+ kdu_instance = await client.install(cluster_uuid, kdu_model, atomic=True, timeout=600)
+
+ if kdu_instance:
+ # Inspect
+ print("Getting status")
+ status = await client.status_kdu(cluster_uuid, kdu_instance)
+ print(status)
+
+ # Inspect the bundle
+ config = await client.inspect_kdu(kdu_model)
+ print(config)
+
+ readme = await client.help_kdu(kdu_model)
+ # print(readme)
+
+
+ """upgrade
+ Upgrade to a newer version of the bundle
+ """
+ kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-upgrade.yaml"
+ upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+ kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-downgrade.yaml"
+ upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
+
+ """uninstall"""
+
+ """reset"""
+ if args.reset:
+ await client.reset(cluster_uuid)
+
+ await client.logout()
+
+ print("Done")
+
+if __name__ == "__main__":
+ asyncio.run(main())