K8s generic connector 99/8099/5
authorquilesj <e.nvi001.tid@telefonica.com>
Mon, 28 Oct 2019 17:08:00 +0000 (18:08 +0100)
committerquilesj <e.nvi001.tid@telefonica.com>
Tue, 12 Nov 2019 09:37:54 +0000 (10:37 +0100)
Change-Id: I7c7879d556783785f5510dcf0e63d8f6dda43d2c
Signed-off-by: quilesj <e.nvi001.tid@telefonica.com>
n2vc/k8s_conn.py [new file with mode: 0644]
n2vc/loggable.py [new file with mode: 0644]

diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py
new file mode 100644 (file)
index 0000000..d951c2c
--- /dev/null
@@ -0,0 +1,350 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import asyncio
+from n2vc.loggable import Loggable
+import abc
+import time
+
+
+class K8sConnector(abc.ABC, Loggable):
+
+    """
+    ##################################################################################################
+    ########################################## P U B L I C ###########################################
+    ##################################################################################################
+    """
+
+    def __init__(
+            self,
+            db: object,
+            log: object = None,
+            on_update_db=None
+    ):
+        """
+
+        :param db: database object to write current operation status
+        :param log: logger for tracing
+        :param on_update_db: callback called when k8s connector updates database
+        """
+
+        # parent class
+        Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S')
+
+        self.info('Initializing generic K8S connector')
+
+        # the database and update callback
+        self.db = db
+        self.on_update_db = on_update_db
+
+        self.info('K8S generic connector initialized')
+
+    @abc.abstractmethod
+    async def init_env(
+            self,
+            k8s_creds: str,
+            namespace: str = 'kube-system',
+            reuse_cluster_uuid = None
+    ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
+            client (OSM)
+            server (Tiller/Charm)
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+        :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+        (on error, an exception will be raised)
+        """
+
+    @abc.abstractmethod
+    async def repo_add(
+            self,
+            cluster_uuid: str,
+            name: str,
+            url: str,
+            repo_type: str = 'chart'
+    ):
+        """
+        Add a new repository to OSM database
+
+        :param cluster_uuid: the cluster
+        :param name: name for the repo in OSM
+        :param url: URL of the repo
+        :param repo_type: either "chart" or "bundle"
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def repo_list(
+            self,
+            cluster_uuid: str
+    ):
+        """
+        Get the list of registered repositories
+
+        :param cluster_uuid: the cluster
+        :return: list of registered repositories: [ (name, url) .... ]
+        """
+
+    @abc.abstractmethod
+    async def repo_remove(
+            self,
+            cluster_uuid: str,
+            name: str
+    ):
+        """
+        Remove a repository from OSM
+
+        :param name: repo name in OSM
+        :param cluster_uuid: the cluster
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def reset(
+            self,
+            cluster_uuid: str,
+            force: bool = False,
+            uninstall_sw: bool = False
+    ) -> bool:
+        """
+        Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters.
+        Intended to be used e.g. when the NS instance is deleted.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM.
+        :param force: force deletion, even in case there are deployed releases
+        :param uninstall_sw: flag to indicate that sw uninstallation from software is needed
+        :return: str: kdu_instance generated by helm
+        """
+
+    @abc.abstractmethod
+    async def install(
+            self,
+            cluster_uuid: str,
+            kdu_model: str,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+        """
+        Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle
+        properly parametrized (in practice, this call would happen before any _initial-config-primitive_
+        of the VNF is called).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_model: chart/bundle:version reference (string), which can be either of these options:
+            - a name of chart/bundle available via the repos known by OSM
+            - a path to a packaged chart/bundle
+            - a path to an unpacked chart/bundle directory or a URL
+        :param atomic: If set, installation process purges chart/bundle on fail, also will wait until
+            all the K8s objects are active
+        :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+            Helm default timeout: 300s)
+        :param params: dictionary of key-value pairs for instantiation parameters (overriding default values)
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def upgrade(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            kdu_model: str = None,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None
+    ):
+        """
+        Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle.
+        It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance to be updated
+        :param kdu_model: new chart/bundle:version reference
+        :param atomic: rollback in case of fail and wait for pods and services are available
+        :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+            Helm default timeout: 300s)
+        :param params: new dictionary of key-value pairs for instantiation parameters
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return: reference to the new revision number of the KDU instance
+        """
+
+    @abc.abstractmethod
+    async def rollback(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str,
+            revision=0,
+            db_dict: dict = None
+    ):
+        """
+        Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call.
+        It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration.
+        This would be exposed as Day-2 primitive.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :param revision: revision to which revert changes. If omitted, it will revert the last update only
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},  path: <str>},
+                            e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :return:If successful, reference to the current active revision of the KDU instance after the rollback
+        """
+
+    @abc.abstractmethod
+    async def uninstall(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ):
+        """
+        Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+        after all _terminate-config-primitive_ of the VNF are invoked).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance to be deleted
+        :return: True if successful
+        """
+
+    @abc.abstractmethod
+    async def inspect_kdu(
+            self,
+            kdu_model: str
+    ) -> str:
+        """
+        These calls will retrieve from the Charm/Bundle:
+
+            - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
+                the contents of `values.yaml`).
+            - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
+
+        :param cluster_uuid: the cluster to get the information
+        :param kdu_model: chart/bundle reference
+        :return: If successful, it will return a dictionary containing the list of available parameters
+            and their default values
+        """
+
+    @abc.abstractmethod
+    async def help_kdu(
+            self,
+            kdu_model: str
+    ) -> str:
+        """
+
+        :param cluster_uuid: the cluster to get the information
+        :param kdu_model: chart/bundle reference
+        :return: If successful, it will return the contents of the 'readme.md'
+        """
+
+    @abc.abstractmethod
+    async def status_kdu(
+            self,
+            cluster_uuid: str,
+            kdu_instance: str
+    ) -> str:
+        """
+        This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve
+        the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied
+        to a given instance. This call would be based on the `status` call.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :return: If successful, it will return the following vector of arguments:
+        - K8s `namespace` in the cluster where the KDU lives
+        - `state` of the KDU instance. It can be:
+              - UNKNOWN
+              - DEPLOYED
+              - DELETED
+              - SUPERSEDED
+              - FAILED or
+              - DELETING
+        - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources
+        - Last `deployment_time`.
+
+        """
+
+    """
+    ##################################################################################################
+    ########################################## P R I V A T E #########################################
+    ##################################################################################################
+    """
+
+    async def write_app_status_to_db(
+            self,
+            db_dict: dict,
+            status: str,
+            detailed_status: str,
+            operation: str
+    ) -> bool:
+
+        if not self.db:
+            self.warning('No db => No database write')
+            return False
+
+        if not db_dict:
+            self.warning('No db_dict => No database write')
+            return False
+
+        self.debug('status={}'.format(status))
+
+        try:
+
+            the_table = db_dict['collection']
+            the_filter = db_dict['filter']
+            the_path = db_dict['path']
+            if not the_path[-1] == '.':
+                the_path = the_path + '.'
+            update_dict = {
+                the_path + 'operation': operation,
+                the_path + 'status': status,
+                the_path + 'detailed-status': detailed_status,
+                the_path + 'status-time': str(time.time()),
+            }
+
+            self.db.set_one(
+                table=the_table,
+                q_filter=the_filter,
+                update_dict=update_dict,
+                fail_on_empty=True
+            )
+
+            # database callback
+            if self.on_update_db:
+                if asyncio.iscoroutinefunction(self.on_update_db):
+                    await self.on_update_db(the_table, the_filter, the_path, update_dict)
+                else:
+                    self.on_update_db(the_table, the_filter, the_path, update_dict)
+
+            return True
+
+        except Exception as e:
+            self.info('Exception writing status to database: {}'.format(e))
+            return False
diff --git a/n2vc/loggable.py b/n2vc/loggable.py
new file mode 100644 (file)
index 0000000..40efa24
--- /dev/null
@@ -0,0 +1,167 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+
+import logging
+import asyncio
+import time
+import inspect
+import datetime
+import threading    # only for logging purposes (not for using threads)
+
+
+class Loggable:
+
+    def __init__(
+            self,
+            log,
+            log_to_console: bool = False,
+            prefix: str = ''
+    ):
+
+        self._last_log_time = None   # used for time increment in logging
+        self._log_to_console = log_to_console
+        self._prefix = prefix
+        if log is not None:
+            self.log = log
+        else:
+            self.log = logging.getLogger(__name__)
+
+    def debug(self, msg: str):
+        self._log_msg(log_level='DEBUG', msg=msg)
+
+    def info(self, msg: str):
+        self._log_msg(log_level='INFO', msg=msg)
+
+    def warning(self, msg: str):
+        self._log_msg(log_level='WARNING', msg=msg)
+
+    def error(self, msg: str):
+        self._log_msg(log_level='ERROR', msg=msg)
+
+    def critical(self, msg: str):
+        self._log_msg(log_level='CRITICAL', msg=msg)
+
+    ##################################################################################################
+
+    def _log_msg(self, log_level: str, msg: str):
+        """Generic log method"""
+        msg = self._format_log(
+            log_level=log_level,
+            msg=msg,
+            obj=self,
+            level=3,
+            include_path=False,
+            include_thread=False,
+            include_coroutine=True
+        )
+        if self._log_to_console:
+            print(msg)
+        else:
+            if self.log is not None:
+                if log_level == 'DEBUG':
+                    self.log.debug(msg)
+                elif log_level == 'INFO':
+                    self.log.info(msg)
+                elif log_level == 'WARNING':
+                    self.log.warning(msg)
+                elif log_level == 'ERROR':
+                    self.log.error(msg)
+                elif log_level == 'CRITICAL':
+                    self.log.critical(msg)
+
+    def _format_log(
+            self,
+            log_level: str,
+            msg: str = '',
+            obj: object = None,
+            level: int = None,
+            include_path: bool = False,
+            include_thread: bool = False,
+            include_coroutine: bool = True
+    ) -> str:
+
+        # time increment from last log
+        now = time.perf_counter()
+        if self._last_log_time is None:
+            time_str = ' (+0.000)'
+        else:
+            diff = round(now - self._last_log_time, 3)
+            time_str = ' (+{})'.format(diff)
+        self._last_log_time = now
+
+        if level is None:
+            level = 1
+
+        # stack info
+        fi = inspect.stack()[level]
+        filename = fi.filename
+        func = fi.function
+        lineno = fi.lineno
+        # filename without path
+        if not include_path:
+            i = filename.rfind('/')
+            if i > 0:
+                filename = filename[i+1:]
+
+        # datetime
+        dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
+        dt = dt + time_str
+        dt = time_str       # logger already shows datetime
+
+        # current thread
+        if include_thread:
+            thread_name = 'th:{}'.format(threading.current_thread().getName())
+        else:
+            thread_name = ''
+
+        # current coroutine
+
+        coroutine_id = ''
+        if include_coroutine:
+            try:
+                if asyncio.Task.current_task() is not None:
+                    def print_cor_name(c):
+                        import inspect
+                        try:
+                            for m in inspect.getmembers(c):
+                                if m[0] == '__name__':
+                                    return m[1]
+                        except Exception:
+                            pass
+                    coro = asyncio.Task.current_task()._coro
+                    coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro))
+            except Exception:
+                coroutine_id = ''
+
+        # classname
+        if obj is not None:
+            obj_type = obj.__class__.__name__  # type: str
+            log_msg = \
+                '{} {} {} {} {}::{}.{}():{}\n{}'\
+                .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg))
+        else:
+            log_msg = \
+                '{} {} {} {} {}::{}():{}\n{}'\
+                .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg))
+
+        return log_msg