blob: 8fd8a7dc26a96c23ae9b3ca5ccc70d38f690a262 [file] [log] [blame]
##
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
# This file is part of OSM
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
import abc
import asyncio
from typing import Union
import time
from n2vc.loggable import Loggable
class K8sConnector(abc.ABC, Loggable):
"""
####################################################################################
################################### P U B L I C ####################################
####################################################################################
"""
@staticmethod
def generate_kdu_instance_name(**kwargs):
raise NotImplementedError("Method not implemented")
def __init__(self, db: object, log: object = None, on_update_db=None):
"""
:param db: database object to write current operation status
:param log: logger for tracing
:param on_update_db: callback called when k8s connector updates database
"""
# parent class
Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S")
# self.log.info('Initializing generic K8S connector')
# the database and update callback
self.db = db
self.on_update_db = on_update_db
# self.log.info('K8S generic connector initialized')
@abc.abstractmethod
async def init_env(
self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts or juju Bundles on
both sides:
client (OSM)
server (Tiller/Charm)
:param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
'.kube/config'
:param namespace: optional namespace to be used for the K8s engine (helm
tiller, juju). By default, 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster (on error, an exception will be raised)
"""
@abc.abstractmethod
async def repo_add(
self,
cluster_uuid: str,
name: str,
url: str,
repo_type: str = "chart",
cert: str = None,
user: str = None,
password: str = None,
):
"""
Add a new repository to OSM database
:param cluster_uuid: the cluster
:param name: name for the repo in OSM
:param url: URL of the repo
:param repo_type: either "chart" or "bundle"
:return: True if successful
"""
@abc.abstractmethod
async def repo_list(self, cluster_uuid: str):
"""
Get the list of registered repositories
:param cluster_uuid: the cluster
:return: list of registered repositories: [ (name, url) .... ]
"""
@abc.abstractmethod
async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
:param name: repo name in OSM
:param cluster_uuid: the cluster
:return: True if successful
"""
@abc.abstractmethod
async def synchronize_repos(self, cluster_uuid: str, name: str):
"""
Synchronizes the list of repositories created in the cluster with
the repositories added by the NBI
:param cluster_uuid: the cluster
:return: List of repositories deleted from the cluster and dictionary with
repos added
"""
@abc.abstractmethod
async def reset(
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
"""
Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list
of known K8s clusters. Intended to be used e.g. when the NS instance is deleted.
:param cluster_uuid: UUID of a K8s cluster known by OSM.
:param force: force deletion, even in case there are deployed releases
:param uninstall_sw: flag to indicate that sw uninstallation from software is
needed
:return: str: kdu_instance generated by helm
"""
@abc.abstractmethod
async def install(
self,
cluster_uuid: str,
kdu_model: str,
kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
):
"""
Deploys of a new KDU instance. It would implicitly rely on the `install` call
to deploy the Chart/Bundle properly parametrized (in practice, this call would
happen before any _initial-config-primitive_of the VNF is called).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_model: chart/bundle:version reference (string), which can be either
of these options:
- a name of chart/bundle available via the repos known by OSM
- a path to a packaged chart/bundle
- a path to an unpacked chart/bundle directory or a URL
:param kdu_instance: Kdu instance name
:param atomic: If set, installation process purges chart/bundle on fail, also
will wait until all the K8s objects are active
:param timeout: Time in seconds to wait for the install of the chart/bundle
(defaults to Helm default timeout: 300s)
:param params: dictionary of key-value pairs for instantiation parameters
(overriding default values)
:param dict db_dict: where to write into database when the status changes.
It contains a dict with {collection: <str>, filter: {},
path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
:param kdu_name: Name of the KDU instance to be installed
:param namespace: K8s namespace to use for the KDU instance
:return: True if successful
"""
@abc.abstractmethod
async def upgrade(
self,
cluster_uuid: str,
kdu_instance: str,
kdu_model: str = None,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
reset_values: bool = False,
reuse_values: bool = True,
reset_then_reuse_values: bool = False,
force: bool = False,
):
"""
Upgrades an existing KDU instance. It would implicitly use the `upgrade` call
over an existing Chart/Bundle. It can be used both to upgrade the chart or to
reconfigure it. This would be exposed as Day-2 primitive.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be updated
:param kdu_model: new chart/bundle:version reference
:param atomic: rollback in case of fail and wait for pods and services are
available
:param timeout: Time in seconds to wait for the install of the chart/bundle
(defaults to Helm default timeout: 300s)
:param params: new dictionary of key-value pairs for instantiation parameters
:param dict db_dict: where to write into database when the status changes.
It contains a dict with {collection: <str>, filter: {},
path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
:param reset_values: force reseting values
:param reuse_values: force reusing values (default)
:param reset_then_reuse_values: forces reseting values, then apply the last release's values
:param force: force recreation of resources if necessary
:return: reference to the new revision number of the KDU instance
"""
@abc.abstractmethod
async def scale(
self,
kdu_instance: str,
scale: int,
resource_name: str,
total_timeout: float = 1800,
cluster_uuid: str = None,
kdu_model: str = None,
atomic: bool = True,
db_dict: dict = None,
**kwargs,
) -> bool:
"""Scale a resource in a KDU instance.
Args:
kdu_instance: KDU instance name
scale: Scale to which to set the resource
resource_name: Resource name
total_timeout: The time, in seconds, to wait for the install
to finish
cluster_uuid: The UUID of the cluster
kdu_model: The chart/bundle reference
atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
The --wait flag will be set automatically if --atomic is used
db_dict: Dictionary for any additional data
kwargs: Additional parameters
vca_id (str): VCA ID
Returns:
True if successful, False otherwise
"""
@abc.abstractmethod
async def get_scale_count(
self,
resource_name: str,
kdu_instance: str,
cluster_uuid: str,
kdu_model: str,
timeout: float = 300,
**kwargs,
) -> int:
"""Get a resource scale count in a KDU instance.
Args:
resource_name: Resource name
kdu_instance: KDU instance name
cluster_uuid: The UUID of the cluster
kdu_model: chart/bundle reference
timeout: The time, in seconds, to wait
kwargs: Additional parameters
Returns:
Resource instance count
"""
@abc.abstractmethod
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
"""
Rolls back a previous update of a KDU instance. It would implicitly use the
`rollback` call. It can be used both to rollback from a Chart/Bundle version
update or from a reconfiguration. This would be exposed as Day-2 primitive.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param revision: revision to which revert changes. If omitted, it will revert
the last update only
:param dict db_dict: where to write into database when the status changes.
It contains a dict with {collection: <str>, filter: {},
path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
:return:If successful, reference to the current active revision of the KDU
instance after the rollback
"""
@abc.abstractmethod
async def uninstall(self, cluster_uuid: str, kdu_instance: str):
"""
Removes an existing KDU instance. It would implicitly use the `delete` call
(this call would happen after all _terminate-config-primitive_ of the VNF are
invoked).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be deleted
:return: True if successful
"""
@abc.abstractmethod
async def exec_primitive(
self,
cluster_uuid: str = None,
kdu_instance: str = None,
primitive_name: str = None,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
) -> str:
"""Exec primitive (Juju action)
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
:db_dict: Dictionary for any additional data
:return: Returns the output of the action
"""
@abc.abstractmethod
async def upgrade_charm(
self,
ee_id: str = None,
path: str = None,
charm_id: str = None,
charm_type: str = None,
timeout: float = None,
) -> str:
"""This method upgrade charms in VNFs
Args:
ee_id: Execution environment id
path: Local path to the charm
charm_id: charm-id
charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
timeout: (Float) Timeout for the ns update operation
Returns:
The output of the update operation if status equals to "completed"
"""
@abc.abstractmethod
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
"""
These calls will retrieve from the Chart/Bundle:
- The list of configurable values and their defaults (e.g. in Charts,
it would retrieve the contents of `values.yaml`).
- If available, any embedded help file (e.g. `readme.md`) embedded in the
Chart/Bundle.
:param kdu_model: chart/bundle reference
:param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
even stable URL)
:return:
If successful, it will return the available parameters and their default values
as provided by the backend.
"""
@abc.abstractmethod
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
"""
:param kdu_model: chart/bundle reference
:param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
even stable URL)
:return: If successful, it will return the contents of the 'readme.md'
"""
@abc.abstractmethod
async def status_kdu(
self, cluster_uuid: str, kdu_instance: str, yaml_format: str
) -> Union[str, dict]:
"""
This call would retrieve tha current state of a given KDU instance. It would be
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
values_ of the configuration parameters applied to a given instance. This call
would be based on the `status` call.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param yaml_format: if the return shall be returned as an YAML string or as a
dictionary
:return: If successful, it will return the following vector of arguments:
- K8s `namespace` in the cluster where the KDU lives
- `state` of the KDU instance. It can be:
- UNKNOWN
- DEPLOYED
- DELETED
- SUPERSEDED
- FAILED or
- DELETING
- List of `resources` (objects) that this release consists of, sorted by kind,
and the status of those resources
- Last `deployment_time`.
"""
@abc.abstractmethod
async def get_services(
self, cluster_uuid: str, kdu_instance: str, namespace: str
) -> list:
"""
Returns a list of services defined for the specified kdu instance.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param namespace: K8s namespace used by the KDU instance
:return: If successful, it will return a list of services, Each service
can have the following data:
- `name` of the service
- `type` type of service in the k8 cluster
- `ports` List of ports offered by the service, for each port includes at least
name, port, protocol
- `cluster_ip` Internal ip to be used inside k8s cluster
- `external_ip` List of external ips (in case they are available)
"""
@abc.abstractmethod
async def get_service(
self, cluster_uuid: str, service_name: str, namespace: str = None
) -> object:
"""
Obtains the data of the specified service in the k8cluster.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param service_name: name of the K8s service in the specified namespace
:param namespace: K8s namespace used by the KDU instance
:return: If successful, it will return a list of services, Each service can have
the following data:
- `name` of the service
- `type` type of service in the k8 cluster
- `ports` List of ports offered by the service, for each port includes at least
name, port, protocol
- `cluster_ip` Internal ip to be used inside k8s cluster
- `external_ip` List of external ips (in case they are available)
"""
"""
####################################################################################
################################### P R I V A T E ##################################
####################################################################################
"""
async def write_app_status_to_db(
self, db_dict: dict, status: str, detailed_status: str, operation: str
) -> bool:
"""
This method will write the status of the application to the database.
:param db_dict: A dictionary with the database necessary information. It shall contain the values for the keys:
- "collection": The Mongo DB collection to write to
- "filter": The query filter to use in the update process
- "path": The dot separated keys which targets the object to be updated
:param status: Status of the application
:param detailed_status: Detailed status of the application
:param operation: Operation that is being performed on the application
:return: True if successful
"""
if not self.db:
self.warning("No db => No database write")
return False
if not db_dict:
self.warning("No db_dict => No database write")
return False
self.log.debug("status={}".format(status))
try:
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
if not the_path[-1] == ".":
the_path = the_path + "."
update_dict = {
the_path + "operation": operation,
the_path + "status": status,
the_path + "detailed-status": detailed_status,
the_path + "status-time": str(time.time()),
}
self.db.set_one(
table=the_table,
q_filter=the_filter,
update_dict=update_dict,
fail_on_empty=True,
)
# database callback
if self.on_update_db:
if asyncio.iscoroutinefunction(self.on_update_db):
await self.on_update_db(
the_table, the_filter, the_path, update_dict
)
else:
self.on_update_db(the_table, the_filter, the_path, update_dict)
return True
except Exception as e:
self.log.info("Exception writing status to database: {}".format(e))
return False