From a049b7ce5d1606440447a88a98dd70548a1a0c74 Mon Sep 17 00:00:00 2001 From: quilesj Date: Mon, 28 Oct 2019 18:08:00 +0100 Subject: [PATCH] K8s generic connector Change-Id: I7c7879d556783785f5510dcf0e63d8f6dda43d2c Signed-off-by: quilesj --- n2vc/k8s_conn.py | 350 +++++++++++++++++++++++++++++++++++++++++++++++ n2vc/loggable.py | 167 ++++++++++++++++++++++ 2 files changed, 517 insertions(+) create mode 100644 n2vc/k8s_conn.py create mode 100644 n2vc/loggable.py diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py new file mode 100644 index 0000000..d951c2c --- /dev/null +++ b/n2vc/k8s_conn.py @@ -0,0 +1,350 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +import asyncio +from n2vc.loggable import Loggable +import abc +import time + + +class K8sConnector(abc.ABC, Loggable): + + """ + ################################################################################################## + ########################################## P U B L I C ########################################### + ################################################################################################## + """ + + def __init__( + self, + db: object, + log: object = None, + on_update_db=None + ): + """ + + :param db: database object to write current operation status + :param log: logger for tracing + :param on_update_db: callback called when k8s connector updates database + """ + + # parent class + Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S') + + self.info('Initializing generic K8S connector') + + # the database and update callback + self.db = db + self.on_update_db = on_update_db + + self.info('K8S generic connector initialized') + + @abc.abstractmethod + async def init_env( + self, + k8s_creds: str, + namespace: str = 'kube-system', + reuse_cluster_uuid = None + ) -> (str, bool): + """ + It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides: + client (OSM) + server (Tiller/Charm) + + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' + :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used + :param reuse_cluster_uuid: existing cluster uuid for reuse + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster + (on error, an exception will be raised) + """ + + @abc.abstractmethod + async def repo_add( + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = 'chart' + ): + """ + Add a new repository to OSM database + + :param cluster_uuid: the cluster + :param name: name for the repo in OSM + :param url: URL of the repo + :param repo_type: either "chart" or "bundle" + :return: True if successful + """ + + @abc.abstractmethod + async def repo_list( + self, + cluster_uuid: str + ): + """ + Get the list of registered repositories + + :param cluster_uuid: the cluster + :return: list of registered repositories: [ (name, url) .... ] + """ + + @abc.abstractmethod + async def repo_remove( + self, + cluster_uuid: str, + name: str + ): + """ + Remove a repository from OSM + + :param name: repo name in OSM + :param cluster_uuid: the cluster + :return: True if successful + """ + + @abc.abstractmethod + async def reset( + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False + ) -> bool: + """ + Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters. + Intended to be used e.g. when the NS instance is deleted. + + :param cluster_uuid: UUID of a K8s cluster known by OSM. + :param force: force deletion, even in case there are deployed releases + :param uninstall_sw: flag to indicate that sw uninstallation from software is needed + :return: str: kdu_instance generated by helm + """ + + @abc.abstractmethod + async def install( + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + """ + Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle + properly parametrized (in practice, this call would happen before any _initial-config-primitive_ + of the VNF is called). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_model: chart/bundle:version reference (string), which can be either of these options: + - a name of chart/bundle available via the repos known by OSM + - a path to a packaged chart/bundle + - a path to an unpacked chart/bundle directory or a URL + :param atomic: If set, installation process purges chart/bundle on fail, also will wait until + all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to + Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters (overriding default values) + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return: True if successful + """ + + @abc.abstractmethod + async def upgrade( + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + """ + Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle. + It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be updated + :param kdu_model: new chart/bundle:version reference + :param atomic: rollback in case of fail and wait for pods and services are available + :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to + Helm default timeout: 300s) + :param params: new dictionary of key-value pairs for instantiation parameters + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return: reference to the new revision number of the KDU instance + """ + + @abc.abstractmethod + async def rollback( + self, + cluster_uuid: str, + kdu_instance: str, + revision=0, + db_dict: dict = None + ): + """ + Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call. + It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration. + This would be exposed as Day-2 primitive. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :param revision: revision to which revert changes. If omitted, it will revert the last update only + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + :return:If successful, reference to the current active revision of the KDU instance after the rollback + """ + + @abc.abstractmethod + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str + ): + """ + Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen + after all _terminate-config-primitive_ of the VNF are invoked). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be deleted + :return: True if successful + """ + + @abc.abstractmethod + async def inspect_kdu( + self, + kdu_model: str + ) -> str: + """ + These calls will retrieve from the Charm/Bundle: + + - The list of configurable values and their defaults (e.g. in Charts, it would retrieve + the contents of `values.yaml`). + - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle. + + :param cluster_uuid: the cluster to get the information + :param kdu_model: chart/bundle reference + :return: If successful, it will return a dictionary containing the list of available parameters + and their default values + """ + + @abc.abstractmethod + async def help_kdu( + self, + kdu_model: str + ) -> str: + """ + + :param cluster_uuid: the cluster to get the information + :param kdu_model: chart/bundle reference + :return: If successful, it will return the contents of the 'readme.md' + """ + + @abc.abstractmethod + async def status_kdu( + self, + cluster_uuid: str, + kdu_instance: str + ) -> str: + """ + This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve + the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied + to a given instance. This call would be based on the `status` call. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :return: If successful, it will return the following vector of arguments: + - K8s `namespace` in the cluster where the KDU lives + - `state` of the KDU instance. It can be: + - UNKNOWN + - DEPLOYED + - DELETED + - SUPERSEDED + - FAILED or + - DELETING + - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources + - Last `deployment_time`. + + """ + + """ + ################################################################################################## + ########################################## P R I V A T E ######################################### + ################################################################################################## + """ + + async def write_app_status_to_db( + self, + db_dict: dict, + status: str, + detailed_status: str, + operation: str + ) -> bool: + + if not self.db: + self.warning('No db => No database write') + return False + + if not db_dict: + self.warning('No db_dict => No database write') + return False + + self.debug('status={}'.format(status)) + + try: + + the_table = db_dict['collection'] + the_filter = db_dict['filter'] + the_path = db_dict['path'] + if not the_path[-1] == '.': + the_path = the_path + '.' + update_dict = { + the_path + 'operation': operation, + the_path + 'status': status, + the_path + 'detailed-status': detailed_status, + the_path + 'status-time': str(time.time()), + } + + self.db.set_one( + table=the_table, + q_filter=the_filter, + update_dict=update_dict, + fail_on_empty=True + ) + + # database callback + if self.on_update_db: + if asyncio.iscoroutinefunction(self.on_update_db): + await self.on_update_db(the_table, the_filter, the_path, update_dict) + else: + self.on_update_db(the_table, the_filter, the_path, update_dict) + + return True + + except Exception as e: + self.info('Exception writing status to database: {}'.format(e)) + return False diff --git a/n2vc/loggable.py b/n2vc/loggable.py new file mode 100644 index 0000000..40efa24 --- /dev/null +++ b/n2vc/loggable.py @@ -0,0 +1,167 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + + +import logging +import asyncio +import time +import inspect +import datetime +import threading # only for logging purposes (not for using threads) + + +class Loggable: + + def __init__( + self, + log, + log_to_console: bool = False, + prefix: str = '' + ): + + self._last_log_time = None # used for time increment in logging + self._log_to_console = log_to_console + self._prefix = prefix + if log is not None: + self.log = log + else: + self.log = logging.getLogger(__name__) + + def debug(self, msg: str): + self._log_msg(log_level='DEBUG', msg=msg) + + def info(self, msg: str): + self._log_msg(log_level='INFO', msg=msg) + + def warning(self, msg: str): + self._log_msg(log_level='WARNING', msg=msg) + + def error(self, msg: str): + self._log_msg(log_level='ERROR', msg=msg) + + def critical(self, msg: str): + self._log_msg(log_level='CRITICAL', msg=msg) + + ################################################################################################## + + def _log_msg(self, log_level: str, msg: str): + """Generic log method""" + msg = self._format_log( + log_level=log_level, + msg=msg, + obj=self, + level=3, + include_path=False, + include_thread=False, + include_coroutine=True + ) + if self._log_to_console: + print(msg) + else: + if self.log is not None: + if log_level == 'DEBUG': + self.log.debug(msg) + elif log_level == 'INFO': + self.log.info(msg) + elif log_level == 'WARNING': + self.log.warning(msg) + elif log_level == 'ERROR': + self.log.error(msg) + elif log_level == 'CRITICAL': + self.log.critical(msg) + + def _format_log( + self, + log_level: str, + msg: str = '', + obj: object = None, + level: int = None, + include_path: bool = False, + include_thread: bool = False, + include_coroutine: bool = True + ) -> str: + + # time increment from last log + now = time.perf_counter() + if self._last_log_time is None: + time_str = ' (+0.000)' + else: + diff = round(now - self._last_log_time, 3) + time_str = ' (+{})'.format(diff) + self._last_log_time = now + + if level is None: + level = 1 + + # stack info + fi = inspect.stack()[level] + filename = fi.filename + func = fi.function + lineno = fi.lineno + # filename without path + if not include_path: + i = filename.rfind('/') + if i > 0: + filename = filename[i+1:] + + # datetime + dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') + dt = dt + time_str + dt = time_str # logger already shows datetime + + # current thread + if include_thread: + thread_name = 'th:{}'.format(threading.current_thread().getName()) + else: + thread_name = '' + + # current coroutine + + coroutine_id = '' + if include_coroutine: + try: + if asyncio.Task.current_task() is not None: + def print_cor_name(c): + import inspect + try: + for m in inspect.getmembers(c): + if m[0] == '__name__': + return m[1] + except Exception: + pass + coro = asyncio.Task.current_task()._coro + coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro)) + except Exception: + coroutine_id = '' + + # classname + if obj is not None: + obj_type = obj.__class__.__name__ # type: str + log_msg = \ + '{} {} {} {} {}::{}.{}():{}\n{}'\ + .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg)) + else: + log_msg = \ + '{} {} {} {} {}::{}():{}\n{}'\ + .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg)) + + return log_msg -- 2.17.1