import uuid
import yaml
import tempfile
+import binascii
+import base64
from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
+from kubernetes.client.models import (
+ V1ClusterRole,
+ V1ObjectMeta,
+ V1PolicyRule,
+ V1ServiceAccount,
+ V1ClusterRoleBinding,
+ V1RoleRef,
+ V1Subject,
+)
+
+from typing import Dict
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
+ADMIN_NAMESPACE = "kube-system"
+RBAC_STACK_PREFIX = "juju-credential"
# from juju.bundle import BundleHandler
# import re
# import ssl
# from .vnf import N2VC
+
+
+def generate_rbac_id():
+ return binascii.hexlify(os.urandom(4)).decode()
+
+
class K8sJujuConnector(K8sConnector):
def __init__(
self,
# Store the cluster configuration so it
# can be used for subsequent calls
-
kubecfg = tempfile.NamedTemporaryFile()
with open(kubecfg.name, "w") as kubecfg_file:
kubecfg_file.write(k8s_creds)
kubectl = Kubectl(config_file=kubecfg.name)
- configuration = kubectl.get_configuration()
- default_storage_class = kubectl.get_default_storage_class()
- await self.libjuju.add_k8s(
- name=cluster_uuid,
- configuration=configuration,
- storage_class=default_storage_class,
- credential_name=self._get_credential_name(cluster_uuid),
- )
- # self.log.debug("Setting config")
- # await self.set_config(cluster_uuid, config)
- # Test connection
- # controller = await self.get_controller(cluster_uuid)
- # await controller.disconnect()
+ # CREATING RESOURCES IN K8S
+ rbac_id = generate_rbac_id()
+ metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+ labels = {RBAC_STACK_PREFIX: rbac_id}
- # TODO: Remove these commented lines
- # raise Exception("EOL")
- # self.juju_public_key = None
- # Login to the k8s cluster
- # if not self.authenticated:
- # await self.login(cluster_uuid)
+ # Create cleanup dictionary to clean up created resources
+ # if it fails in the middle of the process
+ cleanup_data = []
+ try:
+ self._create_cluster_role(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_cluster_role,
+ "args": (kubectl, metadata_name),
+ }
+ )
- # We're creating a new cluster
- # print("Getting model {}".format(self.get_namespace(cluster_uuid),
- # cluster_uuid=cluster_uuid))
- # model = await self.get_model(
- # self.get_namespace(cluster_uuid),
- # cluster_uuid=cluster_uuid
- # )
+ self._create_service_account(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_service_account,
+ "args": (kubectl, metadata_name),
+ }
+ )
+
+ self._create_cluster_role_binding(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_service_account,
+ "args": (kubectl, metadata_name),
+ }
+ )
+ token, client_cert_data = await self._get_secret_data(
+ kubectl,
+ metadata_name,
+ )
- # Disconnect from the model
- # if model and model.is_connected():
- # await model.disconnect()
+ default_storage_class = kubectl.get_default_storage_class()
+ await self.libjuju.add_k8s(
+ name=cluster_uuid,
+ rbac_id=rbac_id,
+ token=token,
+ client_cert_data=client_cert_data,
+ configuration=kubectl.configuration,
+ storage_class=default_storage_class,
+ credential_name=self._get_credential_name(cluster_uuid),
+ )
+ # self.log.debug("Setting config")
+ # await self.set_config(cluster_uuid, config)
+
+ # Test connection
+ # controller = await self.get_controller(cluster_uuid)
+ # await controller.disconnect()
+
+ # TODO: Remove these commented lines
+ # raise Exception("EOL")
+ # self.juju_public_key = None
+ # Login to the k8s cluster
+ # if not self.authenticated:
+ # await self.login(cluster_uuid)
+
+ # We're creating a new cluster
+ # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid))
+ # model = await self.get_model(
+ # self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid
+ # )
- return cluster_uuid, True
+ # Disconnect from the model
+ # if model and model.is_connected():
+ # await model.disconnect()
+
+ return cluster_uuid, True
+ except Exception as e:
+ self.log.error("Error initializing k8scluster: {}".format(e))
+ if len(cleanup_data) > 0:
+ self.log.debug("Cleaning up created resources in k8s cluster...")
+ for item in cleanup_data:
+ delete_function = item["delete"]
+ delete_args = item["args"]
+ delete_function(*delete_args)
+ self.log.debug("Cleanup finished")
+ raise e
"""Repo Management"""
"""
try:
-
# Remove k8scluster from database
# self.log.debug("[reset] Removing k8scluster from juju database")
# juju_db = self.db.get_one("admin", {"_id": "juju"})
# Destroy the controller (via CLI)
# self.log.debug("[reset] Destroying controller")
# await self.destroy_controller(cluster_uuid)
-
self.log.debug("[reset] Removing k8s cloud")
# k8s_cloud = "k8s-{}".format(cluster_uuid)
# await self.remove_cloud(k8s_cloud)
+
+ cloud_creds = await self.libjuju.get_cloud_credentials(
+ cluster_uuid,
+ self._get_credential_name(cluster_uuid),
+ )
+
await self.libjuju.remove_cloud(cluster_uuid)
+ kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+
+ kubecfg_file = tempfile.NamedTemporaryFile()
+ with open(kubecfg_file.name, "w") as f:
+ f.write(kubecfg)
+ kubectl = Kubectl(config_file=kubecfg_file.name)
+
+ delete_functions = [
+ self._delete_cluster_role_binding,
+ self._delete_service_account,
+ self._delete_cluster_role,
+ ]
+
+ credential_attrs = cloud_creds[0].result["attrs"]
+ if RBAC_LABEL_KEY_NAME in credential_attrs:
+ rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
+ metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+ delete_args = (kubectl, metadata_name)
+ for delete_func in delete_functions:
+ try:
+ delete_func(*delete_args)
+ except Exception as e:
+ self.log.warning("Cannot remove resource in K8s {}".format(e))
+
except Exception as e:
self.log.debug("Caught exception during reset: {}".format(e))
raise e
# q_filter={"_id": "juju"},
# update_dict={"k8sclusters": k8sclusters},
# )
+
+ # Private methods to create/delete needed resources in the
+ # Kubernetes cluster to create the K8s cloud in Juju
+
+ def _create_cluster_role(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
+ field_selector="metadata.name={}".format(name)
+ )
+
+ if len(cluster_roles.items) > 0:
+ raise Exception(
+ "Cluster role with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ # Cluster role
+ cluster_role = V1ClusterRole(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+ V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+ ],
+ )
+
+ kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+ def _delete_cluster_role(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+ def _create_service_account(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) > 0:
+ raise Exception(
+ "Service account with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ service_account = V1ServiceAccount(metadata=metadata)
+
+ kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
+ ADMIN_NAMESPACE, service_account
+ )
+
+ def _delete_service_account(self, kubectl: Kubectl, name: str):
+ kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
+ name, ADMIN_NAMESPACE
+ )
+
+ def _create_cluster_role_binding(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
+ field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception("Generated rbac id already exists")
+
+ role_binding = V1ClusterRoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
+ ],
+ )
+ kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+ def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+ async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
+ v1_core = kubectl.clients[CORE_CLIENT]
+
+ retries_limit = 10
+ secret_name = None
+ while True:
+ retries_limit -= 1
+ service_accounts = v1_core.list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) == 0:
+ raise Exception(
+ "Service account not found with metadata.name={}".format(name)
+ )
+ service_account = service_accounts.items[0]
+ if service_account.secrets and len(service_account.secrets) > 0:
+ secret_name = service_account.secrets[0].name
+ if secret_name is not None or not retries_limit:
+ break
+ if not secret_name:
+ raise Exception(
+ "Failed getting the secret from service account {}".format(name)
+ )
+ secret = v1_core.list_namespaced_secret(
+ ADMIN_NAMESPACE,
+ field_selector="metadata.name={}".format(secret_name),
+ ).items[0]
+
+ token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+ client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+ return (
+ base64.b64decode(token).decode("utf-8"),
+ base64.b64decode(client_certificate_data).decode("utf-8"),
+ )