from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
class K8sHelmBaseConnector(K8sConnector):
break # it is not necessary to continue the loop if the repo link was found...
return repo_url
+
+ async def create_certificate(
+ self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+ ):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_certificate(
+ namespace=namespace,
+ name=name,
+ dns_prefix=dns_prefix,
+ secret_name=secret_name,
+ usages=[usage],
+ issuer_name="ca-issuer",
+ )
+
+ async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_certificate(namespace, certificate_name)
from typing import Dict
import typing
import uuid
+import json
from distutils.version import LooseVersion
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
class Kubectl:
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
+ CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ async def create_certificate(
+ self,
+ namespace: str,
+ name: str,
+ dns_prefix: str,
+ secret_name: str,
+ usages: list,
+ issuer_name: str,
+ ):
+ """
+ Creates cert-manager certificate object
+
+ :param: namespace: Name of the namespace where the certificate and secret is stored
+ :param: name: Name of the certificate object
+ :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+ :param: secret_name: Name of the secret created by cert-manager
+ :param: usages: List of X.509 key usages
+ :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
+
+ """
+ certificate_body = {
+ "apiVersion": "cert-manager.io/v1",
+ "kind": "Certificate",
+ "metadata": {"name": name, "namespace": namespace},
+ "spec": {
+ "secretName": secret_name,
+ "privateKey": {
+ "rotationPolicy": "Always",
+ "algorithm": "ECDSA",
+ "size": 256,
+ },
+ "duration": "8760h", # 1 Year
+ "renewBefore": "2208h", # 9 months
+ "subject": {"organizations": ["osm"]},
+ "commonName": "osm",
+ "isCA": False,
+ "usages": usages,
+ "dnsNames": [
+ "{}.{}".format(dns_prefix, namespace),
+ "{}.{}.svc".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+ ],
+ "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+ },
+ }
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Certificate already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_certificate(self, namespace, object_name):
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ name=object_name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Certificate already deleted: {}".format(e))
+ else:
+ raise e
# See the License for the specific language governing permissions and
# limitations under the License.
+import asynctest
+import yaml
+import os
from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl, CORE_CLIENT
+from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT
from n2vc.utils import Dict
from kubernetes.client.rest import ApiException
from kubernetes.client import (
)
mock_create_service_account.assert_called()
mock_create_secret.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
+class CreateCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.name = "test-cert"
+ self.dns_prefix = "*"
+ self.secret_name = "test-cert-secret"
+ self.usages = ["server auth"]
+ self.issuer_name = "ca-issuer"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_certificate_is_created(
+ self,
+ mock_create_certificate,
+ ):
+ with open(
+ os.path.join(
+ os.path.dirname(__file__), "testdata", "test_certificate.yaml"
+ ),
+ "r",
+ ) as test_certificate:
+ certificate_body = yaml.safe_load(test_certificate.read())
+ print(certificate_body)
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ mock_create_certificate.assert_called_once_with(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=self.namespace,
+ )
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_alreadyexists(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "AlreadyExists"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
+class DeleteCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(DeleteCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.object_name = "test-cert"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_notfound(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "NotFound"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )
--- /dev/null
+# Copyright 2022 Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: cert-manager.io/v1
+kind: Certificate
+metadata:
+ name: test-cert
+ namespace: osm
+spec:
+ secretName: test-cert-secret
+ privateKey:
+ rotationPolicy: Always
+ algorithm: ECDSA
+ size: 256
+ duration: 8760h
+ renewBefore: 2208h
+ subject:
+ organizations:
+ - osm
+ commonName: osm
+ isCA: false
+ usages:
+ - server auth
+ dnsNames:
+ - "*.osm"
+ - "*.osm.svc"
+ - "*.osm.svc.cluster"
+ - "*.osm.svc.cluster.local"
+ issuerRef:
+ name: ca-issuer
+ kind: ClusterIssuer