Fix flake8 minor issues
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
index 7e58deb..e3d8a67 100644 (file)
@@ -17,19 +17,45 @@ import os
 import uuid
 import yaml
 import tempfile
+import binascii
+import base64
 
 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
 from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
 from .exceptions import MethodNotImplemented
 from n2vc.utils import base64_to_cacert
 from n2vc.libjuju import Libjuju
 
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleRef,
+    V1Subject,
+)
+
+from typing import Dict
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
+ADMIN_NAMESPACE = "kube-system"
+RBAC_STACK_PREFIX = "juju-credential"
 
 # from juju.bundle import BundleHandler
 # import re
 # import ssl
 # from .vnf import N2VC
+
+
+def generate_rbac_id():
+    return binascii.hexlify(os.urandom(4)).decode()
+
+
 class K8sJujuConnector(K8sConnector):
     def __init__(
         self,
@@ -212,46 +238,107 @@ class K8sJujuConnector(K8sConnector):
 
         # Store the cluster configuration so it
         # can be used for subsequent calls
-
         kubecfg = tempfile.NamedTemporaryFile()
         with open(kubecfg.name, "w") as kubecfg_file:
             kubecfg_file.write(k8s_creds)
         kubectl = Kubectl(config_file=kubecfg.name)
-        configuration = kubectl.get_configuration()
-        default_storage_class = kubectl.get_default_storage_class()
-        await self.libjuju.add_k8s(
-            name=cluster_uuid,
-            configuration=configuration,
-            storage_class=default_storage_class,
-            credential_name=self._get_credential_name(cluster_uuid),
-        )
-        # self.log.debug("Setting config")
-        # await self.set_config(cluster_uuid, config)
 
-        # Test connection
-        # controller = await self.get_controller(cluster_uuid)
-        # await controller.disconnect()
+        # CREATING RESOURCES IN K8S
+        rbac_id = generate_rbac_id()
+        metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+        labels = {RBAC_STACK_PREFIX: rbac_id}
 
-        # TODO: Remove these commented lines
-        # raise Exception("EOL")
-        # self.juju_public_key = None
-        # Login to the k8s cluster
-        # if not self.authenticated:
-        #     await self.login(cluster_uuid)
+        # Create cleanup dictionary to clean up created resources
+        # if it fails in the middle of the process
+        cleanup_data = []
+        try:
+            self._create_cluster_role(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_cluster_role,
+                    "args": (kubectl, metadata_name),
+                }
+            )
+
+            self._create_service_account(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_service_account,
+                    "args": (kubectl, metadata_name),
+                }
+            )
 
-        # We're creating a new cluster
-        # print("Getting model {}".format(self.get_namespace(cluster_uuid),
-        #    cluster_uuid=cluster_uuid))
-        # model = await self.get_model(
-        #    self.get_namespace(cluster_uuid),
-        #    cluster_uuid=cluster_uuid
-        # )
+            self._create_cluster_role_binding(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_service_account,
+                    "args": (kubectl, metadata_name),
+                }
+            )
+            token, client_cert_data = await self._get_secret_data(
+                kubectl,
+                metadata_name,
+            )
+
+            default_storage_class = kubectl.get_default_storage_class()
+            await self.libjuju.add_k8s(
+                name=cluster_uuid,
+                rbac_id=rbac_id,
+                token=token,
+                client_cert_data=client_cert_data,
+                configuration=kubectl.configuration,
+                storage_class=default_storage_class,
+                credential_name=self._get_credential_name(cluster_uuid),
+            )
+            # self.log.debug("Setting config")
+            # await self.set_config(cluster_uuid, config)
+
+            # Test connection
+            # controller = await self.get_controller(cluster_uuid)
+            # await controller.disconnect()
+
+            # TODO: Remove these commented lines
+            # raise Exception("EOL")
+            # self.juju_public_key = None
+            # Login to the k8s cluster
+            # if not self.authenticated:
+            #     await self.login(cluster_uuid)
+
+            # We're creating a new cluster
+            # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+            #    cluster_uuid=cluster_uuid))
+            # model = await self.get_model(
+            #    self.get_namespace(cluster_uuid),
+            #    cluster_uuid=cluster_uuid
+            # )
 
-        # Disconnect from the model
-        # if model and model.is_connected():
-        #    await model.disconnect()
+            # Disconnect from the model
+            # if model and model.is_connected():
+            #    await model.disconnect()
 
-        return cluster_uuid, True
+            return cluster_uuid, True
+        except Exception as e:
+            self.log.error("Error initializing k8scluster: {}".format(e))
+            if len(cleanup_data) > 0:
+                self.log.debug("Cleaning up created resources in k8s cluster...")
+                for item in cleanup_data:
+                    delete_function = item["delete"]
+                    delete_args = item["args"]
+                    delete_function(*delete_args)
+                self.log.debug("Cleanup finished")
+            raise e
 
     """Repo Management"""
 
@@ -292,7 +379,6 @@ class K8sJujuConnector(K8sConnector):
         """
 
         try:
-
             # Remove k8scluster from database
             # self.log.debug("[reset] Removing k8scluster from juju database")
             # juju_db = self.db.get_one("admin", {"_id": "juju"})
@@ -310,12 +396,41 @@ class K8sJujuConnector(K8sConnector):
             # Destroy the controller (via CLI)
             # self.log.debug("[reset] Destroying controller")
             # await self.destroy_controller(cluster_uuid)
-
             self.log.debug("[reset] Removing k8s cloud")
             # k8s_cloud = "k8s-{}".format(cluster_uuid)
             # await self.remove_cloud(k8s_cloud)
+
+            cloud_creds = await self.libjuju.get_cloud_credentials(
+                cluster_uuid,
+                self._get_credential_name(cluster_uuid),
+            )
+
             await self.libjuju.remove_cloud(cluster_uuid)
 
+            kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+
+            kubecfg_file = tempfile.NamedTemporaryFile()
+            with open(kubecfg_file.name, "w") as f:
+                f.write(kubecfg)
+            kubectl = Kubectl(config_file=kubecfg_file.name)
+
+            delete_functions = [
+                self._delete_cluster_role_binding,
+                self._delete_service_account,
+                self._delete_cluster_role,
+            ]
+
+            credential_attrs = cloud_creds[0].result["attrs"]
+            if RBAC_LABEL_KEY_NAME in credential_attrs:
+                rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
+                metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+                delete_args = (kubectl, metadata_name)
+                for delete_func in delete_functions:
+                    try:
+                        delete_func(*delete_args)
+                    except Exception as e:
+                        self.log.warning("Cannot remove resource in K8s {}".format(e))
+
         except Exception as e:
             self.log.debug("Caught exception during reset: {}".format(e))
             raise e
@@ -341,6 +456,7 @@ class K8sJujuConnector(K8sConnector):
         self,
         cluster_uuid: str,
         kdu_model: str,
+        kdu_instance: str,
         atomic: bool = True,
         timeout: float = 1800,
         params: dict = None,
@@ -352,6 +468,7 @@ class K8sJujuConnector(K8sConnector):
 
         :param cluster_uuid str: The UUID of the cluster to install to
         :param kdu_model str: The name or path of a bundle to install
+        :param kdu_instance: Kdu instance name
         :param atomic bool: If set, waits until the model is active and resets
                             the cluster on failure.
         :param timeout int: The time, in seconds, to wait for the install
@@ -385,11 +502,6 @@ class K8sJujuConnector(K8sConnector):
             os.chdir(new_workdir)
             bundle = "local:{}".format(kdu_model)
 
-        if kdu_name:
-            kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
-        else:
-            kdu_instance = db_dict["filter"]["_id"]
-
         self.log.debug("Checking for model named {}".format(kdu_instance))
 
         # Create the new model
@@ -456,8 +568,7 @@ class K8sJujuConnector(K8sConnector):
         #     await model.disconnect()
         # await controller.disconnect()
         os.chdir(previous_workdir)
-
-        return kdu_instance
+        return True
 
     async def instances_list(self, cluster_uuid: str) -> list:
         """
@@ -1243,3 +1354,132 @@ class K8sJujuConnector(K8sConnector):
     #         q_filter={"_id": "juju"},
     #         update_dict={"k8sclusters": k8sclusters},
     #     )
+
+    # Private methods to create/delete needed resources in the
+    # Kubernetes cluster to create the K8s cloud in Juju
+
+    def _create_cluster_role(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception(
+                "Cluster role with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    def _delete_cluster_role(self, kubectl: Kubectl, name: str):
+        kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def _create_service_account(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
+            ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+        service_account = V1ServiceAccount(metadata=metadata)
+
+        kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
+            ADMIN_NAMESPACE, service_account
+        )
+
+    def _delete_service_account(self, kubectl: Kubectl, name: str):
+        kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
+            name, ADMIN_NAMESPACE
+        )
+
+    def _create_cluster_role_binding(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[
+                V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
+            ],
+        )
+        kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
+        kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
+        v1_core = kubectl.clients[CORE_CLIENT]
+
+        retries_limit = 10
+        secret_name = None
+        while True:
+            retries_limit -= 1
+            service_accounts = v1_core.list_namespaced_service_account(
+                ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+            )
+            if len(service_accounts.items) == 0:
+                raise Exception(
+                    "Service account not found with metadata.name={}".format(name)
+                )
+            service_account = service_accounts.items[0]
+            if service_account.secrets and len(service_account.secrets) > 0:
+                secret_name = service_account.secrets[0].name
+            if secret_name is not None or not retries_limit:
+                break
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        secret = v1_core.list_namespaced_secret(
+            ADMIN_NAMESPACE,
+            field_selector="metadata.name={}".format(secret_name),
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )
+
+    @staticmethod
+    def generate_kdu_instance_name(**kwargs):
+        db_dict = kwargs.get("db_dict")
+        kdu_name = kwargs.get("kdu_name", None)
+        if kdu_name:
+            kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+        else:
+            kdu_instance = db_dict["filter"]["_id"]
+        return kdu_instance