Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / tests / unit / test_kubectl.py
index e67168e..a6d02ff 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import asynctest
+import yaml
+import os
 from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl, CORE_CLIENT
+from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT
 from n2vc.utils import Dict
 from kubernetes.client.rest import ApiException
 from kubernetes.client import (
@@ -21,6 +24,12 @@ from kubernetes.client import (
     V1Secret,
     V1ServiceAccount,
     V1SecretReference,
+    V1Role,
+    V1RoleBinding,
+    V1RoleRef,
+    V1Subject,
+    V1PolicyRule,
+    V1Namespace,
 )
 
 
@@ -90,6 +99,24 @@ class FakeK8sSecretList:
         return self._items
 
 
+class FakeK8sRoleList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
+class FakeK8sRoleBindingList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
 class FakeK8sVersionApiCode:
     def __init__(self, major: str, minor: str):
         self._major = major
@@ -423,3 +450,405 @@ class CreateServiceAccountClass(KubectlTestCase):
         )
         mock_create_service_account.assert_called()
         mock_create_secret.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
+class CreateCertificateClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateCertificateClass, self).setUp()
+        self.namespace = "osm"
+        self.name = "test-cert"
+        self.dns_prefix = "*"
+        self.secret_name = "test-cert-secret"
+        self.usages = ["server auth"]
+        self.issuer_name = "ca-issuer"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_certificate_is_created(
+        self,
+        mock_create_certificate,
+    ):
+        with open(
+            os.path.join(
+                os.path.dirname(__file__), "testdata", "test_certificate.yaml"
+            ),
+            "r",
+        ) as test_certificate:
+            certificate_body = yaml.safe_load(test_certificate.read())
+            print(certificate_body)
+        await self.kubectl.create_certificate(
+            namespace=self.namespace,
+            name=self.name,
+            dns_prefix=self.dns_prefix,
+            secret_name=self.secret_name,
+            usages=self.usages,
+            issuer_name=self.issuer_name,
+        )
+        mock_create_certificate.assert_called_once_with(
+            group="cert-manager.io",
+            plural="certificates",
+            version="v1",
+            body=certificate_body,
+            namespace=self.namespace,
+        )
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_alreadyexists(
+        self,
+        mock_create_certificate,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "AlreadyExists"}'
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].create_namespaced_custom_object.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.create_certificate(
+                namespace=self.namespace,
+                name=self.name,
+                dns_prefix=self.dns_prefix,
+                secret_name=self.secret_name,
+                usages=self.usages,
+                issuer_name=self.issuer_name,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_certificate,
+    ):
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].create_namespaced_custom_object.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.create_certificate(
+                namespace=self.namespace,
+                name=self.name,
+                dns_prefix=self.dns_prefix,
+                secret_name=self.secret_name,
+                usages=self.usages,
+                issuer_name=self.issuer_name,
+            )
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
+class DeleteCertificateClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(DeleteCertificateClass, self).setUp()
+        self.namespace = "osm"
+        self.object_name = "test-cert"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_notfound(
+        self,
+        mock_create_certificate,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "NotFound"}'
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].delete_namespaced_custom_object.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.delete_certificate(
+                namespace=self.namespace,
+                object_name=self.object_name,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_certificate,
+    ):
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].delete_namespaced_custom_object.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.delete_certificate(
+                namespace=self.namespace,
+                object_name=self.object_name,
+            )
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role")
+class CreateRoleClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateRoleClass, self).setUp()
+        self.name = "role"
+        self.namespace = "osm"
+        self.resources = ["*"]
+        self.api_groups = ["*"]
+        self.verbs = ["*"]
+        self.labels = {}
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_role(self, mock_create_role):
+        metadata = V1ObjectMeta(
+            name=self.name, labels=self.labels, namespace=self.namespace
+        )
+        role = V1Role(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(
+                    api_groups=self.api_groups,
+                    resources=self.resources,
+                    verbs=self.verbs,
+                ),
+            ],
+        )
+        await self.kubectl.create_role(
+            namespace=self.namespace,
+            api_groups=self.api_groups,
+            name=self.name,
+            resources=self.resources,
+            verbs=self.verbs,
+            labels=self.labels,
+        )
+        mock_create_role.assert_called_once_with(self.namespace, role)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_raise_exception_if_role_already_exists(
+        self,
+        mock_list_role,
+        mock_create_role,
+    ):
+        mock_list_role.return_value = FakeK8sRoleList(items=[1])
+        with self.assertRaises(Exception) as context:
+            await self.kubectl.create_role(
+                self.name,
+                self.labels,
+                self.api_groups,
+                self.resources,
+                self.verbs,
+                self.namespace,
+            )
+        self.assertTrue(
+            "Role with metadata.name={} already exists".format(self.name)
+            in str(context.exception)
+        )
+        mock_create_role.assert_not_called()
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding")
+class CreateRoleBindingClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateRoleBindingClass, self).setUp()
+        self.name = "rolebinding"
+        self.namespace = "osm"
+        self.role_name = "role"
+        self.sa_name = "Default"
+        self.labels = {}
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_role_binding(self, mock_create_role_binding):
+        role_binding = V1RoleBinding(
+            metadata=V1ObjectMeta(name=self.name, labels=self.labels),
+            role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""),
+            subjects=[
+                V1Subject(
+                    kind="ServiceAccount",
+                    name=self.sa_name,
+                    namespace=self.namespace,
+                )
+            ],
+        )
+        await self.kubectl.create_role_binding(
+            namespace=self.namespace,
+            role_name=self.role_name,
+            name=self.name,
+            sa_name=self.sa_name,
+            labels=self.labels,
+        )
+        mock_create_role_binding.assert_called_once_with(self.namespace, role_binding)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_raise_exception_if_role_binding_already_exists(
+        self,
+        mock_list_role_binding,
+        mock_create_role_binding,
+    ):
+        mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1])
+        with self.assertRaises(Exception) as context:
+            await self.kubectl.create_role_binding(
+                self.name,
+                self.role_name,
+                self.sa_name,
+                self.labels,
+                self.namespace,
+            )
+        self.assertTrue(
+            "Role Binding with metadata.name={} already exists".format(self.name)
+            in str(context.exception)
+        )
+        mock_create_role_binding.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+class CreateSecretClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateSecretClass, self).setUp()
+        self.name = "secret"
+        self.namespace = "osm"
+        self.data = {"test": "1234"}
+        self.secret_type = "Opaque"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def assert_create_secret(self, mock_create_secret):
+        secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+        secret = V1Secret(
+            metadata=secret_metadata,
+            data=self.data,
+            type=self.secret_type,
+        )
+        await self.kubectl.create_secret(
+            namespace=self.namespace,
+            data=self.data,
+            name=self.name,
+            secret_type=self.secret_type,
+        )
+        mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespace")
+class CreateNamespaceClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateNamespaceClass, self).setUp()
+        self.namespace = "osm"
+        self.labels = {"key": "value"}
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_namespace_is_created(
+        self,
+        mock_create_namespace,
+    ):
+        metadata = V1ObjectMeta(name=self.namespace, labels=self.labels)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+        await self.kubectl.create_namespace(
+            name=self.namespace,
+            labels=self.labels,
+        )
+        mock_create_namespace.assert_called_once_with(namespace)
+
+    async def test_namespace_is_created_default_labels(
+        self,
+        mock_create_namespace,
+    ):
+        metadata = V1ObjectMeta(name=self.namespace, labels=None)
+        namespace = V1Namespace(
+            metadata=metadata,
+        )
+        await self.kubectl.create_namespace(
+            name=self.namespace,
+        )
+        mock_create_namespace.assert_called_once_with(namespace)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_alreadyexists(
+        self,
+        mock_create_namespace,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "AlreadyExists"}'
+        self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.create_namespace(
+                name=self.namespace,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_namespace,
+    ):
+        self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.create_namespace(
+                name=self.namespace,
+            )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.delete_namespace")
+class DeleteNamespaceClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(DeleteNamespaceClass, self).setUp()
+        self.namespace = "osm"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_notfound(
+        self,
+        mock_delete_namespace,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "NotFound"}'
+        self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.delete_namespace(
+                name=self.namespace,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_delete_namespace,
+    ):
+        self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.delete_namespace(
+                name=self.namespace,
+            )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret")
+class GetSecretContentClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(GetSecretContentClass, self).setUp()
+        self.name = "my_secret"
+        self.namespace = "osm"
+        self.data = {"my_key": "my_value"}
+        self.type = "Opaque"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_return_type_is_dict(
+        self,
+        mock_read_namespaced_secret,
+    ):
+        metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+        secret = V1Secret(metadata=metadata, data=self.data, type=self.type)
+        mock_read_namespaced_secret.return_value = secret
+        content = await self.kubectl.get_secret_content(self.name, self.namespace)
+        assert type(content) is dict