Fix bug 1298
[osm/N2VC.git] / n2vc / tests / unit / test_libjuju.py
index 7bea6b3..b7c7901 100644 (file)
@@ -32,6 +32,7 @@ from n2vc.exceptions import (
     JujuInvalidK8sConfiguration,
     JujuLeaderUnitNotFound,
 )
+from n2vc.k8s_juju_conn import generate_rbac_id
 
 
 class LibjujuTestCase(asynctest.TestCase):
@@ -1431,45 +1432,56 @@ class ConsumeTest(LibjujuTestCase):
 class AddK8sTest(LibjujuTestCase):
     def setUp(self):
         super(AddK8sTest, self).setUp()
-        self.configuration = kubernetes.client.configuration.Configuration()
+        name = "cloud"
+        rbac_id = generate_rbac_id()
+        token = "token"
+        client_cert_data = "cert"
+        configuration = kubernetes.client.configuration.Configuration()
+        storage_class = "storage_class"
+        credential_name = name
+
+        self._add_k8s_args = {
+            "name": name,
+            "rbac_id": rbac_id,
+            "token": token,
+            "client_cert_data": client_cert_data,
+            "configuration": configuration,
+            "storage_class": storage_class,
+            "credential_name": credential_name,
+        }
 
     def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
-        self.loop.run_until_complete(
-            self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-        )
+        self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.side_effect = Exception()
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+        self._add_k8s_args["name"] = ""
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_storage_name(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["storage_class"] = ""
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_configuration_keys(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["configuration"] = None
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
 
@@ -1596,6 +1608,8 @@ class RemoveCloudTest(LibjujuTestCase):
 class GetK8sCloudCredentials(LibjujuTestCase):
     def setUp(self):
         super(GetK8sCloudCredentials, self).setUp()
+        self.cert_data = "cert"
+        self.token = "token"
 
     @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
     def test_not_supported(self, mock_exception, mock_configuration):
@@ -1605,8 +1619,14 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         exception_raised = False
+        self.token = None
+        self.cert_data = None
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(
@@ -1621,7 +1641,13 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.token = None
+        self.cert_data = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
@@ -1629,14 +1655,44 @@ class GetK8sCloudCredentials(LibjujuTestCase):
             ),
         )
 
+    def test_user_pass_with_cert(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "admin"
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        self.token = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={
+                    "ClientCertificateData": self.cert_data,
+                    "username": "admin",
+                    "password": "admin",
+                },
+                auth_type="userpasswithcert",
+            ),
+        )
+
     def test_user_no_pass(self, mock_configuration):
         mock_configuration.username = "admin"
         mock_configuration.password = ""
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
+        self.token = None
+        self.cert_data = None
         with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
-            credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            credential = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
             self.assertEqual(
                 credential,
                 juju.client._definitions.CloudCredential(
@@ -1647,28 +1703,6 @@ class GetK8sCloudCredentials(LibjujuTestCase):
                 "credential for user admin has empty password"
             )
 
-    def test_user_pass_with_cert(self, mock_configuration):
-        mock_configuration.username = "admin"
-        mock_configuration.password = "admin"
-        ssl_ca_cert = tempfile.NamedTemporaryFile()
-        with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
-            ssl_ca_cert_file.write("cacert")
-        mock_configuration.ssl_ca_cert = ssl_ca_cert.name
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={
-                    "username": "admin",
-                    "password": "admin",
-                    "ClientCertificateData": "cacert",
-                },
-                auth_type="userpasswithcert",
-            ),
-        )
-
     def test_cert(self, mock_configuration):
         mock_configuration.username = ""
         mock_configuration.password = ""
@@ -1679,72 +1713,67 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = ssl_ca_cert.name
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={"ClientCertificateData": "cacert", "Token": "Token"},
-                auth_type="certificate",
-            ),
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
         )
-
-    def test_oauth2(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token"}
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
-                attrs={"ClientKeyData": "key", "Token": "Token"},
-                auth_type="oauth2",
+                attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
+                auth_type="certificate",
             ),
         )
 
-    @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
-    def test_oauth2_missing_token(self, mock_exception, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "missing token for auth type oauth2",
-            )
-        self.assertTrue(exception_raised)
-
-    def test_unknown_api_key(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "unknown format of api_key",
-            )
-        self.assertTrue(exception_raised)
+    # TODO: Fix this test when oauth authentication is supported
+    # def test_oauth2(self, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     mock_configuration.api_key = {"authorization": "Bearer Token"}
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     credential = self.libjuju.get_k8s_cloud_credential(
+    #         mock_configuration,
+    #         self.cert_data,
+    #         self.token,
+    #     )
+    #     self.assertEqual(
+    #         credential,
+    #         juju.client._definitions.CloudCredential(
+    #             attrs={"ClientKeyData": "key", "Token": "Token"},
+    #             auth_type="oauth2",
+    #         ),
+    #     )
+
+    # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+    # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     exception_raised = False
+    #     try:
+    #         _ = self.libjuju.get_k8s_cloud_credential(
+    #             mock_configuration,
+    #             self.cert_data,
+    #             self.token,
+    #         )
+    #     except JujuInvalidK8sConfiguration as e:
+    #         exception_raised = True
+    #         self.assertEqual(
+    #             e.message,
+    #             "missing token for auth type oauth2",
+    #         )
+    #     self.assertTrue(exception_raised)
 
     def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
         mock_configuration.username = "admin"
@@ -1755,7 +1784,11 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.key_file = None
         exception_raised = False
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(