From: garciadeblas Date: Fri, 22 Nov 2019 11:07:49 +0000 (+0100) Subject: Merge branch 'feature5837' X-Git-Tag: v7.0.0rc1~17 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=commitdiff_plain;h=8ca38ed92090ce6cbd1f8e1cdb802b259d31e9d5;hp=d030ff576501a5d13dd1406b8fa3fc0ead769197 Merge branch 'feature5837' Signed-off-by: garciadeblas Change-Id: I31abc0d7de97c1dd3fb45fd7798fbab5d1bad5b4 --- diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py new file mode 100644 index 0000000..d951c2c --- /dev/null +++ b/n2vc/k8s_conn.py @@ -0,0 +1,350 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +import asyncio +from n2vc.loggable import Loggable +import abc +import time + + +class K8sConnector(abc.ABC, Loggable): + + """ + ################################################################################################## + ########################################## P U B L I C ########################################### + ################################################################################################## + """ + + def __init__( + self, + db: object, + log: object = None, + on_update_db=None + ): + """ + + :param db: database object to write current operation status + :param log: logger for tracing + :param on_update_db: callback called when k8s connector updates database + """ + + # parent class + Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S') + + self.info('Initializing generic K8S connector') + + # the database and update callback + self.db = db + self.on_update_db = on_update_db + + self.info('K8S generic connector initialized') + + @abc.abstractmethod + async def init_env( + self, + k8s_creds: str, + namespace: str = 'kube-system', + reuse_cluster_uuid = None + ) -> (str, bool): + """ + It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides: + client (OSM) + server (Tiller/Charm) + + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' + :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used + :param reuse_cluster_uuid: existing cluster uuid for reuse + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster + (on error, an exception will be raised) + """ + + @abc.abstractmethod + async def repo_add( + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = 'chart' + ): + """ + Add a new repository to OSM database + + :param cluster_uuid: the cluster + :param name: name for the repo in OSM + :param url: URL of the repo + :param repo_type: either "chart" or "bundle" + :return: True if successful + """ + + @abc.abstractmethod + async def repo_list( + self, + cluster_uuid: str + ): + """ + Get the list of registered repositories + + :param cluster_uuid: the cluster + :return: list of registered repositories: [ (name, url) .... ] + """ + + @abc.abstractmethod + async def repo_remove( + self, + cluster_uuid: str, + name: str + ): + """ + Remove a repository from OSM + + :param name: repo name in OSM + :param cluster_uuid: the cluster + :return: True if successful + """ + + @abc.abstractmethod + async def reset( + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False + ) -> bool: + """ + Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters. + Intended to be used e.g. when the NS instance is deleted. + + :param cluster_uuid: UUID of a K8s cluster known by OSM. + :param force: force deletion, even in case there are deployed releases + :param uninstall_sw: flag to indicate that sw uninstallation from software is needed + :return: str: kdu_instance generated by helm + """ + + @abc.abstractmethod + async def install( + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + """ + Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle + properly parametrized (in practice, this call would happen before any _initial-config-primitive_ + of the VNF is called). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_model: chart/bundle:version reference (string), which can be either of these options: + - a name of chart/bundle available via the repos known by OSM + - a path to a packaged chart/bundle + - a path to an unpacked chart/bundle directory or a URL + :param atomic: If set, installation process purges chart/bundle on fail, also will wait until + all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to + Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters (overriding default values) + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return: True if successful + """ + + @abc.abstractmethod + async def upgrade( + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + """ + Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle. + It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be updated + :param kdu_model: new chart/bundle:version reference + :param atomic: rollback in case of fail and wait for pods and services are available + :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to + Helm default timeout: 300s) + :param params: new dictionary of key-value pairs for instantiation parameters + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return: reference to the new revision number of the KDU instance + """ + + @abc.abstractmethod + async def rollback( + self, + cluster_uuid: str, + kdu_instance: str, + revision=0, + db_dict: dict = None + ): + """ + Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call. + It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration. + This would be exposed as Day-2 primitive. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :param revision: revision to which revert changes. If omitted, it will revert the last update only + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return:If successful, reference to the current active revision of the KDU instance after the rollback + """ + + @abc.abstractmethod + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str + ): + """ + Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen + after all _terminate-config-primitive_ of the VNF are invoked). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be deleted + :return: True if successful + """ + + @abc.abstractmethod + async def inspect_kdu( + self, + kdu_model: str + ) -> str: + """ + These calls will retrieve from the Charm/Bundle: + + - The list of configurable values and their defaults (e.g. in Charts, it would retrieve + the contents of `values.yaml`). + - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle. + + :param cluster_uuid: the cluster to get the information + :param kdu_model: chart/bundle reference + :return: If successful, it will return a dictionary containing the list of available parameters + and their default values + """ + + @abc.abstractmethod + async def help_kdu( + self, + kdu_model: str + ) -> str: + """ + + :param cluster_uuid: the cluster to get the information + :param kdu_model: chart/bundle reference + :return: If successful, it will return the contents of the 'readme.md' + """ + + @abc.abstractmethod + async def status_kdu( + self, + cluster_uuid: str, + kdu_instance: str + ) -> str: + """ + This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve + the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied + to a given instance. This call would be based on the `status` call. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :return: If successful, it will return the following vector of arguments: + - K8s `namespace` in the cluster where the KDU lives + - `state` of the KDU instance. It can be: + - UNKNOWN + - DEPLOYED + - DELETED + - SUPERSEDED + - FAILED or + - DELETING + - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources + - Last `deployment_time`. + + """ + + """ + ################################################################################################## + ########################################## P R I V A T E ######################################### + ################################################################################################## + """ + + async def write_app_status_to_db( + self, + db_dict: dict, + status: str, + detailed_status: str, + operation: str + ) -> bool: + + if not self.db: + self.warning('No db => No database write') + return False + + if not db_dict: + self.warning('No db_dict => No database write') + return False + + self.debug('status={}'.format(status)) + + try: + + the_table = db_dict['collection'] + the_filter = db_dict['filter'] + the_path = db_dict['path'] + if not the_path[-1] == '.': + the_path = the_path + '.' + update_dict = { + the_path + 'operation': operation, + the_path + 'status': status, + the_path + 'detailed-status': detailed_status, + the_path + 'status-time': str(time.time()), + } + + self.db.set_one( + table=the_table, + q_filter=the_filter, + update_dict=update_dict, + fail_on_empty=True + ) + + # database callback + if self.on_update_db: + if asyncio.iscoroutinefunction(self.on_update_db): + await self.on_update_db(the_table, the_filter, the_path, update_dict) + else: + self.on_update_db(the_table, the_filter, the_path, update_dict) + + return True + + except Exception as e: + self.info('Exception writing status to database: {}'.format(e)) + return False diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py new file mode 100644 index 0000000..cd15d73 --- /dev/null +++ b/n2vc/k8s_helm_conn.py @@ -0,0 +1,1105 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +import paramiko +import subprocess +import os +import shutil +import asyncio +import time +import yaml +from uuid import uuid4 +import random +from n2vc.k8s_conn import K8sConnector + + +class K8sHelmConnector(K8sConnector): + + """ + ################################################################################################## + ########################################## P U B L I C ########################################### + ################################################################################################## + """ + + def __init__( + self, + fs: object, + db: object, + kubectl_command: str = '/usr/bin/kubectl', + helm_command: str = '/usr/bin/helm', + log: object = None, + on_update_db=None + ): + """ + + :param fs: file system for kubernetes and helm configuration + :param db: database object to write current operation status + :param kubectl_command: path to kubectl executable + :param helm_command: path to helm executable + :param log: logger + :param on_update_db: callback called when k8s connector updates database + """ + + # parent class + K8sConnector.__init__( + self, + db=db, + log=log, + on_update_db=on_update_db + ) + + self.info('Initializing K8S Helm connector') + + # random numbers for release name generation + random.seed(time.time()) + + # the file system + self.fs = fs + + # exception if kubectl is not installed + self.kubectl_command = kubectl_command + self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) + + # exception if helm is not installed + self._helm_command = helm_command + self._check_file_exists(filename=helm_command, exception_if_not_exists=True) + + self.info('K8S Helm connector initialized') + + async def init_env( + self, + k8s_creds: str, + namespace: str = 'kube-system', + reuse_cluster_uuid=None + ) -> (str, bool): + + cluster_uuid = reuse_cluster_uuid + if not cluster_uuid: + cluster_uuid = str(uuid4()) + + self.debug('Initializing K8S environment. namespace: {}'.format(namespace)) + + # create config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + f = open(config_filename, "w") + f.write(k8s_creds) + f.close() + + # check if tiller pod is up in cluster + command = '{} --kubeconfig={} --namespace={} get deployments'\ + .format(self.kubectl_command, config_filename, namespace) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + output_table = K8sHelmConnector._output_to_table(output=output) + + # find 'tiller' pod in all pods + already_initialized = False + try: + for row in output_table: + if row[0].startswith('tiller-deploy'): + already_initialized = True + break + except Exception as e: + pass + + # helm init + n2vc_installed_sw = False + if not already_initialized: + self.info('Initializing helm in client and server: {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\ + .format(self._helm_command, config_filename, namespace, helm_dir) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + n2vc_installed_sw = True + else: + # check client helm installation + check_file = helm_dir + '/repository/repositories.yaml' + if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): + self.info('Initializing helm in client: {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\ + .format(self._helm_command, config_filename, namespace, helm_dir) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + else: + self.info('Helm client already initialized') + + self.info('Cluster initialized {}'.format(cluster_uuid)) + + return cluster_uuid, n2vc_installed_sw + + async def repo_add( + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = 'chart' + ): + + self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # helm repo update + command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) + self.debug('updating repo: {}'.format(command)) + await self._local_async_exec(command=command, raise_exception_on_error=False) + + # helm repo add name url + command = '{} --kubeconfig={} --home={} repo add {} {}'\ + .format(self._helm_command, config_filename, helm_dir, name, url) + self.debug('adding repo: {}'.format(command)) + await self._local_async_exec(command=command, raise_exception_on_error=True) + + async def repo_list( + self, + cluster_uuid: str + ) -> list: + """ + Get the list of registered repositories + + :return: list of registered repositories: [ (name, url) .... ] + """ + + self.debug('list repositories for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + if output and len(output) > 0: + return yaml.load(output, Loader=yaml.SafeLoader) + else: + return [] + + async def repo_remove( + self, + cluster_uuid: str, + name: str + ): + """ + Remove a repository from OSM + + :param cluster_uuid: the cluster + :param name: repo name in OSM + :return: True if successful + """ + + self.debug('list repositories for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} repo remove {}'\ + .format(self._helm_command, config_filename, helm_dir, name) + + await self._local_async_exec(command=command, raise_exception_on_error=True) + + async def reset( + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False + ) -> bool: + + self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) + + # get kube and helm directories + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) + + # uninstall releases if needed + releases = await self.instances_list(cluster_uuid=cluster_uuid) + if len(releases) > 0: + if force: + for r in releases: + try: + kdu_instance = r.get('Name') + chart = r.get('Chart') + self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) + await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + except Exception as e: + self.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) + else: + msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ + .format(cluster_uuid) + self.error(msg) + raise Exception(msg) + + if uninstall_sw: + + self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + + # find namespace for tiller pod + command = '{} --kubeconfig={} get deployments --all-namespaces'\ + .format(self.kubectl_command, config_filename) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + output_table = K8sHelmConnector._output_to_table(output=output) + namespace = None + for r in output_table: + try: + if 'tiller-deploy' in r[1]: + namespace = r[0] + break + except Exception as e: + pass + else: + msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) + self.error(msg) + # raise Exception(msg) + + self.debug('namespace for tiller: {}'.format(namespace)) + + force_str = '--force' + + if namespace: + # delete tiller deployment + self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) + command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\ + .format(self.kubectl_command, namespace, config_filename, force_str) + await self._local_async_exec(command=command, raise_exception_on_error=False) + + # uninstall tiller from cluster + self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --home={} reset'\ + .format(self._helm_command, config_filename, helm_dir) + self.debug('resetting: {}'.format(command)) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + else: + self.debug('namespace not found') + + # delete cluster directory + dir = self.fs.path + '/' + cluster_uuid + self.debug('Removing directory {}'.format(dir)) + shutil.rmtree(dir, ignore_errors=True) + + return True + + async def install( + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + + self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) + + start = time.time() + end = start + timeout + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # params to str + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) + + timeout_str = '' + if timeout: + timeout_str = '--timeout {}'.format(timeout) + + # atomic + atomic_str = '' + if atomic: + atomic_str = '--atomic' + + # version + version_str = '' + if ':' in kdu_model: + parts = kdu_model.split(sep=':') + if len(parts) == 2: + version_str = '--version {}'.format(parts[1]) + kdu_model = parts[0] + + # generate a name for the releas. Then, check if already exists + kdu_instance = None + while kdu_instance is None: + kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) + try: + result = await self._status_kdu( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + show_error_log=False + ) + if result is not None: + # instance already exists: generate a new one + kdu_instance = None + except: + kdu_instance = None + + # helm repo install + command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\ + .format(self._helm_command, atomic_str, config_filename, helm_dir, + params_str, timeout_str, kdu_instance, kdu_model, version_str) + self.debug('installing: {}'.format(command)) + + if atomic: + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='install', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + + output, rc = exec_task.result() + + else: + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='install', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + self.debug('Returning kdu_instance {}'.format(kdu_instance)) + return kdu_instance + + async def instances_list( + self, + cluster_uuid: str + ) -> list: + """ + returns a list of deployed releases in a cluster + + :param cluster_uuid: the cluster + :return: + """ + + self.debug('list releases for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} list --output yaml'\ + .format(self._helm_command, config_filename, helm_dir) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + if output and len(output) > 0: + return yaml.load(output, Loader=yaml.SafeLoader).get('Releases') + else: + return [] + + async def upgrade( + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + + self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) + + start = time.time() + end = start + timeout + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # params to str + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) + + timeout_str = '' + if timeout: + timeout_str = '--timeout {}'.format(timeout) + + # atomic + atomic_str = '' + if atomic: + atomic_str = '--atomic' + + # version + version_str = '' + if kdu_model and ':' in kdu_model: + parts = kdu_model.split(sep=':') + if len(parts) == 2: + version_str = '--version {}'.format(parts[1]) + kdu_model = parts[0] + + # helm repo upgrade + command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\ + .format(self._helm_command, atomic_str, config_filename, helm_dir, + params_str, timeout_str, kdu_instance, kdu_model, version_str) + self.debug('upgrading: {}'.format(command)) + + if atomic: + + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='upgrade', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([ exec_task ]) + + # cancel status task + status_task.cancel() + output, rc = exec_task.result() + + else: + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='upgrade', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + # return new revision number + instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + if instance: + revision = int(instance.get('Revision')) + self.debug('New revision: {}'.format(revision)) + return revision + else: + return 0 + + async def rollback( + self, + cluster_uuid: str, + kdu_instance: str, + revision=0, + db_dict: dict = None + ): + + self.debug('rollback kdu_instance {} to revision {} from cluster {}' + .format(kdu_instance, revision, cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision) + + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='rollback', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + + output, rc = exec_task.result() + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='rollback', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + # return new revision number + instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + if instance: + revision = int(instance.get('Revision')) + self.debug('New revision: {}'.format(revision)) + return revision + else: + return 0 + + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str + ): + """ + Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen + after all _terminate-config-primitive_ of the VNF are invoked). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be deleted + :return: True if successful + """ + + self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} delete --purge {}'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + return self._output_to_table(output) + + async def inspect_kdu( + self, + kdu_model: str + ) -> str: + + self.debug('inspect kdu_model {}'.format(kdu_model)) + + command = '{} inspect values {}'\ + .format(self._helm_command, kdu_model) + + output, rc = await self._local_async_exec(command=command) + + return output + + async def help_kdu( + self, + kdu_model: str + ): + + self.debug('help kdu_model {}'.format(kdu_model)) + + command = '{} inspect readme {}'\ + .format(self._helm_command, kdu_model) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + return output + + async def status_kdu( + self, + cluster_uuid: str, + kdu_instance: str + ): + + return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True) + + + """ + ################################################################################################## + ########################################## P R I V A T E ######################################### + ################################################################################################## + """ + + async def _status_kdu( + self, + cluster_uuid: str, + kdu_instance: str, + show_error_log: bool = False + ): + + self.debug('status of kdu_instance {}'.format(kdu_instance)) + + # config filename + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} status {} --output yaml'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance) + + output, rc = await self._local_async_exec( + command=command, + raise_exception_on_error=True, + show_error_log=show_error_log + ) + + if rc != 0: + return None + + data = yaml.load(output, Loader=yaml.SafeLoader) + + # remove field 'notes' + try: + del data.get('info').get('status')['notes'] + except KeyError: + pass + + # parse field 'resources' + try: + resources = str(data.get('info').get('status').get('resources')) + resource_table = self._output_to_table(resources) + data.get('info').get('status')['resources'] = resource_table + except Exception as e: + pass + + return data + + async def get_instance_info( + self, + cluster_uuid: str, + kdu_instance: str + ): + instances = await self.instances_list(cluster_uuid=cluster_uuid) + for instance in instances: + if instance.get('Name') == kdu_instance: + return instance + self.debug('Instance {} not found'.format(kdu_instance)) + return None + + @staticmethod + def _generate_release_name( + chart_name: str + ): + name = '' + for c in chart_name: + if c.isalpha() or c.isnumeric(): + name += c + else: + name += '-' + if len(name) > 35: + name = name[0:35] + + # if does not start with alpha character, prefix 'a' + if not name[0].isalpha(): + name = 'a' + name + + name += '-' + + def get_random_number(): + r = random.randrange(start=1, stop=99999999) + s = str(r) + s = s.rjust(width=10, fillchar=' ') + return s + + name = name + get_random_number() + return name.lower() + + async def _store_status( + self, + cluster_uuid: str, + operation: str, + kdu_instance: str, + check_every: float = 10, + db_dict: dict = None, + run_once: bool = False + ): + while True: + try: + await asyncio.sleep(check_every) + detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + status = detailed_status.get('info').get('Description') + print('=' * 60) + self.debug('STATUS:\n{}'.format(status)) + self.debug('DETAILED STATUS:\n{}'.format(detailed_status)) + print('=' * 60) + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation) + if not result: + self.info('Error writing in database. Task exiting...') + return + except asyncio.CancelledError: + self.debug('Task cancelled') + return + except Exception as e: + pass + finally: + if run_once: + return + + async def _is_install_completed( + self, + cluster_uuid: str, + kdu_instance: str + ) -> bool: + + status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + + # extract info.status.resources-> str + # format: + # ==> v1/Deployment + # NAME READY UP-TO-DATE AVAILABLE AGE + # halting-horse-mongodb 0/1 1 0 0s + # halting-petit-mongodb 1/1 1 0 0s + # blank line + resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources')) + + # convert to table + resources = K8sHelmConnector._output_to_table(resources) + + num_lines = len(resources) + index = 0 + while index < num_lines: + try: + line1 = resources[index] + index += 1 + # find '==>' in column 0 + if line1[0] == '==>': + line2 = resources[index] + index += 1 + # find READY in column 1 + if line2[1] == 'READY': + # read next lines + line3 = resources[index] + index += 1 + while len(line3) > 1 and index < num_lines: + ready_value = line3[1] + parts = ready_value.split(sep='/') + current = int(parts[0]) + total = int(parts[1]) + if current < total: + self.debug('NOT READY:\n {}'.format(line3)) + ready = False + line3 = resources[index] + index += 1 + + except Exception as e: + pass + + return ready + + @staticmethod + def _get_deep(dictionary: dict, members: tuple): + target = dictionary + value = None + try: + for m in members: + value = target.get(m) + if not value: + return None + else: + target = value + except Exception as e: + pass + return value + + # find key:value in several lines + @staticmethod + def _find_in_lines(p_lines: list, p_key: str) -> str: + for line in p_lines: + try: + if line.startswith(p_key + ':'): + parts = line.split(':') + the_value = parts[1].strip() + return the_value + except Exception as e: + # ignore it + pass + return None + + # params for use in -f file + # returns values file option and filename (in order to delete it at the end) + def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): + params_str = '' + + if params and len(params) > 0: + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + def get_random_number(): + r = random.randrange(start=1, stop=99999999) + s = str(r) + while len(s) < 10: + s = '0' + s + return s + + params2 = dict() + for key in params: + value = params.get(key) + if '!!yaml' in str(value): + value = yaml.load(value[7:]) + params2[key] = value + + values_file = get_random_number() + '.yaml' + with open(values_file, 'w') as stream: + yaml.dump(params2, stream, indent=4, default_flow_style=False) + + return '-f {}'.format(values_file), values_file + + return '', None + + # params for use in --set option + @staticmethod + def _params_to_set_option(params: dict) -> str: + params_str = '' + if params and len(params) > 0: + start = True + for key in params: + value = params.get(key, None) + if value is not None: + if start: + params_str += '--set ' + start = False + else: + params_str += ',' + params_str += '{}={}'.format(key, value) + return params_str + + @staticmethod + def _output_to_lines(output: str) -> list: + output_lines = list() + lines = output.splitlines(keepends=False) + for line in lines: + line = line.strip() + if len(line) > 0: + output_lines.append(line) + return output_lines + + @staticmethod + def _output_to_table(output: str) -> list: + output_table = list() + lines = output.splitlines(keepends=False) + for line in lines: + line = line.replace('\t', ' ') + line_list = list() + output_table.append(line_list) + cells = line.split(sep=' ') + for cell in cells: + cell = cell.strip() + if len(cell) > 0: + line_list.append(cell) + return output_table + + def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str): + """ + Returns kube and helm directories + + :param cluster_name: + :param create_if_not_exist: + :return: kube, helm directories, config filename and cluster dir. + Raises exception if not exist and cannot create + """ + + base = self.fs.path + if base.endswith("/") or base.endswith("\\"): + base = base[:-1] + + # base dir for cluster + cluster_dir = base + '/' + cluster_name + if create_if_not_exist and not os.path.exists(cluster_dir): + self.debug('Creating dir {}'.format(cluster_dir)) + os.makedirs(cluster_dir) + if not os.path.exists(cluster_dir): + msg = 'Base cluster dir {} does not exist'.format(cluster_dir) + self.error(msg) + raise Exception(msg) + + # kube dir + kube_dir = cluster_dir + '/' + '.kube' + if create_if_not_exist and not os.path.exists(kube_dir): + self.debug('Creating dir {}'.format(kube_dir)) + os.makedirs(kube_dir) + if not os.path.exists(kube_dir): + msg = 'Kube config dir {} does not exist'.format(kube_dir) + self.error(msg) + raise Exception(msg) + + # helm home dir + helm_dir = cluster_dir + '/' + '.helm' + if create_if_not_exist and not os.path.exists(helm_dir): + self.debug('Creating dir {}'.format(helm_dir)) + os.makedirs(helm_dir) + if not os.path.exists(helm_dir): + msg = 'Helm config dir {} does not exist'.format(helm_dir) + self.error(msg) + raise Exception(msg) + + config_filename = kube_dir + '/config' + return kube_dir, helm_dir, config_filename, cluster_dir + + @staticmethod + def _remove_multiple_spaces(str): + str = str.strip() + while ' ' in str: + str = str.replace(' ', ' ') + return str + + def _local_exec( + self, + command: str + ) -> (str, int): + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing sync local command: {}'.format(command)) + # raise exception if fails + output = '' + try: + output = subprocess.check_output(command, shell=True, universal_newlines=True) + return_code = 0 + self.debug(output) + except Exception as e: + return_code = 1 + + return output, return_code + + async def _local_async_exec( + self, + command: str, + raise_exception_on_error: bool = False, + show_error_log: bool = True + ) -> (str, int): + + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing async local command: {}'.format(command)) + + # split command + command = command.split(sep=' ') + + try: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + # wait for command terminate + stdout, stderr = await process.communicate() + + return_code = process.returncode + + output = '' + if stdout: + output = stdout.decode('utf-8').strip() + if stderr: + output = stderr.decode('utf-8').strip() + + if return_code != 0 and show_error_log: + self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) + else: + self.debug('Return code: {}'.format(return_code)) + + if raise_exception_on_error and return_code != 0: + raise Exception(output) + + return output, return_code + + except Exception as e: + msg = 'Exception executing command: {} -> {}'.format(command, e) + if show_error_log: + self.error(msg) + return '', -1 + + def _remote_exec( + self, + hostname: str, + username: str, + password: str, + command: str, + timeout: int = 10 + ) -> (str, int): + + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing sync remote ssh command: {}'.format(command)) + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=hostname, username=username, password=password) + ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout) + output = ssh_stdout.read().decode('utf-8') + error = ssh_stderr.read().decode('utf-8') + if error: + self.error('ERROR: {}'.format(error)) + return_code = 1 + else: + return_code = 0 + output = output.replace('\\n', '\n') + self.debug('OUTPUT: {}'.format(output)) + + return output, return_code + + def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): + self.debug('Checking if file {} exists...'.format(filename)) + if os.path.exists(filename): + return True + else: + msg = 'File {} does not exist'.format(filename) + if exception_if_not_exists: + self.error(msg) + raise Exception(msg) + diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py new file mode 100644 index 0000000..4f62898 --- /dev/null +++ b/n2vc/k8s_juju_conn.py @@ -0,0 +1,952 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import concurrent +from .exceptions import NotImplemented + +import juju +# from juju.bundle import BundleHandler +from juju.controller import Controller +from juju.model import Model +from juju.errors import JujuAPIError, JujuError + +import logging + +from n2vc.k8s_conn import K8sConnector + +import os +# import re +# import ssl +import subprocess +# from .vnf import N2VC + +import uuid +import yaml + + +class K8sJujuConnector(K8sConnector): + def __init__( + self, + fs, + kubectl_command='/usr/bin/kubectl', + log=None + ): + """ + + :param kubectl_command: path to kubectl executable + :param helm_command: path to helm executable + :param fs: file system for kubernetes and helm configuration + :param log: logger + """ + + # parent class + K8sConnector.__init__( + self, + kubectl_command=kubectl_command, + fs=fs, + log=log, + ) + + self.info('Initializing K8S Juju connector') + + self.authenticated = False + self.models = {} + self.log = logging.getLogger(__name__) + self.info('K8S Juju connector initialized') + + """Initialization""" + async def init_env( + self, + k8s_creds: dict, + namespace: str = 'kube-system', + reuse_cluster_uuid: str = None, + ) -> str: + """Initialize a Kubernetes environment + + :param k8s_creds dict: A dictionary containing the Kubernetes cluster + configuration + :param namespace str: The Kubernetes namespace to initialize + + :return: UUID of the k8s context or raises an exception + """ + + """Bootstrapping + + Bootstrapping cannot be done, by design, through the API. We need to + use the CLI tools. + """ + # TODO: The path may change + jujudir = "/snap/bin" + + self.k8scli = "{}/juju".format(jujudir) + + """ + WIP: Workflow + + 1. Has the environment already been bootstrapped? + - Check the database to see if we have a record for this env + + 2. If this is a new env, create it + - Add the k8s cloud to Juju + - Bootstrap + - Record it in the database + + 3. Connect to the Juju controller for this cloud + + """ + # cluster_uuid = reuse_cluster_uuid + # if not cluster_uuid: + # cluster_uuid = str(uuid4()) + + ################################################## + # TODO: Pull info from db based on the namespace # + ################################################## + + if not reuse_cluster_uuid: + # This is a new cluster, so bootstrap it + + cluster_uuid = str(uuid.uuid4()) + + # Add k8s cloud to Juju (unless it's microk8s) + + # Does the kubeconfig contain microk8s? + microk8s = self.is_microk8s_by_credentials(k8s_creds) + + if not microk8s: + # Name the new k8s cloud + k8s_cloud = "{}-k8s".format(namespace) + + await self.add_k8s(k8s_cloud, k8s_creds) + + # Bootstrap Juju controller + self.bootstrap(k8s_cloud, cluster_uuid) + else: + # k8s_cloud = 'microk8s-test' + k8s_cloud = "{}-k8s".format(namespace) + + await self.add_k8s(k8s_cloud, k8s_creds) + + await self.bootstrap(k8s_cloud, cluster_uuid) + + # Get the controller information + + # Parse ~/.local/share/juju/controllers.yaml + # controllers.testing.api-endpoints|ca-cert|uuid + with open(os.path.expanduser( + "~/.local/share/juju/controllers.yaml" + )) as f: + controllers = yaml.load(f, Loader=yaml.Loader) + controller = controllers['controllers'][cluster_uuid] + endpoints = controller['api-endpoints'] + self.juju_endpoint = endpoints[0] + self.juju_ca_cert = controller['ca-cert'] + + # Parse ~/.local/share/juju/accounts + # controllers.testing.user|password + with open(os.path.expanduser( + "~/.local/share/juju/accounts.yaml" + )) as f: + controllers = yaml.load(f, Loader=yaml.Loader) + controller = controllers['controllers'][cluster_uuid] + + self.juju_user = controller['user'] + self.juju_secret = controller['password'] + + print("user: {}".format(self.juju_user)) + print("secret: {}".format(self.juju_secret)) + print("endpoint: {}".format(self.juju_endpoint)) + print("ca-cert: {}".format(self.juju_ca_cert)) + + # raise Exception("EOL") + + self.juju_public_key = None + + config = { + 'endpoint': self.juju_endpoint, + 'username': self.juju_user, + 'secret': self.juju_secret, + 'cacert': self.juju_ca_cert, + 'namespace': namespace, + 'microk8s': microk8s, + } + + # Store the cluster configuration so it + # can be used for subsequent calls + await self.set_config(cluster_uuid, config) + + else: + # This is an existing cluster, so get its config + cluster_uuid = reuse_cluster_uuid + + config = self.get_config(cluster_uuid) + + self.juju_endpoint = config['endpoint'] + self.juju_user = config['username'] + self.juju_secret = config['secret'] + self.juju_ca_cert = config['cacert'] + self.juju_public_key = None + + # Login to the k8s cluster + if not self.authenticated: + await self.login() + + # We're creating a new cluster + print("Getting model {}".format(self.get_namespace(cluster_uuid))) + model = await self.get_model(self.get_namespace(cluster_uuid)) + + # Disconnect from the model + if model and model.is_connected(): + await model.disconnect() + + return cluster_uuid + + """Repo Management""" + async def repo_add( + self, + name: str, + url: str, + type: str = "charm", + ): + raise NotImplemented() + + async def repo_list(self): + raise NotImplemented() + + async def repo_remove( + self, + name: str, + ): + raise NotImplemented() + + """Reset""" + async def reset( + self, + cluster_uuid: str, + ) -> bool: + """Reset a cluster + + Resets the Kubernetes cluster by removing the model that represents it. + + :param cluster_uuid str: The UUID of the cluster to reset + :return: Returns True if successful or raises an exception. + """ + + try: + if not self.authenticated: + await self.login() + + if self.controller.is_connected(): + # Destroy the model + namespace = self.get_namespace(cluster_uuid) + if await self.has_model(namespace): + print("[reset] Destroying model") + await self.controller.destroy_model( + namespace, + destroy_storage=True + ) + + # Disconnect from the controller + print("[reset] Disconnecting controller") + await self.controller.disconnect() + + # Destroy the controller (via CLI) + print("[reset] Destroying controller") + await self.destroy_controller(cluster_uuid) + + """Remove the k8s cloud + + Only remove the k8s cloud if it's not a microk8s cloud, + since microk8s is a built-in cloud type. + """ + # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid) + # if not microk8s: + print("[reset] Removing k8s cloud") + namespace = self.get_namespace(cluster_uuid) + k8s_cloud = "{}-k8s".format(namespace) + await self.remove_cloud(k8s_cloud) + + except Exception as ex: + print("Caught exception during reset: {}".format(ex)) + + """Deployment""" + async def install( + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: int = None, + params: dict = None, + ) -> str: + """Install a bundle + + :param cluster_uuid str: The UUID of the cluster to install to + :param kdu_model str: The name or path of a bundle to install + :param atomic bool: If set, waits until the model is active and resets + the cluster on failure. + :param timeout int: The time, in seconds, to wait for the install + to finish + :param params dict: Key-value pairs of instantiation parameters + + :return: If successful, returns ? + """ + + if not self.authenticated: + print("[install] Logging in to the controller") + await self.login() + + ## + # Get or create the model, based on the namespace the cluster was + # instantiated with. + namespace = self.get_namespace(cluster_uuid) + model = await self.get_model(namespace) + if not model: + # Create the new model + model = await self.add_model(namespace) + + if model: + # TODO: Instantiation parameters + + print("[install] deploying {}".format(kdu_model)) + await model.deploy(kdu_model) + + # Get the application + if atomic: + # applications = model.applications + print("[install] Applications: {}".format(model.applications)) + for name in model.applications: + print("[install] Waiting for {} to settle".format(name)) + application = model.applications[name] + try: + # It's not enough to wait for all units to be active; + # the application status needs to be active as well. + print("Waiting for all units to be active...") + await model.block_until( + lambda: all( + unit.agent_status == 'idle' + and application.status in ['active', 'unknown'] + and unit.workload_status in [ + 'active', 'unknown' + ] for unit in application.units + ), + timeout=timeout + ) + print("All units active.") + + except concurrent.futures._base.TimeoutError: + print("[install] Timeout exceeded; resetting cluster") + await self.reset(cluster_uuid) + return False + + # Wait for the application to be active + if model.is_connected(): + print("[install] Disconnecting model") + await model.disconnect() + + return True + raise Exception("Unable to install") + + async def instances_list( + self, + cluster_uuid: str + ) -> list: + """ + returns a list of deployed releases in a cluster + + :param cluster_uuid: the cluster + :return: + """ + return [] + + async def upgrade( + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + params: dict = None, + ) -> str: + """Upgrade a model + + :param cluster_uuid str: The UUID of the cluster to upgrade + :param kdu_instance str: The unique name of the KDU instance + :param kdu_model str: The name or path of the bundle to upgrade to + :param params dict: Key-value pairs of instantiation parameters + + :return: If successful, reference to the new revision number of the + KDU instance. + """ + + # TODO: Loop through the bundle and upgrade each charm individually + + """ + The API doesn't have a concept of bundle upgrades, because there are + many possible changes: charm revision, disk, number of units, etc. + + As such, we are only supporting a limited subset of upgrades. We'll + upgrade the charm revision but leave storage and scale untouched. + + Scale changes should happen through OSM constructs, and changes to + storage would require a redeployment of the service, at least in this + initial release. + """ + namespace = self.get_namespace(cluster_uuid) + model = await self.get_model(namespace) + + with open(kdu_model, 'r') as f: + bundle = yaml.load(f, Loader=yaml.FullLoader) + + """ + { + 'description': 'Test bundle', + 'bundle': 'kubernetes', + 'applications': { + 'mariadb-k8s': { + 'charm': 'cs:~charmed-osm/mariadb-k8s-20', + 'scale': 1, + 'options': { + 'password': 'manopw', + 'root_password': 'osm4u', + 'user': 'mano' + }, + 'series': 'kubernetes' + } + } + } + """ + # TODO: This should be returned in an agreed-upon format + for name in bundle['applications']: + print(model.applications) + application = model.applications[name] + print(application) + + path = bundle['applications'][name]['charm'] + + try: + await application.upgrade_charm(switch=path) + except juju.errors.JujuError as ex: + if 'already running charm' in str(ex): + # We're already running this version + pass + + await model.disconnect() + + return True + raise NotImplemented() + + """Rollback""" + async def rollback( + self, + cluster_uuid: str, + kdu_instance: str, + revision: int = 0, + ) -> str: + """Rollback a model + + :param cluster_uuid str: The UUID of the cluster to rollback + :param kdu_instance str: The unique name of the KDU instance + :param revision int: The revision to revert to. If omitted, rolls back + the previous upgrade. + + :return: If successful, returns the revision of active KDU instance, + or raises an exception + """ + raise NotImplemented() + + """Deletion""" + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str, + ) -> bool: + """Uninstall a KDU instance + + :param cluster_uuid str: The UUID of the cluster to uninstall + :param kdu_instance str: The unique name of the KDU instance + + :return: Returns True if successful, or raises an exception + """ + removed = False + + # Remove an application from the model + model = await self.get_model(self.get_namespace(cluster_uuid)) + + if model: + # Get the application + if kdu_instance not in model.applications: + # TODO: Raise a named exception + raise Exception("Application not found.") + + application = model.applications[kdu_instance] + + # Destroy the application + await application.destroy() + + # TODO: Verify removal + + removed = True + return removed + + """Introspection""" + async def inspect_kdu( + self, + kdu_model: str, + ) -> dict: + """Inspect a KDU + + Inspects a bundle and returns a dictionary of config parameters and + their default values. + + :param kdu_model str: The name or path of the bundle to inspect. + + :return: If successful, returns a dictionary of available parameters + and their default values. + """ + + kdu = {} + with open(kdu_model, 'r') as f: + bundle = yaml.load(f, Loader=yaml.FullLoader) + + """ + { + 'description': 'Test bundle', + 'bundle': 'kubernetes', + 'applications': { + 'mariadb-k8s': { + 'charm': 'cs:~charmed-osm/mariadb-k8s-20', + 'scale': 1, + 'options': { + 'password': 'manopw', + 'root_password': 'osm4u', + 'user': 'mano' + }, + 'series': 'kubernetes' + } + } + } + """ + # TODO: This should be returned in an agreed-upon format + kdu = bundle['applications'] + + return kdu + + async def help_kdu( + self, + kdu_model: str, + ) -> str: + """View the README + + If available, returns the README of the bundle. + + :param kdu_model str: The name or path of a bundle + + :return: If found, returns the contents of the README. + """ + readme = None + + files = ['README', 'README.txt', 'README.md'] + path = os.path.dirname(kdu_model) + for file in os.listdir(path): + if file in files: + with open(file, 'r') as f: + readme = f.read() + break + + return readme + + async def status_kdu( + self, + cluster_uuid: str, + kdu_instance: str, + ) -> dict: + """Get the status of the KDU + + Get the current status of the KDU instance. + + :param cluster_uuid str: The UUID of the cluster + :param kdu_instance str: The unique id of the KDU instance + + :return: Returns a dictionary containing namespace, state, resources, + and deployment_time. + """ + status = {} + + model = await self.get_model(self.get_namespace(cluster_uuid)) + + # model = await self.get_model_by_uuid(cluster_uuid) + if model: + model_status = await model.get_status() + status = model_status.applications + + for name in model_status.applications: + application = model_status.applications[name] + status[name] = { + 'status': application['status']['status'] + } + + if model.is_connected(): + await model.disconnect() + + return status + + # Private methods + async def add_k8s( + self, + cloud_name: str, + credentials: dict, + ) -> bool: + """Add a k8s cloud to Juju + + Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a + Juju Controller. + + :param cloud_name str: The name of the cloud to add. + :param credentials dict: A dictionary representing the output of + `kubectl config view --raw`. + + :returns: True if successful, otherwise raises an exception. + """ + cmd = [self.k8scli, "add-k8s", "--local", cloud_name] + + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + input=yaml.dump(credentials, Dumper=yaml.Dumper), + encoding='ascii' + ) + retcode = p.returncode + + if retcode > 0: + raise Exception(p.stderr) + return True + + async def add_model( + self, + model_name: str + ) -> juju.model.Model: + """Adds a model to the controller + + Adds a new model to the Juju controller + + :param model_name str: The name of the model to add. + :returns: The juju.model.Model object of the new model upon success or + raises an exception. + """ + if not self.authenticated: + await self.login() + + model = await self.controller.add_model( + model_name, + config={'authorized-keys': self.juju_public_key} + ) + return model + + async def bootstrap( + self, + cloud_name: str, + cluster_uuid: str + ) -> bool: + """Bootstrap a Kubernetes controller + + Bootstrap a Juju controller inside the Kubernetes cluster + + :param cloud_name str: The name of the cloud. + :param cluster_uuid str: The UUID of the cluster to bootstrap. + :returns: True upon success or raises an exception. + """ + cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid] + print("Bootstrapping controller {} in cloud {}".format( + cluster_uuid, cloud_name + )) + + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding='ascii' + ) + retcode = p.returncode + + if retcode > 0: + # + if 'already exists' not in p.stderr: + raise Exception(p.stderr) + + return True + + async def destroy_controller( + self, + cluster_uuid: str + ) -> bool: + """Destroy a Kubernetes controller + + Destroy an existing Kubernetes controller. + + :param cluster_uuid str: The UUID of the cluster to bootstrap. + :returns: True upon success or raises an exception. + """ + cmd = [ + self.k8scli, + "destroy-controller", + "--destroy-all-models", + "--destroy-storage", + "-y", + cluster_uuid + ] + + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding='ascii' + ) + retcode = p.returncode + + if retcode > 0: + # + if 'already exists' not in p.stderr: + raise Exception(p.stderr) + + def get_config( + self, + cluster_uuid: str, + ) -> dict: + """Get the cluster configuration + + Gets the configuration of the cluster + + :param cluster_uuid str: The UUID of the cluster. + :return: A dict upon success, or raises an exception. + """ + cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid) + if os.path.exists(cluster_config): + with open(cluster_config, 'r') as f: + config = yaml.load(f.read(), Loader=yaml.FullLoader) + return config + else: + raise Exception( + "Unable to locate configuration for cluster {}".format( + cluster_uuid + ) + ) + + async def get_model( + self, + model_name: str, + ) -> juju.model.Model: + """Get a model from the Juju Controller. + + Note: Model objects returned must call disconnected() before it goes + out of scope. + + :param model_name str: The name of the model to get + :return The juju.model.Model object if found, or None. + """ + if not self.authenticated: + await self.login() + + model = None + models = await self.controller.list_models() + + if model_name in models: + model = await self.controller.get_model( + model_name + ) + return model + + def get_namespace( + self, + cluster_uuid: str, + ) -> str: + """Get the namespace UUID + Gets the namespace's unique name + + :param cluster_uuid str: The UUID of the cluster + :returns: The namespace UUID, or raises an exception + """ + config = self.get_config(cluster_uuid) + + # Make sure the name is in the config + if 'namespace' not in config: + raise Exception("Namespace not found.") + + # TODO: We want to make sure this is unique to the cluster, in case + # the cluster is being reused. + # Consider pre/appending the cluster id to the namespace string + return config['namespace'] + + async def has_model( + self, + model_name: str + ) -> bool: + """Check if a model exists in the controller + + Checks to see if a model exists in the connected Juju controller. + + :param model_name str: The name of the model + :return: A boolean indicating if the model exists + """ + models = await self.controller.list_models() + + if model_name in models: + return True + return False + + def is_microk8s_by_cluster_uuid( + self, + cluster_uuid: str, + ) -> bool: + """Check if a cluster is micro8s + + Checks if a cluster is running microk8s + + :param cluster_uuid str: The UUID of the cluster + :returns: A boolean if the cluster is running microk8s + """ + config = self.get_config(cluster_uuid) + return config['microk8s'] + + def is_microk8s_by_credentials( + self, + credentials: dict, + ) -> bool: + """Check if a cluster is micro8s + + Checks if a cluster is running microk8s + + :param credentials dict: A dictionary containing the k8s credentials + :returns: A boolean if the cluster is running microk8s + """ + for context in credentials['contexts']: + if 'microk8s' in context['name']: + return True + + return False + + async def login(self): + """Login to the Juju controller.""" + + if self.authenticated: + return + + self.connecting = True + + self.controller = Controller() + + if self.juju_secret: + self.log.debug( + "Connecting to controller... ws://{} as {}/{}".format( + self.juju_endpoint, + self.juju_user, + self.juju_secret, + ) + ) + try: + await self.controller.connect( + endpoint=self.juju_endpoint, + username=self.juju_user, + password=self.juju_secret, + cacert=self.juju_ca_cert, + ) + self.authenticated = True + self.log.debug("JujuApi: Logged into controller") + except Exception as ex: + print(ex) + self.log.debug("Caught exception: {}".format(ex)) + pass + else: + self.log.fatal("VCA credentials not configured.") + self.authenticated = False + + async def logout(self): + """Logout of the Juju controller.""" + print("[logout]") + if not self.authenticated: + return False + + for model in self.models: + print("Logging out of model {}".format(model)) + await self.models[model].disconnect() + + if self.controller: + self.log.debug("Disconnecting controller {}".format( + self.controller + )) + await self.controller.disconnect() + self.controller = None + + self.authenticated = False + + async def remove_cloud( + self, + cloud_name: str, + ) -> bool: + """Remove a k8s cloud from Juju + + Removes a Kubernetes cloud from Juju. + + :param cloud_name str: The name of the cloud to add. + + :returns: True if successful, otherwise raises an exception. + """ + + # Remove the bootstrapped controller + cmd = [self.k8scli, "remove-k8s", "--client", cloud_name] + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding='ascii' + ) + retcode = p.returncode + + if retcode > 0: + raise Exception(p.stderr) + + # Remove the cloud from the local config + cmd = [self.k8scli, "remove-cloud", "--client", cloud_name] + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding='ascii' + ) + retcode = p.returncode + + if retcode > 0: + raise Exception(p.stderr) + + + return True + + async def set_config( + self, + cluster_uuid: str, + config: dict, + ) -> bool: + """Save the cluster configuration + + Saves the cluster information to the file store + + :param cluster_uuid str: The UUID of the cluster + :param config dict: A dictionary containing the cluster configuration + :returns: Boolean upon success or raises an exception. + """ + + cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid) + if not os.path.exists(cluster_config): + print("Writing config to {}".format(cluster_config)) + with open(cluster_config, 'w') as f: + f.write(yaml.dump(config, Dumper=yaml.Dumper)) + + return True diff --git a/n2vc/loggable.py b/n2vc/loggable.py new file mode 100644 index 0000000..40efa24 --- /dev/null +++ b/n2vc/loggable.py @@ -0,0 +1,167 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + + +import logging +import asyncio +import time +import inspect +import datetime +import threading # only for logging purposes (not for using threads) + + +class Loggable: + + def __init__( + self, + log, + log_to_console: bool = False, + prefix: str = '' + ): + + self._last_log_time = None # used for time increment in logging + self._log_to_console = log_to_console + self._prefix = prefix + if log is not None: + self.log = log + else: + self.log = logging.getLogger(__name__) + + def debug(self, msg: str): + self._log_msg(log_level='DEBUG', msg=msg) + + def info(self, msg: str): + self._log_msg(log_level='INFO', msg=msg) + + def warning(self, msg: str): + self._log_msg(log_level='WARNING', msg=msg) + + def error(self, msg: str): + self._log_msg(log_level='ERROR', msg=msg) + + def critical(self, msg: str): + self._log_msg(log_level='CRITICAL', msg=msg) + + ################################################################################################## + + def _log_msg(self, log_level: str, msg: str): + """Generic log method""" + msg = self._format_log( + log_level=log_level, + msg=msg, + obj=self, + level=3, + include_path=False, + include_thread=False, + include_coroutine=True + ) + if self._log_to_console: + print(msg) + else: + if self.log is not None: + if log_level == 'DEBUG': + self.log.debug(msg) + elif log_level == 'INFO': + self.log.info(msg) + elif log_level == 'WARNING': + self.log.warning(msg) + elif log_level == 'ERROR': + self.log.error(msg) + elif log_level == 'CRITICAL': + self.log.critical(msg) + + def _format_log( + self, + log_level: str, + msg: str = '', + obj: object = None, + level: int = None, + include_path: bool = False, + include_thread: bool = False, + include_coroutine: bool = True + ) -> str: + + # time increment from last log + now = time.perf_counter() + if self._last_log_time is None: + time_str = ' (+0.000)' + else: + diff = round(now - self._last_log_time, 3) + time_str = ' (+{})'.format(diff) + self._last_log_time = now + + if level is None: + level = 1 + + # stack info + fi = inspect.stack()[level] + filename = fi.filename + func = fi.function + lineno = fi.lineno + # filename without path + if not include_path: + i = filename.rfind('/') + if i > 0: + filename = filename[i+1:] + + # datetime + dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') + dt = dt + time_str + dt = time_str # logger already shows datetime + + # current thread + if include_thread: + thread_name = 'th:{}'.format(threading.current_thread().getName()) + else: + thread_name = '' + + # current coroutine + + coroutine_id = '' + if include_coroutine: + try: + if asyncio.Task.current_task() is not None: + def print_cor_name(c): + import inspect + try: + for m in inspect.getmembers(c): + if m[0] == '__name__': + return m[1] + except Exception: + pass + coro = asyncio.Task.current_task()._coro + coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro)) + except Exception: + coroutine_id = '' + + # classname + if obj is not None: + obj_type = obj.__class__.__name__ # type: str + log_msg = \ + '{} {} {} {} {}::{}.{}():{}\n{}'\ + .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg)) + else: + log_msg = \ + '{} {} {} {} {}::{}():{}\n{}'\ + .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg)) + + return log_msg diff --git a/tests/bundles/k8s-zookeeper-downgrade.yaml b/tests/bundles/k8s-zookeeper-downgrade.yaml new file mode 100644 index 0000000..8a99e2c --- /dev/null +++ b/tests/bundles/k8s-zookeeper-downgrade.yaml @@ -0,0 +1,21 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +description: Test bundle +bundle: kubernetes +applications: + zookeeper-k8s: + charm: 'cs:~charmed-osm/zookeeper-k8s-29' + scale: 1 + series: kubernetes diff --git a/tests/bundles/k8s-zookeeper-upgrade.yaml b/tests/bundles/k8s-zookeeper-upgrade.yaml new file mode 100644 index 0000000..8308f2f --- /dev/null +++ b/tests/bundles/k8s-zookeeper-upgrade.yaml @@ -0,0 +1,21 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +description: Test bundle +bundle: kubernetes +applications: + zookeeper-k8s: + charm: 'cs:~charmed-osm/zookeeper-k8s-31' + scale: 1 + series: kubernetes diff --git a/tests/bundles/k8s-zookeeper.yaml b/tests/bundles/k8s-zookeeper.yaml new file mode 100644 index 0000000..689220e --- /dev/null +++ b/tests/bundles/k8s-zookeeper.yaml @@ -0,0 +1,21 @@ +# Copyright 2019 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +description: Test bundle +bundle: kubernetes +applications: + zookeeper-k8s: + charm: 'cs:~charmed-osm/zookeeper-k8s-30' + scale: 1 + series: kubernetes diff --git a/tests/test_k8s_juju_conn.py b/tests/test_k8s_juju_conn.py new file mode 100644 index 0000000..6e1e98e --- /dev/null +++ b/tests/test_k8s_juju_conn.py @@ -0,0 +1,129 @@ +# Copyright 2019 Canonical Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import logging +import n2vc.k8s_juju_conn +from base import get_juju_public_key +import os +from osm_common.fslocal import FsLocal +import subprocess +import yaml + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--cluster_uuid", help='The UUID of an existing cluster to use', default=None) + parser.add_argument("--reset", action="store_true") + return parser.parse_args() + +async def main(): + + args = get_args() + + reuse_cluster_uuid = args.cluster_uuid + + log = logging.getLogger() + log.level = logging.DEBUG + + # Extract parameters from the environment in order to run our tests + vca_host = os.getenv('VCA_HOST', '127.0.0.1') + vca_port = os.getenv('VCA_PORT', 17070) + vca_user = os.getenv('VCA_USER', 'admin') + vca_charms = os.getenv('VCA_CHARMS', None) + vca_secret = os.getenv('VCA_SECRET', None) + vca_ca_cert = os.getenv('VCA_CACERT', None) + + # Get the Juju Public key + juju_public_key = get_juju_public_key() + if juju_public_key: + with open(juju_public_key, 'r') as f: + juju_public_key = f.read() + else: + raise Exception("No Juju Public Key found") + + storage = { + 'driver': 'local', + 'path': '/srv/app/storage' + } + fs = FsLocal() + fs.fs_connect(storage) + + client = n2vc.k8s_juju_conn.K8sJujuConnector( + kubectl_command = '/bin/true', + fs = fs, + ) + + # kubectl config view --raw + # microk8s.config + + # if microk8s then + kubecfg = subprocess.getoutput('microk8s.config') + # else + # kubecfg.subprocess.getoutput('kubectl config view --raw') + + k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader) + namespace = 'testing' + kdu_model = "./tests/bundles/k8s-zookeeper.yaml" + + """init_env""" + cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid) + print(cluster_uuid) + + if not reuse_cluster_uuid: + # This is a new cluster, so install to it + + """install""" + # async def install(self, cluster_uuid, kdu_model, atomic=True, timeout=None, params=None): + # TODO: Re-add storage declaration to bundle. The API doesn't support the storage option yet. David is investigating. + + # Deploy the bundle + kdu_instance = await client.install(cluster_uuid, kdu_model, atomic=True, timeout=600) + + if kdu_instance: + # Inspect + print("Getting status") + status = await client.status_kdu(cluster_uuid, kdu_instance) + print(status) + + # Inspect the bundle + config = await client.inspect_kdu(kdu_model) + print(config) + + readme = await client.help_kdu(kdu_model) + # print(readme) + + + """upgrade + Upgrade to a newer version of the bundle + """ + kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-upgrade.yaml" + upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade) + + kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-downgrade.yaml" + upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade) + + """uninstall""" + + """reset""" + if args.reset: + await client.reset(cluster_uuid) + + await client.logout() + + print("Done") + +if __name__ == "__main__": + asyncio.run(main())