# contact with: nfvlabs@tid.es
##
-import asyncio
-from n2vc.loggable import Loggable
import abc
+import asyncio
import time
+from n2vc.loggable import Loggable
-class K8sConnector(abc.ABC, Loggable):
+class K8sConnector(abc.ABC, Loggable):
"""
- ##################################################################################################
- ########################################## P U B L I C ###########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
- def __init__(
- self,
- db: object,
- log: object = None,
- on_update_db=None
- ):
+ @staticmethod
+ def generate_kdu_instance_name(**kwargs):
+ raise NotImplementedError("Method not implemented")
+
+ def __init__(self, db: object, log: object = None, on_update_db=None):
"""
:param db: database object to write current operation status
"""
# parent class
- Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S')
+ Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S")
- self.info('Initializing generic K8S connector')
+ # self.log.info('Initializing generic K8S connector')
# the database and update callback
self.db = db
self.on_update_db = on_update_db
- self.info('K8S generic connector initialized')
+ # self.log.info('K8S generic connector initialized')
@abc.abstractmethod
async def init_env(
- self,
- k8s_creds: str,
- namespace: str = 'kube-system',
- reuse_cluster_uuid = None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
"""
- It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
+ It prepares a given K8s cluster environment to run Charts or juju Bundles on
+ both sides:
client (OSM)
server (Tiller/Charm)
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
- :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for the K8s engine (helm
+ tiller, juju). By default, 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
- (on error, an exception will be raised)
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster (on error, an exception will be raised)
"""
@abc.abstractmethod
async def repo_add(
- self,
- cluster_uuid: str,
- name: str,
- url: str,
- repo_type: str = 'chart'
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
"""
Add a new repository to OSM database
"""
@abc.abstractmethod
- async def repo_list(
- self,
- cluster_uuid: str
- ):
+ async def repo_list(self, cluster_uuid: str):
"""
Get the list of registered repositories
"""
@abc.abstractmethod
- async def repo_remove(
- self,
- cluster_uuid: str,
- name: str
- ):
+ async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
:return: True if successful
"""
+ @abc.abstractmethod
+ async def synchronize_repos(self, cluster_uuid: str, name: str):
+ """
+ Synchronizes the list of repositories created in the cluster with
+ the repositories added by the NBI
+
+ :param cluster_uuid: the cluster
+ :return: List of repositories deleted from the cluster and dictionary with
+ repos added
+ """
+
@abc.abstractmethod
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
"""
- Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters.
- Intended to be used e.g. when the NS instance is deleted.
+ Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list
+ of known K8s clusters. Intended to be used e.g. when the NS instance is deleted.
:param cluster_uuid: UUID of a K8s cluster known by OSM.
:param force: force deletion, even in case there are deployed releases
- :param uninstall_sw: flag to indicate that sw uninstallation from software is needed
+ :param uninstall_sw: flag to indicate that sw uninstallation from software is
+ needed
:return: str: kdu_instance generated by helm
"""
@abc.abstractmethod
async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
):
"""
- Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle
- properly parametrized (in practice, this call would happen before any _initial-config-primitive_
- of the VNF is called).
+ Deploys of a new KDU instance. It would implicitly rely on the `install` call
+ to deploy the Chart/Bundle properly parametrized (in practice, this call would
+ happen before any _initial-config-primitive_of the VNF is called).
:param cluster_uuid: UUID of a K8s cluster known by OSM
- :param kdu_model: chart/bundle:version reference (string), which can be either of these options:
+ :param kdu_model: chart/bundle:version reference (string), which can be either
+ of these options:
- a name of chart/bundle available via the repos known by OSM
- a path to a packaged chart/bundle
- a path to an unpacked chart/bundle directory or a URL
- :param atomic: If set, installation process purges chart/bundle on fail, also will wait until
- all the K8s objects are active
- :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
- Helm default timeout: 300s)
- :param params: dictionary of key-value pairs for instantiation parameters (overriding default values)
+ :param kdu_instance: Kdu instance name
+ :param atomic: If set, installation process purges chart/bundle on fail, also
+ will wait until all the K8s objects are active
+ :param timeout: Time in seconds to wait for the install of the chart/bundle
+ (defaults to Helm default timeout: 300s)
+ :param params: dictionary of key-value pairs for instantiation parameters
+ (overriding default values)
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ It contains a dict with {collection: <str>, filter: {},
+ path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
:return: True if successful
"""
@abc.abstractmethod
async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
):
"""
- Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle.
- It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive.
+ Upgrades an existing KDU instance. It would implicitly use the `upgrade` call
+ over an existing Chart/Bundle. It can be used both to upgrade the chart or to
+ reconfigure it. This would be exposed as Day-2 primitive.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be updated
:param kdu_model: new chart/bundle:version reference
- :param atomic: rollback in case of fail and wait for pods and services are available
- :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to
- Helm default timeout: 300s)
+ :param atomic: rollback in case of fail and wait for pods and services are
+ available
+ :param timeout: Time in seconds to wait for the install of the chart/bundle
+ (defaults to Helm default timeout: 300s)
:param params: new dictionary of key-value pairs for instantiation parameters
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ It contains a dict with {collection: <str>, filter: {},
+ path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
:return: reference to the new revision number of the KDU instance
"""
+ @abc.abstractmethod
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ) -> bool:
+ """
+ Scales an application in KDU instance.
+
+ :param: kdu_instance str: KDU instance name
+ :param: scale int: Scale to which to set this application
+ :param: resource_name str: Resource name (Application name)
+ :param: timeout float: The time, in seconds, to wait for the install
+ to finish
+ :param kwargs: Additional parameters
+
+ :return: If successful, returns True
+ """
+
+ @abc.abstractmethod
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> int:
+ """
+ Get an application scale count.
+
+ :param: resource_name str: Resource name (Application name)
+ :param: kdu_instance str: KDU instance name
+ :param kwargs: Additional parameters
+
+ :return: Return application instance count
+ """
+
@abc.abstractmethod
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision=0,
- db_dict: dict = None
+ self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
"""
- Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call.
- It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration.
- This would be exposed as Day-2 primitive.
+ Rolls back a previous update of a KDU instance. It would implicitly use the
+ `rollback` call. It can be used both to rollback from a Chart/Bundle version
+ update or from a reconfiguration. This would be exposed as Day-2 primitive.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
- :param revision: revision to which revert changes. If omitted, it will revert the last update only
+ :param revision: revision to which revert changes. If omitted, it will revert
+ the last update only
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
- :return:If successful, reference to the current active revision of the KDU instance after the rollback
+ It contains a dict with {collection: <str>, filter: {},
+ path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :return:If successful, reference to the current active revision of the KDU
+ instance after the rollback
"""
@abc.abstractmethod
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str):
"""
- Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
- after all _terminate-config-primitive_ of the VNF are invoked).
+ Removes an existing KDU instance. It would implicitly use the `delete` call
+ (this call would happen after all _terminate-config-primitive_ of the VNF are
+ invoked).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be deleted
"""
@abc.abstractmethod
- async def inspect_kdu(
- self,
- kdu_model: str
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
"""
- These calls will retrieve from the Charm/Bundle:
- - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
- the contents of `values.yaml`).
- - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
+ @abc.abstractmethod
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+ """
+ These calls will retrieve from the Chart/Bundle:
+
+ - The list of configurable values and their defaults (e.g. in Charts,
+ it would retrieve the contents of `values.yaml`).
+ - If available, any embedded help file (e.g. `readme.md`) embedded in the
+ Chart/Bundle.
- :param cluster_uuid: the cluster to get the information
:param kdu_model: chart/bundle reference
- :return: If successful, it will return a dictionary containing the list of available parameters
- and their default values
+ :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
+ even stable URL)
+ :return:
+
+ If successful, it will return the available parameters and their default values
+ as provided by the backend.
"""
@abc.abstractmethod
- async def help_kdu(
- self,
- kdu_model: str
- ) -> str:
+ async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
"""
- :param cluster_uuid: the cluster to get the information
:param kdu_model: chart/bundle reference
+ :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
+ even stable URL)
:return: If successful, it will return the contents of the 'readme.md'
"""
@abc.abstractmethod
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> str:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
"""
- This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve
- the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied
- to a given instance. This call would be based on the `status` call.
+ This call would retrieve tha current state of a given KDU instance. It would be
+ would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+ values_ of the configuration parameters applied to a given instance. This call
+ would be based on the `status` call.
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
- SUPERSEDED
- FAILED or
- DELETING
- - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources
+ - List of `resources` (objects) that this release consists of, sorted by kind,
+ and the status of those resources
- Last `deployment_time`.
"""
+ @abc.abstractmethod
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """
+ Returns a list of services defined for the specified kdu instance.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param namespace: K8s namespace used by the KDU instance
+ :return: If successful, it will return a list of services, Each service
+ can have the following data:
+ - `name` of the service
+ - `type` type of service in the k8 cluster
+ - `ports` List of ports offered by the service, for each port includes at least
+ name, port, protocol
+ - `cluster_ip` Internal ip to be used inside k8s cluster
+ - `external_ip` List of external ips (in case they are available)
+ """
+
+ @abc.abstractmethod
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str = None
+ ) -> object:
+ """
+ Obtains the data of the specified service in the k8cluster.
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param service_name: name of the K8s service in the specified namespace
+ :param namespace: K8s namespace used by the KDU instance
+ :return: If successful, it will return a list of services, Each service can have
+ the following data:
+ - `name` of the service
+ - `type` type of service in the k8 cluster
+ - `ports` List of ports offered by the service, for each port includes at least
+ name, port, protocol
+ - `cluster_ip` Internal ip to be used inside k8s cluster
+ - `external_ip` List of external ips (in case they are available)
+ """
+
"""
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
"""
async def write_app_status_to_db(
- self,
- db_dict: dict,
- status: str,
- detailed_status: str,
- operation: str
+ self, db_dict: dict, status: str, detailed_status: str, operation: str
) -> bool:
if not self.db:
- self.warning('No db => No database write')
+ self.warning("No db => No database write")
return False
if not db_dict:
- self.warning('No db_dict => No database write')
+ self.warning("No db_dict => No database write")
return False
- self.debug('status={}'.format(status))
+ self.log.debug("status={}".format(status))
try:
- the_table = db_dict['collection']
- the_filter = db_dict['filter']
- the_path = db_dict['path']
- if not the_path[-1] == '.':
- the_path = the_path + '.'
+ the_table = db_dict["collection"]
+ the_filter = db_dict["filter"]
+ the_path = db_dict["path"]
+ if not the_path[-1] == ".":
+ the_path = the_path + "."
update_dict = {
- the_path + 'operation': operation,
- the_path + 'status': status,
- the_path + 'detailed-status': detailed_status,
- the_path + 'status-time': str(time.time()),
+ the_path + "operation": operation,
+ the_path + "status": status,
+ the_path + "detailed-status": detailed_status,
+ the_path + "status-time": str(time.time()),
}
self.db.set_one(
table=the_table,
q_filter=the_filter,
update_dict=update_dict,
- fail_on_empty=True
+ fail_on_empty=True,
)
# database callback
if self.on_update_db:
if asyncio.iscoroutinefunction(self.on_update_db):
- await self.on_update_db(the_table, the_filter, the_path, update_dict)
+ await self.on_update_db(
+ the_table, the_filter, the_path, update_dict
+ )
else:
self.on_update_db(the_table, the_filter, the_path, update_dict)
return True
except Exception as e:
- self.info('Exception writing status to database: {}'.format(e))
+ self.log.info("Exception writing status to database: {}".format(e))
return False