X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Ftests%2Funit%2Ftest_libjuju.py;h=b7c7901525e39b260a4d4f4cbaf5bb6a7bb3ed0d;hp=7bea6b3db7e7a4f049a45c030219ca0bf11163fa;hb=refs%2Fchanges%2F47%2F10047%2F13;hpb=1c83f2e4d061ad37ba898e114cb42e70fdee5145 diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py index 7bea6b3..b7c7901 100644 --- a/n2vc/tests/unit/test_libjuju.py +++ b/n2vc/tests/unit/test_libjuju.py @@ -32,6 +32,7 @@ from n2vc.exceptions import ( JujuInvalidK8sConfiguration, JujuLeaderUnitNotFound, ) +from n2vc.k8s_juju_conn import generate_rbac_id class LibjujuTestCase(asynctest.TestCase): @@ -1431,45 +1432,56 @@ class ConsumeTest(LibjujuTestCase): class AddK8sTest(LibjujuTestCase): def setUp(self): super(AddK8sTest, self).setUp() - self.configuration = kubernetes.client.configuration.Configuration() + name = "cloud" + rbac_id = generate_rbac_id() + token = "token" + client_cert_data = "cert" + configuration = kubernetes.client.configuration.Configuration() + storage_class = "storage_class" + credential_name = name + + self._add_k8s_args = { + "name": name, + "rbac_id": rbac_id, + "token": token, + "client_cert_data": client_cert_data, + "configuration": configuration, + "storage_class": storage_class, + "credential_name": credential_name, + } def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential): - self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.configuration, "storage_class") - ) + self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args)) mock_add_cloud.assert_called_once() mock_get_k8s_cloud_credential.assert_called_once() def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential): mock_add_cloud.side_effect = Exception() with self.assertRaises(Exception): - self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.configuration, "storage_class") - ) + self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args)) mock_add_cloud.assert_called_once() mock_get_k8s_cloud_credential.assert_called_once() def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential): + self._add_k8s_args["name"] = "" with self.assertRaises(Exception): - self.loop.run_until_complete( - self.libjuju.add_k8s("", self.configuration, "storage_class") - ) + self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args)) mock_add_cloud.assert_not_called() def test_add_k8s_missing_storage_name( self, mock_add_cloud, mock_get_k8s_cloud_credential ): + self._add_k8s_args["storage_class"] = "" with self.assertRaises(Exception): - self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.configuration, "") - ) + self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args)) mock_add_cloud.assert_not_called() def test_add_k8s_missing_configuration_keys( self, mock_add_cloud, mock_get_k8s_cloud_credential ): + self._add_k8s_args["configuration"] = None with self.assertRaises(Exception): - self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, "")) + self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args)) mock_add_cloud.assert_not_called() @@ -1596,6 +1608,8 @@ class RemoveCloudTest(LibjujuTestCase): class GetK8sCloudCredentials(LibjujuTestCase): def setUp(self): super(GetK8sCloudCredentials, self).setUp() + self.cert_data = "cert" + self.token = "token" @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") def test_not_supported(self, mock_exception, mock_configuration): @@ -1605,8 +1619,14 @@ class GetK8sCloudCredentials(LibjujuTestCase): mock_configuration.cert_file = None mock_configuration.key_file = None exception_raised = False + self.token = None + self.cert_data = None try: - _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + _ = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, + ) except JujuInvalidK8sConfiguration as e: exception_raised = True self.assertEqual( @@ -1621,7 +1641,13 @@ class GetK8sCloudCredentials(LibjujuTestCase): mock_configuration.ssl_ca_cert = None mock_configuration.cert_file = None mock_configuration.key_file = None - credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.token = None + self.cert_data = None + credential = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, + ) self.assertEqual( credential, juju.client._definitions.CloudCredential( @@ -1629,14 +1655,44 @@ class GetK8sCloudCredentials(LibjujuTestCase): ), ) + def test_user_pass_with_cert(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + self.token = None + credential = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, + ) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={ + "ClientCertificateData": self.cert_data, + "username": "admin", + "password": "admin", + }, + auth_type="userpasswithcert", + ), + ) + def test_user_no_pass(self, mock_configuration): mock_configuration.username = "admin" mock_configuration.password = "" mock_configuration.ssl_ca_cert = None mock_configuration.cert_file = None mock_configuration.key_file = None + self.token = None + self.cert_data = None with mock.patch.object(self.libjuju.log, "debug") as mock_debug: - credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + credential = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, + ) self.assertEqual( credential, juju.client._definitions.CloudCredential( @@ -1647,28 +1703,6 @@ class GetK8sCloudCredentials(LibjujuTestCase): "credential for user admin has empty password" ) - def test_user_pass_with_cert(self, mock_configuration): - mock_configuration.username = "admin" - mock_configuration.password = "admin" - ssl_ca_cert = tempfile.NamedTemporaryFile() - with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: - ssl_ca_cert_file.write("cacert") - mock_configuration.ssl_ca_cert = ssl_ca_cert.name - mock_configuration.cert_file = None - mock_configuration.key_file = None - credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) - self.assertEqual( - credential, - juju.client._definitions.CloudCredential( - attrs={ - "username": "admin", - "password": "admin", - "ClientCertificateData": "cacert", - }, - auth_type="userpasswithcert", - ), - ) - def test_cert(self, mock_configuration): mock_configuration.username = "" mock_configuration.password = "" @@ -1679,72 +1713,67 @@ class GetK8sCloudCredentials(LibjujuTestCase): mock_configuration.ssl_ca_cert = ssl_ca_cert.name mock_configuration.cert_file = None mock_configuration.key_file = None - credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) - self.assertEqual( - credential, - juju.client._definitions.CloudCredential( - attrs={"ClientCertificateData": "cacert", "Token": "Token"}, - auth_type="certificate", - ), + credential = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, ) - - def test_oauth2(self, mock_configuration): - mock_configuration.username = "" - mock_configuration.password = "" - mock_configuration.api_key = {"authorization": "Bearer Token"} - key = tempfile.NamedTemporaryFile() - with open(key.name, "w") as key_file: - key_file.write("key") - mock_configuration.ssl_ca_cert = None - mock_configuration.cert_file = None - mock_configuration.key_file = key.name - credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) self.assertEqual( credential, juju.client._definitions.CloudCredential( - attrs={"ClientKeyData": "key", "Token": "Token"}, - auth_type="oauth2", + attrs={"ClientCertificateData": self.cert_data, "Token": self.token}, + auth_type="certificate", ), ) - @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") - def test_oauth2_missing_token(self, mock_exception, mock_configuration): - mock_configuration.username = "" - mock_configuration.password = "" - key = tempfile.NamedTemporaryFile() - with open(key.name, "w") as key_file: - key_file.write("key") - mock_configuration.ssl_ca_cert = None - mock_configuration.cert_file = None - mock_configuration.key_file = key.name - exception_raised = False - try: - _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) - except JujuInvalidK8sConfiguration as e: - exception_raised = True - self.assertEqual( - e.message, - "missing token for auth type oauth2", - ) - self.assertTrue(exception_raised) - - def test_unknown_api_key(self, mock_configuration): - mock_configuration.username = "" - mock_configuration.password = "" - mock_configuration.api_key = {"authorization": "Bearer Token Wrong"} - mock_configuration.ssl_ca_cert = None - mock_configuration.cert_file = None - mock_configuration.key_file = None - exception_raised = False - try: - _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) - except JujuInvalidK8sConfiguration as e: - exception_raised = True - self.assertEqual( - e.message, - "unknown format of api_key", - ) - self.assertTrue(exception_raised) + # TODO: Fix this test when oauth authentication is supported + # def test_oauth2(self, mock_configuration): + # mock_configuration.username = "" + # mock_configuration.password = "" + # mock_configuration.api_key = {"authorization": "Bearer Token"} + # key = tempfile.NamedTemporaryFile() + # with open(key.name, "w") as key_file: + # key_file.write("key") + # mock_configuration.ssl_ca_cert = None + # mock_configuration.cert_file = None + # mock_configuration.key_file = key.name + # credential = self.libjuju.get_k8s_cloud_credential( + # mock_configuration, + # self.cert_data, + # self.token, + # ) + # self.assertEqual( + # credential, + # juju.client._definitions.CloudCredential( + # attrs={"ClientKeyData": "key", "Token": "Token"}, + # auth_type="oauth2", + # ), + # ) + + # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + # def test_oauth2_missing_token(self, mock_exception, mock_configuration): + # mock_configuration.username = "" + # mock_configuration.password = "" + # key = tempfile.NamedTemporaryFile() + # with open(key.name, "w") as key_file: + # key_file.write("key") + # mock_configuration.ssl_ca_cert = None + # mock_configuration.cert_file = None + # mock_configuration.key_file = key.name + # exception_raised = False + # try: + # _ = self.libjuju.get_k8s_cloud_credential( + # mock_configuration, + # self.cert_data, + # self.token, + # ) + # except JujuInvalidK8sConfiguration as e: + # exception_raised = True + # self.assertEqual( + # e.message, + # "missing token for auth type oauth2", + # ) + # self.assertTrue(exception_raised) def test_exception_cannot_set_token_and_userpass(self, mock_configuration): mock_configuration.username = "admin" @@ -1755,7 +1784,11 @@ class GetK8sCloudCredentials(LibjujuTestCase): mock_configuration.key_file = None exception_raised = False try: - _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + _ = self.libjuju.get_k8s_cloud_credential( + mock_configuration, + self.cert_data, + self.token, + ) except JujuInvalidK8sConfiguration as e: exception_raised = True self.assertEqual(