This bug was caused because a bad parsing of the kubeconfig.
The token should be the secret from k8s for the created service
account.
When adding a k8s cluster, a clusterrole, clusterrolebinding, and
serviceaccounts are created.
Tests are needed for oauth2 and userpass kubeconfigs.
Change-Id: I6a4a2834bd6477f255e8ca48e7f53cd3a0d3fddf
Signed-off-by: David Garcia <david.garcia@canonical.com>
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+ await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
import uuid
import yaml
import tempfile
+import binascii
+import base64
from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
+from kubernetes.client.models import (
+ V1ClusterRole,
+ V1ObjectMeta,
+ V1PolicyRule,
+ V1ServiceAccount,
+ V1ClusterRoleBinding,
+ V1RoleRef,
+ V1Subject,
+)
+
+from typing import Dict
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
+ADMIN_NAMESPACE = "kube-system"
+RBAC_STACK_PREFIX = "juju-credential"
# from juju.bundle import BundleHandler
# import re
# import ssl
# from .vnf import N2VC
+
+
+def generate_rbac_id():
+ return binascii.hexlify(os.urandom(4)).decode()
+
+
class K8sJujuConnector(K8sConnector):
def __init__(
self,
# Store the cluster configuration so it
# can be used for subsequent calls
-
kubecfg = tempfile.NamedTemporaryFile()
with open(kubecfg.name, "w") as kubecfg_file:
kubecfg_file.write(k8s_creds)
kubectl = Kubectl(config_file=kubecfg.name)
- configuration = kubectl.get_configuration()
- default_storage_class = kubectl.get_default_storage_class()
- await self.libjuju.add_k8s(
- name=cluster_uuid,
- configuration=configuration,
- storage_class=default_storage_class,
- credential_name=self._get_credential_name(cluster_uuid),
- )
- # self.log.debug("Setting config")
- # await self.set_config(cluster_uuid, config)
- # Test connection
- # controller = await self.get_controller(cluster_uuid)
- # await controller.disconnect()
+ # CREATING RESOURCES IN K8S
+ rbac_id = generate_rbac_id()
+ metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+ labels = {RBAC_STACK_PREFIX: rbac_id}
- # TODO: Remove these commented lines
- # raise Exception("EOL")
- # self.juju_public_key = None
- # Login to the k8s cluster
- # if not self.authenticated:
- # await self.login(cluster_uuid)
+ # Create cleanup dictionary to clean up created resources
+ # if it fails in the middle of the process
+ cleanup_data = []
+ try:
+ self._create_cluster_role(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_cluster_role,
+ "args": (kubectl, metadata_name),
+ }
+ )
- # We're creating a new cluster
- # print("Getting model {}".format(self.get_namespace(cluster_uuid),
- # cluster_uuid=cluster_uuid))
- # model = await self.get_model(
- # self.get_namespace(cluster_uuid),
- # cluster_uuid=cluster_uuid
- # )
+ self._create_service_account(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_service_account,
+ "args": (kubectl, metadata_name),
+ }
+ )
+
+ self._create_cluster_role_binding(
+ kubectl,
+ name=metadata_name,
+ labels=labels,
+ )
+ cleanup_data.append(
+ {
+ "delete": self._delete_service_account,
+ "args": (kubectl, metadata_name),
+ }
+ )
+ token, client_cert_data = await self._get_secret_data(
+ kubectl,
+ metadata_name,
+ )
- # Disconnect from the model
- # if model and model.is_connected():
- # await model.disconnect()
+ default_storage_class = kubectl.get_default_storage_class()
+ await self.libjuju.add_k8s(
+ name=cluster_uuid,
+ rbac_id=rbac_id,
+ token=token,
+ client_cert_data=client_cert_data,
+ configuration=kubectl.configuration,
+ storage_class=default_storage_class,
+ credential_name=self._get_credential_name(cluster_uuid),
+ )
+ # self.log.debug("Setting config")
+ # await self.set_config(cluster_uuid, config)
+
+ # Test connection
+ # controller = await self.get_controller(cluster_uuid)
+ # await controller.disconnect()
+
+ # TODO: Remove these commented lines
+ # raise Exception("EOL")
+ # self.juju_public_key = None
+ # Login to the k8s cluster
+ # if not self.authenticated:
+ # await self.login(cluster_uuid)
+
+ # We're creating a new cluster
+ # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid))
+ # model = await self.get_model(
+ # self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid
+ # )
- return cluster_uuid, True
+ # Disconnect from the model
+ # if model and model.is_connected():
+ # await model.disconnect()
+
+ return cluster_uuid, True
+ except Exception as e:
+ self.log.error("Error initializing k8scluster: {}".format(e))
+ if len(cleanup_data) > 0:
+ self.log.debug("Cleaning up created resources in k8s cluster...")
+ for item in cleanup_data:
+ delete_function = item["delete"]
+ delete_args = item["args"]
+ delete_function(*delete_args)
+ self.log.debug("Cleanup finished")
+ raise e
"""Repo Management"""
"""
try:
-
# Remove k8scluster from database
# self.log.debug("[reset] Removing k8scluster from juju database")
# juju_db = self.db.get_one("admin", {"_id": "juju"})
# Destroy the controller (via CLI)
# self.log.debug("[reset] Destroying controller")
# await self.destroy_controller(cluster_uuid)
-
self.log.debug("[reset] Removing k8s cloud")
# k8s_cloud = "k8s-{}".format(cluster_uuid)
# await self.remove_cloud(k8s_cloud)
+
+ cloud_creds = await self.libjuju.get_cloud_credentials(
+ cluster_uuid,
+ self._get_credential_name(cluster_uuid),
+ )
+
await self.libjuju.remove_cloud(cluster_uuid)
+ kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+
+ kubecfg_file = tempfile.NamedTemporaryFile()
+ with open(kubecfg_file.name, "w") as f:
+ f.write(kubecfg)
+ kubectl = Kubectl(config_file=kubecfg_file.name)
+
+ delete_functions = [
+ self._delete_cluster_role_binding,
+ self._delete_service_account,
+ self._delete_cluster_role,
+ ]
+
+ credential_attrs = cloud_creds[0].result["attrs"]
+ if RBAC_LABEL_KEY_NAME in credential_attrs:
+ rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
+ metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
+ delete_args = (kubectl, metadata_name)
+ for delete_func in delete_functions:
+ try:
+ delete_func(*delete_args)
+ except Exception as e:
+ self.log.warning("Cannot remove resource in K8s {}".format(e))
+
except Exception as e:
self.log.debug("Caught exception during reset: {}".format(e))
raise e
# q_filter={"_id": "juju"},
# update_dict={"k8sclusters": k8sclusters},
# )
+
+ # Private methods to create/delete needed resources in the
+ # Kubernetes cluster to create the K8s cloud in Juju
+
+ def _create_cluster_role(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
+ field_selector="metadata.name={}".format(name)
+ )
+
+ if len(cluster_roles.items) > 0:
+ raise Exception(
+ "Cluster role with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ # Cluster role
+ cluster_role = V1ClusterRole(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+ V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+ ],
+ )
+
+ kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+ def _delete_cluster_role(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+ def _create_service_account(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) > 0:
+ raise Exception(
+ "Service account with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
+ service_account = V1ServiceAccount(metadata=metadata)
+
+ kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
+ ADMIN_NAMESPACE, service_account
+ )
+
+ def _delete_service_account(self, kubectl: Kubectl, name: str):
+ kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
+ name, ADMIN_NAMESPACE
+ )
+
+ def _create_cluster_role_binding(
+ self,
+ kubectl: Kubectl,
+ name: str,
+ labels: Dict[str, str],
+ ):
+ role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
+ field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception("Generated rbac id already exists")
+
+ role_binding = V1ClusterRoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
+ ],
+ )
+ kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+ def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
+ kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+ async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
+ v1_core = kubectl.clients[CORE_CLIENT]
+
+ retries_limit = 10
+ secret_name = None
+ while True:
+ retries_limit -= 1
+ service_accounts = v1_core.list_namespaced_service_account(
+ ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) == 0:
+ raise Exception(
+ "Service account not found with metadata.name={}".format(name)
+ )
+ service_account = service_accounts.items[0]
+ if service_account.secrets and len(service_account.secrets) > 0:
+ secret_name = service_account.secrets[0].name
+ if secret_name is not None or not retries_limit:
+ break
+ if not secret_name:
+ raise Exception(
+ "Failed getting the secret from service account {}".format(name)
+ )
+ secret = v1_core.list_namespaced_secret(
+ ADMIN_NAMESPACE,
+ field_selector="metadata.name={}".format(secret_name),
+ ).items[0]
+
+ token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+ client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+ return (
+ base64.b64decode(token).decode("utf-8"),
+ base64.b64decode(client_certificate_data).decode("utf-8"),
+ )
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from kubernetes import client, config
from kubernetes.client.rest import ApiException
-import logging
+
+
+CORE_CLIENT = "core_v1"
+STORAGE_CLIENT = "storage_v1"
+RBAC_CLIENT = "rbac_v1"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
+ self._clients = {
+ "core_v1": client.CoreV1Api(),
+ "storage_v1": client.StorageV1Api(),
+ "rbac_v1": client.RbacAuthorizationV1Api(),
+ }
+ self._configuration = config.kube_config.Configuration()
self.logger = logging.getLogger("Kubectl")
- def get_configuration(self):
- return config.kube_config.Configuration()
+ @property
+ def configuration(self):
+ return self._configuration
+
+ @property
+ def clients(self):
+ return self._clients
def get_services(self, field_selector=None, label_selector=None):
kwargs = {}
kwargs["field_selector"] = field_selector
if label_selector:
kwargs["label_selector"] = label_selector
-
try:
- v1 = client.CoreV1Api()
- result = v1.list_service_for_all_namespaces(**kwargs)
+ result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
return [
{
"name": i.metadata.name,
If not, it returns the first storage class.
If there are not storage classes, returns None
"""
-
- storagev1 = client.StorageV1Api()
- storage_classes = storagev1.list_storage_class()
+ storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
selected_sc = None
default_sc_annotations = {
"storageclass.kubernetes.io/is-default-class": "true",
import asyncio
import logging
-from juju.controller import Controller
-from juju.client import client
+
import time
from juju.errors import JujuAPIError
Cloud,
CloudCredential,
)
+from juju.controller import Controller
+from juju.client import client
+from juju import tag
+
from n2vc.juju_watcher import JujuModelWatcher
from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.n2vc_conn import N2VCConnector
from osm_common.dbbase import DbException
from kubernetes.client.configuration import Configuration
+RBAC_LABEL_KEY_NAME = "rbac-id"
+
class Libjuju:
def __init__(
try:
# Get application
application = self._get_application(
- model, application_name=application_name,
+ model,
+ application_name=application_name,
)
if application is None:
raise JujuApplicationNotFound("Cannot execute action")
try:
# Get application
application = self._get_application(
- model, application_name=application_name,
+ model,
+ application_name=application_name,
)
# Return list of actions
return metrics
async def add_relation(
- self, model_name: str, endpoint_1: str, endpoint_2: str,
+ self,
+ model_name: str,
+ endpoint_1: str,
+ endpoint_2: str,
):
"""Add relation
await self.disconnect_controller(controller)
async def consume(
- self, offer_url: str, model_name: str,
+ self,
+ offer_url: str,
+ model_name: str,
):
"""
Adds a remote offer to the model. Relations can be created later using "juju relate".
try:
model = await self.get_model(controller, model_name)
application = self._get_application(
- model, application_name=application_name,
+ model,
+ application_name=application_name,
)
await application.set_config(config)
finally:
if not juju_info:
try:
self.db.create(
- DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+ DB_DATA.api_endpoints.table,
+ DB_DATA.api_endpoints.filter,
)
except DbException as e:
# Racing condition: check if another N2VC worker has created it
async def add_k8s(
self,
name: str,
+ rbac_id: str,
+ token: str,
+ client_cert_data: str,
configuration: Configuration,
storage_class: str,
credential_name: str = None,
raise Exception("configuration must be provided")
endpoint = configuration.host
- credential = self.get_k8s_cloud_credential(configuration)
- ca_certificates = (
- [credential.attrs["ClientCertificateData"]]
- if "ClientCertificateData" in credential.attrs
- else []
+ credential = self.get_k8s_cloud_credential(
+ configuration,
+ client_cert_data,
+ token,
)
+ credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
cloud = client.Cloud(
type_="kubernetes",
auth_types=[credential.auth_type],
endpoint=endpoint,
- ca_certificates=ca_certificates,
+ ca_certificates=[client_cert_data],
config={
"operator-storage": storage_class,
"workload-storage": storage_class,
)
def get_k8s_cloud_credential(
- self, configuration: Configuration,
+ self,
+ configuration: Configuration,
+ client_cert_data: str,
+ token: str = None,
) -> client.CloudCredential:
attrs = {}
- ca_cert = configuration.ssl_ca_cert or configuration.cert_file
- key = configuration.key_file
- api_key = configuration.api_key
- token = None
+ # TODO: Test with AKS
+ key = None # open(configuration.key_file, "r").read()
username = configuration.username
password = configuration.password
- if "authorization" in api_key:
- authorization = api_key["authorization"]
- if "Bearer " in authorization:
- bearer_list = authorization.split(" ")
- if len(bearer_list) == 2:
- [_, token] = bearer_list
- else:
- raise JujuInvalidK8sConfiguration("unknown format of api_key")
- else:
- token = authorization
- if ca_cert:
- attrs["ClientCertificateData"] = open(ca_cert, "r").read()
+ if client_cert_data:
+ attrs["ClientCertificateData"] = client_cert_data
if key:
- attrs["ClientKeyData"] = open(key, "r").read()
+ attrs["ClientKeyData"] = key
if token:
if username or password:
raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
auth_type = None
if key:
auth_type = "oauth2"
+ if client_cert_data:
+ auth_type = "oauth2withcert"
if not token:
raise JujuInvalidK8sConfiguration(
"missing token for auth type {}".format(auth_type)
)
attrs["username"] = username
attrs["password"] = password
- if ca_cert:
+ if client_cert_data:
auth_type = "userpasswithcert"
else:
auth_type = "userpass"
- elif ca_cert and token:
+ elif client_cert_data and token:
auth_type = "certificate"
else:
raise JujuInvalidK8sConfiguration("authentication method not supported")
unit = u
break
return unit
+
+ async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+ controller = await self.get_controller()
+ try:
+ facade = client.CloudFacade.from_connection(controller.connection())
+ cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+ params = [client.Entity(cloud_cred_tag)]
+ return (await facade.Credential(params)).results
+ finally:
+ await self.disconnect_controller(controller)
import asyncio
import logging
import asynctest
-from n2vc.k8s_juju_conn import K8sJujuConnector
+from unittest.mock import Mock
+from n2vc.k8s_juju_conn import K8sJujuConnector, RBAC_LABEL_KEY_NAME
from osm_common import fslocal
-from .utils import kubeconfig, FakeModel, FakeFileWrapper
+from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock
from n2vc.exceptions import (
MethodNotImplemented,
K8sException,
N2VCBadArgumentsException,
)
-from unittest.mock import Mock
-from .utils import AsyncMock
class K8sJujuConnTestCase(asynctest.TestCase):
)
+@asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
class InitEnvTest(K8sJujuConnTestCase):
def setUp(self):
super(InitEnvTest, self).setUp()
self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
+ self.k8s_juju_conn._create_cluster_role = Mock()
+ self.k8s_juju_conn._create_service_account = Mock()
+ self.k8s_juju_conn._create_cluster_role_binding = Mock()
+ self.k8s_juju_conn._get_secret_data = AsyncMock()
+ self.k8s_juju_conn._get_secret_data.return_value = ("token", "cacert")
- @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
def test_with_cluster_uuid(
self,
mock_get_default_storage_class,
mock_get_default_storage_class.assert_called_once()
self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
- @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
- def test_with_no_cluster_uuid(self, mock_get_default_storage_class):
+ def test_with_no_cluster_uuid(
+ self,
+ mock_get_default_storage_class,
+ ):
uuid, created = self.loop.run_until_complete(
self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
)
mock_get_default_storage_class.assert_called_once()
self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
- @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
- def test_init_env_exception(self, mock_get_default_storage_class):
+ def test_init_env_exception(
+ self,
+ mock_get_default_storage_class,
+ ):
self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
created = None
uuid = None
def setUp(self):
super(ResetTest, self).setUp()
self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
+ self.k8s_juju_conn.libjuju.get_cloud_credentials = AsyncMock()
+ cloud_creds = Mock()
+ cloud_creds.result = {"attrs": {RBAC_LABEL_KEY_NAME: "asd"}}
+ self.k8s_juju_conn.libjuju.get_cloud_credentials.return_value = [cloud_creds]
+ self.k8s_juju_conn._delete_cluster_role_binding = Mock()
+ self.k8s_juju_conn._delete_service_account = Mock()
+ self.k8s_juju_conn._delete_cluster_role = Mock()
+ self.k8s_juju_conn.get_credentials = Mock()
+ self.k8s_juju_conn.get_credentials.return_value = kubeconfig
def test_success(self):
removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
# limitations under the License.
from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl
+from n2vc.kubectl import Kubectl, CORE_CLIENT
from n2vc.utils import Dict
from kubernetes.client.rest import ApiException
class KubectlTestCase(TestCase):
- def setUp(self,):
+ def setUp(
+ self,
+ ):
pass
return fake_list_services
-class ProvisionerTest(TestCase):
+class GetServices(TestCase):
@mock.patch("n2vc.kubectl.config.load_kube_config")
@mock.patch("n2vc.kubectl.client.CoreV1Api")
def setUp(self, mock_core, mock_config):
keys = ["name", "cluster_ip", "type", "ports", "external_ip"]
self.assertTrue(k in service for service in services for k in keys)
- @mock.patch("n2vc.kubectl.client.CoreV1Api.list_service_for_all_namespaces")
- def test_get_service_exception(self, list_services):
- list_services.side_effect = ApiException()
+ def test_get_service_exception(self):
+ self.kubectl.clients[
+ CORE_CLIENT
+ ].list_service_for_all_namespaces.side_effect = ApiException()
with self.assertRaises(ApiException):
self.kubectl.get_services()
def test_get_configuration(self, mock_load_kube_config, mock_configuration):
kubectl = Kubectl()
- kubectl.get_configuration()
+ kubectl.configuration
mock_configuration.assert_called_once()
JujuInvalidK8sConfiguration,
JujuLeaderUnitNotFound,
)
+from n2vc.k8s_juju_conn import generate_rbac_id
class LibjujuTestCase(asynctest.TestCase):
class AddK8sTest(LibjujuTestCase):
def setUp(self):
super(AddK8sTest, self).setUp()
- self.configuration = kubernetes.client.configuration.Configuration()
+ name = "cloud"
+ rbac_id = generate_rbac_id()
+ token = "token"
+ client_cert_data = "cert"
+ configuration = kubernetes.client.configuration.Configuration()
+ storage_class = "storage_class"
+ credential_name = name
+
+ self._add_k8s_args = {
+ "name": name,
+ "rbac_id": rbac_id,
+ "token": token,
+ "client_cert_data": client_cert_data,
+ "configuration": configuration,
+ "storage_class": storage_class,
+ "credential_name": credential_name,
+ }
def test_add_k8s(self, mock_add_cloud, mock_get_k8s_cloud_credential):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_called_once()
mock_get_k8s_cloud_credential.assert_called_once()
def test_add_k8s_exception(self, mock_add_cloud, mock_get_k8s_cloud_credential):
mock_add_cloud.side_effect = Exception()
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_called_once()
mock_get_k8s_cloud_credential.assert_called_once()
def test_add_k8s_missing_name(self, mock_add_cloud, mock_get_k8s_cloud_credential):
+ self._add_k8s_args["name"] = ""
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("", self.configuration, "storage_class")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
def test_add_k8s_missing_storage_name(
self, mock_add_cloud, mock_get_k8s_cloud_credential
):
+ self._add_k8s_args["storage_class"] = ""
with self.assertRaises(Exception):
- self.loop.run_until_complete(
- self.libjuju.add_k8s("cloud", self.configuration, "")
- )
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
def test_add_k8s_missing_configuration_keys(
self, mock_add_cloud, mock_get_k8s_cloud_credential
):
+ self._add_k8s_args["configuration"] = None
with self.assertRaises(Exception):
- self.loop.run_until_complete(self.libjuju.add_k8s("cloud", None, ""))
+ self.loop.run_until_complete(self.libjuju.add_k8s(**self._add_k8s_args))
mock_add_cloud.assert_not_called()
class GetK8sCloudCredentials(LibjujuTestCase):
def setUp(self):
super(GetK8sCloudCredentials, self).setUp()
+ self.cert_data = "cert"
+ self.token = "token"
@asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
def test_not_supported(self, mock_exception, mock_configuration):
mock_configuration.cert_file = None
mock_configuration.key_file = None
exception_raised = False
+ self.token = None
+ self.cert_data = None
try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
except JujuInvalidK8sConfiguration as e:
exception_raised = True
self.assertEqual(
mock_configuration.ssl_ca_cert = None
mock_configuration.cert_file = None
mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ self.token = None
+ self.cert_data = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
),
)
+ def test_user_pass_with_cert(self, mock_configuration):
+ mock_configuration.username = "admin"
+ mock_configuration.password = "admin"
+ mock_configuration.ssl_ca_cert = None
+ mock_configuration.cert_file = None
+ mock_configuration.key_file = None
+ self.token = None
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
+ self.assertEqual(
+ credential,
+ juju.client._definitions.CloudCredential(
+ attrs={
+ "ClientCertificateData": self.cert_data,
+ "username": "admin",
+ "password": "admin",
+ },
+ auth_type="userpasswithcert",
+ ),
+ )
+
def test_user_no_pass(self, mock_configuration):
mock_configuration.username = "admin"
mock_configuration.password = ""
mock_configuration.ssl_ca_cert = None
mock_configuration.cert_file = None
mock_configuration.key_file = None
+ self.token = None
+ self.cert_data = None
with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
"credential for user admin has empty password"
)
- def test_user_pass_with_cert(self, mock_configuration):
- mock_configuration.username = "admin"
- mock_configuration.password = "admin"
- ssl_ca_cert = tempfile.NamedTemporaryFile()
- with open(ssl_ca_cert.name, "w") as ssl_ca_cert_file:
- ssl_ca_cert_file.write("cacert")
- mock_configuration.ssl_ca_cert = ssl_ca_cert.name
- mock_configuration.cert_file = None
- mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- self.assertEqual(
- credential,
- juju.client._definitions.CloudCredential(
- attrs={
- "username": "admin",
- "password": "admin",
- "ClientCertificateData": "cacert",
- },
- auth_type="userpasswithcert",
- ),
- )
-
def test_cert(self, mock_configuration):
mock_configuration.username = ""
mock_configuration.password = ""
mock_configuration.ssl_ca_cert = ssl_ca_cert.name
mock_configuration.cert_file = None
mock_configuration.key_file = None
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- self.assertEqual(
- credential,
- juju.client._definitions.CloudCredential(
- attrs={"ClientCertificateData": "cacert", "Token": "Token"},
- auth_type="certificate",
- ),
+ credential = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
)
-
- def test_oauth2(self, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- mock_configuration.api_key = {"authorization": "Bearer Token"}
- key = tempfile.NamedTemporaryFile()
- with open(key.name, "w") as key_file:
- key_file.write("key")
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = key.name
- credential = self.libjuju.get_k8s_cloud_credential(mock_configuration)
self.assertEqual(
credential,
juju.client._definitions.CloudCredential(
- attrs={"ClientKeyData": "key", "Token": "Token"},
- auth_type="oauth2",
+ attrs={"ClientCertificateData": self.cert_data, "Token": self.token},
+ auth_type="certificate",
),
)
- @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
- def test_oauth2_missing_token(self, mock_exception, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- key = tempfile.NamedTemporaryFile()
- with open(key.name, "w") as key_file:
- key_file.write("key")
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = key.name
- exception_raised = False
- try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- except JujuInvalidK8sConfiguration as e:
- exception_raised = True
- self.assertEqual(
- e.message,
- "missing token for auth type oauth2",
- )
- self.assertTrue(exception_raised)
-
- def test_unknown_api_key(self, mock_configuration):
- mock_configuration.username = ""
- mock_configuration.password = ""
- mock_configuration.api_key = {"authorization": "Bearer Token Wrong"}
- mock_configuration.ssl_ca_cert = None
- mock_configuration.cert_file = None
- mock_configuration.key_file = None
- exception_raised = False
- try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
- except JujuInvalidK8sConfiguration as e:
- exception_raised = True
- self.assertEqual(
- e.message,
- "unknown format of api_key",
- )
- self.assertTrue(exception_raised)
+ # TODO: Fix this test when oauth authentication is supported
+ # def test_oauth2(self, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # mock_configuration.api_key = {"authorization": "Bearer Token"}
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # credential = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # self.assertEqual(
+ # credential,
+ # juju.client._definitions.CloudCredential(
+ # attrs={"ClientKeyData": "key", "Token": "Token"},
+ # auth_type="oauth2",
+ # ),
+ # )
+
+ # @asynctest.mock.patch("n2vc.exceptions.JujuInvalidK8sConfiguration")
+ # def test_oauth2_missing_token(self, mock_exception, mock_configuration):
+ # mock_configuration.username = ""
+ # mock_configuration.password = ""
+ # key = tempfile.NamedTemporaryFile()
+ # with open(key.name, "w") as key_file:
+ # key_file.write("key")
+ # mock_configuration.ssl_ca_cert = None
+ # mock_configuration.cert_file = None
+ # mock_configuration.key_file = key.name
+ # exception_raised = False
+ # try:
+ # _ = self.libjuju.get_k8s_cloud_credential(
+ # mock_configuration,
+ # self.cert_data,
+ # self.token,
+ # )
+ # except JujuInvalidK8sConfiguration as e:
+ # exception_raised = True
+ # self.assertEqual(
+ # e.message,
+ # "missing token for auth type oauth2",
+ # )
+ # self.assertTrue(exception_raised)
def test_exception_cannot_set_token_and_userpass(self, mock_configuration):
mock_configuration.username = "admin"
mock_configuration.key_file = None
exception_raised = False
try:
- _ = self.libjuju.get_k8s_cloud_credential(mock_configuration)
+ _ = self.libjuju.get_k8s_cloud_credential(
+ mock_configuration,
+ self.cert_data,
+ self.token,
+ )
except JujuInvalidK8sConfiguration as e:
exception_raised = True
self.assertEqual(