X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_juju_conn.py;h=24b314289756c95cdc3ff84ce7d521cc3c23511e;hp=0e9d5475c69149626ef98f47a348ada713373330;hb=d80f038732548191c8214f8387a95bd7919473f1;hpb=a71d4a04c1e8ad3ffe1a129024e6dbc14d6d3bd5 diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index 0e9d547..24b3142 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -20,14 +20,16 @@ import tempfile import binascii import base64 -from n2vc.config import ModelConfig -from n2vc.exceptions import K8sException, N2VCBadArgumentsException +from n2vc.config import EnvironConfig +from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT from .exceptions import MethodNotImplemented -from n2vc.utils import base64_to_cacert from n2vc.libjuju import Libjuju - +from n2vc.utils import obj_to_dict, obj_to_yaml +from n2vc.store import MotorStore +from n2vc.vca.cloud import Cloud +from n2vc.vca.connection import get_connection from kubernetes.client.models import ( V1ClusterRole, V1ObjectMeta, @@ -53,6 +55,8 @@ def generate_rbac_id(): class K8sJujuConnector(K8sConnector): + libjuju = None + def __init__( self, fs: object, @@ -62,7 +66,6 @@ class K8sJujuConnector(K8sConnector): log: object = None, loop: object = None, on_update_db=None, - vca_config: dict = None, ): """ :param fs: file system for kubernetes and helm configuration @@ -85,35 +88,10 @@ class K8sJujuConnector(K8sConnector): self.loop = loop or asyncio.get_event_loop() self.log.debug("Initializing K8S Juju connector") - required_vca_config = [ - "host", - "user", - "secret", - "ca_cert", - ] - if not vca_config or not all(k in vca_config for k in required_vca_config): - raise N2VCBadArgumentsException( - message="Missing arguments in vca_config: {}".format(vca_config), - bad_args=required_vca_config, - ) - port = vca_config["port"] if "port" in vca_config else 17070 - url = "{}:{}".format(vca_config["host"], port) - model_config = ModelConfig(vca_config) - username = vca_config["user"] - secret = vca_config["secret"] - ca_cert = base64_to_cacert(vca_config["ca_cert"]) - - self.libjuju = Libjuju( - endpoint=url, - api_proxy=None, # Not needed for k8s charms - model_config=model_config, - username=username, - password=secret, - cacert=ca_cert, - loop=self.loop, - log=self.log, - db=self.db, - ) + db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri") + self._store = MotorStore(db_uri) + self.loading_libjuju = asyncio.Lock(loop=self.loop) + self.log.debug("K8S Juju connector initialized") # TODO: Remove these commented lines: # self.authenticated = False @@ -127,6 +105,7 @@ class K8sJujuConnector(K8sConnector): k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid: str = None, + **kwargs, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Juju bundles. @@ -136,10 +115,14 @@ class K8sJujuConnector(K8sConnector): :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster (on error, an exception will be raised) """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4()) @@ -198,7 +181,7 @@ class K8sJujuConnector(K8sConnector): ) default_storage_class = kubectl.get_default_storage_class() - await self.libjuju.add_k8s( + await libjuju.add_k8s( name=cluster_uuid, rbac_id=rbac_id, token=token, @@ -247,25 +230,34 @@ class K8sJujuConnector(K8sConnector): """Reset""" async def reset( - self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False, + **kwargs, ) -> bool: """Reset a cluster Resets the Kubernetes cluster by removing the model that represents it. :param cluster_uuid str: The UUID of the cluster to reset + :param force: Force reset + :param uninstall_sw: Boolean to uninstall sw + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: Returns True if successful or raises an exception. """ try: self.log.debug("[reset] Removing k8s cloud") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - cloud_creds = await self.libjuju.get_cloud_credentials( - cluster_uuid, - self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + + cloud_creds = await libjuju.get_cloud_credentials(cloud) - await self.libjuju.remove_cloud(cluster_uuid) + await libjuju.remove_cloud(cluster_uuid) kubecfg = self.get_credentials(cluster_uuid=cluster_uuid) @@ -309,6 +301,7 @@ class K8sJujuConnector(K8sConnector): db_dict: dict = None, kdu_name: str = None, namespace: str = None, + **kwargs, ) -> bool: """Install a bundle @@ -322,9 +315,12 @@ class K8sJujuConnector(K8sConnector): :param params dict: Key-value pairs of instantiation parameters :param kdu_name: Name of the KDU instance to be installed :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: If successful, returns ? """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) bundle = kdu_model if not db_dict: @@ -346,11 +342,8 @@ class K8sJujuConnector(K8sConnector): # Create the new model self.log.debug("Adding model: {}".format(kdu_instance)) - await self.libjuju.add_model( - model_name=kdu_instance, - cloud_name=cluster_uuid, - credential_name=self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + await libjuju.add_model(kdu_instance, cloud) # if model: # TODO: Instantiation parameters @@ -369,12 +362,81 @@ class K8sJujuConnector(K8sConnector): previous_workdir = "/app/storage" self.log.debug("[install] deploying {}".format(bundle)) - await self.libjuju.deploy( + await libjuju.deploy( bundle, model_name=kdu_instance, wait=atomic, timeout=timeout ) os.chdir(previous_workdir) + if self.on_update_db: + await self.on_update_db( + cluster_uuid, + kdu_instance, + filter=db_dict["filter"], + vca_id=kwargs.get("vca_id"), + ) return True + async def scale( + self, + kdu_instance: str, + scale: int, + resource_name: str, + total_timeout: float = 1800, + **kwargs, + ) -> bool: + """Scale an application in a model + + :param: kdu_instance str: KDU instance name + :param: scale int: Scale to which to set this application + :param: resource_name str: Resource name (Application name) + :param: timeout float: The time, in seconds, to wait for the install + to finish + :param kwargs: Additional parameters + vca_id (str): VCA ID + + :return: If successful, returns True + """ + + try: + libjuju = await self._get_libjuju(kwargs.get("vca_id")) + await libjuju.scale_application( + model_name=kdu_instance, + application_name=resource_name, + scale=scale, + total_timeout=total_timeout, + ) + except Exception as e: + error_msg = "Error scaling application {} in kdu instance {}: {}".format( + resource_name, kdu_instance, e + ) + self.log.error(error_msg) + raise K8sException(message=error_msg) + return True + + async def get_scale_count( + self, + resource_name: str, + kdu_instance: str, + **kwargs, + ) -> int: + """Get an application scale count + + :param: resource_name str: Resource name (Application name) + :param: kdu_instance str: KDU instance name + :param kwargs: Additional parameters + vca_id (str): VCA ID + :return: Return application instance count + """ + try: + libjuju = await self._get_libjuju(kwargs.get("vca_id")) + status = await libjuju.get_model_status(kdu_instance) + return len(status.applications[resource_name].units) + except Exception as e: + error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format( + resource_name, kdu_instance, e + ) + self.log.error(error_msg) + raise K8sException(message=error_msg) + async def instances_list(self, cluster_uuid: str) -> list: """ returns a list of deployed releases in a cluster @@ -439,18 +501,26 @@ class K8sJujuConnector(K8sConnector): """Deletion""" - async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool: + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str, + **kwargs, + ) -> bool: """Uninstall a KDU instance :param cluster_uuid str: The UUID of the cluster :param kdu_instance str: The unique name of the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns True if successful, or raises an exception """ self.log.debug("[uninstall] Destroying model") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - await self.libjuju.destroy_model(kdu_instance, total_timeout=3600) + await libjuju.destroy_model(kdu_instance, total_timeout=3600) # self.log.debug("[uninstall] Model destroyed and disconnecting") # await controller.disconnect() @@ -469,6 +539,7 @@ class K8sJujuConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + **kwargs, ) -> str: """Exec primitive (Juju action) @@ -477,10 +548,13 @@ class K8sJujuConnector(K8sConnector): :param primitive_name: Name of action that will be executed :param timeout: Timeout for action execution :param params: Dictionary of all the parameters needed for the action - :db_dict: Dictionary for any additional data + :param db_dict: Dictionary for any additional data + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns the output of the action """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) if not params or "application-name" not in params: raise K8sException( @@ -493,10 +567,10 @@ class K8sJujuConnector(K8sConnector): "kdu_instance: {}".format(kdu_instance) ) application_name = params["application-name"] - actions = await self.libjuju.get_actions(application_name, kdu_instance) + actions = await libjuju.get_actions(application_name, kdu_instance) if primitive_name not in actions: raise K8sException("Primitive {} not found".format(primitive_name)) - output, status = await self.libjuju.execute_action( + output, status = await libjuju.execute_action( application_name, kdu_instance, primitive_name, **params ) @@ -504,6 +578,10 @@ class K8sJujuConnector(K8sConnector): raise K8sException( "status is not completed: {} output: {}".format(status, output) ) + if self.on_update_db: + await self.on_update_db( + cluster_uuid, kdu_instance, filter=db_dict["filter"] + ) return output @@ -587,6 +665,9 @@ class K8sJujuConnector(K8sConnector): self, cluster_uuid: str, kdu_instance: str, + complete_status: bool = False, + yaml_format: bool = False, + **kwargs, ) -> dict: """Get the status of the KDU @@ -594,18 +675,63 @@ class K8sJujuConnector(K8sConnector): :param cluster_uuid str: The UUID of the cluster :param kdu_instance str: The unique id of the KDU instance + :param complete_status: To get the complete_status of the KDU + :param yaml_format: To get the status in proper format for NSR record + :param: kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns a dictionary containing namespace, state, resources, - and deployment_time. + and deployment_time and returns complete_status if complete_status is True """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) status = {} - model_status = await self.libjuju.get_model_status(kdu_instance) - for name in model_status.applications: - application = model_status.applications[name] - status[name] = {"status": application["status"]["status"]} + + model_status = await libjuju.get_model_status(kdu_instance) + + if not complete_status: + for name in model_status.applications: + application = model_status.applications[name] + status[name] = {"status": application["status"]["status"]} + else: + if yaml_format: + return obj_to_yaml(model_status) + else: + return obj_to_dict(model_status) return status + async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs): + """ + Add all configs, actions, executed actions of all applications in a model to vcastatus dict + + :param vcastatus dict: dict containing vcastatus + :param kdu_instance str: The unique id of the KDU instance + :param: kwargs: Additional parameters + vca_id (str): VCA ID + + :return: None + """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) + try: + for model_name in vcastatus: + # Adding executed actions + vcastatus[model_name][ + "executedActions" + ] = await libjuju.get_executed_actions(kdu_instance) + + for application in vcastatus[model_name]["applications"]: + # Adding application actions + vcastatus[model_name]["applications"][application][ + "actions" + ] = await libjuju.get_actions(application, kdu_instance) + # Adding application configs + vcastatus[model_name]["applications"][application][ + "configs" + ] = await libjuju.get_application_configs(kdu_instance, application) + + except Exception as e: + self.log.debug("Error in updating vca status: {}".format(str(e))) + async def get_services( self, cluster_uuid: str, kdu_instance: str, namespace: str ) -> list: @@ -810,3 +936,28 @@ class K8sJujuConnector(K8sConnector): else: kdu_instance = db_dict["filter"]["_id"] return kdu_instance + + async def _get_libjuju(self, vca_id: str = None) -> Libjuju: + """ + Get libjuju object + + :param: vca_id: VCA ID + If None, get a libjuju object with a Connection to the default VCA + Else, geta libjuju object with a Connection to the specified VCA + """ + if not vca_id: + while self.loading_libjuju.locked(): + await asyncio.sleep(0.1) + if not self.libjuju: + async with self.loading_libjuju: + vca_connection = await get_connection(self._store) + self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log) + return self.libjuju + else: + vca_connection = await get_connection(self._store, vca_id) + return Libjuju( + vca_connection, + loop=self.loop, + log=self.log, + n2vc=self, + )