X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Ftests%2Funit%2Ftest_libjuju.py;fp=n2vc%2Ftests%2Funit%2Ftest_libjuju.py;h=8c16c9d8b8b63a161c39e8f288dfcf982bf19f3e;hp=57a6ccc69bc6fd21d00a6317b6349f490a76f6b1;hb=475a7221e3598ad1c75ce802c5ad74ef7ecf72f1;hpb=85755d17a807df83d0e472e6e73500a4d743296b diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py index 57a6ccc..8c16c9d 100644 --- a/n2vc/tests/unit/test_libjuju.py +++ b/n2vc/tests/unit/test_libjuju.py @@ -14,7 +14,10 @@ import asyncio import asynctest +import tempfile +from unittest import mock import juju +import kubernetes from juju.errors import JujuAPIError import logging from .utils import FakeN2VC, FakeMachine, FakeApplication @@ -26,6 +29,7 @@ from n2vc.exceptions import ( JujuApplicationNotFound, JujuActionNotFound, JujuApplicationExists, + JujuInvalidK8sConfiguration, ) @@ -1112,47 +1116,50 @@ class ConsumeTest(LibjujuTestCase): mock_disconnect_controller.assert_called_once() +@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential") @asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud") class AddK8sTest(LibjujuTestCase): def setUp(self): super(AddK8sTest, self).setUp() - self.auth_data = { - "server": "https://192.168.0.21:16443", - "token": "1234", - "cacert": "cacert", - } + self.configuration = kubernetes.client.configuration.Configuration() - def test_add_k8s(self, mock_add_cloud): + def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_exception(self, mock_add_cloud): + def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential): mock_add_cloud.side_effect = Exception() with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_missing_name(self, mock_add_cloud): + def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("", self.auth_data, "storage_class") + self.libjuju.add_k8s("", self.configuration, "storage_class") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_storage_name(self, mock_add_cloud): + def test_add_k8s_missing_storage_name( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "") + self.libjuju.add_k8s("cloud", self.configuration, "") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_auth_data_keys(self, mock_add_cloud): + def test_add_k8s_missing_configuration_keys( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): - self.loop.run_until_complete(self.libjuju.add_k8s("cloud", {}, "")) + self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, "")) mock_add_cloud.assert_not_called() @@ -1267,3 +1274,171 @@ class RemoveCloudTest(LibjujuTestCase): self.loop.run_until_complete(self.libjuju.remove_cloud("cloud")) mock_remove_cloud.assert_called_once_with("cloud") mock_disconnect_controller.assert_called_once() + + +@asynctest.mock.patch("kubernetes.client.configuration.Configuration") +class GetK8sCloudCredentials(LibjujuTestCase): + def setUp(self): + super(GetK8sCloudCredentials, self).setUp() + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_not_supported(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "authentication method not supported", + ) + self.assertTrue(exception_raised) + + def test_user_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": "admin"}, auth_type="userpass" + ), + ) + + def test_user_no_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + with mock.patch.object(self.libjuju.log, "debug") as mock_debug: + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": ""}, auth_type="userpass" + ), + ) + mock_debug.assert_called_once_with( + "credential for user admin has empty password" + ) + + def test_user_pass_with_cert(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={ + "username": "admin", + "password": "admin", + "ClientCertificateData": "cacert", + }, + auth_type="userpasswithcert", + ), + ) + + def test_cert(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientCertificateData": "cacert", "Token": "Token"}, + auth_type="certificate", + ), + ) + + def test_oauth2(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2", + ), + ) + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_oauth2_missing_token(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "missing token for auth type oauth2", + ) + self.assertTrue(exception_raised) + + def test_unknown_api_key(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token Wrong"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "unknown format of api_key", + ) + self.assertTrue(exception_raised) + + def test_exception_cannot_set_token_and_userpass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "pass" + mock_configuration.api_key = {"authorization": "No_bearer_token"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "Cannot set both token and user/pass", + ) + self.assertTrue(exception_raised)