X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Ftests%2Funit%2Ftest_kubectl.py;h=6ba074f2cf427104cf08bf3915f688514835d6c2;hb=refs%2Fchanges%2F75%2F13275%2F4;hp=d970bf0267fb80c30e1aa8cb8b623b40ae9887ce;hpb=fb03e9084403d6fc2adf427a371ff9827f3c1238;p=osm%2FN2VC.git diff --git a/n2vc/tests/unit/test_kubectl.py b/n2vc/tests/unit/test_kubectl.py index d970bf0..6ba074f 100644 --- a/n2vc/tests/unit/test_kubectl.py +++ b/n2vc/tests/unit/test_kubectl.py @@ -24,6 +24,12 @@ from kubernetes.client import ( V1Secret, V1ServiceAccount, V1SecretReference, + V1Role, + V1RoleBinding, + V1RoleRef, + V1Subject, + V1PolicyRule, + V1Namespace, ) @@ -93,6 +99,24 @@ class FakeK8sSecretList: return self._items +class FakeK8sRoleList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sRoleBindingList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + class FakeK8sVersionApiCode: def __init__(self, major: str, minor: str): self._major = major @@ -555,3 +579,261 @@ class DeleteCertificateClass(asynctest.TestCase): namespace=self.namespace, object_name=self.object_name, ) + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role") +class CreateRoleClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleClass, self).setUp() + self.name = "role" + self.namespace = "osm" + self.resources = ["*"] + self.api_groups = ["*"] + self.verbs = ["*"] + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role(self, mock_create_role): + metadata = V1ObjectMeta( + name=self.name, labels=self.labels, namespace=self.namespace + ) + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule( + api_groups=self.api_groups, + resources=self.resources, + verbs=self.verbs, + ), + ], + ) + await self.kubectl.create_role( + namespace=self.namespace, + api_groups=self.api_groups, + name=self.name, + resources=self.resources, + verbs=self.verbs, + labels=self.labels, + ) + mock_create_role.assert_called_once_with(self.namespace, role) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_already_exists( + self, + mock_list_role, + mock_create_role, + ): + mock_list_role.return_value = FakeK8sRoleList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role( + self.name, + self.labels, + self.api_groups, + self.resources, + self.verbs, + self.namespace, + ) + self.assertTrue( + "Role with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role.assert_not_called() + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding") +class CreateRoleBindingClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleBindingClass, self).setUp() + self.name = "rolebinding" + self.namespace = "osm" + self.role_name = "role" + self.sa_name = "Default" + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role_binding(self, mock_create_role_binding): + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=self.name, labels=self.labels), + role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""), + subjects=[ + V1Subject( + kind="ServiceAccount", + name=self.sa_name, + namespace=self.namespace, + ) + ], + ) + await self.kubectl.create_role_binding( + namespace=self.namespace, + role_name=self.role_name, + name=self.name, + sa_name=self.sa_name, + labels=self.labels, + ) + mock_create_role_binding.assert_called_once_with(self.namespace, role_binding) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_binding_already_exists( + self, + mock_list_role_binding, + mock_create_role_binding, + ): + mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role_binding( + self.name, + self.role_name, + self.sa_name, + self.labels, + self.namespace, + ) + self.assertTrue( + "Role Binding with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role_binding.assert_not_called() + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret") +class CreateSecretClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateSecretClass, self).setUp() + self.name = "secret" + self.namespace = "osm" + self.data = {"test": "1234"} + self.secret_type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_secret(self, mock_create_secret): + secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret( + metadata=secret_metadata, + data=self.data, + type=self.secret_type, + ) + await self.kubectl.create_secret( + namespace=self.namespace, + data=self.data, + name=self.name, + secret_type=self.secret_type, + ) + mock_create_secret.assert_called_once_with(self.namespace, secret) + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespace") +class CreateNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateNamespaceClass, self).setUp() + self.namespace = "osm" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_namespace_is_created( + self, + mock_create_namespace, + ): + metadata = V1ObjectMeta(name=self.namespace) + namespace = V1Namespace( + metadata=metadata, + ) + await self.kubectl.create_namespace( + name=self.namespace, + ) + mock_create_namespace.assert_called_once_with(namespace) + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_alreadyexists( + self, + mock_create_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "AlreadyExists"}' + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.create_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_namespace, + ): + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.create_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.delete_namespace") +class DeleteNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(DeleteNamespaceClass, self).setUp() + self.namespace = "osm" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_notfound( + self, + mock_delete_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "NotFound"}' + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.delete_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_delete_namespace, + ): + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.delete_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret") +class GetSecretContentClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(GetSecretContentClass, self).setUp() + self.name = "my_secret" + self.namespace = "osm" + self.data = {"my_key": "my_value"} + self.type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_return_type_is_dict( + self, + mock_read_namespaced_secret, + ): + metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret(metadata=metadata, data=self.data, type=self.type) + mock_read_namespaced_secret.return_value = secret + content = await self.kubectl.get_secret_content(self.name, self.namespace) + assert type(content) is dict