import yaml
import tempfile
import binascii
-import base64
from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
from n2vc.libjuju import Libjuju
from n2vc.utils import obj_to_dict, obj_to_yaml
from n2vc.store import MotorStore
from n2vc.vca.cloud import Cloud
from n2vc.vca.connection import get_connection
-from kubernetes.client.models import (
- V1ClusterRole,
- V1ObjectMeta,
- V1PolicyRule,
- V1ServiceAccount,
- V1ClusterRoleBinding,
- V1RoleRef,
- V1Subject,
-)
-
-from typing import Dict
-
-SERVICE_ACCOUNT_TOKEN_KEY = "token"
-SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
-RBAC_LABEL_KEY_NAME = "rbac-id"
-ADMIN_NAMESPACE = "kube-system"
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
RBAC_STACK_PREFIX = "juju-credential"
libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(k8s_creds)
- kubectl = Kubectl(config_file=kubecfg.name)
+ kubectl = self._get_kubectl(k8s_creds)
# CREATING RESOURCES IN K8S
rbac_id = generate_rbac_id()
# if it fails in the middle of the process
cleanup_data = []
try:
- self._create_cluster_role(
- kubectl,
+ kubectl.create_cluster_role(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_cluster_role,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_cluster_role,
+ "args": (metadata_name),
}
)
- self._create_service_account(
- kubectl,
+ kubectl.create_service_account(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name),
}
)
- self._create_cluster_role_binding(
- kubectl,
+ kubectl.create_cluster_role_binding(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name),
}
)
- token, client_cert_data = await self._get_secret_data(
- kubectl,
+ token, client_cert_data = await kubectl.get_secret_data(
metadata_name,
)
await libjuju.remove_cloud(cluster_uuid)
- kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+ credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- kubecfg_file = tempfile.NamedTemporaryFile()
- with open(kubecfg_file.name, "w") as f:
- f.write(kubecfg)
- kubectl = Kubectl(config_file=kubecfg_file.name)
+ kubectl = self._get_kubectl(credentials)
delete_functions = [
- self._delete_cluster_role_binding,
- self._delete_service_account,
- self._delete_cluster_role,
+ kubectl.delete_cluster_role_binding,
+ kubectl.delete_service_account,
+ kubectl.delete_cluster_role,
]
credential_attrs = cloud_creds[0].result["attrs"]
if RBAC_LABEL_KEY_NAME in credential_attrs:
rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
- delete_args = (kubectl, metadata_name)
for delete_func in delete_functions:
try:
- delete_func(*delete_args)
+ delete_func(metadata_name)
except Exception as e:
self.log.warning("Cannot remove resource in K8s {}".format(e))
"""Return a list of services of a kdu_instance"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.namespace={}".format(kdu_instance)
)
"""Return data for a specific service inside a namespace"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.name={},metadata.namespace={}".format(
service_name, namespace
"""
pass
- def _create_cluster_role(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
-
- if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
-
- kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
- def _delete_cluster_role(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
- def _create_service_account(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- service_account = V1ServiceAccount(metadata=metadata)
-
- kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
- ADMIN_NAMESPACE, service_account
- )
-
- def _delete_service_account(self, kubectl: Kubectl, name: str):
- kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
- name, ADMIN_NAMESPACE
- )
-
- def _create_cluster_role_binding(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
-
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
- ],
- )
- kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
- def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
- async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
- v1_core = kubectl.clients[CORE_CLIENT]
-
- retries_limit = 10
- secret_name = None
- while True:
- retries_limit -= 1
- service_accounts = v1_core.list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) == 0:
- raise Exception(
- "Service account not found with metadata.name={}".format(name)
- )
- service_account = service_accounts.items[0]
- if service_account.secrets and len(service_account.secrets) > 0:
- secret_name = service_account.secrets[0].name
- if secret_name is not None or not retries_limit:
- break
- if not secret_name:
- raise Exception(
- "Failed getting the secret from service account {}".format(name)
- )
- secret = v1_core.list_namespaced_secret(
- ADMIN_NAMESPACE,
- field_selector="metadata.name={}".format(secret_name),
- ).items[0]
-
- token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
- client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
- return (
- base64.b64decode(token).decode("utf-8"),
- base64.b64decode(client_certificate_data).decode("utf-8"),
- )
-
@staticmethod
def generate_kdu_instance_name(**kwargs):
db_dict = kwargs.get("db_dict")
log=self.log,
n2vc=self,
)
+
+ def _get_kubectl(self, credentials: str) -> Kubectl:
+ """
+ Get Kubectl object
+
+ :param: kubeconfig_credentials: Kubeconfig credentials
+ """
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ return Kubectl(config_file=kubecfg.name)