X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fkubectl.py;fp=n2vc%2Fkubectl.py;h=d6ca09a6e64006d7cc4b89d5bd242e26cdd7fee4;hp=8b8008efa1bf18ef8ea723113efcf8ce9fe19593;hb=fb03e9084403d6fc2adf427a371ff9827f3c1238;hpb=085fa8d4658a9b621354d5a08853086e2696abdc diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 8b8008e..d6ca09a 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -17,6 +17,7 @@ import logging from typing import Dict import typing import uuid +import json from distutils.version import LooseVersion @@ -43,6 +44,7 @@ SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" CORE_CLIENT = "core_v1" RBAC_CLIENT = "rbac_v1" STORAGE_CLIENT = "storage_v1" +CUSTOM_OBJECT_CLIENT = "custom_object" class Kubectl: @@ -52,6 +54,7 @@ class Kubectl: CORE_CLIENT: client.CoreV1Api(), RBAC_CLIENT: client.RbacAuthorizationV1Api(), STORAGE_CLIENT: client.StorageV1Api(), + CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), } self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") @@ -357,3 +360,82 @@ class Kubectl: base64.b64decode(token).decode("utf-8"), base64.b64decode(client_certificate_data).decode("utf-8"), ) + + async def create_certificate( + self, + namespace: str, + name: str, + dns_prefix: str, + secret_name: str, + usages: list, + issuer_name: str, + ): + """ + Creates cert-manager certificate object + + :param: namespace: Name of the namespace where the certificate and secret is stored + :param: name: Name of the certificate object + :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes + :param: secret_name: Name of the secret created by cert-manager + :param: usages: List of X.509 key usages + :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object + + """ + certificate_body = { + "apiVersion": "cert-manager.io/v1", + "kind": "Certificate", + "metadata": {"name": name, "namespace": namespace}, + "spec": { + "secretName": secret_name, + "privateKey": { + "rotationPolicy": "Always", + "algorithm": "ECDSA", + "size": 256, + }, + "duration": "8760h", # 1 Year + "renewBefore": "2208h", # 9 months + "subject": {"organizations": ["osm"]}, + "commonName": "osm", + "isCA": False, + "usages": usages, + "dnsNames": [ + "{}.{}".format(dns_prefix, namespace), + "{}.{}.svc".format(dns_prefix, namespace), + "{}.{}.svc.cluster".format(dns_prefix, namespace), + "{}.{}.svc.cluster.local".format(dns_prefix, namespace), + ], + "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, + }, + } + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.create_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + body=certificate_body, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Certificate already exists: {}".format(e)) + else: + raise e + + async def delete_certificate(self, namespace, object_name): + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.delete_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + name=object_name, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning("Certificate already deleted: {}".format(e)) + else: + raise e