JujuInvalidK8sConfiguration,
JujuLeaderUnitNotFound,
)
+from n2vc.k8s_juju_conn import generate_rbac_id
class LibjujuTestCase(asynctest.TestCase):
class AddK8sTest(LibjujuTestCase):
def setUp(self):
super(AddK8sTest, self).setUp()
- self.configuration = kubernetes.client.configuration.Configuration()
+ name = "cloud"
+ rbac_id = generate_rbac_id()
+ token = "token"
+ client_cert_data = "cert"
+ configuration = kubernetes.client.configuration.Configuration()
+ storage_class = "storage_class"
+ credential_name = name
+
+ self._add_k8s_args = {
+ "name": name,
+ "rbac_id": rbac_id,
+ "token": token,
+ "client_cert_data": client_cert_data,
+ "configuration": configuration,
+ "storage_class": storage_class,
+ "credential_name": credential_name,
+ }
def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_called_once()
mock_get_k8s_cloud_credential.assert_called_once()
def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
mock_add_cloud.side_effect = Exception()
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_called_once()
mock_get_k8s_cloud_credential.assert_called_once()
def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+ self._add_k8s_args["name"] = ""
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
def test_add_k8s_missing_storage_name(
self, mock_add_cloud, mock_get_k8s_cloud_credential
):
+ self._add_k8s_args["storage_class"] = ""
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
def test_add_k8s_missing_configuration_keys(
self, mock_add_cloud, mock_get_k8s_cloud_credential
):
+ self._add_k8s_args["configuration"] = None
with self.assertRaises(Exception):
- self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
class GetK8sCloudCredentials(LibjujuTestCase):
def setUp(self):
super(GetK8sCloudCredentials, self).setUp()
+ self.cert_data = "cert"
+ self.token = "token"
@asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
def test_not_supported(self, mock_exception, mock_configuration):
mock_configuration.cert_file = None
mock_configuration.key_file = None
exception_raised = False
+ self.token = None
+ self.cert_data = None
try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
except JujuInvalidK8sConfiguration as e:
exception_raised = True
self.assertEqual(
mock_configuration.ssl_ca_cert = None
mock_configuration.cert_file = None
mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.token = None
+ self.cert_data = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
),
)
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ self.token = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "ClientCertificateData": self.cert_data,
+ "username": "admin",
+ "password": "admin",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
def test_user_no_pass(self, mock_configuration):
mock_configuration.username = "admin"
mock_configuration.password = ""
mock_configuration.ssl_ca_cert = None
mock_configuration.cert_file = None
mock_configuration.key_file = None
+ self.token = None
+ self.cert_data = None
with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
"credential for user admin has empty password"
)
- def test_user_pass_with_cert(self, mock_configuration):
- mock_configuration.username = "admin"
- mock_configuration.password = "admin"
- ssl_ca_cert = tempfile.NamedTemporaryFile()
- with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
- ssl_ca_cert_file.write("cacert")
- mock_configuration.ssl_ca_cert = ssl_ca_cert.name
- mock_configuration.cert_file = None
- mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- self.assertEqual(
- credential,
- juju.client._definitions.CloudCredential(
- attrs={
- "username": "admin",
- "password": "admin",
- "ClientCertificateData": "cacert",
- },
- auth_type="userpasswithcert",
- ),
- )
-
def test_cert(self, mock_configuration):
mock_configuration.username = ""
mock_configuration.password = ""
mock_configuration.ssl_ca_cert = ssl_ca_cert.name
mock_configuration.cert_file = None
mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- self.assertEqual(
- credential,
- juju.client._definitions.CloudCredential(
- attrs={"ClientCertificateData": "cacert", "Token": "Token"},
- auth_type="certificate",
- ),
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
)
-
- def test_oauth2(self, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- mock_configuration.api_key = {"authorization": "Bearer Token"}
- key = tempfile.NamedTemporaryFile()
- with open(key.name, "w") as key_file:
- key_file.write("key")
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = key.name
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
- attrs={"ClientKeyData": "key", "Token": "Token"},
- auth_type="oauth2",
+ attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
+ auth_type="certificate",
),
)
- @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
- def test_oauth2_missing_token(self, mock_exception, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- key = tempfile.NamedTemporaryFile()
- with open(key.name, "w") as key_file:
- key_file.write("key")
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = key.name
- exception_raised = False
- try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- except JujuInvalidK8sConfiguration as e:
- exception_raised = True
- self.assertEqual(
- e.message,
- "missing token for auth type oauth2",
- )
- self.assertTrue(exception_raised)
-
- def test_unknown_api_key(self, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = None
- exception_raised = False
- try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- except JujuInvalidK8sConfiguration as e:
- exception_raised = True
- self.assertEqual(
- e.message,
- "unknown format of api_key",
- )
- self.assertTrue(exception_raised)
+ # TODO: Fix this test when oauth authentication is supported
+ # def test_oauth2(self, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # mock_configuration.api_key = {"authorization": "Bearer Token"}
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # credential = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # self.assertEqual(
+ # credential,
+ # juju.client._definitions.CloudCredential(
+ # attrs={"ClientKeyData": "key", "Token": "Token"},
+ # auth_type="oauth2",
+ # ),
+ # )
+
+ # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # exception_raised = False
+ # try:
+ # _ = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # except JujuInvalidK8sConfiguration as e:
+ # exception_raised = True
+ # self.assertEqual(
+ # e.message,
+ # "missing token for auth type oauth2",
+ # )
+ # self.assertTrue(exception_raised)
def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
mock_configuration.username = "admin"
mock_configuration.key_file = None
exception_raised = False
try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
except JujuInvalidK8sConfiguration as e:
exception_raised = True
self.assertEqual(