X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Ftests%2Funit%2Ftest_libjuju.py;h=123da4a03692a5ffe0efab1e28e817c201efce3e;hb=59f520da90fb12b9d9871889dfbc5d57aa14c591;hp=57a6ccc69bc6fd21d00a6317b6349f490a76f6b1;hpb=85755d17a807df83d0e472e6e73500a4d743296b;p=osm%2FN2VC.git diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py index 57a6ccc..123da4a 100644 --- a/n2vc/tests/unit/test_libjuju.py +++ b/n2vc/tests/unit/test_libjuju.py @@ -14,7 +14,10 @@ import asyncio import asynctest +import tempfile +from unittest import mock import juju +import kubernetes from juju.errors import JujuAPIError import logging from .utils import FakeN2VC, FakeMachine, FakeApplication @@ -26,6 +29,8 @@ from n2vc.exceptions import ( JujuApplicationNotFound, JujuActionNotFound, JujuApplicationExists, + JujuInvalidK8sConfiguration, + JujuLeaderUnitNotFound, ) @@ -575,7 +580,35 @@ class ExecuteActionTest(LibjujuTestCase): mock_disconnect_controller.assert_called() mock_disconnect_model.assert_called() - # TODO no leader unit found exception + @asynctest.mock.patch("asyncio.sleep") + @asynctest.mock.patch("n2vc.tests.unit.utils.FakeUnit.is_leader_from_status") + def test_no_leader( + self, + mock_is_leader_from_status, + mock_sleep, + mock_get_action_status, + mock_get_action_output, + mock_wait_for, + mock__get_application, + mock_disconnect_controller, + mock_disconnect_model, + mock_get_model, + mock_get_controller, + ): + mock_get_model.return_value = juju.model.Model() + mock__get_application.return_value = FakeApplication() + mock_is_leader_from_status.return_value = False + output = None + status = None + with self.assertRaises(JujuLeaderUnitNotFound): + output, status = self.loop.run_until_complete( + self.libjuju.execute_action("app", "model", "action",) + ) + self.assertIsNone(output) + self.assertIsNone(status) + + mock_disconnect_controller.assert_called() + mock_disconnect_model.assert_called() def test_succesful_exec( self, @@ -1112,47 +1145,50 @@ class ConsumeTest(LibjujuTestCase): mock_disconnect_controller.assert_called_once() +@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential") @asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud") class AddK8sTest(LibjujuTestCase): def setUp(self): super(AddK8sTest, self).setUp() - self.auth_data = { - "server": "https://192.168.0.21:16443", - "token": "1234", - "cacert": "cacert", - } + self.configuration = kubernetes.client.configuration.Configuration() - def test_add_k8s(self, mock_add_cloud): + def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_exception(self, mock_add_cloud): + def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential): mock_add_cloud.side_effect = Exception() with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_missing_name(self, mock_add_cloud): + def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("", self.auth_data, "storage_class") + self.libjuju.add_k8s("", self.configuration, "storage_class") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_storage_name(self, mock_add_cloud): + def test_add_k8s_missing_storage_name( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "") + self.libjuju.add_k8s("cloud", self.configuration, "") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_auth_data_keys(self, mock_add_cloud): + def test_add_k8s_missing_configuration_keys( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): - self.loop.run_until_complete(self.libjuju.add_k8s("cloud", {}, "")) + self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, "")) mock_add_cloud.assert_not_called() @@ -1267,3 +1303,171 @@ class RemoveCloudTest(LibjujuTestCase): self.loop.run_until_complete(self.libjuju.remove_cloud("cloud")) mock_remove_cloud.assert_called_once_with("cloud") mock_disconnect_controller.assert_called_once() + + +@asynctest.mock.patch("kubernetes.client.configuration.Configuration") +class GetK8sCloudCredentials(LibjujuTestCase): + def setUp(self): + super(GetK8sCloudCredentials, self).setUp() + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_not_supported(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "authentication method not supported", + ) + self.assertTrue(exception_raised) + + def test_user_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": "admin"}, auth_type="userpass" + ), + ) + + def test_user_no_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + with mock.patch.object(self.libjuju.log, "debug") as mock_debug: + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": ""}, auth_type="userpass" + ), + ) + mock_debug.assert_called_once_with( + "credential for user admin has empty password" + ) + + def test_user_pass_with_cert(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={ + "username": "admin", + "password": "admin", + "ClientCertificateData": "cacert", + }, + auth_type="userpasswithcert", + ), + ) + + def test_cert(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientCertificateData": "cacert", "Token": "Token"}, + auth_type="certificate", + ), + ) + + def test_oauth2(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2", + ), + ) + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_oauth2_missing_token(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "missing token for auth type oauth2", + ) + self.assertTrue(exception_raised) + + def test_unknown_api_key(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token Wrong"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "unknown format of api_key", + ) + self.assertTrue(exception_raised) + + def test_exception_cannot_set_token_and_userpass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "pass" + mock_configuration.api_key = {"authorization": "No_bearer_token"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "Cannot set both token and user/pass", + ) + self.assertTrue(exception_raised)