X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_juju_conn.py;h=e3ec17e295ee4f704edcec9dd6e14f15629e7295;hp=85e24f2c32c806753256d0b554ea181f186b85a8;hb=refs%2Fchanges%2F16%2F10616%2F9;hpb=c4da25cc55411e6cea1e83d29c206bce421f0728 diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index 85e24f2..e3ec17e 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -20,13 +20,16 @@ import tempfile import binascii import base64 -from n2vc.exceptions import K8sException, N2VCBadArgumentsException +from n2vc.config import EnvironConfig +from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT from .exceptions import MethodNotImplemented -from n2vc.utils import base64_to_cacert from n2vc.libjuju import Libjuju - +from n2vc.utils import obj_to_dict, obj_to_yaml +from n2vc.store import MotorStore +from n2vc.vca.cloud import Cloud +from n2vc.vca.connection import get_connection from kubernetes.client.models import ( V1ClusterRole, V1ObjectMeta, @@ -52,6 +55,8 @@ def generate_rbac_id(): class K8sJujuConnector(K8sConnector): + libjuju = None + def __init__( self, fs: object, @@ -61,7 +66,6 @@ class K8sJujuConnector(K8sConnector): log: object = None, loop: object = None, on_update_db=None, - vca_config: dict = None, ): """ :param fs: file system for kubernetes and helm configuration @@ -84,37 +88,10 @@ class K8sJujuConnector(K8sConnector): self.loop = loop or asyncio.get_event_loop() self.log.debug("Initializing K8S Juju connector") - required_vca_config = [ - "host", - "user", - "secret", - "ca_cert", - ] - if not vca_config or not all(k in vca_config for k in required_vca_config): - raise N2VCBadArgumentsException( - message="Missing arguments in vca_config: {}".format(vca_config), - bad_args=required_vca_config, - ) - port = vca_config["port"] if "port" in vca_config else 17070 - url = "{}:{}".format(vca_config["host"], port) - enable_os_upgrade = vca_config.get("enable_os_upgrade", True) - apt_mirror = vca_config.get("apt_mirror", None) - username = vca_config["user"] - secret = vca_config["secret"] - ca_cert = base64_to_cacert(vca_config["ca_cert"]) - - self.libjuju = Libjuju( - endpoint=url, - api_proxy=None, # Not needed for k8s charms - enable_os_upgrade=enable_os_upgrade, - apt_mirror=apt_mirror, - username=username, - password=secret, - cacert=ca_cert, - loop=self.loop, - log=self.log, - db=self.db, - ) + db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri") + self._store = MotorStore(db_uri) + self.loading_libjuju = asyncio.Lock(loop=self.loop) + self.log.debug("K8S Juju connector initialized") # TODO: Remove these commented lines: # self.authenticated = False @@ -128,6 +105,7 @@ class K8sJujuConnector(K8sConnector): k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid: str = None, + **kwargs, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Juju bundles. @@ -137,10 +115,14 @@ class K8sJujuConnector(K8sConnector): :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster (on error, an exception will be raised) """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4()) @@ -199,7 +181,7 @@ class K8sJujuConnector(K8sConnector): ) default_storage_class = kubectl.get_default_storage_class() - await self.libjuju.add_k8s( + await libjuju.add_k8s( name=cluster_uuid, rbac_id=rbac_id, token=token, @@ -248,25 +230,34 @@ class K8sJujuConnector(K8sConnector): """Reset""" async def reset( - self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False, + **kwargs, ) -> bool: """Reset a cluster Resets the Kubernetes cluster by removing the model that represents it. :param cluster_uuid str: The UUID of the cluster to reset + :param force: Force reset + :param uninstall_sw: Boolean to uninstall sw + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: Returns True if successful or raises an exception. """ try: self.log.debug("[reset] Removing k8s cloud") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - cloud_creds = await self.libjuju.get_cloud_credentials( - cluster_uuid, - self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + + cloud_creds = await libjuju.get_cloud_credentials(cloud) - await self.libjuju.remove_cloud(cluster_uuid) + await libjuju.remove_cloud(cluster_uuid) kubecfg = self.get_credentials(cluster_uuid=cluster_uuid) @@ -310,6 +301,7 @@ class K8sJujuConnector(K8sConnector): db_dict: dict = None, kdu_name: str = None, namespace: str = None, + **kwargs, ) -> bool: """Install a bundle @@ -323,9 +315,12 @@ class K8sJujuConnector(K8sConnector): :param params dict: Key-value pairs of instantiation parameters :param kdu_name: Name of the KDU instance to be installed :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: If successful, returns ? """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) bundle = kdu_model if not db_dict: @@ -347,11 +342,8 @@ class K8sJujuConnector(K8sConnector): # Create the new model self.log.debug("Adding model: {}".format(kdu_instance)) - await self.libjuju.add_model( - model_name=kdu_instance, - cloud_name=cluster_uuid, - credential_name=self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + await libjuju.add_model(kdu_instance, cloud) # if model: # TODO: Instantiation parameters @@ -370,10 +362,17 @@ class K8sJujuConnector(K8sConnector): previous_workdir = "/app/storage" self.log.debug("[install] deploying {}".format(bundle)) - await self.libjuju.deploy( + await libjuju.deploy( bundle, model_name=kdu_instance, wait=atomic, timeout=timeout ) os.chdir(previous_workdir) + if self.on_update_db: + await self.on_update_db( + cluster_uuid, + kdu_instance, + filter=db_dict["filter"], + vca_id=kwargs.get("vca_id") + ) return True async def instances_list(self, cluster_uuid: str) -> list: @@ -440,18 +439,26 @@ class K8sJujuConnector(K8sConnector): """Deletion""" - async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool: + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str, + **kwargs, + ) -> bool: """Uninstall a KDU instance :param cluster_uuid str: The UUID of the cluster :param kdu_instance str: The unique name of the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns True if successful, or raises an exception """ self.log.debug("[uninstall] Destroying model") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - await self.libjuju.destroy_model(kdu_instance, total_timeout=3600) + await libjuju.destroy_model(kdu_instance, total_timeout=3600) # self.log.debug("[uninstall] Model destroyed and disconnecting") # await controller.disconnect() @@ -470,6 +477,7 @@ class K8sJujuConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + **kwargs, ) -> str: """Exec primitive (Juju action) @@ -478,10 +486,13 @@ class K8sJujuConnector(K8sConnector): :param primitive_name: Name of action that will be executed :param timeout: Timeout for action execution :param params: Dictionary of all the parameters needed for the action - :db_dict: Dictionary for any additional data + :param db_dict: Dictionary for any additional data + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns the output of the action """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) if not params or "application-name" not in params: raise K8sException( @@ -494,10 +505,10 @@ class K8sJujuConnector(K8sConnector): "kdu_instance: {}".format(kdu_instance) ) application_name = params["application-name"] - actions = await self.libjuju.get_actions(application_name, kdu_instance) + actions = await libjuju.get_actions(application_name, kdu_instance) if primitive_name not in actions: raise K8sException("Primitive {} not found".format(primitive_name)) - output, status = await self.libjuju.execute_action( + output, status = await libjuju.execute_action( application_name, kdu_instance, primitive_name, **params ) @@ -505,6 +516,8 @@ class K8sJujuConnector(K8sConnector): raise K8sException( "status is not completed: {} output: {}".format(status, output) ) + if self.on_update_db: + await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"]) return output @@ -588,6 +601,9 @@ class K8sJujuConnector(K8sConnector): self, cluster_uuid: str, kdu_instance: str, + complete_status: bool = False, + yaml_format: bool = False, + **kwargs, ) -> dict: """Get the status of the KDU @@ -595,18 +611,60 @@ class K8sJujuConnector(K8sConnector): :param cluster_uuid str: The UUID of the cluster :param kdu_instance str: The unique id of the KDU instance + :param complete_status: To get the complete_status of the KDU + :param yaml_format: To get the status in proper format for NSR record + :param: kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns a dictionary containing namespace, state, resources, - and deployment_time. + and deployment_time and returns complete_status if complete_status is True """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) status = {} - model_status = await self.libjuju.get_model_status(kdu_instance) - for name in model_status.applications: - application = model_status.applications[name] - status[name] = {"status": application["status"]["status"]} + + model_status = await libjuju.get_model_status(kdu_instance) + + if not complete_status: + for name in model_status.applications: + application = model_status.applications[name] + status[name] = {"status": application["status"]["status"]} + else: + if yaml_format: + return obj_to_yaml(model_status) + else: + return obj_to_dict(model_status) return status + async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs): + """ + Add all configs, actions, executed actions of all applications in a model to vcastatus dict + + :param vcastatus dict: dict containing vcastatus + :param kdu_instance str: The unique id of the KDU instance + :param: kwargs: Additional parameters + vca_id (str): VCA ID + + :return: None + """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) + try: + for model_name in vcastatus: + # Adding executed actions + vcastatus[model_name]["executedActions"] = \ + await libjuju.get_executed_actions(kdu_instance) + + for application in vcastatus[model_name]["applications"]: + # Adding application actions + vcastatus[model_name]["applications"][application]["actions"] = \ + await libjuju.get_actions(application, kdu_instance) + # Adding application configs + vcastatus[model_name]["applications"][application]["configs"] = \ + await libjuju.get_application_configs(kdu_instance, application) + + except Exception as e: + self.log.debug("Error in updating vca status: {}".format(str(e))) + async def get_services( self, cluster_uuid: str, kdu_instance: str, namespace: str ) -> list: @@ -811,3 +869,28 @@ class K8sJujuConnector(K8sConnector): else: kdu_instance = db_dict["filter"]["_id"] return kdu_instance + + async def _get_libjuju(self, vca_id: str = None) -> Libjuju: + """ + Get libjuju object + + :param: vca_id: VCA ID + If None, get a libjuju object with a Connection to the default VCA + Else, geta libjuju object with a Connection to the specified VCA + """ + if not vca_id: + while self.loading_libjuju.locked(): + await asyncio.sleep(0.1) + if not self.libjuju: + async with self.loading_libjuju: + vca_connection = await get_connection(self._store) + self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log) + return self.libjuju + else: + vca_connection = await get_connection(self._store, vca_id) + return Libjuju( + vca_connection, + loop=self.loop, + log=self.log, + n2vc=self, + )