# See the License for the specific language governing permissions and
# limitations under the License.
+import asynctest
+import yaml
+import os
from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl, CORE_CLIENT
+from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT
from n2vc.utils import Dict
from kubernetes.client.rest import ApiException
from kubernetes.client import (
)
mock_create_service_account.assert_called()
mock_create_secret.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
+class CreateCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.name = "test-cert"
+ self.dns_prefix = "*"
+ self.secret_name = "test-cert-secret"
+ self.usages = ["server auth"]
+ self.issuer_name = "ca-issuer"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_certificate_is_created(
+ self,
+ mock_create_certificate,
+ ):
+ with open(
+ os.path.join(
+ os.path.dirname(__file__), "testdata", "test_certificate.yaml"
+ ),
+ "r",
+ ) as test_certificate:
+ certificate_body = yaml.safe_load(test_certificate.read())
+ print(certificate_body)
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ mock_create_certificate.assert_called_once_with(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=self.namespace,
+ )
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_alreadyexists(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "AlreadyExists"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
+class DeleteCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(DeleteCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.object_name = "test-cert"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_notfound(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "NotFound"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )