Feature 10947: Add methods to create certificates
[osm/N2VC.git] / n2vc / kubectl.py
index 8b8008e..d6ca09a 100644 (file)
@@ -17,6 +17,7 @@ import logging
 from typing import Dict
 import typing
 import uuid
+import json
 
 from distutils.version import LooseVersion
 
@@ -43,6 +44,7 @@ SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
 CORE_CLIENT = "core_v1"
 RBAC_CLIENT = "rbac_v1"
 STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
 
 
 class Kubectl:
@@ -52,6 +54,7 @@ class Kubectl:
             CORE_CLIENT: client.CoreV1Api(),
             RBAC_CLIENT: client.RbacAuthorizationV1Api(),
             STORAGE_CLIENT: client.StorageV1Api(),
+            CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
         }
         self._configuration = config.kube_config.Configuration.get_default_copy()
         self.logger = logging.getLogger("Kubectl")
@@ -357,3 +360,82 @@ class Kubectl:
             base64.b64decode(token).decode("utf-8"),
             base64.b64decode(client_certificate_data).decode("utf-8"),
         )
+
+    async def create_certificate(
+        self,
+        namespace: str,
+        name: str,
+        dns_prefix: str,
+        secret_name: str,
+        usages: list,
+        issuer_name: str,
+    ):
+        """
+        Creates cert-manager certificate object
+
+        :param: namespace:       Name of the namespace where the certificate and secret is stored
+        :param: name:            Name of the certificate object
+        :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+        :param: secret_name:     Name of the secret created by cert-manager
+        :param: usages:          List of X.509 key usages
+        :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
+
+        """
+        certificate_body = {
+            "apiVersion": "cert-manager.io/v1",
+            "kind": "Certificate",
+            "metadata": {"name": name, "namespace": namespace},
+            "spec": {
+                "secretName": secret_name,
+                "privateKey": {
+                    "rotationPolicy": "Always",
+                    "algorithm": "ECDSA",
+                    "size": 256,
+                },
+                "duration": "8760h",  # 1 Year
+                "renewBefore": "2208h",  # 9 months
+                "subject": {"organizations": ["osm"]},
+                "commonName": "osm",
+                "isCA": False,
+                "usages": usages,
+                "dnsNames": [
+                    "{}.{}".format(dns_prefix, namespace),
+                    "{}.{}.svc".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster".format(dns_prefix, namespace),
+                    "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+                ],
+                "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+            },
+        }
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.create_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                body=certificate_body,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Certificate already exists: {}".format(e))
+            else:
+                raise e
+
+    async def delete_certificate(self, namespace, object_name):
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.delete_namespaced_custom_object(
+                group="cert-manager.io",
+                plural="certificates",
+                version="v1",
+                name=object_name,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning("Certificate already deleted: {}".format(e))
+            else:
+                raise e