X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=n2vc%2Ftests%2Funit%2Ftest_kubectl.py;h=a6d02ffbdc208c90eb64c07a57e05e27625a55d5;hb=21852a087bec9102e8ba762d7298e46bb8452e0e;hp=e67168e7157ec8b0f8275be41e8eec4a9acb29dd;hpb=6343d434fa3cec28d8b9b470054d3a13ada8865a;p=osm%2FN2VC.git diff --git a/n2vc/tests/unit/test_kubectl.py b/n2vc/tests/unit/test_kubectl.py index e67168e..a6d02ff 100644 --- a/n2vc/tests/unit/test_kubectl.py +++ b/n2vc/tests/unit/test_kubectl.py @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asynctest +import yaml +import os from unittest import TestCase, mock -from n2vc.kubectl import Kubectl, CORE_CLIENT +from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT from n2vc.utils import Dict from kubernetes.client.rest import ApiException from kubernetes.client import ( @@ -21,6 +24,12 @@ from kubernetes.client import ( V1Secret, V1ServiceAccount, V1SecretReference, + V1Role, + V1RoleBinding, + V1RoleRef, + V1Subject, + V1PolicyRule, + V1Namespace, ) @@ -90,6 +99,24 @@ class FakeK8sSecretList: return self._items +class FakeK8sRoleList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sRoleBindingList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + class FakeK8sVersionApiCode: def __init__(self, major: str, minor: str): self._major = major @@ -423,3 +450,405 @@ class CreateServiceAccountClass(KubectlTestCase): ) mock_create_service_account.assert_called() mock_create_secret.assert_not_called() + + +@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object") +class CreateCertificateClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateCertificateClass, self).setUp() + self.namespace = "osm" + self.name = "test-cert" + self.dns_prefix = "*" + self.secret_name = "test-cert-secret" + self.usages = ["server auth"] + self.issuer_name = "ca-issuer" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_certificate_is_created( + self, + mock_create_certificate, + ): + with open( + os.path.join( + os.path.dirname(__file__), "testdata", "test_certificate.yaml" + ), + "r", + ) as test_certificate: + certificate_body = yaml.safe_load(test_certificate.read()) + print(certificate_body) + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + mock_create_certificate.assert_called_once_with( + group="cert-manager.io", + plural="certificates", + version="v1", + body=certificate_body, + namespace=self.namespace, + ) + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_alreadyexists( + self, + mock_create_certificate, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "AlreadyExists"}' + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].create_namespaced_custom_object.side_effect = api_exception + raised = False + try: + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_certificate, + ): + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].create_namespaced_custom_object.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + + +@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object") +class DeleteCertificateClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(DeleteCertificateClass, self).setUp() + self.namespace = "osm" + self.object_name = "test-cert" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_notfound( + self, + mock_create_certificate, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "NotFound"}' + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].delete_namespaced_custom_object.side_effect = api_exception + raised = False + try: + await self.kubectl.delete_certificate( + namespace=self.namespace, + object_name=self.object_name, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_certificate, + ): + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].delete_namespaced_custom_object.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.delete_certificate( + namespace=self.namespace, + object_name=self.object_name, + ) + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role") +class CreateRoleClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleClass, self).setUp() + self.name = "role" + self.namespace = "osm" + self.resources = ["*"] + self.api_groups = ["*"] + self.verbs = ["*"] + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role(self, mock_create_role): + metadata = V1ObjectMeta( + name=self.name, labels=self.labels, namespace=self.namespace + ) + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule( + api_groups=self.api_groups, + resources=self.resources, + verbs=self.verbs, + ), + ], + ) + await self.kubectl.create_role( + namespace=self.namespace, + api_groups=self.api_groups, + name=self.name, + resources=self.resources, + verbs=self.verbs, + labels=self.labels, + ) + mock_create_role.assert_called_once_with(self.namespace, role) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_already_exists( + self, + mock_list_role, + mock_create_role, + ): + mock_list_role.return_value = FakeK8sRoleList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role( + self.name, + self.labels, + self.api_groups, + self.resources, + self.verbs, + self.namespace, + ) + self.assertTrue( + "Role with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role.assert_not_called() + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding") +class CreateRoleBindingClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleBindingClass, self).setUp() + self.name = "rolebinding" + self.namespace = "osm" + self.role_name = "role" + self.sa_name = "Default" + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role_binding(self, mock_create_role_binding): + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=self.name, labels=self.labels), + role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""), + subjects=[ + V1Subject( + kind="ServiceAccount", + name=self.sa_name, + namespace=self.namespace, + ) + ], + ) + await self.kubectl.create_role_binding( + namespace=self.namespace, + role_name=self.role_name, + name=self.name, + sa_name=self.sa_name, + labels=self.labels, + ) + mock_create_role_binding.assert_called_once_with(self.namespace, role_binding) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_binding_already_exists( + self, + mock_list_role_binding, + mock_create_role_binding, + ): + mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role_binding( + self.name, + self.role_name, + self.sa_name, + self.labels, + self.namespace, + ) + self.assertTrue( + "Role Binding with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role_binding.assert_not_called() + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret") +class CreateSecretClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateSecretClass, self).setUp() + self.name = "secret" + self.namespace = "osm" + self.data = {"test": "1234"} + self.secret_type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_secret(self, mock_create_secret): + secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret( + metadata=secret_metadata, + data=self.data, + type=self.secret_type, + ) + await self.kubectl.create_secret( + namespace=self.namespace, + data=self.data, + name=self.name, + secret_type=self.secret_type, + ) + mock_create_secret.assert_called_once_with(self.namespace, secret) + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespace") +class CreateNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateNamespaceClass, self).setUp() + self.namespace = "osm" + self.labels = {"key": "value"} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_namespace_is_created( + self, + mock_create_namespace, + ): + metadata = V1ObjectMeta(name=self.namespace, labels=self.labels) + namespace = V1Namespace( + metadata=metadata, + ) + await self.kubectl.create_namespace( + name=self.namespace, + labels=self.labels, + ) + mock_create_namespace.assert_called_once_with(namespace) + + async def test_namespace_is_created_default_labels( + self, + mock_create_namespace, + ): + metadata = V1ObjectMeta(name=self.namespace, labels=None) + namespace = V1Namespace( + metadata=metadata, + ) + await self.kubectl.create_namespace( + name=self.namespace, + ) + mock_create_namespace.assert_called_once_with(namespace) + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_alreadyexists( + self, + mock_create_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "AlreadyExists"}' + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.create_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_namespace, + ): + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.create_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.delete_namespace") +class DeleteNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(DeleteNamespaceClass, self).setUp() + self.namespace = "osm" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_notfound( + self, + mock_delete_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "NotFound"}' + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.delete_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_delete_namespace, + ): + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.delete_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret") +class GetSecretContentClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(GetSecretContentClass, self).setUp() + self.name = "my_secret" + self.namespace = "osm" + self.data = {"my_key": "my_value"} + self.type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_return_type_is_dict( + self, + mock_read_namespaced_secret, + ): + metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret(metadata=metadata, data=self.data, type=self.type) + mock_read_namespaced_secret.return_value = secret + content = await self.kubectl.get_secret_content(self.name, self.namespace) + assert type(content) is dict