import logging
from typing import Dict
import typing
+import uuid
+import json
+from distutils.version import LooseVersion
from kubernetes import client, config
+from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
V1ClusterRoleBinding,
V1RoleRef,
V1Subject,
+ V1Secret,
+ V1SecretReference,
)
from kubernetes.client.rest import ApiException
from retrying_async import retry
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
class Kubectl:
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
+ CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
"""
self.clients[RBAC_CLIENT].delete_cluster_role(name)
+ def _get_kubectl_version(self):
+ version = VersionApi().get_code()
+ return "{}.{}".format(version.major, version.minor)
+
+ def _need_to_create_new_secret(self):
+ min_k8s_version = "1.24"
+ current_k8s_version = self._get_kubectl_version()
+ return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
+
+ def _get_secret_name(self, service_account_name: str):
+ random_alphanum = str(uuid.uuid4())[:5]
+ return "{}-token-{}".format(service_account_name, random_alphanum)
+
+ def _create_service_account_secret(
+ self, service_account_name: str, namespace: str, secret_name: str
+ ):
+ """
+ Create a secret for the service account. K8s version >= 1.24
+
+ :param: service_account_name: Name of the service account
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ secrets = v1_core.list_namespaced_secret(
+ namespace, field_selector="metadata.name={}".format(secret_name)
+ ).items
+
+ if len(secrets) > 0:
+ raise Exception(
+ "Secret with metadata.name={} already exists".format(secret_name)
+ )
+
+ annotations = {"kubernetes.io/service-account.name": service_account_name}
+ metadata = V1ObjectMeta(
+ name=secret_name, namespace=namespace, annotations=annotations
+ )
+ type = "kubernetes.io/service-account-token"
+ secret = V1Secret(metadata=metadata, type=type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ def _get_secret_reference_list(self, namespace: str, secret_name: str):
+ """
+ Return a secret reference list with one secret.
+ K8s version >= 1.24
+
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ :rtype: list[V1SecretReference]
+ """
+ return [V1SecretReference(name=secret_name, namespace=namespace)]
+
def create_service_account(
self,
name: str,
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
- service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
+ v1_core = self.clients[CORE_CLIENT]
+ service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) > 0:
)
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
- service_account = V1ServiceAccount(metadata=metadata)
- self.clients[CORE_CLIENT].create_namespaced_service_account(
- namespace, service_account
- )
+ if self._need_to_create_new_secret():
+ secret_name = self._get_secret_name(name)
+ secrets = self._get_secret_reference_list(namespace, secret_name)
+ service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
+ v1_core.create_namespaced_service_account(namespace, service_account)
+ self._create_service_account_secret(name, namespace, secret_name)
+ else:
+ service_account = V1ServiceAccount(metadata=metadata)
+ v1_core.create_namespaced_service_account(namespace, service_account)
def delete_service_account(self, name: str, namespace: str = "kube-system"):
"""
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ async def create_certificate(
+ self,
+ namespace: str,
+ name: str,
+ dns_prefix: str,
+ secret_name: str,
+ usages: list,
+ issuer_name: str,
+ ):
+ """
+ Creates cert-manager certificate object
+
+ :param: namespace: Name of the namespace where the certificate and secret is stored
+ :param: name: Name of the certificate object
+ :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+ :param: secret_name: Name of the secret created by cert-manager
+ :param: usages: List of X.509 key usages
+ :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
+
+ """
+ certificate_body = {
+ "apiVersion": "cert-manager.io/v1",
+ "kind": "Certificate",
+ "metadata": {"name": name, "namespace": namespace},
+ "spec": {
+ "secretName": secret_name,
+ "privateKey": {
+ "rotationPolicy": "Always",
+ "algorithm": "ECDSA",
+ "size": 256,
+ },
+ "duration": "8760h", # 1 Year
+ "renewBefore": "2208h", # 9 months
+ "subject": {"organizations": ["osm"]},
+ "commonName": "osm",
+ "isCA": False,
+ "usages": usages,
+ "dnsNames": [
+ "{}.{}".format(dns_prefix, namespace),
+ "{}.{}.svc".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+ ],
+ "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+ },
+ }
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Certificate already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_certificate(self, namespace, object_name):
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ name=object_name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Certificate already deleted: {}".format(e))
+ else:
+ raise e