Fix bug 1298 47/10047/13
authorDavid Garcia <david.garcia@canonical.com>
Fri, 27 Nov 2020 14:32:02 +0000 (15:32 +0100)
committerDavid Garcia <david.garcia@canonical.com>
Sat, 28 Nov 2020 10:12:41 +0000 (11:12 +0100)
This bug was caused because a bad parsing of the kubeconfig.
The token should be the secret from k8s for the created service
account.

When adding a k8s cluster, a clusterrole, clusterrolebinding, and
serviceaccounts are created.

Tests are needed for oauth2 and userpass kubeconfigs.

Change-Id: I6a4a2834bd6477f255e8ca48e7f53cd3a0d3fddf
Signed-off-by: David Garcia <david.garcia@canonical.com>
n2vc/k8s_helm_base_conn.py
n2vc/k8s_juju_conn.py
n2vc/kubectl.py
n2vc/libjuju.py
n2vc/tests/unit/test_k8s_juju_conn.py
n2vc/tests/unit/test_kubectl.py
n2vc/tests/unit/test_libjuju.py

index 842bbe3..3e054ed 100644 (file)
@@ -840,7 +840,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
                         # add repo
                         self.log.debug("add repo {}".format(db_repo["name"]))
 
                         # add repo
                         self.log.debug("add repo {}".format(db_repo["name"]))
-                        await  self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+                        await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
                         added_repo_dict[repo_id] = db_repo["name"]
                 except Exception as e:
                     raise K8sException(
                         added_repo_dict[repo_id] = db_repo["name"]
                 except Exception as e:
                     raise K8sException(
index 7e58deb..3d58385 100644 (file)
@@ -17,19 +17,45 @@ import os
 import uuid
 import yaml
 import tempfile
 import uuid
 import yaml
 import tempfile
+import binascii
+import base64
 
 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
 from n2vc.k8s_conn import K8sConnector
 
 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
 from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
 from .exceptions import MethodNotImplemented
 from n2vc.utils import base64_to_cacert
 from n2vc.libjuju import Libjuju
 
 from .exceptions import MethodNotImplemented
 from n2vc.utils import base64_to_cacert
 from n2vc.libjuju import Libjuju
 
+from kubernetes.client.models import (
+    V1ClusterRole,
+    V1ObjectMeta,
+    V1PolicyRule,
+    V1ServiceAccount,
+    V1ClusterRoleBinding,
+    V1RoleRef,
+    V1Subject,
+)
+
+from typing import Dict
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
+ADMIN_NAMESPACE = "kube-system"
+RBAC_STACK_PREFIX = "juju-credential"
 
 # from juju.bundle import BundleHandler
 # import re
 # import ssl
 # from .vnf import N2VC
 
 # from juju.bundle import BundleHandler
 # import re
 # import ssl
 # from .vnf import N2VC
+
+
+def generate_rbac_id():
+    return binascii.hexlify(os.urandom(4)).decode()
+
+
 class K8sJujuConnector(K8sConnector):
     def __init__(
         self,
 class K8sJujuConnector(K8sConnector):
     def __init__(
         self,
@@ -212,46 +238,107 @@ class K8sJujuConnector(K8sConnector):
 
         # Store the cluster configuration so it
         # can be used for subsequent calls
 
         # Store the cluster configuration so it
         # can be used for subsequent calls
-
         kubecfg = tempfile.NamedTemporaryFile()
         with open(kubecfg.name, "w") as kubecfg_file:
             kubecfg_file.write(k8s_creds)
         kubectl = Kubectl(config_file=kubecfg.name)
         kubecfg = tempfile.NamedTemporaryFile()
         with open(kubecfg.name, "w") as kubecfg_file:
             kubecfg_file.write(k8s_creds)
         kubectl = Kubectl(config_file=kubecfg.name)
-        configuration = kubectl.get_configuration()
-        default_storage_class = kubectl.get_default_storage_class()
-        await self.libjuju.add_k8s(
-            name=cluster_uuid,
-            configuration=configuration,
-            storage_class=default_storage_class,
-            credential_name=self._get_credential_name(cluster_uuid),
-        )
-        # self.log.debug("Setting config")
-        # await self.set_config(cluster_uuid, config)
 
 
-        # Test connection
-        # controller = await self.get_controller(cluster_uuid)
-        # await controller.disconnect()
+        # CREATING RESOURCES IN K8S
+        rbac_id = generate_rbac_id()
+        metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+        labels = {RBAC_STACK_PREFIX: rbac_id}
 
 
-        # TODO: Remove these commented lines
-        # raise Exception("EOL")
-        # self.juju_public_key = None
-        # Login to the k8s cluster
-        # if not self.authenticated:
-        #     await self.login(cluster_uuid)
+        # Create cleanup dictionary to clean up created resources
+        # if it fails in the middle of the process
+        cleanup_data = []
+        try:
+            self._create_cluster_role(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_cluster_role,
+                    "args": (kubectl, metadata_name),
+                }
+            )
 
 
-        # We're creating a new cluster
-        # print("Getting model {}".format(self.get_namespace(cluster_uuid),
-        #    cluster_uuid=cluster_uuid))
-        # model = await self.get_model(
-        #    self.get_namespace(cluster_uuid),
-        #    cluster_uuid=cluster_uuid
-        # )
+            self._create_service_account(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_service_account,
+                    "args": (kubectl, metadata_name),
+                }
+            )
+
+            self._create_cluster_role_binding(
+                kubectl,
+                name=metadata_name,
+                labels=labels,
+            )
+            cleanup_data.append(
+                {
+                    "delete": self._delete_service_account,
+                    "args": (kubectl, metadata_name),
+                }
+            )
+            token, client_cert_data = await self._get_secret_data(
+                kubectl,
+                metadata_name,
+            )
 
 
-        # Disconnect from the model
-        # if model and model.is_connected():
-        #    await model.disconnect()
+            default_storage_class = kubectl.get_default_storage_class()
+            await self.libjuju.add_k8s(
+                name=cluster_uuid,
+                rbac_id=rbac_id,
+                token=token,
+                client_cert_data=client_cert_data,
+                configuration=kubectl.configuration,
+                storage_class=default_storage_class,
+                credential_name=self._get_credential_name(cluster_uuid),
+            )
+            # self.log.debug("Setting config")
+            # await self.set_config(cluster_uuid, config)
+
+            # Test connection
+            # controller = await self.get_controller(cluster_uuid)
+            # await controller.disconnect()
+
+            # TODO: Remove these commented lines
+            # raise Exception("EOL")
+            # self.juju_public_key = None
+            # Login to the k8s cluster
+            # if not self.authenticated:
+            #     await self.login(cluster_uuid)
+
+            # We're creating a new cluster
+            # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+            #    cluster_uuid=cluster_uuid))
+            # model = await self.get_model(
+            #    self.get_namespace(cluster_uuid),
+            #    cluster_uuid=cluster_uuid
+            # )
 
 
-        return cluster_uuid, True
+            # Disconnect from the model
+            # if model and model.is_connected():
+            #    await model.disconnect()
+
+            return cluster_uuid, True
+        except Exception as e:
+            self.log.error("Error initializing k8scluster: {}".format(e))
+            if len(cleanup_data) > 0:
+                self.log.debug("Cleaning up created resources in k8s cluster...")
+                for item in cleanup_data:
+                    delete_function = item["delete"]
+                    delete_args = item["args"]
+                    delete_function(*delete_args)
+                self.log.debug("Cleanup finished")
+            raise e
 
     """Repo Management"""
 
 
     """Repo Management"""
 
@@ -292,7 +379,6 @@ class K8sJujuConnector(K8sConnector):
         """
 
         try:
         """
 
         try:
-
             # Remove k8scluster from database
             # self.log.debug("[reset] Removing k8scluster from juju database")
             # juju_db = self.db.get_one("admin", {"_id": "juju"})
             # Remove k8scluster from database
             # self.log.debug("[reset] Removing k8scluster from juju database")
             # juju_db = self.db.get_one("admin", {"_id": "juju"})
@@ -310,12 +396,41 @@ class K8sJujuConnector(K8sConnector):
             # Destroy the controller (via CLI)
             # self.log.debug("[reset] Destroying controller")
             # await self.destroy_controller(cluster_uuid)
             # Destroy the controller (via CLI)
             # self.log.debug("[reset] Destroying controller")
             # await self.destroy_controller(cluster_uuid)
-
             self.log.debug("[reset] Removing k8s cloud")
             # k8s_cloud = "k8s-{}".format(cluster_uuid)
             # await self.remove_cloud(k8s_cloud)
             self.log.debug("[reset] Removing k8s cloud")
             # k8s_cloud = "k8s-{}".format(cluster_uuid)
             # await self.remove_cloud(k8s_cloud)
+
+            cloud_creds = await self.libjuju.get_cloud_credentials(
+                cluster_uuid,
+                self._get_credential_name(cluster_uuid),
+            )
+
             await self.libjuju.remove_cloud(cluster_uuid)
 
             await self.libjuju.remove_cloud(cluster_uuid)
 
+            kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+
+            kubecfg_file = tempfile.NamedTemporaryFile()
+            with open(kubecfg_file.name, "w") as f:
+                f.write(kubecfg)
+            kubectl = Kubectl(config_file=kubecfg_file.name)
+
+            delete_functions = [
+                self._delete_cluster_role_binding,
+                self._delete_service_account,
+                self._delete_cluster_role,
+            ]
+
+            credential_attrs = cloud_creds[0].result["attrs"]
+            if RBAC_LABEL_KEY_NAME in credential_attrs:
+                rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
+                metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+                delete_args = (kubectl, metadata_name)
+                for delete_func in delete_functions:
+                    try:
+                        delete_func(*delete_args)
+                    except Exception as e:
+                        self.log.warning("Cannot remove resource in K8s {}".format(e))
+
         except Exception as e:
             self.log.debug("Caught exception during reset: {}".format(e))
             raise e
         except Exception as e:
             self.log.debug("Caught exception during reset: {}".format(e))
             raise e
@@ -1243,3 +1358,122 @@ class K8sJujuConnector(K8sConnector):
     #         q_filter={"_id": "juju"},
     #         update_dict={"k8sclusters": k8sclusters},
     #     )
     #         q_filter={"_id": "juju"},
     #         update_dict={"k8sclusters": k8sclusters},
     #     )
+
+    # Private methods to create/delete needed resources in the
+    # Kubernetes cluster to create the K8s cloud in Juju
+
+    def _create_cluster_role(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
+            field_selector="metadata.name={}".format(name)
+        )
+
+        if len(cluster_roles.items) > 0:
+            raise Exception(
+                "Cluster role with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+        # Cluster role
+        cluster_role = V1ClusterRole(
+            metadata=metadata,
+            rules=[
+                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+            ],
+        )
+
+        kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+    def _delete_cluster_role(self, kubectl: Kubectl, name: str):
+        kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+    def _create_service_account(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
+            ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+        )
+        if len(service_accounts.items) > 0:
+            raise Exception(
+                "Service account with metadata.name={} already exists".format(name)
+            )
+
+        metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+        service_account = V1ServiceAccount(metadata=metadata)
+
+        kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
+            ADMIN_NAMESPACE, service_account
+        )
+
+    def _delete_service_account(self, kubectl: Kubectl, name: str):
+        kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
+            name, ADMIN_NAMESPACE
+        )
+
+    def _create_cluster_role_binding(
+        self,
+        kubectl: Kubectl,
+        name: str,
+        labels: Dict[str, str],
+    ):
+        role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
+            field_selector="metadata.name={}".format(name)
+        )
+        if len(role_bindings.items) > 0:
+            raise Exception("Generated rbac id already exists")
+
+        role_binding = V1ClusterRoleBinding(
+            metadata=V1ObjectMeta(name=name, labels=labels),
+            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+            subjects=[
+                V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
+            ],
+        )
+        kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+    def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
+        kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+    async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
+        v1_core = kubectl.clients[CORE_CLIENT]
+
+        retries_limit = 10
+        secret_name = None
+        while True:
+            retries_limit -= 1
+            service_accounts = v1_core.list_namespaced_service_account(
+                ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+            )
+            if len(service_accounts.items) == 0:
+                raise Exception(
+                    "Service account not found with metadata.name={}".format(name)
+                )
+            service_account = service_accounts.items[0]
+            if service_account.secrets and len(service_account.secrets) > 0:
+                secret_name = service_account.secrets[0].name
+            if secret_name is not None or not retries_limit:
+                break
+        if not secret_name:
+            raise Exception(
+                "Failed getting the secret from service account {}".format(name)
+            )
+        secret = v1_core.list_namespaced_secret(
+            ADMIN_NAMESPACE,
+            field_selector="metadata.name={}".format(secret_name),
+        ).items[0]
+
+        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+        return (
+            base64.b64decode(token).decode("utf-8"),
+            base64.b64decode(client_certificate_data).decode("utf-8"),
+        )
index 31b6f55..bc8c392 100644 (file)
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
 
+import logging
+
 from kubernetes import client, config
 from kubernetes.client.rest import ApiException
 from kubernetes import client, config
 from kubernetes.client.rest import ApiException
-import logging
+
+
+CORE_CLIENT = "core_v1"
+STORAGE_CLIENT = "storage_v1"
+RBAC_CLIENT = "rbac_v1"
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
 
 
 class Kubectl:
     def __init__(self, config_file=None):
         config.load_kube_config(config_file=config_file)
+        self._clients = {
+            "core_v1": client.CoreV1Api(),
+            "storage_v1": client.StorageV1Api(),
+            "rbac_v1": client.RbacAuthorizationV1Api(),
+        }
+        self._configuration = config.kube_config.Configuration()
         self.logger = logging.getLogger("Kubectl")
 
         self.logger = logging.getLogger("Kubectl")
 
-    def get_configuration(self):
-        return config.kube_config.Configuration()
+    @property
+    def configuration(self):
+        return self._configuration
+
+    @property
+    def clients(self):
+        return self._clients
 
     def get_services(self, field_selector=None, label_selector=None):
         kwargs = {}
 
     def get_services(self, field_selector=None, label_selector=None):
         kwargs = {}
@@ -31,10 +48,8 @@ class Kubectl:
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
             kwargs["field_selector"] = field_selector
         if label_selector:
             kwargs["label_selector"] = label_selector
-
         try:
         try:
-            v1 = client.CoreV1Api()
-            result = v1.list_service_for_all_namespaces(**kwargs)
+            result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
             return [
                 {
                     "name": i.metadata.name,
             return [
                 {
                     "name": i.metadata.name,
@@ -70,9 +85,7 @@ class Kubectl:
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
                     If not, it returns the first storage class.
                     If there are not storage classes, returns None
         """
-
-        storagev1 = client.StorageV1Api()
-        storage_classes = storagev1.list_storage_class()
+        storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
         selected_sc = None
         default_sc_annotations = {
             "storageclass.kubernetes.io/is-default-class": "true",
index 5a3ddbb..a79d00d 100644 (file)
@@ -14,8 +14,7 @@
 
 import asyncio
 import logging
 
 import asyncio
 import logging
-from juju.controller import Controller
-from juju.client import client
+
 import time
 
 from juju.errors import JujuAPIError
 import time
 
 from juju.errors import JujuAPIError
@@ -29,6 +28,10 @@ from juju.client._definitions import (
     Cloud,
     CloudCredential,
 )
     Cloud,
     CloudCredential,
 )
+from juju.controller import Controller
+from juju.client import client
+from juju import tag
+
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -46,6 +49,8 @@ from n2vc.utils import DB_DATA
 from osm_common.dbbase import DbException
 from kubernetes.client.configuration import Configuration
 
 from osm_common.dbbase import DbException
 from kubernetes.client.configuration import Configuration
 
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
 
 class Libjuju:
     def __init__(
 
 class Libjuju:
     def __init__(
@@ -654,7 +659,8 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
@@ -744,7 +750,8 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
 
             # Return list of actions
             )
 
             # Return list of actions
@@ -776,7 +783,10 @@ class Libjuju:
         return metrics
 
     async def add_relation(
         return metrics
 
     async def add_relation(
-        self, model_name: str, endpoint_1: str, endpoint_2: str,
+        self,
+        model_name: str,
+        endpoint_1: str,
+        endpoint_2: str,
     ):
         """Add relation
 
     ):
         """Add relation
 
@@ -811,7 +821,9 @@ class Libjuju:
             await self.disconnect_controller(controller)
 
     async def consume(
             await self.disconnect_controller(controller)
 
     async def consume(
-        self, offer_url: str, model_name: str,
+        self,
+        offer_url: str,
+        model_name: str,
     ):
         """
         Adds a remote offer to the model. Relations can be created later using "juju relate".
     ):
         """
         Adds a remote offer to the model. Relations can be created later using "juju relate".
@@ -958,7 +970,8 @@ class Libjuju:
             try:
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
             try:
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
-                    model, application_name=application_name,
+                    model,
+                    application_name=application_name,
                 )
                 await application.set_config(config)
             finally:
                 )
                 await application.set_config(config)
             finally:
@@ -999,7 +1012,8 @@ class Libjuju:
         if not juju_info:
             try:
                 self.db.create(
         if not juju_info:
             try:
                 self.db.create(
-                    DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+                    DB_DATA.api_endpoints.table,
+                    DB_DATA.api_endpoints.filter,
                 )
             except DbException as e:
                 # Racing condition: check if another N2VC worker has created it
                 )
             except DbException as e:
                 # Racing condition: check if another N2VC worker has created it
@@ -1071,6 +1085,9 @@ class Libjuju:
     async def add_k8s(
         self,
         name: str,
     async def add_k8s(
         self,
         name: str,
+        rbac_id: str,
+        token: str,
+        client_cert_data: str,
         configuration: Configuration,
         storage_class: str,
         credential_name: str = None,
         configuration: Configuration,
         storage_class: str,
         credential_name: str = None,
@@ -1094,17 +1111,17 @@ class Libjuju:
             raise Exception("configuration must be provided")
 
         endpoint = configuration.host
             raise Exception("configuration must be provided")
 
         endpoint = configuration.host
-        credential = self.get_k8s_cloud_credential(configuration)
-        ca_certificates = (
-            [credential.attrs["ClientCertificateData"]]
-            if "ClientCertificateData" in credential.attrs
-            else []
+        credential = self.get_k8s_cloud_credential(
+            configuration,
+            client_cert_data,
+            token,
         )
         )
+        credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
         cloud = client.Cloud(
             type_="kubernetes",
             auth_types=[credential.auth_type],
             endpoint=endpoint,
         cloud = client.Cloud(
             type_="kubernetes",
             auth_types=[credential.auth_type],
             endpoint=endpoint,
-            ca_certificates=ca_certificates,
+            ca_certificates=[client_cert_data],
             config={
                 "operator-storage": storage_class,
                 "workload-storage": storage_class,
             config={
                 "operator-storage": storage_class,
                 "workload-storage": storage_class,
@@ -1116,30 +1133,21 @@ class Libjuju:
         )
 
     def get_k8s_cloud_credential(
         )
 
     def get_k8s_cloud_credential(
-        self, configuration: Configuration,
+        self,
+        configuration: Configuration,
+        client_cert_data: str,
+        token: str = None,
     ) -> client.CloudCredential:
         attrs = {}
     ) -> client.CloudCredential:
         attrs = {}
-        ca_cert = configuration.ssl_ca_cert or configuration.cert_file
-        key = configuration.key_file
-        api_key = configuration.api_key
-        token = None
+        # TODO: Test with AKS
+        key = None  # open(configuration.key_file, "r").read()
         username = configuration.username
         password = configuration.password
 
         username = configuration.username
         password = configuration.password
 
-        if "authorization" in api_key:
-            authorization = api_key["authorization"]
-            if "Bearer " in authorization:
-                bearer_list = authorization.split(" ")
-                if len(bearer_list) == 2:
-                    [_, token] = bearer_list
-                else:
-                    raise JujuInvalidK8sConfiguration("unknown format of api_key")
-            else:
-                token = authorization
-        if ca_cert:
-            attrs["ClientCertificateData"] = open(ca_cert, "r").read()
+        if client_cert_data:
+            attrs["ClientCertificateData"] = client_cert_data
         if key:
         if key:
-            attrs["ClientKeyData"] = open(key, "r").read()
+            attrs["ClientKeyData"] = key
         if token:
             if username or password:
                 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
         if token:
             if username or password:
                 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
@@ -1148,6 +1156,8 @@ class Libjuju:
         auth_type = None
         if key:
             auth_type = "oauth2"
         auth_type = None
         if key:
             auth_type = "oauth2"
+            if client_cert_data:
+                auth_type = "oauth2withcert"
             if not token:
                 raise JujuInvalidK8sConfiguration(
                     "missing token for auth type {}".format(auth_type)
             if not token:
                 raise JujuInvalidK8sConfiguration(
                     "missing token for auth type {}".format(auth_type)
@@ -1159,11 +1169,11 @@ class Libjuju:
                 )
             attrs["username"] = username
             attrs["password"] = password
                 )
             attrs["username"] = username
             attrs["password"] = password
-            if ca_cert:
+            if client_cert_data:
                 auth_type = "userpasswithcert"
             else:
                 auth_type = "userpass"
                 auth_type = "userpasswithcert"
             else:
                 auth_type = "userpass"
-        elif ca_cert and token:
+        elif client_cert_data and token:
             auth_type = "certificate"
         else:
             raise JujuInvalidK8sConfiguration("authentication method not supported")
             auth_type = "certificate"
         else:
             raise JujuInvalidK8sConfiguration("authentication method not supported")
@@ -1218,3 +1228,13 @@ class Libjuju:
                 unit = u
                 break
         return unit
                 unit = u
                 break
         return unit
+
+    async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+        controller = await self.get_controller()
+        try:
+            facade = client.CloudFacade.from_connection(controller.connection())
+            cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+            params = [client.Entity(cloud_cred_tag)]
+            return (await facade.Credential(params)).results
+        finally:
+            await self.disconnect_controller(controller)
index 50e827e..388c08f 100644 (file)
 import asyncio
 import logging
 import asynctest
 import asyncio
 import logging
 import asynctest
-from n2vc.k8s_juju_conn import K8sJujuConnector
+from unittest.mock import Mock
+from n2vc.k8s_juju_conn import K8sJujuConnector, RBAC_LABEL_KEY_NAME
 from osm_common import fslocal
 from osm_common import fslocal
-from .utils import kubeconfig, FakeModel, FakeFileWrapper
+from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock
 from n2vc.exceptions import (
     MethodNotImplemented,
     K8sException,
     N2VCBadArgumentsException,
 )
 from n2vc.exceptions import (
     MethodNotImplemented,
     K8sException,
     N2VCBadArgumentsException,
 )
-from unittest.mock import Mock
-from .utils import AsyncMock
 
 
 class K8sJujuConnTestCase(asynctest.TestCase):
 
 
 class K8sJujuConnTestCase(asynctest.TestCase):
@@ -193,12 +192,17 @@ class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase):
             )
 
 
             )
 
 
+@asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
 class InitEnvTest(K8sJujuConnTestCase):
     def setUp(self):
         super(InitEnvTest, self).setUp()
         self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
 class InitEnvTest(K8sJujuConnTestCase):
     def setUp(self):
         super(InitEnvTest, self).setUp()
         self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
+        self.k8s_juju_conn._create_cluster_role = Mock()
+        self.k8s_juju_conn._create_service_account = Mock()
+        self.k8s_juju_conn._create_cluster_role_binding = Mock()
+        self.k8s_juju_conn._get_secret_data = AsyncMock()
+        self.k8s_juju_conn._get_secret_data.return_value = ("token", "cacert")
 
 
-    @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
     def test_with_cluster_uuid(
         self,
         mock_get_default_storage_class,
     def test_with_cluster_uuid(
         self,
         mock_get_default_storage_class,
@@ -215,8 +219,10 @@ class InitEnvTest(K8sJujuConnTestCase):
         mock_get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
         mock_get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
-    @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
-    def test_with_no_cluster_uuid(self, mock_get_default_storage_class):
+    def test_with_no_cluster_uuid(
+        self,
+        mock_get_default_storage_class,
+    ):
         uuid, created = self.loop.run_until_complete(
             self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
         )
         uuid, created = self.loop.run_until_complete(
             self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
         )
@@ -226,8 +232,10 @@ class InitEnvTest(K8sJujuConnTestCase):
         mock_get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
         mock_get_default_storage_class.assert_called_once()
         self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
 
-    @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
-    def test_init_env_exception(self, mock_get_default_storage_class):
+    def test_init_env_exception(
+        self,
+        mock_get_default_storage_class,
+    ):
         self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
         created = None
         uuid = None
         self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
         created = None
         uuid = None
@@ -283,6 +291,15 @@ class ResetTest(K8sJujuConnTestCase):
     def setUp(self):
         super(ResetTest, self).setUp()
         self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
     def setUp(self):
         super(ResetTest, self).setUp()
         self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
+        self.k8s_juju_conn.libjuju.get_cloud_credentials = AsyncMock()
+        cloud_creds = Mock()
+        cloud_creds.result = {"attrs": {RBAC_LABEL_KEY_NAME: "asd"}}
+        self.k8s_juju_conn.libjuju.get_cloud_credentials.return_value = [cloud_creds]
+        self.k8s_juju_conn._delete_cluster_role_binding = Mock()
+        self.k8s_juju_conn._delete_service_account = Mock()
+        self.k8s_juju_conn._delete_cluster_role = Mock()
+        self.k8s_juju_conn.get_credentials = Mock()
+        self.k8s_juju_conn.get_credentials.return_value = kubeconfig
 
     def test_success(self):
         removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
 
     def test_success(self):
         removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
index 8fb0310..28d66c5 100644 (file)
@@ -13,7 +13,7 @@
 #     limitations under the License.
 
 from unittest import TestCase, mock
 #     limitations under the License.
 
 from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT
 from n2vc.utils import Dict
 from kubernetes.client.rest import ApiException
 
 from n2vc.utils import Dict
 from kubernetes.client.rest import ApiException
 
@@ -114,7 +114,9 @@ fake_list_services = Dict(
 
 
 class KubectlTestCase(TestCase):
 
 
 class KubectlTestCase(TestCase):
-    def setUp(self,):
+    def setUp(
+        self,
+    ):
         pass
 
 
         pass
 
 
@@ -123,7 +125,7 @@ class FakeCoreV1Api:
         return fake_list_services
 
 
         return fake_list_services
 
 
-class ProvisionerTest(TestCase):
+class GetServices(TestCase):
     @mock.patch("n2vc.kubectl.config.load_kube_config")
     @mock.patch("n2vc.kubectl.client.CoreV1Api")
     def setUp(self, mock_core, mock_config):
     @mock.patch("n2vc.kubectl.config.load_kube_config")
     @mock.patch("n2vc.kubectl.client.CoreV1Api")
     def setUp(self, mock_core, mock_config):
@@ -140,9 +142,10 @@ class ProvisionerTest(TestCase):
         keys = ["name", "cluster_ip", "type", "ports", "external_ip"]
         self.assertTrue(k in service for service in services for k in keys)
 
         keys = ["name", "cluster_ip", "type", "ports", "external_ip"]
         self.assertTrue(k in service for service in services for k in keys)
 
-    @mock.patch("n2vc.kubectl.client.CoreV1Api.list_service_for_all_namespaces")
-    def test_get_service_exception(self, list_services):
-        list_services.side_effect = ApiException()
+    def test_get_service_exception(self):
+        self.kubectl.clients[
+            CORE_CLIENT
+        ].list_service_for_all_namespaces.side_effect = ApiException()
         with self.assertRaises(ApiException):
             self.kubectl.get_services()
 
         with self.assertRaises(ApiException):
             self.kubectl.get_services()
 
@@ -155,7 +158,7 @@ class GetConfiguration(KubectlTestCase):
 
     def test_get_configuration(self, mock_load_kube_config, mock_configuration):
         kubectl = Kubectl()
 
     def test_get_configuration(self, mock_load_kube_config, mock_configuration):
         kubectl = Kubectl()
-        kubectl.get_configuration()
+        kubectl.configuration
         mock_configuration.assert_called_once()
 
 
         mock_configuration.assert_called_once()
 
 
index 7bea6b3..b7c7901 100644 (file)
@@ -32,6 +32,7 @@ from n2vc.exceptions import (
     JujuInvalidK8sConfiguration,
     JujuLeaderUnitNotFound,
 )
     JujuInvalidK8sConfiguration,
     JujuLeaderUnitNotFound,
 )
+from n2vc.k8s_juju_conn import generate_rbac_id
 
 
 class LibjujuTestCase(asynctest.TestCase):
 
 
 class LibjujuTestCase(asynctest.TestCase):
@@ -1431,45 +1432,56 @@ class ConsumeTest(LibjujuTestCase):
 class AddK8sTest(LibjujuTestCase):
     def setUp(self):
         super(AddK8sTest, self).setUp()
 class AddK8sTest(LibjujuTestCase):
     def setUp(self):
         super(AddK8sTest, self).setUp()
-        self.configuration = kubernetes.client.configuration.Configuration()
+        name = "cloud"
+        rbac_id = generate_rbac_id()
+        token = "token"
+        client_cert_data = "cert"
+        configuration = kubernetes.client.configuration.Configuration()
+        storage_class = "storage_class"
+        credential_name = name
+
+        self._add_k8s_args = {
+            "name": name,
+            "rbac_id": rbac_id,
+            "token": token,
+            "client_cert_data": client_cert_data,
+            "configuration": configuration,
+            "storage_class": storage_class,
+            "credential_name": credential_name,
+        }
 
     def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
 
     def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
-        self.loop.run_until_complete(
-            self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-        )
+        self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.side_effect = Exception()
         with self.assertRaises(Exception):
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.side_effect = Exception()
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
         mock_add_cloud.assert_called_once()
         mock_get_k8s_cloud_credential.assert_called_once()
 
     def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+        self._add_k8s_args["name"] = ""
         with self.assertRaises(Exception):
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("", self.configuration, "storage_class")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_storage_name(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_storage_name(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["storage_class"] = ""
         with self.assertRaises(Exception):
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(
-                self.libjuju.add_k8s("cloud", self.configuration, "")
-            )
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_configuration_keys(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
         mock_add_cloud.assert_not_called()
 
     def test_add_k8s_missing_configuration_keys(
         self, mock_add_cloud, mock_get_k8s_cloud_credential
     ):
+        self._add_k8s_args["configuration"] = None
         with self.assertRaises(Exception):
         with self.assertRaises(Exception):
-            self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+            self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
         mock_add_cloud.assert_not_called()
 
 
         mock_add_cloud.assert_not_called()
 
 
@@ -1596,6 +1608,8 @@ class RemoveCloudTest(LibjujuTestCase):
 class GetK8sCloudCredentials(LibjujuTestCase):
     def setUp(self):
         super(GetK8sCloudCredentials, self).setUp()
 class GetK8sCloudCredentials(LibjujuTestCase):
     def setUp(self):
         super(GetK8sCloudCredentials, self).setUp()
+        self.cert_data = "cert"
+        self.token = "token"
 
     @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
     def test_not_supported(self, mock_exception, mock_configuration):
 
     @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
     def test_not_supported(self, mock_exception, mock_configuration):
@@ -1605,8 +1619,14 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         exception_raised = False
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         exception_raised = False
+        self.token = None
+        self.cert_data = None
         try:
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(
@@ -1621,7 +1641,13 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+        self.token = None
+        self.cert_data = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
@@ -1629,14 +1655,44 @@ class GetK8sCloudCredentials(LibjujuTestCase):
             ),
         )
 
             ),
         )
 
+    def test_user_pass_with_cert(self, mock_configuration):
+        mock_configuration.username = "admin"
+        mock_configuration.password = "admin"
+        mock_configuration.ssl_ca_cert = None
+        mock_configuration.cert_file = None
+        mock_configuration.key_file = None
+        self.token = None
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
+        )
+        self.assertEqual(
+            credential,
+            juju.client._definitions.CloudCredential(
+                attrs={
+                    "ClientCertificateData": self.cert_data,
+                    "username": "admin",
+                    "password": "admin",
+                },
+                auth_type="userpasswithcert",
+            ),
+        )
+
     def test_user_no_pass(self, mock_configuration):
         mock_configuration.username = "admin"
         mock_configuration.password = ""
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
     def test_user_no_pass(self, mock_configuration):
         mock_configuration.username = "admin"
         mock_configuration.password = ""
         mock_configuration.ssl_ca_cert = None
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
+        self.token = None
+        self.cert_data = None
         with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
         with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
-            credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            credential = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
             self.assertEqual(
                 credential,
                 juju.client._definitions.CloudCredential(
             self.assertEqual(
                 credential,
                 juju.client._definitions.CloudCredential(
@@ -1647,28 +1703,6 @@ class GetK8sCloudCredentials(LibjujuTestCase):
                 "credential for user admin has empty password"
             )
 
                 "credential for user admin has empty password"
             )
 
-    def test_user_pass_with_cert(self, mock_configuration):
-        mock_configuration.username = "admin"
-        mock_configuration.password = "admin"
-        ssl_ca_cert = tempfile.NamedTemporaryFile()
-        with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
-            ssl_ca_cert_file.write("cacert")
-        mock_configuration.ssl_ca_cert = ssl_ca_cert.name
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={
-                    "username": "admin",
-                    "password": "admin",
-                    "ClientCertificateData": "cacert",
-                },
-                auth_type="userpasswithcert",
-            ),
-        )
-
     def test_cert(self, mock_configuration):
         mock_configuration.username = ""
         mock_configuration.password = ""
     def test_cert(self, mock_configuration):
         mock_configuration.username = ""
         mock_configuration.password = ""
@@ -1679,72 +1713,67 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.ssl_ca_cert = ssl_ca_cert.name
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
         mock_configuration.ssl_ca_cert = ssl_ca_cert.name
         mock_configuration.cert_file = None
         mock_configuration.key_file = None
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        self.assertEqual(
-            credential,
-            juju.client._definitions.CloudCredential(
-                attrs={"ClientCertificateData": "cacert", "Token": "Token"},
-                auth_type="certificate",
-            ),
+        credential = self.libjuju.get_k8s_cloud_credential(
+            mock_configuration,
+            self.cert_data,
+            self.token,
         )
         )
-
-    def test_oauth2(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token"}
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
         self.assertEqual(
             credential,
             juju.client._definitions.CloudCredential(
-                attrs={"ClientKeyData": "key", "Token": "Token"},
-                auth_type="oauth2",
+                attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
+                auth_type="certificate",
             ),
         )
 
             ),
         )
 
-    @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
-    def test_oauth2_missing_token(self, mock_exception, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        key = tempfile.NamedTemporaryFile()
-        with open(key.name, "w") as key_file:
-            key_file.write("key")
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = key.name
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "missing token for auth type oauth2",
-            )
-        self.assertTrue(exception_raised)
-
-    def test_unknown_api_key(self, mock_configuration):
-        mock_configuration.username = ""
-        mock_configuration.password = ""
-        mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
-        mock_configuration.ssl_ca_cert = None
-        mock_configuration.cert_file = None
-        mock_configuration.key_file = None
-        exception_raised = False
-        try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
-        except JujuInvalidK8sConfiguration as e:
-            exception_raised = True
-            self.assertEqual(
-                e.message,
-                "unknown format of api_key",
-            )
-        self.assertTrue(exception_raised)
+    # TODO: Fix this test when oauth authentication is supported
+    # def test_oauth2(self, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     mock_configuration.api_key = {"authorization": "Bearer Token"}
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     credential = self.libjuju.get_k8s_cloud_credential(
+    #         mock_configuration,
+    #         self.cert_data,
+    #         self.token,
+    #     )
+    #     self.assertEqual(
+    #         credential,
+    #         juju.client._definitions.CloudCredential(
+    #             attrs={"ClientKeyData": "key", "Token": "Token"},
+    #             auth_type="oauth2",
+    #         ),
+    #     )
+
+    # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+    # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+    #     mock_configuration.username = ""
+    #     mock_configuration.password = ""
+    #     key = tempfile.NamedTemporaryFile()
+    #     with open(key.name, "w") as key_file:
+    #         key_file.write("key")
+    #     mock_configuration.ssl_ca_cert = None
+    #     mock_configuration.cert_file = None
+    #     mock_configuration.key_file = key.name
+    #     exception_raised = False
+    #     try:
+    #         _ = self.libjuju.get_k8s_cloud_credential(
+    #             mock_configuration,
+    #             self.cert_data,
+    #             self.token,
+    #         )
+    #     except JujuInvalidK8sConfiguration as e:
+    #         exception_raised = True
+    #         self.assertEqual(
+    #             e.message,
+    #             "missing token for auth type oauth2",
+    #         )
+    #     self.assertTrue(exception_raised)
 
     def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
         mock_configuration.username = "admin"
 
     def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
         mock_configuration.username = "admin"
@@ -1755,7 +1784,11 @@ class GetK8sCloudCredentials(LibjujuTestCase):
         mock_configuration.key_file = None
         exception_raised = False
         try:
         mock_configuration.key_file = None
         exception_raised = False
         try:
-            _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+            _ = self.libjuju.get_k8s_cloud_credential(
+                mock_configuration,
+                self.cert_data,
+                self.token,
+            )
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(
         except JujuInvalidK8sConfiguration as e:
             exception_raised = True
             self.assertEqual(