+
+
+@asynctest.mock.patch("kubernetes.client.configuration.Configuration")
+class GetK8sCloudCredentials(LibjujuTestCase):
+ def setUp(self):
+ super(GetK8sCloudCredentials, self).setUp()
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_not_supported(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "authentication method not supported",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_user_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": "admin"}, auth_type="userpass"
+ ),
+ )
+
+ def test_user_no_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": ""}, auth_type="userpass"
+ ),
+ )
+ mock_debug.assert_called_once_with(
+ "credential for user admin has empty password"
+ )
+
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "username": "admin",
+ "password": "admin",
+ "ClientCertificateData": "cacert",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
+ def test_cert(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientCertificateData": "cacert", "Token": "Token"},
+ auth_type="certificate",
+ ),
+ )
+
+ def test_oauth2(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2",
+ ),
+ )
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "missing token for auth type oauth2",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_unknown_api_key(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "unknown format of api_key",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "pass"
+ mock_configuration.api_key = {"authorization": "No_bearer_token"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "Cannot set both token and user/pass",
+ )
+ self.assertTrue(exception_raised)