Fix cross-model relation condition
[osm/N2VC.git] / n2vc / tests / unit / test_kubectl.py
index eb9b01d..d970bf0 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import asynctest
+import yaml
+import os
 from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl, CORE_CLIENT
+from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT
 from n2vc.utils import Dict
 from kubernetes.client.rest import ApiException
+from kubernetes.client import (
+    V1ObjectMeta,
+    V1Secret,
+    V1ServiceAccount,
+    V1SecretReference,
+)
 
 
 class FakeK8sResourceMetadata:
@@ -66,6 +75,38 @@ class FakeK8sStorageClassesList:
         return self._items
 
 
+class FakeK8sServiceAccountsList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
+class FakeK8sSecretList:
+    def __init__(self, items=[]):
+        self._items = items
+
+    @property
+    def items(self):
+        return self._items
+
+
+class FakeK8sVersionApiCode:
+    def __init__(self, major: str, minor: str):
+        self._major = major
+        self._minor = minor
+
+    @property
+    def major(self):
+        return self._major
+
+    @property
+    def minor(self):
+        return self._minor
+
+
 fake_list_services = Dict(
     {
         "items": [
@@ -248,3 +289,269 @@ class GetDefaultStorageClass(KubectlTestCase):
         sc_name = kubectl.get_default_storage_class()
         self.assertEqual(sc_name, self.default_sc_name)
         mock_list_storage_class.assert_called_once()
+
+
+@mock.patch("kubernetes.client.VersionApi.get_code")
+@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_secret")
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_service_account")
+@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_service_account")
+class CreateServiceAccountClass(KubectlTestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateServiceAccountClass, self).setUp()
+        self.service_account_name = "Service_account"
+        self.labels = {"Key1": "Value1", "Key2": "Value2"}
+        self.namespace = "kubernetes"
+        self.token_id = "abc12345"
+        self.kubectl = Kubectl()
+
+    def assert_create_secret(self, mock_create_secret, secret_name):
+        annotations = {"kubernetes.io/service-account.name": self.service_account_name}
+        secret_metadata = V1ObjectMeta(
+            name=secret_name, namespace=self.namespace, annotations=annotations
+        )
+        secret_type = "kubernetes.io/service-account-token"
+        secret = V1Secret(metadata=secret_metadata, type=secret_type)
+        mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+    def assert_create_service_account_v_1_24(
+        self, mock_create_service_account, secret_name
+    ):
+        sevice_account_metadata = V1ObjectMeta(
+            name=self.service_account_name, labels=self.labels, namespace=self.namespace
+        )
+        secrets = [V1SecretReference(name=secret_name, namespace=self.namespace)]
+        service_account = V1ServiceAccount(
+            metadata=sevice_account_metadata, secrets=secrets
+        )
+        mock_create_service_account.assert_called_once_with(
+            self.namespace, service_account
+        )
+
+    def assert_create_service_account_v_1_23(self, mock_create_service_account):
+        metadata = V1ObjectMeta(
+            name=self.service_account_name, labels=self.labels, namespace=self.namespace
+        )
+        service_account = V1ServiceAccount(metadata=metadata)
+        mock_create_service_account.assert_called_once_with(
+            self.namespace, service_account
+        )
+
+    @mock.patch("n2vc.kubectl.uuid.uuid4")
+    def test_secret_is_created_when_k8s_1_24(
+        self,
+        mock_uuid4,
+        mock_list_service_account,
+        mock_create_service_account,
+        mock_create_secret,
+        mock_list_secret,
+        mock_version,
+    ):
+        mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+        mock_list_secret.return_value = FakeK8sSecretList(items=[])
+        mock_version.return_value = FakeK8sVersionApiCode("1", "24")
+        mock_uuid4.return_value = self.token_id
+        self.kubectl.create_service_account(
+            self.service_account_name, self.labels, self.namespace
+        )
+        secret_name = "{}-token-{}".format(self.service_account_name, self.token_id[:5])
+        self.assert_create_service_account_v_1_24(
+            mock_create_service_account, secret_name
+        )
+        self.assert_create_secret(mock_create_secret, secret_name)
+
+    def test_secret_is_not_created_when_k8s_1_23(
+        self,
+        mock_list_service_account,
+        mock_create_service_account,
+        mock_create_secret,
+        mock_list_secret,
+        mock_version,
+    ):
+        mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+        mock_version.return_value = FakeK8sVersionApiCode("1", "23+")
+        self.kubectl.create_service_account(
+            self.service_account_name, self.labels, self.namespace
+        )
+        self.assert_create_service_account_v_1_23(mock_create_service_account)
+        mock_create_secret.assert_not_called()
+        mock_list_secret.assert_not_called()
+
+    def test_raise_exception_if_service_account_already_exists(
+        self,
+        mock_list_service_account,
+        mock_create_service_account,
+        mock_create_secret,
+        mock_list_secret,
+        mock_version,
+    ):
+        mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[1])
+        with self.assertRaises(Exception) as context:
+            self.kubectl.create_service_account(
+                self.service_account_name, self.labels, self.namespace
+            )
+        self.assertTrue(
+            "Service account with metadata.name={} already exists".format(
+                self.service_account_name
+            )
+            in str(context.exception)
+        )
+        mock_create_service_account.assert_not_called()
+        mock_create_secret.assert_not_called()
+
+    @mock.patch("n2vc.kubectl.uuid.uuid4")
+    def test_raise_exception_if_secret_already_exists(
+        self,
+        mock_uuid4,
+        mock_list_service_account,
+        mock_create_service_account,
+        mock_create_secret,
+        mock_list_secret,
+        mock_version,
+    ):
+        mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+        mock_list_secret.return_value = FakeK8sSecretList(items=[1])
+        mock_version.return_value = FakeK8sVersionApiCode("1", "24+")
+        mock_uuid4.return_value = self.token_id
+        with self.assertRaises(Exception) as context:
+            self.kubectl.create_service_account(
+                self.service_account_name, self.labels, self.namespace
+            )
+        self.assertTrue(
+            "Secret with metadata.name={}-token-{} already exists".format(
+                self.service_account_name, self.token_id[:5]
+            )
+            in str(context.exception)
+        )
+        mock_create_service_account.assert_called()
+        mock_create_secret.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
+class CreateCertificateClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(CreateCertificateClass, self).setUp()
+        self.namespace = "osm"
+        self.name = "test-cert"
+        self.dns_prefix = "*"
+        self.secret_name = "test-cert-secret"
+        self.usages = ["server auth"]
+        self.issuer_name = "ca-issuer"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_certificate_is_created(
+        self,
+        mock_create_certificate,
+    ):
+        with open(
+            os.path.join(
+                os.path.dirname(__file__), "testdata", "test_certificate.yaml"
+            ),
+            "r",
+        ) as test_certificate:
+            certificate_body = yaml.safe_load(test_certificate.read())
+            print(certificate_body)
+        await self.kubectl.create_certificate(
+            namespace=self.namespace,
+            name=self.name,
+            dns_prefix=self.dns_prefix,
+            secret_name=self.secret_name,
+            usages=self.usages,
+            issuer_name=self.issuer_name,
+        )
+        mock_create_certificate.assert_called_once_with(
+            group="cert-manager.io",
+            plural="certificates",
+            version="v1",
+            body=certificate_body,
+            namespace=self.namespace,
+        )
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_alreadyexists(
+        self,
+        mock_create_certificate,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "AlreadyExists"}'
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].create_namespaced_custom_object.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.create_certificate(
+                namespace=self.namespace,
+                name=self.name,
+                dns_prefix=self.dns_prefix,
+                secret_name=self.secret_name,
+                usages=self.usages,
+                issuer_name=self.issuer_name,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_certificate,
+    ):
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].create_namespaced_custom_object.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.create_certificate(
+                namespace=self.namespace,
+                name=self.name,
+                dns_prefix=self.dns_prefix,
+                secret_name=self.secret_name,
+                usages=self.usages,
+                issuer_name=self.issuer_name,
+            )
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
+class DeleteCertificateClass(asynctest.TestCase):
+    @mock.patch("kubernetes.config.load_kube_config")
+    def setUp(self, mock_load_kube_config):
+        super(DeleteCertificateClass, self).setUp()
+        self.namespace = "osm"
+        self.object_name = "test-cert"
+        self.kubectl = Kubectl()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_no_exception_if_notfound(
+        self,
+        mock_create_certificate,
+    ):
+        api_exception = ApiException()
+        api_exception.body = '{"reason": "NotFound"}'
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].delete_namespaced_custom_object.side_effect = api_exception
+        raised = False
+        try:
+            await self.kubectl.delete_certificate(
+                namespace=self.namespace,
+                object_name=self.object_name,
+            )
+        except Exception:
+            raised = True
+        self.assertFalse(raised, "An exception was raised")
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_other_exceptions(
+        self,
+        mock_create_certificate,
+    ):
+        self.kubectl.clients[
+            CUSTOM_OBJECT_CLIENT
+        ].delete_namespaced_custom_object.side_effect = Exception()
+        with self.assertRaises(Exception):
+            await self.kubectl.delete_certificate(
+                namespace=self.namespace,
+                object_name=self.object_name,
+            )