X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_conn.py;h=5bdc8acb2c6e804205d50510d612dafae9116be3;hp=11fb43e338a7243bdbc79647408fe9473485f969;hb=2c791b34626ff76ab1886a110599998f9de0df80;hpb=43c2e79e783aa65dba1234ae4c299944c5f1702e diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py index 11fb43e..5bdc8ac 100644 --- a/n2vc/k8s_conn.py +++ b/n2vc/k8s_conn.py @@ -20,26 +20,21 @@ # contact with: nfvlabs@tid.es ## -import asyncio -from n2vc.loggable import Loggable import abc +import asyncio import time +from n2vc.loggable import Loggable -class K8sConnector(abc.ABC, Loggable): +class K8sConnector(abc.ABC, Loggable): """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ - def __init__( - self, - db: object, - log: object = None, - on_update_db=None - ): + def __init__(self, db: object, log: object = None, on_update_db=None): """ :param db: database object to write current operation status @@ -48,42 +43,38 @@ class K8sConnector(abc.ABC, Loggable): """ # parent class - Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S') + Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S") - self.info('Initializing generic K8S connector') + # self.log.info('Initializing generic K8S connector') # the database and update callback self.db = db self.on_update_db = on_update_db - self.info('K8S generic connector initialized') + # self.log.info('K8S generic connector initialized') @abc.abstractmethod async def init_env( - self, - k8s_creds: str, - namespace: str = 'kube-system', - reuse_cluster_uuid=None + self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None ) -> (str, bool): """ - It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides: + It prepares a given K8s cluster environment to run Charts or juju Bundles on + both sides: client (OSM) server (Tiller/Charm) - :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' - :param namespace: optional namespace for helm tiller. By default, 'kube-system' will be used + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid + '.kube/config' + :param namespace: optional namespace to be used for the K8s engine (helm + tiller, juju). By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse - :return: uuid of the K8s cluster and True if connector has installed some software in the cluster - (on error, an exception will be raised) + :return: uuid of the K8s cluster and True if connector has installed some + software in the cluster (on error, an exception will be raised) """ @abc.abstractmethod async def repo_add( - self, - cluster_uuid: str, - name: str, - url: str, - repo_type: str = 'chart' + self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): """ Add a new repository to OSM database @@ -96,10 +87,7 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def repo_list( - self, - cluster_uuid: str - ): + async def repo_list(self, cluster_uuid: str): """ Get the list of registered repositories @@ -108,11 +96,7 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def repo_remove( - self, - cluster_uuid: str, - name: str - ): + async def repo_remove(self, cluster_uuid: str, name: str): """ Remove a repository from OSM @@ -121,113 +105,131 @@ class K8sConnector(abc.ABC, Loggable): :return: True if successful """ + @abc.abstractmethod + async def synchronize_repos(self, cluster_uuid: str, name: str): + """ + Synchronizes the list of repositories created in the cluster with + the repositories added by the NBI + + :param cluster_uuid: the cluster + :return: List of repositories deleted from the cluster and dictionary with + repos added + """ + @abc.abstractmethod async def reset( - self, - cluster_uuid: str, - force: bool = False, - uninstall_sw: bool = False + self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: """ - Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters. - Intended to be used e.g. when the NS instance is deleted. + Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list + of known K8s clusters. Intended to be used e.g. when the NS instance is deleted. :param cluster_uuid: UUID of a K8s cluster known by OSM. :param force: force deletion, even in case there are deployed releases - :param uninstall_sw: flag to indicate that sw uninstallation from software is needed + :param uninstall_sw: flag to indicate that sw uninstallation from software is + needed :return: str: kdu_instance generated by helm """ @abc.abstractmethod async def install( - self, - cluster_uuid: str, - kdu_model: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, ): """ - Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle - properly parametrized (in practice, this call would happen before any _initial-config-primitive_ - of the VNF is called). + Deploys of a new KDU instance. It would implicitly rely on the `install` call + to deploy the Chart/Bundle properly parametrized (in practice, this call would + happen before any _initial-config-primitive_of the VNF is called). :param cluster_uuid: UUID of a K8s cluster known by OSM - :param kdu_model: chart/bundle:version reference (string), which can be either of these options: + :param kdu_model: chart/bundle:version reference (string), which can be either + of these options: - a name of chart/bundle available via the repos known by OSM - a path to a packaged chart/bundle - a path to an unpacked chart/bundle directory or a URL - :param atomic: If set, installation process purges chart/bundle on fail, also will wait until - all the K8s objects are active - :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to - Helm default timeout: 300s) - :param params: dictionary of key-value pairs for instantiation parameters (overriding default values) + :param atomic: If set, installation process purges chart/bundle on fail, also + will wait until all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters + (overriding default values) :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :param kdu_name: Name of the KDU instance to be installed + :param namespace: K8s namespace to use for the KDU instance :return: True if successful """ @abc.abstractmethod async def upgrade( - self, - cluster_uuid: str, - kdu_instance: str, - kdu_model: str = None, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, ): """ - Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle. - It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive. + Upgrades an existing KDU instance. It would implicitly use the `upgrade` call + over an existing Chart/Bundle. It can be used both to upgrade the chart or to + reconfigure it. This would be exposed as Day-2 primitive. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance to be updated :param kdu_model: new chart/bundle:version reference - :param atomic: rollback in case of fail and wait for pods and services are available - :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to - Helm default timeout: 300s) + :param atomic: rollback in case of fail and wait for pods and services are + available + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) :param params: new dictionary of key-value pairs for instantiation parameters :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} :return: reference to the new revision number of the KDU instance """ @abc.abstractmethod async def rollback( - self, - cluster_uuid: str, - kdu_instance: str, - revision=0, - db_dict: dict = None + self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): """ - Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call. - It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration. - This would be exposed as Day-2 primitive. + Rolls back a previous update of a KDU instance. It would implicitly use the + `rollback` call. It can be used both to rollback from a Chart/Bundle version + update or from a reconfiguration. This would be exposed as Day-2 primitive. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance - :param revision: revision to which revert changes. If omitted, it will revert the last update only + :param revision: revision to which revert changes. If omitted, it will revert + the last update only :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} - :return:If successful, reference to the current active revision of the KDU instance after the rollback + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :return:If successful, reference to the current active revision of the KDU + instance after the rollback """ @abc.abstractmethod - async def uninstall( - self, - cluster_uuid: str, - kdu_instance: str - ): + async def uninstall(self, cluster_uuid: str, kdu_instance: str): """ - Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen - after all _terminate-config-primitive_ of the VNF are invoked). + Removes an existing KDU instance. It would implicitly use the `delete` call + (this call would happen after all _terminate-config-primitive_ of the VNF are + invoked). :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance to be deleted @@ -235,48 +237,63 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def inspect_kdu( - self, - kdu_model: str, - repo_url: str = None + async def exec_primitive( + self, + cluster_uuid: str = None, + kdu_instance: str = None, + primitive_name: str = None, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, ) -> str: + """Exec primitive (Juju action) + + :param cluster_uuid str: The UUID of the cluster + :param kdu_instance str: The unique name of the KDU instance + :param primitive_name: Name of action that will be executed + :param timeout: Timeout for action execution + :param params: Dictionary of all the parameters needed for the action + :db_dict: Dictionary for any additional data + + :return: Returns the output of the action + """ + + @abc.abstractmethod + async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: """ These calls will retrieve from the Chart/Bundle: - - The list of configurable values and their defaults (e.g. in Charts, it would retrieve - the contents of `values.yaml`). - - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle. + - The list of configurable values and their defaults (e.g. in Charts, + it would retrieve the contents of `values.yaml`). + - If available, any embedded help file (e.g. `readme.md`) embedded in the + Chart/Bundle. :param kdu_model: chart/bundle reference - :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL) + :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, + even stable URL) :return: - If successful, it will return the available parameters and their default values as provided by the backend. + If successful, it will return the available parameters and their default values + as provided by the backend. """ @abc.abstractmethod - async def help_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: """ :param kdu_model: chart/bundle reference - :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL) + :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, + even stable URL) :return: If successful, it will return the contents of the 'readme.md' """ @abc.abstractmethod - async def status_kdu( - self, - cluster_uuid: str, - kdu_instance: str - ) -> str: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: """ - This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve - the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied - to a given instance. This call would be based on the `status` call. + This call would retrieve tha current state of a given KDU instance. It would be + would allow to retrieve the _composition_ (i.e. K8s objects) and _specific + values_ of the configuration parameters applied to a given instance. This call + would be based on the `status` call. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance @@ -289,65 +306,106 @@ class K8sConnector(abc.ABC, Loggable): - SUPERSEDED - FAILED or - DELETING - - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources + - List of `resources` (objects) that this release consists of, sorted by kind, + and the status of those resources - Last `deployment_time`. """ + @abc.abstractmethod + async def get_services(self, + cluster_uuid: str, + kdu_instance: str, + namespace: str) -> list: + """ + Returns a list of services defined for the specified kdu instance. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :param namespace: K8s namespace used by the KDU instance + :return: If successful, it will return a list of services, Each service + can have the following data: + - `name` of the service + - `type` type of service in the k8 cluster + - `ports` List of ports offered by the service, for each port includes at least + name, port, protocol + - `cluster_ip` Internal ip to be used inside k8s cluster + - `external_ip` List of external ips (in case they are available) + """ + + @abc.abstractmethod + async def get_service(self, + cluster_uuid: str, + service_name: str, + namespace: str = None) -> object: + """ + Obtains the data of the specified service in the k8cluster. + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param service_name: name of the K8s service in the specified namespace + :param namespace: K8s namespace used by the KDU instance + :return: If successful, it will return a list of services, Each service can have + the following data: + - `name` of the service + - `type` type of service in the k8 cluster + - `ports` List of ports offered by the service, for each port includes at least + name, port, protocol + - `cluster_ip` Internal ip to be used inside k8s cluster + - `external_ip` List of external ips (in case they are available) + """ + """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ async def write_app_status_to_db( - self, - db_dict: dict, - status: str, - detailed_status: str, - operation: str + self, db_dict: dict, status: str, detailed_status: str, operation: str ) -> bool: if not self.db: - self.warning('No db => No database write') + self.warning("No db => No database write") return False if not db_dict: - self.warning('No db_dict => No database write') + self.warning("No db_dict => No database write") return False - self.debug('status={}'.format(status)) + self.log.debug("status={}".format(status)) try: - the_table = db_dict['collection'] - the_filter = db_dict['filter'] - the_path = db_dict['path'] - if not the_path[-1] == '.': - the_path = the_path + '.' + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + the_path = db_dict["path"] + if not the_path[-1] == ".": + the_path = the_path + "." update_dict = { - the_path + 'operation': operation, - the_path + 'status': status, - the_path + 'detailed-status': detailed_status, - the_path + 'status-time': str(time.time()), + the_path + "operation": operation, + the_path + "status": status, + the_path + "detailed-status": detailed_status, + the_path + "status-time": str(time.time()), } self.db.set_one( table=the_table, q_filter=the_filter, update_dict=update_dict, - fail_on_empty=True + fail_on_empty=True, ) # database callback if self.on_update_db: if asyncio.iscoroutinefunction(self.on_update_db): - await self.on_update_db(the_table, the_filter, the_path, update_dict) + await self.on_update_db( + the_table, the_filter, the_path, update_dict + ) else: self.on_update_db(the_table, the_filter, the_path, update_dict) return True except Exception as e: - self.info('Exception writing status to database: {}'.format(e)) + self.log.info("Exception writing status to database: {}".format(e)) return False