X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=d6ca09a6e64006d7cc4b89d5bd242e26cdd7fee4;hp=8b8008efa1bf18ef8ea723113efcf8ce9fe19593;hb=HEAD;hpb=085fa8d4658a9b621354d5a08853086e2696abdc diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 8b8008e..c16c95a 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -17,6 +17,7 @@ import logging from typing import Dict import typing import uuid +import json from distutils.version import LooseVersion @@ -24,16 +25,20 @@ from kubernetes import client, config from kubernetes.client.api import VersionApi from kubernetes.client.models import ( V1ClusterRole, + V1Role, V1ObjectMeta, V1PolicyRule, V1ServiceAccount, V1ClusterRoleBinding, + V1RoleBinding, V1RoleRef, V1Subject, V1Secret, V1SecretReference, + V1Namespace, ) from kubernetes.client.rest import ApiException +from n2vc.libjuju import retry_callback from retrying_async import retry @@ -43,6 +48,7 @@ SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" CORE_CLIENT = "core_v1" RBAC_CLIENT = "rbac_v1" STORAGE_CLIENT = "storage_v1" +CUSTOM_OBJECT_CLIENT = "custom_object" class Kubectl: @@ -52,6 +58,7 @@ class Kubectl: CORE_CLIENT: client.CoreV1Api(), RBAC_CLIENT: client.RbacAuthorizationV1Api(), STORAGE_CLIENT: client.StorageV1Api(), + CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), } self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") @@ -159,9 +166,7 @@ class Kubectl: ) if len(cluster_roles.items) > 0: - raise Exception( - "Cluster role with metadata.name={} already exists".format(name) - ) + raise Exception("Role with metadata.name={} already exists".format(name)) metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) # Cluster role @@ -175,6 +180,46 @@ class Kubectl: self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + async def create_role( + self, + name: str, + labels: Dict[str, str], + api_groups: list, + resources: list, + verbs: list, + namespace: str, + ): + """ + Create a role with one PolicyRule + + :param: name: Name of the namespaced Role + :param: labels: Labels for namespaced Role metadata + :param: api_groups: List with api-groups allowed in the policy rule + :param: resources: List with resources allowed in the policy rule + :param: verbs: List with verbs allowed in the policy rule + :param: namespace: Kubernetes namespace for Role metadata + + :return: None + """ + + roles = self.clients[RBAC_CLIENT].list_namespaced_role( + namespace, field_selector="metadata.name={}".format(name) + ) + + if len(roles.items) > 0: + raise Exception("Role with metadata.name={} already exists".format(name)) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), + ], + ) + + self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) + def delete_cluster_role(self, name: str): """ Delete a cluster role @@ -304,6 +349,44 @@ class Kubectl: ) self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + async def create_role_binding( + self, + name: str, + role_name: str, + sa_name: str, + labels: Dict[str, str], + namespace: str, + ): + """ + Create a cluster role binding + + :param: name: Name of the namespaced Role Binding + :param: role_name: Name of the namespaced Role to be bound + :param: sa_name: Name of the Service Account to be bound + :param: labels: Labels for Role Binding metadata + :param: namespace: Kubernetes namespace for Role Binding metadata + + :return: None + """ + role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception( + "Role Binding with metadata.name={} already exists".format(name) + ) + + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), + subjects=[ + V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) + ], + ) + self.clients[RBAC_CLIENT].create_namespaced_role_binding( + namespace, role_binding + ) + def delete_cluster_role_binding(self, name: str): """ Delete a cluster role binding @@ -316,6 +399,7 @@ class Kubectl: attempts=10, delay=1, fallback=Exception("Failed getting the secret from service account"), + callback=retry_callback, ) async def get_secret_data( self, name: str, namespace: str = "kube-system" @@ -346,6 +430,7 @@ class Kubectl: raise Exception( "Failed getting the secret from service account {}".format(name) ) + # TODO: refactor to use get_secret_content secret = v1_core.list_namespaced_secret( namespace, field_selector="metadata.name={}".format(secret_name) ).items[0] @@ -357,3 +442,176 @@ class Kubectl: base64.b64decode(token).decode("utf-8"), base64.b64decode(client_certificate_data).decode("utf-8"), ) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting data from the secret"), + ) + async def get_secret_content( + self, + name: str, + namespace: str, + ) -> dict: + """ + Get secret data + + :param: name: Name of the secret + :param: namespace: Name of the namespace where the secret is stored + + :return: Dictionary with secret's data + """ + v1_core = self.clients[CORE_CLIENT] + + secret = v1_core.read_namespaced_secret(name, namespace) + + return secret.data + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the secret"), + ) + async def create_secret( + self, name: str, data: dict, namespace: str, secret_type: str + ): + """ + Get secret data + + :param: name: Name of the secret + :param: data: Dict with data content. Values must be already base64 encoded + :param: namespace: Name of the namespace where the secret will be stored + :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls + + :return: None + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret(metadata=metadata, data=data, type=secret_type) + v1_core.create_namespaced_secret(namespace, secret) + + async def create_certificate( + self, + namespace: str, + name: str, + dns_prefix: str, + secret_name: str, + usages: list, + issuer_name: str, + ): + """ + Creates cert-manager certificate object + + :param: namespace: Name of the namespace where the certificate and secret is stored + :param: name: Name of the certificate object + :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes + :param: secret_name: Name of the secret created by cert-manager + :param: usages: List of X.509 key usages + :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object + + """ + certificate_body = { + "apiVersion": "cert-manager.io/v1", + "kind": "Certificate", + "metadata": {"name": name, "namespace": namespace}, + "spec": { + "secretName": secret_name, + "privateKey": { + "rotationPolicy": "Always", + "algorithm": "ECDSA", + "size": 256, + }, + "duration": "8760h", # 1 Year + "renewBefore": "2208h", # 9 months + "subject": {"organizations": ["osm"]}, + "commonName": "osm", + "isCA": False, + "usages": usages, + "dnsNames": [ + "{}.{}".format(dns_prefix, namespace), + "{}.{}.svc".format(dns_prefix, namespace), + "{}.{}.svc.cluster".format(dns_prefix, namespace), + "{}.{}.svc.cluster.local".format(dns_prefix, namespace), + ], + "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, + }, + } + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.create_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + body=certificate_body, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Certificate already exists: {}".format(e)) + else: + raise e + + async def delete_certificate(self, namespace, object_name): + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.delete_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + name=object_name, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning("Certificate already deleted: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the namespace"), + ) + async def create_namespace(self, name: str, labels: dict = None): + """ + Create a namespace + + :param: name: Name of the namespace to be created + :param: labels: Dictionary with labels for the new namespace + + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, labels=labels) + namespace = V1Namespace( + metadata=metadata, + ) + + try: + v1_core.create_namespace(namespace) + self.logger.debug("Namespace created: {}".format(name)) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Namespace already exists: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed deleting the namespace"), + ) + async def delete_namespace(self, name: str): + """ + Delete a namespace + + :param: name: Name of the namespace to be deleted + + """ + try: + self.clients[CORE_CLIENT].delete_namespace(name) + except ApiException as e: + if e.reason == "Not Found": + self.logger.warning("Namespace already deleted: {}".format(e))