From 475a7221e3598ad1c75ce802c5ad74ef7ecf72f1 Mon Sep 17 00:00:00 2001 From: David Garcia Date: Mon, 21 Sep 2020 16:19:15 +0200 Subject: [PATCH] Add new kubectl.py functions, modify some libjuju.py functions, add unit tests - Kubectl.py: two new functions added (get_configuration and get_default_storage_class) - get_configuration(): Returns a kubernetes Configuration object. It can be used to properly parse the kubeconfig. - get_default_storage_class(): Searches for the default storage class of a k8s cluster. - Libjuju.py: modified add_k8s function and get_k8s_cloud_credential function was added. - add_k8s(): Improves the way of generation Cloud and CloudCredential objects for the K8s Cloud - get_k8s_cloud_credential(): It parses the kubeconfig to properly determine the authentication method type that should be used for that k8s cluster. - Unit tests: Added unit tests for all the new functions added. - Exceptions: Make all Juju Exceptions to inherit from N2VC Exception. Now Juju exceptions have the message attribute, that is useful for unit testing, to not only check that an exception raised, but to check the message too. - Move get_k8s_cloud_credential() function to n2vc/utils in order to share that code between different connectors. Change-Id: Ife9027d80663fe95f1f3ad883cb9a3376b047d0b Signed-off-by: David Garcia --- n2vc/exceptions.py | 124 +++++++++---------- n2vc/kubectl.py | 34 ++++++ n2vc/libjuju.py | 106 ++++++++++++----- n2vc/n2vc_juju_conn.py | 20 +--- n2vc/tests/unit/test_kubectl.py | 144 ++++++++++++++++++++++ n2vc/tests/unit/test_libjuju.py | 205 +++++++++++++++++++++++++++++--- n2vc/utils.py | 20 ++++ 7 files changed, 527 insertions(+), 126 deletions(-) diff --git a/n2vc/exceptions.py b/n2vc/exceptions.py index 256860e..721b1f4 100644 --- a/n2vc/exceptions.py +++ b/n2vc/exceptions.py @@ -13,66 +13,6 @@ # limitations under the License. -class JujuCharmNotFound(Exception): - """The Charm can't be found or is not readable.""" - - -class JujuControllerFailedConnecting(Exception): - """Failed connecting to juju controller.""" - - -class JujuModelAlreadyExists(Exception): - """The model already exists.""" - - -class JujuApplicationExists(Exception): - """The Application already exists.""" - - -class JujuApplicationNotFound(Exception): - """The Application cannot be found.""" - - -class JujuLeaderUnitNotFound(Exception): - """The Application cannot be found.""" - - -class JujuActionNotFound(Exception): - """The Action cannot be found.""" - - -class JujuMachineNotFound(Exception): - """The machine cannot be found.""" - - -class JujuK8sProxycharmNotSupported(Exception): - """K8s Proxy Charms not supported in this installation.""" - - -class N2VCPrimitiveExecutionFailed(Exception): - """Something failed while attempting to execute a primitive.""" - - -class NetworkServiceDoesNotExist(Exception): - """The Network Service being acted against does not exist.""" - - -class PrimitiveDoesNotExist(Exception): - """The Primitive being executed does not exist.""" - - -class NoRouteToHost(Exception): - """There was no route to the specified host.""" - - -class AuthenticationFailed(Exception): - """The authentication for the specified user failed.""" - - -class MethodNotImplemented(Exception): - """The method is not implemented.""" - - class N2VCException(Exception): """ N2VC exception base class @@ -189,3 +129,67 @@ class K8sException(Exception): class EntityInvalidException(Exception): """Entity is not valid, the type does not match any EntityType.""" + + +class JujuInvalidK8sConfiguration(N2VCException): + """Invalid K8s configuration.""" + + +class JujuCharmNotFound(N2VCException): + """The Charm can't be found or is not readable.""" + + +class JujuControllerFailedConnecting(N2VCException): + """Failed connecting to juju controller.""" + + +class JujuModelAlreadyExists(N2VCException): + """The model already exists.""" + + +class JujuApplicationExists(N2VCException): + """The Application already exists.""" + + +class JujuApplicationNotFound(N2VCException): + """The Application cannot be found.""" + + +class JujuLeaderUnitNotFound(N2VCException): + """The Application cannot be found.""" + + +class JujuActionNotFound(N2VCException): + """The Action cannot be found.""" + + +class JujuMachineNotFound(N2VCException): + """The machine cannot be found.""" + + +class JujuK8sProxycharmNotSupported(N2VCException): + """K8s Proxy Charms not supported in this installation.""" + + +class N2VCPrimitiveExecutionFailed(N2VCException): + """Something failed while attempting to execute a primitive.""" + + +class NetworkServiceDoesNotExist(N2VCException): + """The Network Service being acted against does not exist.""" + + +class PrimitiveDoesNotExist(N2VCException): + """The Primitive being executed does not exist.""" + + +class NoRouteToHost(N2VCException): + """There was no route to the specified host.""" + + +class AuthenticationFailed(N2VCException): + """The authentication for the specified user failed.""" + + +class MethodNotImplemented(N2VCException): + """The method is not implemented.""" diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 61b2dc3..31b6f55 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -22,6 +22,9 @@ class Kubectl: config.load_kube_config(config_file=config_file) self.logger = logging.getLogger("Kubectl") + def get_configuration(self): + return config.kube_config.Configuration() + def get_services(self, field_selector=None, label_selector=None): kwargs = {} if field_selector: @@ -58,3 +61,34 @@ class Kubectl: except ApiException as e: self.logger.error("Error calling get services: {}".format(e)) raise e + + def get_default_storage_class(self) -> str: + """ + Default storage class + + :return: Returns the default storage class name, if exists. + If not, it returns the first storage class. + If there are not storage classes, returns None + """ + + storagev1 = client.StorageV1Api() + storage_classes = storagev1.list_storage_class() + selected_sc = None + default_sc_annotations = { + "storageclass.kubernetes.io/is-default-class": "true", + # Older clusters still use the beta annotation. + "storageclass.beta.kubernetes.io/is-default-class": "true", + } + for sc in storage_classes.items: + if not selected_sc: + # Select the first storage class in case there is no a default-class + selected_sc = sc.metadata.name + annotations = sc.metadata.annotations + if any( + k in annotations and annotations[k] == v + for k, v in default_sc_annotations.items() + ): + # Default storage + selected_sc = sc.metadata.name + break + return selected_sc diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py index d761adc..d2c725f 100644 --- a/n2vc/libjuju.py +++ b/n2vc/libjuju.py @@ -39,9 +39,11 @@ from n2vc.exceptions import ( JujuModelAlreadyExists, JujuControllerFailedConnecting, JujuApplicationExists, + JujuInvalidK8sConfiguration, ) from n2vc.utils import DB_DATA from osm_common.dbbase import DbException +from kubernetes.client.configuration import Configuration class Libjuju: @@ -1020,59 +1022,99 @@ class Libjuju: finally: await self.disconnect_controller(controller) - async def add_k8s(self, name: str, auth_data: dict, storage_class: str): + async def add_k8s( + self, name: str, configuration: Configuration, storage_class: str + ): """ Add a Kubernetes cloud to the controller Similar to the `juju add-k8s` command in the CLI :param: name: Name for the K8s cloud - :param: auth_data: Dictionary with needed credentials. Format: - { - "server": "192.168.0.21:16443", - "cacert": "-----BEGIN CERTIFI...", - "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...", - - } + :param: configuration: Kubernetes configuration object :param: storage_class: Storage Class to use in the cloud """ - required_auth_data_keys = ["server", "cacert", "token"] - missing_keys = [] - for k in required_auth_data_keys: - if k not in auth_data: - missing_keys.append(k) - if missing_keys: - raise Exception( - "missing keys in auth_data: {}".format(",".join(missing_keys)) - ) if not storage_class: raise Exception("storage_class must be a non-empty string") if not name: raise Exception("name must be a non-empty string") - - endpoint = auth_data["server"] - cacert = auth_data["cacert"] - token = auth_data["token"] - region_name = "{}-region".format(name) - + if not configuration: + raise Exception("configuration must be provided") + + endpoint = configuration.host + credential = self.get_k8s_cloud_credential(configuration) + ca_certificates = ( + [credential.attrs["ClientCertificateData"]] + if "ClientCertificateData" in credential.attrs + else [] + ) cloud = client.Cloud( - auth_types=["certificate"], - ca_certificates=[cacert], + type_="kubernetes", + auth_types=[credential.auth_type], endpoint=endpoint, + ca_certificates=ca_certificates, config={ "operator-storage": storage_class, "workload-storage": storage_class, }, - regions=[client.CloudRegion(endpoint=endpoint, name=region_name)], - type_="kubernetes", ) - cred = client.CloudCredential( - auth_type="certificate", - attrs={"ClientCertificateData": cacert, "Token": token}, - ) - return await self.add_cloud(name, cloud, cred) + return await self.add_cloud(name, cloud, credential) + + def get_k8s_cloud_credential( + self, configuration: Configuration, + ) -> client.CloudCredential: + attrs = {} + ca_cert = configuration.ssl_ca_cert or configuration.cert_file + key = configuration.key_file + api_key = configuration.api_key + token = None + username = configuration.username + password = configuration.password + + if "authorization" in api_key: + authorization = api_key["authorization"] + if "Bearer " in authorization: + bearer_list = authorization.split(" ") + if len(bearer_list) == 2: + [_, token] = bearer_list + else: + raise JujuInvalidK8sConfiguration("unknown format of api_key") + else: + token = authorization + if ca_cert: + attrs["ClientCertificateData"] = open(ca_cert, "r").read() + if key: + attrs["ClientKeyData"] = open(key, "r").read() + if token: + if username or password: + raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass") + attrs["Token"] = token + + auth_type = None + if key: + auth_type = "oauth2" + if not token: + raise JujuInvalidK8sConfiguration( + "missing token for auth type {}".format(auth_type) + ) + elif username: + if not password: + self.log.debug( + "credential for user {} has empty password".format(username) + ) + attrs["username"] = username + attrs["password"] = password + if ca_cert: + auth_type = "userpasswithcert" + else: + auth_type = "userpass" + elif ca_cert and token: + auth_type = "certificate" + else: + raise JujuInvalidK8sConfiguration("authentication method not supported") + return client.CloudCredential(auth_type=auth_type, attrs=attrs,) async def add_cloud( self, name: str, cloud: Cloud, credential: CloudCredential = None diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index 0522028..50c0c99 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -21,18 +21,14 @@ ## import asyncio -import base64 -import binascii import logging import os -import re from n2vc.exceptions import ( N2VCBadArgumentsException, N2VCException, N2VCConnectionException, N2VCExecutionException, - N2VCInvalidCertificate, # N2VCNotFound, MethodNotImplemented, JujuK8sProxycharmNotSupported, @@ -40,6 +36,7 @@ from n2vc.exceptions import ( from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml from n2vc.libjuju import Libjuju +from n2vc.utils import base64_to_cacert class N2VCJujuConnector(N2VCConnector): @@ -136,21 +133,6 @@ class N2VCJujuConnector(N2VCConnector): # TODO: Verify ca_cert is valid before using. VCA will crash # if the ca_cert isn't formatted correctly. - def base64_to_cacert(b64string): - """Convert the base64-encoded string containing the VCA CACERT. - - The input string.... - - """ - try: - cacert = base64.b64decode(b64string).decode("utf-8") - - cacert = re.sub(r"\\n", r"\n", cacert,) - except binascii.Error as e: - self.log.debug("Caught binascii.Error: {}".format(e)) - raise N2VCInvalidCertificate(message="Invalid CA Certificate") - - return cacert self.ca_cert = vca_config.get("ca_cert") if self.ca_cert: diff --git a/n2vc/tests/unit/test_kubectl.py b/n2vc/tests/unit/test_kubectl.py index dfb954a..8fb0310 100644 --- a/n2vc/tests/unit/test_kubectl.py +++ b/n2vc/tests/unit/test_kubectl.py @@ -17,6 +17,55 @@ from n2vc.kubectl import Kubectl from n2vc.utils import Dict from kubernetes.client.rest import ApiException + +class FakeK8sResourceMetadata: + def __init__( + self, + name: str = None, + namespace: str = None, + annotations: dict = {}, + labels: dict = {}, + ): + self._annotations = annotations + self._name = name or "name" + self._namespace = namespace or "namespace" + self._labels = labels or {"juju-app": "squid"} + + @property + def name(self): + return self._name + + @property + def namespace(self): + return self._namespace + + @property + def labels(self): + return self._labels + + @property + def annotations(self): + return self._annotations + + +class FakeK8sStorageClass: + def __init__(self, metadata=None): + self._metadata = metadata or FakeK8sResourceMetadata() + + @property + def metadata(self): + return self._metadata + + +class FakeK8sStorageClassesList: + def __init__(self, items=[]): + self._items = items + + @property + def items(self): + return self._items + + fake_list_services = Dict( { "items": [ @@ -64,6 +113,11 @@ fake_list_services = Dict( ) +class KubectlTestCase(TestCase): + def setUp(self,): + pass + + class FakeCoreV1Api: def list_service_for_all_namespaces(self, **kwargs): return fake_list_services @@ -91,3 +145,93 @@ class ProvisionerTest(TestCase): list_services.side_effect = ApiException() with self.assertRaises(ApiException): self.kubectl.get_services() + + +@mock.patch("kubernetes.config.kube_config.Configuration") +@mock.patch("n2vc.kubectl.config.load_kube_config") +class GetConfiguration(KubectlTestCase): + def setUp(self): + super(GetConfiguration, self).setUp() + + def test_get_configuration(self, mock_load_kube_config, mock_configuration): + kubectl = Kubectl() + kubectl.get_configuration() + mock_configuration.assert_called_once() + + +@mock.patch("kubernetes.client.StorageV1Api.list_storage_class") +@mock.patch("kubernetes.config.load_kube_config") +class GetDefaultStorageClass(KubectlTestCase): + def setUp(self): + super(GetDefaultStorageClass, self).setUp() + + # Default Storage Class + self.default_sc_name = "default-sc" + default_sc_metadata = FakeK8sResourceMetadata( + name=self.default_sc_name, + annotations={"storageclass.kubernetes.io/is-default-class": "true"}, + ) + self.default_sc = FakeK8sStorageClass(metadata=default_sc_metadata) + + # Default Storage Class with old annotation + self.default_sc_old_name = "default-sc-old" + default_sc_old_metadata = FakeK8sResourceMetadata( + name=self.default_sc_old_name, + annotations={"storageclass.beta.kubernetes.io/is-default-class": "true"}, + ) + self.default_sc_old = FakeK8sStorageClass(metadata=default_sc_old_metadata) + + # Storage class - not default + self.sc_name = "default-sc-old" + self.sc = FakeK8sStorageClass( + metadata=FakeK8sResourceMetadata(name=self.sc_name) + ) + + def test_get_default_storage_class_exists_default( + self, mock_load_kube_config, mock_list_storage_class + ): + kubectl = Kubectl() + items = [self.default_sc] + mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=items) + sc_name = kubectl.get_default_storage_class() + self.assertEqual(sc_name, self.default_sc_name) + mock_list_storage_class.assert_called_once() + + def test_get_default_storage_class_exists_default_old( + self, mock_load_kube_config, mock_list_storage_class + ): + kubectl = Kubectl() + items = [self.default_sc_old] + mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=items) + sc_name = kubectl.get_default_storage_class() + self.assertEqual(sc_name, self.default_sc_old_name) + mock_list_storage_class.assert_called_once() + + def test_get_default_storage_class_none( + self, mock_load_kube_config, mock_list_storage_class + ): + kubectl = Kubectl() + mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=[]) + sc_name = kubectl.get_default_storage_class() + self.assertEqual(sc_name, None) + mock_list_storage_class.assert_called_once() + + def test_get_default_storage_class_exists_not_default( + self, mock_load_kube_config, mock_list_storage_class + ): + kubectl = Kubectl() + items = [self.sc] + mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=items) + sc_name = kubectl.get_default_storage_class() + self.assertEqual(sc_name, self.sc_name) + mock_list_storage_class.assert_called_once() + + def test_get_default_storage_class_choose( + self, mock_load_kube_config, mock_list_storage_class + ): + kubectl = Kubectl() + items = [self.sc, self.default_sc] + mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=items) + sc_name = kubectl.get_default_storage_class() + self.assertEqual(sc_name, self.default_sc_name) + mock_list_storage_class.assert_called_once() diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py index 57a6ccc..8c16c9d 100644 --- a/n2vc/tests/unit/test_libjuju.py +++ b/n2vc/tests/unit/test_libjuju.py @@ -14,7 +14,10 @@ import asyncio import asynctest +import tempfile +from unittest import mock import juju +import kubernetes from juju.errors import JujuAPIError import logging from .utils import FakeN2VC, FakeMachine, FakeApplication @@ -26,6 +29,7 @@ from n2vc.exceptions import ( JujuApplicationNotFound, JujuActionNotFound, JujuApplicationExists, + JujuInvalidK8sConfiguration, ) @@ -1112,47 +1116,50 @@ class ConsumeTest(LibjujuTestCase): mock_disconnect_controller.assert_called_once() +@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_k8s_cloud_credential") @asynctest.mock.patch("n2vc.libjuju.Libjuju.add_cloud") class AddK8sTest(LibjujuTestCase): def setUp(self): super(AddK8sTest, self).setUp() - self.auth_data = { - "server": "https://192.168.0.21:16443", - "token": "1234", - "cacert": "cacert", - } + self.configuration = kubernetes.client.configuration.Configuration() - def test_add_k8s(self, mock_add_cloud): + def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_exception(self, mock_add_cloud): + def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential): mock_add_cloud.side_effect = Exception() with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "storage_class") + self.libjuju.add_k8s("cloud", self.configuration, "storage_class") ) mock_add_cloud.assert_called_once() + mock_get_k8s_cloud_credential.assert_called_once() - def test_add_k8s_missing_name(self, mock_add_cloud): + def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("", self.auth_data, "storage_class") + self.libjuju.add_k8s("", self.configuration, "storage_class") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_storage_name(self, mock_add_cloud): + def test_add_k8s_missing_storage_name( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): self.loop.run_until_complete( - self.libjuju.add_k8s("cloud", self.auth_data, "") + self.libjuju.add_k8s("cloud", self.configuration, "") ) mock_add_cloud.assert_not_called() - def test_add_k8s_missing_auth_data_keys(self, mock_add_cloud): + def test_add_k8s_missing_configuration_keys( + self, mock_add_cloud, mock_get_k8s_cloud_credential + ): with self.assertRaises(Exception): - self.loop.run_until_complete(self.libjuju.add_k8s("cloud", {}, "")) + self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, "")) mock_add_cloud.assert_not_called() @@ -1267,3 +1274,171 @@ class RemoveCloudTest(LibjujuTestCase): self.loop.run_until_complete(self.libjuju.remove_cloud("cloud")) mock_remove_cloud.assert_called_once_with("cloud") mock_disconnect_controller.assert_called_once() + + +@asynctest.mock.patch("kubernetes.client.configuration.Configuration") +class GetK8sCloudCredentials(LibjujuTestCase): + def setUp(self): + super(GetK8sCloudCredentials, self).setUp() + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_not_supported(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "authentication method not supported", + ) + self.assertTrue(exception_raised) + + def test_user_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": "admin"}, auth_type="userpass" + ), + ) + + def test_user_no_pass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "" + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + with mock.patch.object(self.libjuju.log, "debug") as mock_debug: + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"username": "admin", "password": ""}, auth_type="userpass" + ), + ) + mock_debug.assert_called_once_with( + "credential for user admin has empty password" + ) + + def test_user_pass_with_cert(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "admin" + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={ + "username": "admin", + "password": "admin", + "ClientCertificateData": "cacert", + }, + auth_type="userpasswithcert", + ), + ) + + def test_cert(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + ssl_ca_cert = tempfile.NamedTemporaryFile() + with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file: + ssl_ca_cert_file.write("cacert") + mock_configuration.ssl_ca_cert = ssl_ca_cert.name + mock_configuration.cert_file = None + mock_configuration.key_file = None + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientCertificateData": "cacert", "Token": "Token"}, + auth_type="certificate", + ), + ) + + def test_oauth2(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token"} + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + credential = self.libjuju.get_k8s_cloud_credential(mock_configuration) + self.assertEqual( + credential, + juju.client._definitions.CloudCredential( + attrs={"ClientKeyData": "key", "Token": "Token"}, auth_type="oauth2", + ), + ) + + @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration") + def test_oauth2_missing_token(self, mock_exception, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + key = tempfile.NamedTemporaryFile() + with open(key.name, "w") as key_file: + key_file.write("key") + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = key.name + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "missing token for auth type oauth2", + ) + self.assertTrue(exception_raised) + + def test_unknown_api_key(self, mock_configuration): + mock_configuration.username = "" + mock_configuration.password = "" + mock_configuration.api_key = {"authorization": "Bearer Token Wrong"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "unknown format of api_key", + ) + self.assertTrue(exception_raised) + + def test_exception_cannot_set_token_and_userpass(self, mock_configuration): + mock_configuration.username = "admin" + mock_configuration.password = "pass" + mock_configuration.api_key = {"authorization": "No_bearer_token"} + mock_configuration.ssl_ca_cert = None + mock_configuration.cert_file = None + mock_configuration.key_file = None + exception_raised = False + try: + _ = self.libjuju.get_k8s_cloud_credential(mock_configuration) + except JujuInvalidK8sConfiguration as e: + exception_raised = True + self.assertEqual( + e.message, "Cannot set both token and user/pass", + ) + self.assertTrue(exception_raised) diff --git a/n2vc/utils.py b/n2vc/utils.py index e8cf64d..16a4733 100644 --- a/n2vc/utils.py +++ b/n2vc/utils.py @@ -12,11 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 +import re +import binascii from enum import Enum from juju.machine import Machine from juju.application import Application from juju.action import Action from juju.unit import Unit +from n2vc.exceptions import N2VCInvalidCertificate + + +def base64_to_cacert(b64string): + """Convert the base64-encoded string containing the VCA CACERT. + + The input string.... + + """ + try: + cacert = base64.b64decode(b64string).decode("utf-8") + + cacert = re.sub(r"\\n", r"\n", cacert,) + except binascii.Error as e: + raise N2VCInvalidCertificate(message="Invalid CA Certificate: {}".format(e)) + + return cacert class N2VCDeploymentStatus(Enum): -- 2.17.1