from typing import Dict
import typing
import uuid
+import json
from distutils.version import LooseVersion
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
+ V1Role,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
+ V1RoleBinding,
V1RoleRef,
V1Subject,
V1Secret,
V1SecretReference,
+ V1Namespace,
)
from kubernetes.client.rest import ApiException
+from n2vc.libjuju import retry_callback
from retrying_async import retry
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
class Kubectl:
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
+ CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
)
if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
+ raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+ async def create_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ api_groups: list,
+ resources: list,
+ verbs: list,
+ namespace: str,
+ ):
+ """
+ Create a role with one PolicyRule
+
+ :param: name: Name of the namespaced Role
+ :param: labels: Labels for namespaced Role metadata
+ :param: api_groups: List with api-groups allowed in the policy rule
+ :param: resources: List with resources allowed in the policy rule
+ :param: verbs: List with verbs allowed in the policy rule
+ :param: namespace: Kubernetes namespace for Role metadata
+
+ :return: None
+ """
+
+ roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+
+ if len(roles.items) > 0:
+ raise Exception("Role with metadata.name={} already exists".format(name))
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+ async def create_role_binding(
+ self,
+ name: str,
+ role_name: str,
+ sa_name: str,
+ labels: Dict[str, str],
+ namespace: str,
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the namespaced Role Binding
+ :param: role_name: Name of the namespaced Role to be bound
+ :param: sa_name: Name of the Service Account to be bound
+ :param: labels: Labels for Role Binding metadata
+ :param: namespace: Kubernetes namespace for Role Binding metadata
+
+ :return: None
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception(
+ "Role Binding with metadata.name={} already exists".format(name)
+ )
+
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+ ],
+ )
+ self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+ namespace, role_binding
+ )
+
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
attempts=10,
delay=1,
fallback=Exception("Failed getting the secret from service account"),
+ callback=retry_callback,
)
async def get_secret_data(
self, name: str, namespace: str = "kube-system"
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
+ # TODO: refactor to use get_secret_content
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting data from the secret"),
+ )
+ async def get_secret_content(
+ self,
+ name: str,
+ namespace: str,
+ ) -> dict:
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Dictionary with secret's data
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret = v1_core.read_namespaced_secret(name, namespace)
+
+ return secret.data
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret(
+ self, name: str, data: dict, namespace: str, secret_type: str
+ ):
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: data: Dict with data content. Values must be already base64 encoded
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ async def create_certificate(
+ self,
+ namespace: str,
+ name: str,
+ dns_prefix: str,
+ secret_name: str,
+ usages: list,
+ issuer_name: str,
+ ):
+ """
+ Creates cert-manager certificate object
+
+ :param: namespace: Name of the namespace where the certificate and secret is stored
+ :param: name: Name of the certificate object
+ :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+ :param: secret_name: Name of the secret created by cert-manager
+ :param: usages: List of X.509 key usages
+ :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
+
+ """
+ certificate_body = {
+ "apiVersion": "cert-manager.io/v1",
+ "kind": "Certificate",
+ "metadata": {"name": name, "namespace": namespace},
+ "spec": {
+ "secretName": secret_name,
+ "privateKey": {
+ "rotationPolicy": "Always",
+ "algorithm": "ECDSA",
+ "size": 256,
+ },
+ "duration": "8760h", # 1 Year
+ "renewBefore": "2208h", # 9 months
+ "subject": {"organizations": ["osm"]},
+ "commonName": "osm",
+ "isCA": False,
+ "usages": usages,
+ "dnsNames": [
+ "{}.{}".format(dns_prefix, namespace),
+ "{}.{}.svc".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+ ],
+ "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+ },
+ }
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Certificate already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_certificate(self, namespace, object_name):
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ name=object_name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Certificate already deleted: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the namespace"),
+ )
+ async def create_namespace(self, name: str, labels: dict = None):
+ """
+ Create a namespace
+
+ :param: name: Name of the namespace to be created
+ :param: labels: Dictionary with labels for the new namespace
+
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, labels=labels)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+
+ try:
+ v1_core.create_namespace(namespace)
+ self.logger.debug("Namespace created: {}".format(name))
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Namespace already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the namespace"),
+ )
+ async def delete_namespace(self, name: str):
+ """
+ Delete a namespace
+
+ :param: name: Name of the namespace to be deleted
+
+ """
+ try:
+ self.clients[CORE_CLIENT].delete_namespace(name)
+ except ApiException as e:
+ if e.reason == "Not Found":
+ self.logger.warning("Namespace already deleted: {}".format(e))