Add new kubectl.py functions, modify some libjuju.py functions, add unit tests
[osm/N2VC.git] / n2vc / tests / unit / test_libjuju.py
index 57a6ccc..8c16c9d 100644 (file)
 
 import asyncio
 import asynctest
+import tempfile
+from unittest import mock
 import juju
+import kubernetes
 from juju.errors import JujuAPIError
 import logging
 from .utils import FakeN2VC, FakeMachine, FakeApplication
@@ -26,6 +29,7 @@ from n2vc.exceptions import (
     JujuApplicationNotFound,
     JujuActionNotFound,
     JujuApplicationExists,
+    JujuInvalidK8sConfiguration,
 )
 
 
@@ -1112,47 +1116,50 @@ class ConsumeTest(LibjujuTestCase):
         mock_disconnect_controller.assert_called_once()
 
 
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential")
 @asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud")
 class AddK8sTest(LibjujuTestCase):
     def setUp(self):
         super(AddK8sTest, self).setUp()
-        self.auth_data = {
-            "server": "https://192.168.0.21:16443",
-            "token": "1234",
-            "cacert": "cacert",
-        }
+        self.configuration = kubernetes.client.configuration.Configuration()
 
-    def test_add_k8s(self, mock_add_cloud):
+    def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         self.loop.run_until_complete(
-            self.libjuju.add_k8s("cloud", self.auth_data, "storage_class")
+            self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
         )
         mock_add_cloud.assert_called_once()
+        mock_get_k8s_cloud_credential.assert_called_once()
 
-    def test_add_k8s_exception(self, mock_add_cloud):
+    def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.side_effect = Exception()
         with self.assertRaises(Exception):
             self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.auth_data, "storage_class")
+                self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
             )
         mock_add_cloud.assert_called_once()
+        mock_get_k8s_cloud_credential.assert_called_once()
 
-    def test_add_k8s_missing_name(self, mock_add_cloud):
+    def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         with self.assertRaises(Exception):
             self.loop.run_until_complete(
-                self.libjuju.add_k8s("", self.auth_data, "storage_class")
+                self.libjuju.add_k8s("", self.configuration, "storage_class")
             )
         mock_add_cloud.assert_not_called()
 
-    def test_add_k8s_missing_storage_name(self, mock_add_cloud):
+    def test_add_k8s_missing_storage_name(
+        self, mock_add_cloud, mock_get_k8s_cloud_credential
+    ):
         with self.assertRaises(Exception):
             self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.auth_data, "")
+                self.libjuju.add_k8s("cloud", self.configuration, "")
             )
         mock_add_cloud.assert_not_called()
 
-    def test_add_k8s_missing_auth_data_keys(self, mock_add_cloud):
+    def test_add_k8s_missing_configuration_keys(
+        self, mock_add_cloud, mock_get_k8s_cloud_credential
+    ):
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(self.libjuju.add_k8s("cloud", {}, ""))
+            self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
         mock_add_cloud.assert_not_called()
 
 
@@ -1267,3 +1274,171 @@ class RemoveCloudTest(LibjujuTestCase):
             self.loop.run_until_complete(self.libjuju.remove_cloud("cloud"))
         mock_remove_cloud.assert_called_once_with("cloud")
         mock_disconnect_controller.assert_called_once()
+
+
+@asynctest.mock.patch("kubernetes.client.configuration.Configuration")
+class GetK8sCloudCredentials(LibjujuTestCase):
+    def setUp(self):
+        super(GetK8sCloudCredentials, self).setUp()
+
+    @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+    def test_not_supported(self, mock_exception, mock_configuration):
+        mock_configuration.username = ""
+        mock_configuration.password = ""
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        exception_raised = False
+        try:
+            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        except JujuInvalidK8sConfiguration as e:
+            exception_raised = True
+            self.assertEqual(
+                e.message, "authentication method not supported",
+            )
+        self.assertTrue(exception_raised)
+
+    def test_user_pass(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "admin"
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={"username": "admin", "password": "admin"}, auth_type="userpass"
+            ),
+        )
+
+    def test_user_no_pass(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = ""
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+            credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            self.assertEqual(
+                credential,
+                juju.client._definitions.CloudCredential(
+                    attrs={"username": "admin", "password": ""}, auth_type="userpass"
+                ),
+            )
+            mock_debug.assert_called_once_with(
+                "credential for user admin has empty password"
+            )
+
+    def test_user_pass_with_cert(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "admin"
+        ssl_ca_cert = tempfile.NamedTemporaryFile()
+        with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+            ssl_ca_cert_file.write("cacert")
+        mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={
+                    "username": "admin",
+                    "password": "admin",
+                    "ClientCertificateData": "cacert",
+                },
+                auth_type="userpasswithcert",
+            ),
+        )
+
+    def test_cert(self, mock_configuration):
+        mock_configuration.username = ""
+        mock_configuration.password = ""
+        mock_configuration.api_key = {"authorization": "Bearer Token"}
+        ssl_ca_cert = tempfile.NamedTemporaryFile()
+        with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+            ssl_ca_cert_file.write("cacert")
+        mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={"ClientCertificateData": "cacert", "Token": "Token"},
+                auth_type="certificate",
+            ),
+        )
+
+    def test_oauth2(self, mock_configuration):
+        mock_configuration.username = ""
+        mock_configuration.password = ""
+        mock_configuration.api_key = {"authorization": "Bearer Token"}
+        key = tempfile.NamedTemporaryFile()
+        with open(key.name, "w") as key_file:
+            key_file.write("key")
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = key.name
+        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2",
+            ),
+        )
+
+    @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+    def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+        mock_configuration.username = ""
+        mock_configuration.password = ""
+        key = tempfile.NamedTemporaryFile()
+        with open(key.name, "w") as key_file:
+            key_file.write("key")
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = key.name
+        exception_raised = False
+        try:
+            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        except JujuInvalidK8sConfiguration as e:
+            exception_raised = True
+            self.assertEqual(
+                e.message, "missing token for auth type oauth2",
+            )
+        self.assertTrue(exception_raised)
+
+    def test_unknown_api_key(self, mock_configuration):
+        mock_configuration.username = ""
+        mock_configuration.password = ""
+        mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        exception_raised = False
+        try:
+            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        except JujuInvalidK8sConfiguration as e:
+            exception_raised = True
+            self.assertEqual(
+                e.message, "unknown format of api_key",
+            )
+        self.assertTrue(exception_raised)
+
+    def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "pass"
+        mock_configuration.api_key = {"authorization": "No_bearer_token"}
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        exception_raised = False
+        try:
+            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        except JujuInvalidK8sConfiguration as e:
+            exception_raised = True
+            self.assertEqual(
+                e.message, "Cannot set both token and user/pass",
+            )
+        self.assertTrue(exception_raised)