--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+import asyncio
+from n2vc.loggable import Loggable
+import abc
+import time
+
+
+class K8sConnector(abc.ABC, Loggable):
+
+ """
+ ##################################################################################################
+ ########################################## P U B L I C ###########################################
+ ##################################################################################################
+ """
+
+ def __init__(
+ self,
+ db: object,
+ log: object = None,
+ on_update_db=None
+ ):
+ """
+
+ :param db: database object to write current operation status
+ :param log: logger for tracing
+ :param on_update_db: callback called when k8s connector updates database
+ """
+
+ # parent class
+ Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S')
+
+ self.info('Initializing generic K8S connector')
+
+ # the database and update callback
+ self.db = db
+ self.on_update_db = on_update_db
+
+ self.info('K8S generic connector initialized')
+
+ @abc.abstractmethod
+ async def init_env(
+ self,
+ k8s_creds: str,
+ namespace: str = 'kube-system',
+ reuse_cluster_uuid = None
+ ) -> (str, bool):
+ """
+ It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
+ client (OSM)
+ server (Tiller/Charm)
+
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+ :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+ (on error, an exception will be raised)
+ """
+
+ @abc.abstractmethod
+ async def repo_add(
+ self,
+ cluster_uuid: str,
+ name: str,
+ url: str,
+ repo_type: str = 'chart'
+ ):
+ """
+ Add a new repository to OSM database
+
+ :param cluster_uuid: the cluster
+ :param name: name for the repo in OSM
+ :param url: URL of the repo
+ :param repo_type: either "chart" or "bundle"
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def repo_list(
+ self,
+ cluster_uuid: str
+ ):
+ """
+ Get the list of registered repositories
+
+ :param cluster_uuid: the cluster
+ :return: list of registered repositories: [ (name, url) .... ]
+ """
+
+ @abc.abstractmethod
+ async def repo_remove(
+ self,
+ cluster_uuid: str,
+ name: str
+ ):
+ """
+ Remove a repository from OSM
+
+ :param name: repo name in OSM
+ :param cluster_uuid: the cluster
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def reset(
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False
+ ) -> bool:
+ """
+ Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters.
+ Intended to be used e.g. when the NS instance is deleted.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM.
+ :param force: force deletion, even in case there are deployed releases
+ :param uninstall_sw: flag to indicate that sw uninstallation from software is needed
+ :return: str: kdu_instance generated by helm
+ """
+
+ @abc.abstractmethod
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+ """
+ Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle
+ properly parametrized (in practice, this call would happen before any _initial-config-primitive_
+ of the VNF is called).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_model: chart/bundle:version reference (string), which can be either of these options:
+ - a name of chart/bundle available via the repos known by OSM
+ - a path to a packaged chart/bundle
+ - a path to an unpacked chart/bundle directory or a URL
+ :param atomic: If set, installation process purges chart/bundle on fail, also will wait until
+ all the K8s objects are active
+ :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+ Helm default timeout: 300s)
+ :param params: dictionary of key-value pairs for instantiation parameters (overriding default values)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def upgrade(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None
+ ):
+ """
+ Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle.
+ It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance to be updated
+ :param kdu_model: new chart/bundle:version reference
+ :param atomic: rollback in case of fail and wait for pods and services are available
+ :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
+ Helm default timeout: 300s)
+ :param params: new dictionary of key-value pairs for instantiation parameters
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return: reference to the new revision number of the KDU instance
+ """
+
+ @abc.abstractmethod
+ async def rollback(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision=0,
+ db_dict: dict = None
+ ):
+ """
+ Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call.
+ It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration.
+ This would be exposed as Day-2 primitive.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param revision: revision to which revert changes. If omitted, it will revert the last update only
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return:If successful, reference to the current active revision of the KDU instance after the rollback
+ """
+
+ @abc.abstractmethod
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ):
+ """
+ Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
+ after all _terminate-config-primitive_ of the VNF are invoked).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance to be deleted
+ :return: True if successful
+ """
+
+ @abc.abstractmethod
+ async def inspect_kdu(
+ self,
+ kdu_model: str
+ ) -> str:
+ """
+ These calls will retrieve from the Charm/Bundle:
+
+ - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
+ the contents of `values.yaml`).
+ - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
+
+ :param cluster_uuid: the cluster to get the information
+ :param kdu_model: chart/bundle reference
+ :return: If successful, it will return a dictionary containing the list of available parameters
+ and their default values
+ """
+
+ @abc.abstractmethod
+ async def help_kdu(
+ self,
+ kdu_model: str
+ ) -> str:
+ """
+
+ :param cluster_uuid: the cluster to get the information
+ :param kdu_model: chart/bundle reference
+ :return: If successful, it will return the contents of the 'readme.md'
+ """
+
+ @abc.abstractmethod
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str
+ ) -> str:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve
+ the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied
+ to a given instance. This call would be based on the `status` call.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources
+ - Last `deployment_time`.
+
+ """
+
+ """
+ ##################################################################################################
+ ########################################## P R I V A T E #########################################
+ ##################################################################################################
+ """
+
+ async def write_app_status_to_db(
+ self,
+ db_dict: dict,
+ status: str,
+ detailed_status: str,
+ operation: str
+ ) -> bool:
+
+ if not self.db:
+ self.warning('No db => No database write')
+ return False
+
+ if not db_dict:
+ self.warning('No db_dict => No database write')
+ return False
+
+ self.debug('status={}'.format(status))
+
+ try:
+
+ the_table = db_dict['collection']
+ the_filter = db_dict['filter']
+ the_path = db_dict['path']
+ if not the_path[-1] == '.':
+ the_path = the_path + '.'
+ update_dict = {
+ the_path + 'operation': operation,
+ the_path + 'status': status,
+ the_path + 'detailed-status': detailed_status,
+ the_path + 'status-time': str(time.time()),
+ }
+
+ self.db.set_one(
+ table=the_table,
+ q_filter=the_filter,
+ update_dict=update_dict,
+ fail_on_empty=True
+ )
+
+ # database callback
+ if self.on_update_db:
+ if asyncio.iscoroutinefunction(self.on_update_db):
+ await self.on_update_db(the_table, the_filter, the_path, update_dict)
+ else:
+ self.on_update_db(the_table, the_filter, the_path, update_dict)
+
+ return True
+
+ except Exception as e:
+ self.info('Exception writing status to database: {}'.format(e))
+ return False
--- /dev/null
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+
+import logging
+import asyncio
+import time
+import inspect
+import datetime
+import threading # only for logging purposes (not for using threads)
+
+
+class Loggable:
+
+ def __init__(
+ self,
+ log,
+ log_to_console: bool = False,
+ prefix: str = ''
+ ):
+
+ self._last_log_time = None # used for time increment in logging
+ self._log_to_console = log_to_console
+ self._prefix = prefix
+ if log is not None:
+ self.log = log
+ else:
+ self.log = logging.getLogger(__name__)
+
+ def debug(self, msg: str):
+ self._log_msg(log_level='DEBUG', msg=msg)
+
+ def info(self, msg: str):
+ self._log_msg(log_level='INFO', msg=msg)
+
+ def warning(self, msg: str):
+ self._log_msg(log_level='WARNING', msg=msg)
+
+ def error(self, msg: str):
+ self._log_msg(log_level='ERROR', msg=msg)
+
+ def critical(self, msg: str):
+ self._log_msg(log_level='CRITICAL', msg=msg)
+
+ ##################################################################################################
+
+ def _log_msg(self, log_level: str, msg: str):
+ """Generic log method"""
+ msg = self._format_log(
+ log_level=log_level,
+ msg=msg,
+ obj=self,
+ level=3,
+ include_path=False,
+ include_thread=False,
+ include_coroutine=True
+ )
+ if self._log_to_console:
+ print(msg)
+ else:
+ if self.log is not None:
+ if log_level == 'DEBUG':
+ self.log.debug(msg)
+ elif log_level == 'INFO':
+ self.log.info(msg)
+ elif log_level == 'WARNING':
+ self.log.warning(msg)
+ elif log_level == 'ERROR':
+ self.log.error(msg)
+ elif log_level == 'CRITICAL':
+ self.log.critical(msg)
+
+ def _format_log(
+ self,
+ log_level: str,
+ msg: str = '',
+ obj: object = None,
+ level: int = None,
+ include_path: bool = False,
+ include_thread: bool = False,
+ include_coroutine: bool = True
+ ) -> str:
+
+ # time increment from last log
+ now = time.perf_counter()
+ if self._last_log_time is None:
+ time_str = ' (+0.000)'
+ else:
+ diff = round(now - self._last_log_time, 3)
+ time_str = ' (+{})'.format(diff)
+ self._last_log_time = now
+
+ if level is None:
+ level = 1
+
+ # stack info
+ fi = inspect.stack()[level]
+ filename = fi.filename
+ func = fi.function
+ lineno = fi.lineno
+ # filename without path
+ if not include_path:
+ i = filename.rfind('/')
+ if i > 0:
+ filename = filename[i+1:]
+
+ # datetime
+ dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
+ dt = dt + time_str
+ dt = time_str # logger already shows datetime
+
+ # current thread
+ if include_thread:
+ thread_name = 'th:{}'.format(threading.current_thread().getName())
+ else:
+ thread_name = ''
+
+ # current coroutine
+
+ coroutine_id = ''
+ if include_coroutine:
+ try:
+ if asyncio.Task.current_task() is not None:
+ def print_cor_name(c):
+ import inspect
+ try:
+ for m in inspect.getmembers(c):
+ if m[0] == '__name__':
+ return m[1]
+ except Exception:
+ pass
+ coro = asyncio.Task.current_task()._coro
+ coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro))
+ except Exception:
+ coroutine_id = ''
+
+ # classname
+ if obj is not None:
+ obj_type = obj.__class__.__name__ # type: str
+ log_msg = \
+ '{} {} {} {} {}::{}.{}():{}\n{}'\
+ .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg))
+ else:
+ log_msg = \
+ '{} {} {} {} {}::{}():{}\n{}'\
+ .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg))
+
+ return log_msg