+
+
+@asynctest.mock.patch("kubernetes.client.configuration.Configuration")
+class GetK8sCloudCredentials(LibjujuTestCase):
+ def setUp(self):
+ super(GetK8sCloudCredentials, self).setUp()
+ self.cert_data = "cert"
+ self.token = "token"
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_not_supported(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ self.token = None
+ self.cert_data = None
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message,
+ "authentication method not supported",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_user_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ self.token = None
+ self.cert_data = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": "admin"}, auth_type="userpass"
+ ),
+ )
+
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ self.token = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "ClientCertificateData": self.cert_data,
+ "username": "admin",
+ "password": "admin",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
+ def test_user_no_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ self.token = None
+ self.cert_data = None
+ with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": ""}, auth_type="userpass"
+ ),
+ )
+ mock_debug.assert_called_once_with(
+ "credential for user admin has empty password"
+ )
+
+ def test_cert(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
+ auth_type="certificate",
+ ),
+ )
+
+ # TODO: Fix this test when oauth authentication is supported
+ # def test_oauth2(self, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # mock_configuration.api_key = {"authorization": "Bearer Token"}
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # credential = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # self.assertEqual(
+ # credential,
+ # juju.client._definitions.CloudCredential(
+ # attrs={"ClientKeyData": "key", "Token": "Token"},
+ # auth_type="oauth2",
+ # ),
+ # )
+
+ # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # exception_raised = False
+ # try:
+ # _ = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # except JujuInvalidK8sConfiguration as e:
+ # exception_raised = True
+ # self.assertEqual(
+ # e.message,
+ # "missing token for auth type oauth2",
+ # )
+ # self.assertTrue(exception_raised)
+
+ def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "pass"
+ mock_configuration.api_key = {"authorization": "No_bearer_token"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message,
+ "Cannot set both token and user/pass",
+ )
+ self.assertTrue(exception_raised)