X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Ftests%2Funit%2Ftest_kubectl.py;h=eb9b01d45bd3ff88a56562b83fb7c454ea360cd0;hp=28d66c530df30f2020332cd93e508c601b38019c;hb=HEAD;hpb=2a3ffde1771ec4431eef96f4908b3572a883ef01 diff --git a/n2vc/tests/unit/test_kubectl.py b/n2vc/tests/unit/test_kubectl.py index 28d66c5..a6d02ff 100644 --- a/n2vc/tests/unit/test_kubectl.py +++ b/n2vc/tests/unit/test_kubectl.py @@ -12,10 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asynctest +import yaml +import os from unittest import TestCase, mock -from n2vc.kubectl import Kubectl, CORE_CLIENT +from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT from n2vc.utils import Dict from kubernetes.client.rest import ApiException +from kubernetes.client import ( + V1ObjectMeta, + V1Secret, + V1ServiceAccount, + V1SecretReference, + V1Role, + V1RoleBinding, + V1RoleRef, + V1Subject, + V1PolicyRule, + V1Namespace, +) class FakeK8sResourceMetadata: @@ -66,6 +81,56 @@ class FakeK8sStorageClassesList: return self._items +class FakeK8sServiceAccountsList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sSecretList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sRoleList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sRoleBindingList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + +class FakeK8sVersionApiCode: + def __init__(self, major: str, minor: str): + self._major = major + self._minor = minor + + @property + def major(self): + return self._major + + @property + def minor(self): + return self._minor + + fake_list_services = Dict( { "items": [ @@ -150,16 +215,26 @@ class GetServices(TestCase): self.kubectl.get_services() -@mock.patch("kubernetes.config.kube_config.Configuration") +@mock.patch("n2vc.kubectl.client") +@mock.patch("n2vc.kubectl.config.kube_config.Configuration.get_default_copy") @mock.patch("n2vc.kubectl.config.load_kube_config") class GetConfiguration(KubectlTestCase): def setUp(self): super(GetConfiguration, self).setUp() - def test_get_configuration(self, mock_load_kube_config, mock_configuration): + def test_get_configuration( + self, + mock_load_kube_config, + mock_configuration, + mock_client, + ): kubectl = Kubectl() kubectl.configuration mock_configuration.assert_called_once() + mock_load_kube_config.assert_called_once() + mock_client.CoreV1Api.assert_called_once() + mock_client.RbacAuthorizationV1Api.assert_called_once() + mock_client.StorageV1Api.assert_called_once() @mock.patch("kubernetes.client.StorageV1Api.list_storage_class") @@ -238,3 +313,542 @@ class GetDefaultStorageClass(KubectlTestCase): sc_name = kubectl.get_default_storage_class() self.assertEqual(sc_name, self.default_sc_name) mock_list_storage_class.assert_called_once() + + +@mock.patch("kubernetes.client.VersionApi.get_code") +@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_secret") +@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret") +@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_service_account") +@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_service_account") +class CreateServiceAccountClass(KubectlTestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateServiceAccountClass, self).setUp() + self.service_account_name = "Service_account" + self.labels = {"Key1": "Value1", "Key2": "Value2"} + self.namespace = "kubernetes" + self.token_id = "abc12345" + self.kubectl = Kubectl() + + def assert_create_secret(self, mock_create_secret, secret_name): + annotations = {"kubernetes.io/service-account.name": self.service_account_name} + secret_metadata = V1ObjectMeta( + name=secret_name, namespace=self.namespace, annotations=annotations + ) + secret_type = "kubernetes.io/service-account-token" + secret = V1Secret(metadata=secret_metadata, type=secret_type) + mock_create_secret.assert_called_once_with(self.namespace, secret) + + def assert_create_service_account_v_1_24( + self, mock_create_service_account, secret_name + ): + sevice_account_metadata = V1ObjectMeta( + name=self.service_account_name, labels=self.labels, namespace=self.namespace + ) + secrets = [V1SecretReference(name=secret_name, namespace=self.namespace)] + service_account = V1ServiceAccount( + metadata=sevice_account_metadata, secrets=secrets + ) + mock_create_service_account.assert_called_once_with( + self.namespace, service_account + ) + + def assert_create_service_account_v_1_23(self, mock_create_service_account): + metadata = V1ObjectMeta( + name=self.service_account_name, labels=self.labels, namespace=self.namespace + ) + service_account = V1ServiceAccount(metadata=metadata) + mock_create_service_account.assert_called_once_with( + self.namespace, service_account + ) + + @mock.patch("n2vc.kubectl.uuid.uuid4") + def test_secret_is_created_when_k8s_1_24( + self, + mock_uuid4, + mock_list_service_account, + mock_create_service_account, + mock_create_secret, + mock_list_secret, + mock_version, + ): + mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[]) + mock_list_secret.return_value = FakeK8sSecretList(items=[]) + mock_version.return_value = FakeK8sVersionApiCode("1", "24") + mock_uuid4.return_value = self.token_id + self.kubectl.create_service_account( + self.service_account_name, self.labels, self.namespace + ) + secret_name = "{}-token-{}".format(self.service_account_name, self.token_id[:5]) + self.assert_create_service_account_v_1_24( + mock_create_service_account, secret_name + ) + self.assert_create_secret(mock_create_secret, secret_name) + + def test_secret_is_not_created_when_k8s_1_23( + self, + mock_list_service_account, + mock_create_service_account, + mock_create_secret, + mock_list_secret, + mock_version, + ): + mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[]) + mock_version.return_value = FakeK8sVersionApiCode("1", "23+") + self.kubectl.create_service_account( + self.service_account_name, self.labels, self.namespace + ) + self.assert_create_service_account_v_1_23(mock_create_service_account) + mock_create_secret.assert_not_called() + mock_list_secret.assert_not_called() + + def test_raise_exception_if_service_account_already_exists( + self, + mock_list_service_account, + mock_create_service_account, + mock_create_secret, + mock_list_secret, + mock_version, + ): + mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[1]) + with self.assertRaises(Exception) as context: + self.kubectl.create_service_account( + self.service_account_name, self.labels, self.namespace + ) + self.assertTrue( + "Service account with metadata.name={} already exists".format( + self.service_account_name + ) + in str(context.exception) + ) + mock_create_service_account.assert_not_called() + mock_create_secret.assert_not_called() + + @mock.patch("n2vc.kubectl.uuid.uuid4") + def test_raise_exception_if_secret_already_exists( + self, + mock_uuid4, + mock_list_service_account, + mock_create_service_account, + mock_create_secret, + mock_list_secret, + mock_version, + ): + mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[]) + mock_list_secret.return_value = FakeK8sSecretList(items=[1]) + mock_version.return_value = FakeK8sVersionApiCode("1", "24+") + mock_uuid4.return_value = self.token_id + with self.assertRaises(Exception) as context: + self.kubectl.create_service_account( + self.service_account_name, self.labels, self.namespace + ) + self.assertTrue( + "Secret with metadata.name={}-token-{} already exists".format( + self.service_account_name, self.token_id[:5] + ) + in str(context.exception) + ) + mock_create_service_account.assert_called() + mock_create_secret.assert_not_called() + + +@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object") +class CreateCertificateClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateCertificateClass, self).setUp() + self.namespace = "osm" + self.name = "test-cert" + self.dns_prefix = "*" + self.secret_name = "test-cert-secret" + self.usages = ["server auth"] + self.issuer_name = "ca-issuer" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_certificate_is_created( + self, + mock_create_certificate, + ): + with open( + os.path.join( + os.path.dirname(__file__), "testdata", "test_certificate.yaml" + ), + "r", + ) as test_certificate: + certificate_body = yaml.safe_load(test_certificate.read()) + print(certificate_body) + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + mock_create_certificate.assert_called_once_with( + group="cert-manager.io", + plural="certificates", + version="v1", + body=certificate_body, + namespace=self.namespace, + ) + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_alreadyexists( + self, + mock_create_certificate, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "AlreadyExists"}' + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].create_namespaced_custom_object.side_effect = api_exception + raised = False + try: + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_certificate, + ): + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].create_namespaced_custom_object.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.create_certificate( + namespace=self.namespace, + name=self.name, + dns_prefix=self.dns_prefix, + secret_name=self.secret_name, + usages=self.usages, + issuer_name=self.issuer_name, + ) + + +@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object") +class DeleteCertificateClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(DeleteCertificateClass, self).setUp() + self.namespace = "osm" + self.object_name = "test-cert" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_notfound( + self, + mock_create_certificate, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "NotFound"}' + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].delete_namespaced_custom_object.side_effect = api_exception + raised = False + try: + await self.kubectl.delete_certificate( + namespace=self.namespace, + object_name=self.object_name, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_certificate, + ): + self.kubectl.clients[ + CUSTOM_OBJECT_CLIENT + ].delete_namespaced_custom_object.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.delete_certificate( + namespace=self.namespace, + object_name=self.object_name, + ) + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role") +class CreateRoleClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleClass, self).setUp() + self.name = "role" + self.namespace = "osm" + self.resources = ["*"] + self.api_groups = ["*"] + self.verbs = ["*"] + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role(self, mock_create_role): + metadata = V1ObjectMeta( + name=self.name, labels=self.labels, namespace=self.namespace + ) + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule( + api_groups=self.api_groups, + resources=self.resources, + verbs=self.verbs, + ), + ], + ) + await self.kubectl.create_role( + namespace=self.namespace, + api_groups=self.api_groups, + name=self.name, + resources=self.resources, + verbs=self.verbs, + labels=self.labels, + ) + mock_create_role.assert_called_once_with(self.namespace, role) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_already_exists( + self, + mock_list_role, + mock_create_role, + ): + mock_list_role.return_value = FakeK8sRoleList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role( + self.name, + self.labels, + self.api_groups, + self.resources, + self.verbs, + self.namespace, + ) + self.assertTrue( + "Role with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role.assert_not_called() + + +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding") +@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding") +class CreateRoleBindingClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateRoleBindingClass, self).setUp() + self.name = "rolebinding" + self.namespace = "osm" + self.role_name = "role" + self.sa_name = "Default" + self.labels = {} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_role_binding(self, mock_create_role_binding): + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=self.name, labels=self.labels), + role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""), + subjects=[ + V1Subject( + kind="ServiceAccount", + name=self.sa_name, + namespace=self.namespace, + ) + ], + ) + await self.kubectl.create_role_binding( + namespace=self.namespace, + role_name=self.role_name, + name=self.name, + sa_name=self.sa_name, + labels=self.labels, + ) + mock_create_role_binding.assert_called_once_with(self.namespace, role_binding) + + @asynctest.fail_on(active_handles=True) + async def test_raise_exception_if_role_binding_already_exists( + self, + mock_list_role_binding, + mock_create_role_binding, + ): + mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1]) + with self.assertRaises(Exception) as context: + await self.kubectl.create_role_binding( + self.name, + self.role_name, + self.sa_name, + self.labels, + self.namespace, + ) + self.assertTrue( + "Role Binding with metadata.name={} already exists".format(self.name) + in str(context.exception) + ) + mock_create_role_binding.assert_not_called() + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret") +class CreateSecretClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateSecretClass, self).setUp() + self.name = "secret" + self.namespace = "osm" + self.data = {"test": "1234"} + self.secret_type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def assert_create_secret(self, mock_create_secret): + secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret( + metadata=secret_metadata, + data=self.data, + type=self.secret_type, + ) + await self.kubectl.create_secret( + namespace=self.namespace, + data=self.data, + name=self.name, + secret_type=self.secret_type, + ) + mock_create_secret.assert_called_once_with(self.namespace, secret) + + +@mock.patch("kubernetes.client.CoreV1Api.create_namespace") +class CreateNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(CreateNamespaceClass, self).setUp() + self.namespace = "osm" + self.labels = {"key": "value"} + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_namespace_is_created( + self, + mock_create_namespace, + ): + metadata = V1ObjectMeta(name=self.namespace, labels=self.labels) + namespace = V1Namespace( + metadata=metadata, + ) + await self.kubectl.create_namespace( + name=self.namespace, + labels=self.labels, + ) + mock_create_namespace.assert_called_once_with(namespace) + + async def test_namespace_is_created_default_labels( + self, + mock_create_namespace, + ): + metadata = V1ObjectMeta(name=self.namespace, labels=None) + namespace = V1Namespace( + metadata=metadata, + ) + await self.kubectl.create_namespace( + name=self.namespace, + ) + mock_create_namespace.assert_called_once_with(namespace) + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_alreadyexists( + self, + mock_create_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "AlreadyExists"}' + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.create_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_create_namespace, + ): + self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.create_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.delete_namespace") +class DeleteNamespaceClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(DeleteNamespaceClass, self).setUp() + self.namespace = "osm" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_no_exception_if_notfound( + self, + mock_delete_namespace, + ): + api_exception = ApiException() + api_exception.body = '{"reason": "NotFound"}' + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception + raised = False + try: + await self.kubectl.delete_namespace( + name=self.namespace, + ) + except Exception: + raised = True + self.assertFalse(raised, "An exception was raised") + + @asynctest.fail_on(active_handles=True) + async def test_other_exceptions( + self, + mock_delete_namespace, + ): + self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception() + with self.assertRaises(Exception): + await self.kubectl.delete_namespace( + name=self.namespace, + ) + + +@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret") +class GetSecretContentClass(asynctest.TestCase): + @mock.patch("kubernetes.config.load_kube_config") + def setUp(self, mock_load_kube_config): + super(GetSecretContentClass, self).setUp() + self.name = "my_secret" + self.namespace = "osm" + self.data = {"my_key": "my_value"} + self.type = "Opaque" + self.kubectl = Kubectl() + + @asynctest.fail_on(active_handles=True) + async def test_return_type_is_dict( + self, + mock_read_namespaced_secret, + ): + metadata = V1ObjectMeta(name=self.name, namespace=self.namespace) + secret = V1Secret(metadata=metadata, data=self.data, type=self.type) + mock_read_namespaced_secret.return_value = secret + content = await self.kubectl.get_secret_content(self.name, self.namespace) + assert type(content) is dict