X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_juju_conn.py;h=e3d8a67f4504fd6dc08dc454a30ee078637e8219;hp=7e58deb33e5af150b3525bdb2b8593e6ee6d916c;hb=refs%2Fchanges%2F42%2F10442%2F1;hpb=667696ef11356f3267df58f2a81c6ecebb0e94b9 diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index 7e58deb..e3d8a67 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -17,19 +17,45 @@ import os import uuid import yaml import tempfile +import binascii +import base64 from n2vc.exceptions import K8sException, N2VCBadArgumentsException from n2vc.k8s_conn import K8sConnector -from n2vc.kubectl import Kubectl +from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT from .exceptions import MethodNotImplemented from n2vc.utils import base64_to_cacert from n2vc.libjuju import Libjuju +from kubernetes.client.models import ( + V1ClusterRole, + V1ObjectMeta, + V1PolicyRule, + V1ServiceAccount, + V1ClusterRoleBinding, + V1RoleRef, + V1Subject, +) + +from typing import Dict + +SERVICE_ACCOUNT_TOKEN_KEY = "token" +SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" +RBAC_LABEL_KEY_NAME = "rbac-id" + +ADMIN_NAMESPACE = "kube-system" +RBAC_STACK_PREFIX = "juju-credential" # from juju.bundle import BundleHandler # import re # import ssl # from .vnf import N2VC + + +def generate_rbac_id(): + return binascii.hexlify(os.urandom(4)).decode() + + class K8sJujuConnector(K8sConnector): def __init__( self, @@ -212,46 +238,107 @@ class K8sJujuConnector(K8sConnector): # Store the cluster configuration so it # can be used for subsequent calls - kubecfg = tempfile.NamedTemporaryFile() with open(kubecfg.name, "w") as kubecfg_file: kubecfg_file.write(k8s_creds) kubectl = Kubectl(config_file=kubecfg.name) - configuration = kubectl.get_configuration() - default_storage_class = kubectl.get_default_storage_class() - await self.libjuju.add_k8s( - name=cluster_uuid, - configuration=configuration, - storage_class=default_storage_class, - credential_name=self._get_credential_name(cluster_uuid), - ) - # self.log.debug("Setting config") - # await self.set_config(cluster_uuid, config) - # Test connection - # controller = await self.get_controller(cluster_uuid) - # await controller.disconnect() + # CREATING RESOURCES IN K8S + rbac_id = generate_rbac_id() + metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id) + labels = {RBAC_STACK_PREFIX: rbac_id} - # TODO: Remove these commented lines - # raise Exception("EOL") - # self.juju_public_key = None - # Login to the k8s cluster - # if not self.authenticated: - # await self.login(cluster_uuid) + # Create cleanup dictionary to clean up created resources + # if it fails in the middle of the process + cleanup_data = [] + try: + self._create_cluster_role( + kubectl, + name=metadata_name, + labels=labels, + ) + cleanup_data.append( + { + "delete": self._delete_cluster_role, + "args": (kubectl, metadata_name), + } + ) + + self._create_service_account( + kubectl, + name=metadata_name, + labels=labels, + ) + cleanup_data.append( + { + "delete": self._delete_service_account, + "args": (kubectl, metadata_name), + } + ) - # We're creating a new cluster - # print("Getting model {}".format(self.get_namespace(cluster_uuid), - # cluster_uuid=cluster_uuid)) - # model = await self.get_model( - # self.get_namespace(cluster_uuid), - # cluster_uuid=cluster_uuid - # ) + self._create_cluster_role_binding( + kubectl, + name=metadata_name, + labels=labels, + ) + cleanup_data.append( + { + "delete": self._delete_service_account, + "args": (kubectl, metadata_name), + } + ) + token, client_cert_data = await self._get_secret_data( + kubectl, + metadata_name, + ) + + default_storage_class = kubectl.get_default_storage_class() + await self.libjuju.add_k8s( + name=cluster_uuid, + rbac_id=rbac_id, + token=token, + client_cert_data=client_cert_data, + configuration=kubectl.configuration, + storage_class=default_storage_class, + credential_name=self._get_credential_name(cluster_uuid), + ) + # self.log.debug("Setting config") + # await self.set_config(cluster_uuid, config) + + # Test connection + # controller = await self.get_controller(cluster_uuid) + # await controller.disconnect() + + # TODO: Remove these commented lines + # raise Exception("EOL") + # self.juju_public_key = None + # Login to the k8s cluster + # if not self.authenticated: + # await self.login(cluster_uuid) + + # We're creating a new cluster + # print("Getting model {}".format(self.get_namespace(cluster_uuid), + # cluster_uuid=cluster_uuid)) + # model = await self.get_model( + # self.get_namespace(cluster_uuid), + # cluster_uuid=cluster_uuid + # ) - # Disconnect from the model - # if model and model.is_connected(): - # await model.disconnect() + # Disconnect from the model + # if model and model.is_connected(): + # await model.disconnect() - return cluster_uuid, True + return cluster_uuid, True + except Exception as e: + self.log.error("Error initializing k8scluster: {}".format(e)) + if len(cleanup_data) > 0: + self.log.debug("Cleaning up created resources in k8s cluster...") + for item in cleanup_data: + delete_function = item["delete"] + delete_args = item["args"] + delete_function(*delete_args) + self.log.debug("Cleanup finished") + raise e """Repo Management""" @@ -292,7 +379,6 @@ class K8sJujuConnector(K8sConnector): """ try: - # Remove k8scluster from database # self.log.debug("[reset] Removing k8scluster from juju database") # juju_db = self.db.get_one("admin", {"_id": "juju"}) @@ -310,12 +396,41 @@ class K8sJujuConnector(K8sConnector): # Destroy the controller (via CLI) # self.log.debug("[reset] Destroying controller") # await self.destroy_controller(cluster_uuid) - self.log.debug("[reset] Removing k8s cloud") # k8s_cloud = "k8s-{}".format(cluster_uuid) # await self.remove_cloud(k8s_cloud) + + cloud_creds = await self.libjuju.get_cloud_credentials( + cluster_uuid, + self._get_credential_name(cluster_uuid), + ) + await self.libjuju.remove_cloud(cluster_uuid) + kubecfg = self.get_credentials(cluster_uuid=cluster_uuid) + + kubecfg_file = tempfile.NamedTemporaryFile() + with open(kubecfg_file.name, "w") as f: + f.write(kubecfg) + kubectl = Kubectl(config_file=kubecfg_file.name) + + delete_functions = [ + self._delete_cluster_role_binding, + self._delete_service_account, + self._delete_cluster_role, + ] + + credential_attrs = cloud_creds[0].result["attrs"] + if RBAC_LABEL_KEY_NAME in credential_attrs: + rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME] + metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id) + delete_args = (kubectl, metadata_name) + for delete_func in delete_functions: + try: + delete_func(*delete_args) + except Exception as e: + self.log.warning("Cannot remove resource in K8s {}".format(e)) + except Exception as e: self.log.debug("Caught exception during reset: {}".format(e)) raise e @@ -341,6 +456,7 @@ class K8sJujuConnector(K8sConnector): self, cluster_uuid: str, kdu_model: str, + kdu_instance: str, atomic: bool = True, timeout: float = 1800, params: dict = None, @@ -352,6 +468,7 @@ class K8sJujuConnector(K8sConnector): :param cluster_uuid str: The UUID of the cluster to install to :param kdu_model str: The name or path of a bundle to install + :param kdu_instance: Kdu instance name :param atomic bool: If set, waits until the model is active and resets the cluster on failure. :param timeout int: The time, in seconds, to wait for the install @@ -385,11 +502,6 @@ class K8sJujuConnector(K8sConnector): os.chdir(new_workdir) bundle = "local:{}".format(kdu_model) - if kdu_name: - kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"]) - else: - kdu_instance = db_dict["filter"]["_id"] - self.log.debug("Checking for model named {}".format(kdu_instance)) # Create the new model @@ -456,8 +568,7 @@ class K8sJujuConnector(K8sConnector): # await model.disconnect() # await controller.disconnect() os.chdir(previous_workdir) - - return kdu_instance + return True async def instances_list(self, cluster_uuid: str) -> list: """ @@ -1243,3 +1354,132 @@ class K8sJujuConnector(K8sConnector): # q_filter={"_id": "juju"}, # update_dict={"k8sclusters": k8sclusters}, # ) + + # Private methods to create/delete needed resources in the + # Kubernetes cluster to create the K8s cloud in Juju + + def _create_cluster_role( + self, + kubectl: Kubectl, + name: str, + labels: Dict[str, str], + ): + cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role( + field_selector="metadata.name={}".format(name) + ) + + if len(cluster_roles.items) > 0: + raise Exception( + "Cluster role with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE) + # Cluster role + cluster_role = V1ClusterRole( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), + V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), + ], + ) + + kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + + def _delete_cluster_role(self, kubectl: Kubectl, name: str): + kubectl.clients[RBAC_CLIENT].delete_cluster_role(name) + + def _create_service_account( + self, + kubectl: Kubectl, + name: str, + labels: Dict[str, str], + ): + service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account( + ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) > 0: + raise Exception( + "Service account with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE) + service_account = V1ServiceAccount(metadata=metadata) + + kubectl.clients[CORE_CLIENT].create_namespaced_service_account( + ADMIN_NAMESPACE, service_account + ) + + def _delete_service_account(self, kubectl: Kubectl, name: str): + kubectl.clients[CORE_CLIENT].delete_namespaced_service_account( + name, ADMIN_NAMESPACE + ) + + def _create_cluster_role_binding( + self, + kubectl: Kubectl, + name: str, + labels: Dict[str, str], + ): + role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding( + field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception("Generated rbac id already exists") + + role_binding = V1ClusterRoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), + subjects=[ + V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE) + ], + ) + kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + + def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str): + kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name) + + async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str): + v1_core = kubectl.clients[CORE_CLIENT] + + retries_limit = 10 + secret_name = None + while True: + retries_limit -= 1 + service_accounts = v1_core.list_namespaced_service_account( + ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) == 0: + raise Exception( + "Service account not found with metadata.name={}".format(name) + ) + service_account = service_accounts.items[0] + if service_account.secrets and len(service_account.secrets) > 0: + secret_name = service_account.secrets[0].name + if secret_name is not None or not retries_limit: + break + if not secret_name: + raise Exception( + "Failed getting the secret from service account {}".format(name) + ) + secret = v1_core.list_namespaced_secret( + ADMIN_NAMESPACE, + field_selector="metadata.name={}".format(secret_name), + ).items[0] + + token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] + client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] + + return ( + base64.b64decode(token).decode("utf-8"), + base64.b64decode(client_certificate_data).decode("utf-8"), + ) + + @staticmethod + def generate_kdu_instance_name(**kwargs): + db_dict = kwargs.get("db_dict") + kdu_name = kwargs.get("kdu_name", None) + if kdu_name: + kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"]) + else: + kdu_instance = db_dict["filter"]["_id"] + return kdu_instance