+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud")
+class AddK8sTest(LibjujuTestCase):
+ def setUp(self):
+ super(AddK8sTest, self).setUp()
+ self.configuration = kubernetes.client.configuration.Configuration()
+
+ def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+ self.loop.run_until_complete(
+ self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
+ )
+ mock_add_cloud.assert_called_once()
+ mock_get_k8s_cloud_credential.assert_called_once()
+
+ def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+ mock_add_cloud.side_effect = Exception()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
+ )
+ mock_add_cloud.assert_called_once()
+ mock_get_k8s_cloud_credential.assert_called_once()
+
+ def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.add_k8s("", self.configuration, "storage_class")
+ )
+ mock_add_cloud.assert_not_called()
+
+ def test_add_k8s_missing_storage_name(
+ self, mock_add_cloud, mock_get_k8s_cloud_credential
+ ):
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.add_k8s("cloud", self.configuration, "")
+ )
+ mock_add_cloud.assert_not_called()
+
+ def test_add_k8s_missing_configuration_keys(
+ self, mock_add_cloud, mock_get_k8s_cloud_credential
+ ):
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+ mock_add_cloud.assert_not_called()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("juju.controller.Controller.add_cloud")
+@asynctest.mock.patch("juju.controller.Controller.add_credential")
+class AddCloudTest(LibjujuTestCase):
+ def setUp(self):
+ super(AddCloudTest, self).setUp()
+ self.cloud = juju.client.client.Cloud()
+ self.credential = juju.client.client.CloudCredential()
+
+ def test_add_cloud_with_credential(
+ self,
+ mock_add_credential,
+ mock_add_cloud,
+ mock_disconnect_controller,
+ mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+
+ cloud = self.loop.run_until_complete(
+ self.libjuju.add_cloud("cloud", self.cloud, credential=self.credential)
+ )
+ self.assertEqual(cloud, self.cloud)
+ mock_add_cloud.assert_called_once_with("cloud", self.cloud)
+ mock_add_credential.assert_called_once_with(
+ "cloud", credential=self.credential, cloud="cloud"
+ )
+ mock_disconnect_controller.assert_called_once()
+
+ def test_add_cloud_no_credential(
+ self,
+ mock_add_credential,
+ mock_add_cloud,
+ mock_disconnect_controller,
+ mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+
+ cloud = self.loop.run_until_complete(
+ self.libjuju.add_cloud("cloud", self.cloud)
+ )
+ self.assertEqual(cloud, self.cloud)
+ mock_add_cloud.assert_called_once_with("cloud", self.cloud)
+ mock_add_credential.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+
+ def test_add_cloud_exception(
+ self,
+ mock_add_credential,
+ mock_add_cloud,
+ mock_disconnect_controller,
+ mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+ mock_add_cloud.side_effect = Exception()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.add_cloud("cloud", self.cloud, credential=self.credential)
+ )
+
+ mock_add_cloud.assert_called_once_with("cloud", self.cloud)
+ mock_add_credential.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+
+ def test_add_credential_exception(
+ self,
+ mock_add_credential,
+ mock_add_cloud,
+ mock_disconnect_controller,
+ mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+ mock_add_credential.side_effect = Exception()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.add_cloud("cloud", self.cloud, credential=self.credential)
+ )
+
+ mock_add_cloud.assert_called_once_with("cloud", self.cloud)
+ mock_add_credential.assert_called_once_with(
+ "cloud", credential=self.credential, cloud="cloud"
+ )
+ mock_disconnect_controller.assert_called_once()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("juju.controller.Controller.remove_cloud")
+class RemoveCloudTest(LibjujuTestCase):
+ def setUp(self):
+ super(RemoveCloudTest, self).setUp()
+
+ def test_remove_cloud(
+ self, mock_remove_cloud, mock_disconnect_controller, mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+
+ self.loop.run_until_complete(self.libjuju.remove_cloud("cloud"))
+ mock_remove_cloud.assert_called_once_with("cloud")
+ mock_disconnect_controller.assert_called_once()
+
+ def test_remove_cloud_exception(
+ self, mock_remove_cloud, mock_disconnect_controller, mock_get_controller,
+ ):
+ mock_get_controller.return_value = juju.controller.Controller()
+ mock_remove_cloud.side_effect = Exception()
+
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.libjuju.remove_cloud("cloud"))
+ mock_remove_cloud.assert_called_once_with("cloud")
+ mock_disconnect_controller.assert_called_once()
+
+
+@asynctest.mock.patch("kubernetes.client.configuration.Configuration")
+class GetK8sCloudCredentials(LibjujuTestCase):
+ def setUp(self):
+ super(GetK8sCloudCredentials, self).setUp()
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_not_supported(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "authentication method not supported",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_user_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": "admin"}, auth_type="userpass"
+ ),
+ )
+
+ def test_user_no_pass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = ""
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"username": "admin", "password": ""}, auth_type="userpass"
+ ),
+ )
+ mock_debug.assert_called_once_with(
+ "credential for user admin has empty password"
+ )
+
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "username": "admin",
+ "password": "admin",
+ "ClientCertificateData": "cacert",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
+ def test_cert(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ ssl_ca_cert = tempfile.NamedTemporaryFile()
+ with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
+ ssl_ca_cert_file.write("cacert")
+ mock_configuration.ssl_ca_cert = ssl_ca_cert.name
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientCertificateData": "cacert", "Token": "Token"},
+ auth_type="certificate",
+ ),
+ )
+
+ def test_oauth2(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token"}
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2",
+ ),
+ )
+
+ @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ key = tempfile.NamedTemporaryFile()
+ with open(key.name, "w") as key_file:
+ key_file.write("key")
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = key.name
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "missing token for auth type oauth2",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_unknown_api_key(self, mock_configuration):
+ mock_configuration.username = ""
+ mock_configuration.password = ""
+ mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "unknown format of api_key",
+ )
+ self.assertTrue(exception_raised)
+
+ def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "pass"
+ mock_configuration.api_key = {"authorization": "No_bearer_token"}
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ exception_raised = False
+ try:
+ _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ except JujuInvalidK8sConfiguration as e:
+ exception_raised = True
+ self.assertEqual(
+ e.message, "Cannot set both token and user/pass",
+ )
+ self.assertTrue(exception_raised)