import asyncio
import asynctest
+import tempfile
+from unittest import mock
import juju
+import kubernetes
from juju.errors import JujuAPIError
import logging
from .utils import FakeN2VC, FakeMachine, FakeApplication
JujuApplicationNotFound,
JujuActionNotFound,
JujuApplicationExists,
+ JujuInvalidK8sConfiguration,
)
mock_disconnect_controller.assert_called_once()
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud")
class AddK8sTest(LibjujuTestCase):
def setUp(self):
super(AddK8sTest, self).setUp()
- self.auth_data = {
- "server": "https://192.168.0.21:16443",
- "token": "1234",
- "cacert": "cacert",
- }
+ self.configuration = kubernetes.client.configuration.Configuration()
- def test_add_k8s(self, mock_add_cloud):
+ def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.auth_data, "storage_class")
+ self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
)
mock_add_cloud.assert_called_once()
+ mock_get_k8s_cloud_credential.assert_called_once()
- def test_add_k8s_exception(self, mock_add_cloud):
+ def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
mock_add_cloud.side_effect = Exception()
with self.assertRaises(Exception):
self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.auth_data, "storage_class")
+ self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
)
mock_add_cloud.assert_called_once()
+ mock_get_k8s_cloud_credential.assert_called_once()
- def test_add_k8s_missing_name(self, mock_add_cloud):
+ def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
with self.assertRaises(Exception):
self.loop.run_until_complete(
- self.libjuju.add_k8s("", self.auth_data, "storage_class")
+ self.libjuju.add_k8s("", self.configuration, "storage_class")
)
mock_add_cloud.assert_not_called()
- def test_add_k8s_missing_storage_name(self, mock_add_cloud):
+ def test_add_k8s_missing_storage_name(
+ self, mock_add_cloud, mock_get_k8s_cloud_credential
+ ):
with self.assertRaises(Exception):
self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.auth_data, "")
+ self.libjuju.add_k8s("cloud", self.configuration, "")
)
mock_add_cloud.assert_not_called()
- def test_add_k8s_missing_auth_data_keys(self, mock_add_cloud):
+ def test_add_k8s_missing_configuration_keys(
+ self, mock_add_cloud, mock_get_k8s_cloud_credential
+ ):
with self.assertRaises(Exception):
- self.loop.run_until_complete(self.libjuju.add_k8s("cloud", {}, ""))
+ self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
mock_add_cloud.assert_not_called()
self.loop.run_until_complete(self.libjuju.remove_cloud("cloud"))
mock_remove_cloud.assert_called_once_with("cloud")
mock_disconnect_controller.assert_called_once()
+
+
+@asynctest.mock.patch("kubernetes.client.configuration.Configuration")
+class GetK8sCloudCredentials(LibjujuTestCase):
+ def setUp(self):
+ super(GetK8sCloudCredentials, self).setUp()
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_not_supported(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "authentication method not supported",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_user_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": "admin"}, auth_type="userpass"
+ ),
+ )
+
+ def test_user_no_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": ""}, auth_type="userpass"
+ ),
+ )
+ mock_debug.assert_called_once_with(
+ "credential for user admin has empty password"
+ )
+
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "username": "admin",
+ "password": "admin",
+ "ClientCertificateData": "cacert",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
+ def test_cert(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientCertificateData": "cacert", "Token": "Token"},
+ auth_type="certificate",
+ ),
+ )
+
+ def test_oauth2(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2",
+ ),
+ )
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "missing token for auth type oauth2",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_unknown_api_key(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "unknown format of api_key",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "pass"
+ mock_configuration.api_key = {"authorization": "No_bearer_token"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "Cannot set both token and user/pass",
+ )
+ self.assertTrue(exception_raised)