import logging
from typing import Dict
import typing
+import uuid
+from distutils.version import LooseVersion
from kubernetes import client, config
+from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
V1ClusterRoleBinding,
V1RoleRef,
V1Subject,
+ V1Secret,
+ V1SecretReference,
)
from kubernetes.client.rest import ApiException
from retrying_async import retry
"""
self.clients[RBAC_CLIENT].delete_cluster_role(name)
+ def _get_kubectl_version(self):
+ version = VersionApi().get_code()
+ return "{}.{}".format(version.major, version.minor)
+
+ def _need_to_create_new_secret(self):
+ min_k8s_version = "1.24"
+ current_k8s_version = self._get_kubectl_version()
+ return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
+
+ def _get_secret_name(self, service_account_name: str):
+ random_alphanum = str(uuid.uuid4())[:5]
+ return "{}-token-{}".format(service_account_name, random_alphanum)
+
+ def _create_service_account_secret(
+ self, service_account_name: str, namespace: str, secret_name: str
+ ):
+ """
+ Create a secret for the service account. K8s version >= 1.24
+
+ :param: service_account_name: Name of the service account
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ secrets = v1_core.list_namespaced_secret(
+ namespace, field_selector="metadata.name={}".format(secret_name)
+ ).items
+
+ if len(secrets) > 0:
+ raise Exception(
+ "Secret with metadata.name={} already exists".format(secret_name)
+ )
+
+ annotations = {"kubernetes.io/service-account.name": service_account_name}
+ metadata = V1ObjectMeta(
+ name=secret_name, namespace=namespace, annotations=annotations
+ )
+ type = "kubernetes.io/service-account-token"
+ secret = V1Secret(metadata=metadata, type=type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ def _get_secret_reference_list(self, namespace: str, secret_name: str):
+ """
+ Return a secret reference list with one secret.
+ K8s version >= 1.24
+
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ :rtype: list[V1SecretReference]
+ """
+ return [V1SecretReference(name=secret_name, namespace=namespace)]
+
def create_service_account(
self,
name: str,
:param: namespace: Kubernetes namespace for service account metadata
Default: kube-system
"""
- service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
+ v1_core = self.clients[CORE_CLIENT]
+ service_accounts = v1_core.list_namespaced_service_account(
namespace, field_selector="metadata.name={}".format(name)
)
if len(service_accounts.items) > 0:
)
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
- service_account = V1ServiceAccount(metadata=metadata)
- self.clients[CORE_CLIENT].create_namespaced_service_account(
- namespace, service_account
- )
+ if self._need_to_create_new_secret():
+ secret_name = self._get_secret_name(name)
+ secrets = self._get_secret_reference_list(namespace, secret_name)
+ service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
+ v1_core.create_namespaced_service_account(namespace, service_account)
+ self._create_service_account_secret(name, namespace, secret_name)
+ else:
+ service_account = V1ServiceAccount(metadata=metadata)
+ v1_core.create_namespaced_service_account(namespace, service_account)
def delete_service_account(self, name: str, namespace: str = "kube-system"):
"""
from n2vc.kubectl import Kubectl, CORE_CLIENT
from n2vc.utils import Dict
from kubernetes.client.rest import ApiException
+from kubernetes.client import (
+ V1ObjectMeta,
+ V1Secret,
+ V1ServiceAccount,
+ V1SecretReference,
+)
class FakeK8sResourceMetadata:
return self._items
+class FakeK8sServiceAccountsList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
+class FakeK8sSecretList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
+class FakeK8sVersionApiCode:
+ def __init__(self, major: str, minor: str):
+ self._major = major
+ self._minor = minor
+
+ @property
+ def major(self):
+ return self._major
+
+ @property
+ def minor(self):
+ return self._minor
+
+
fake_list_services = Dict(
{
"items": [
sc_name = kubectl.get_default_storage_class()
self.assertEqual(sc_name, self.default_sc_name)
mock_list_storage_class.assert_called_once()
+
+
+@mock.patch("kubernetes.client.VersionApi.get_code")
+@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_secret")
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_service_account")
+@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_service_account")
+class CreateServiceAccountClass(KubectlTestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateServiceAccountClass, self).setUp()
+ self.service_account_name = "Service_account"
+ self.labels = {"Key1": "Value1", "Key2": "Value2"}
+ self.namespace = "kubernetes"
+ self.token_id = "abc12345"
+ self.kubectl = Kubectl()
+
+ def assert_create_secret(self, mock_create_secret, secret_name):
+ annotations = {"kubernetes.io/service-account.name": self.service_account_name}
+ secret_metadata = V1ObjectMeta(
+ name=secret_name, namespace=self.namespace, annotations=annotations
+ )
+ secret_type = "kubernetes.io/service-account-token"
+ secret = V1Secret(metadata=secret_metadata, type=secret_type)
+ mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+ def assert_create_service_account_v_1_24(
+ self, mock_create_service_account, secret_name
+ ):
+ sevice_account_metadata = V1ObjectMeta(
+ name=self.service_account_name, labels=self.labels, namespace=self.namespace
+ )
+ secrets = [V1SecretReference(name=secret_name, namespace=self.namespace)]
+ service_account = V1ServiceAccount(
+ metadata=sevice_account_metadata, secrets=secrets
+ )
+ mock_create_service_account.assert_called_once_with(
+ self.namespace, service_account
+ )
+
+ def assert_create_service_account_v_1_23(self, mock_create_service_account):
+ metadata = V1ObjectMeta(
+ name=self.service_account_name, labels=self.labels, namespace=self.namespace
+ )
+ service_account = V1ServiceAccount(metadata=metadata)
+ mock_create_service_account.assert_called_once_with(
+ self.namespace, service_account
+ )
+
+ @mock.patch("n2vc.kubectl.uuid.uuid4")
+ def test_secret_is_created_when_k8s_1_24(
+ self,
+ mock_uuid4,
+ mock_list_service_account,
+ mock_create_service_account,
+ mock_create_secret,
+ mock_list_secret,
+ mock_version,
+ ):
+ mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+ mock_list_secret.return_value = FakeK8sSecretList(items=[])
+ mock_version.return_value = FakeK8sVersionApiCode("1", "24")
+ mock_uuid4.return_value = self.token_id
+ self.kubectl.create_service_account(
+ self.service_account_name, self.labels, self.namespace
+ )
+ secret_name = "{}-token-{}".format(self.service_account_name, self.token_id[:5])
+ self.assert_create_service_account_v_1_24(
+ mock_create_service_account, secret_name
+ )
+ self.assert_create_secret(mock_create_secret, secret_name)
+
+ def test_secret_is_not_created_when_k8s_1_23(
+ self,
+ mock_list_service_account,
+ mock_create_service_account,
+ mock_create_secret,
+ mock_list_secret,
+ mock_version,
+ ):
+ mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+ mock_version.return_value = FakeK8sVersionApiCode("1", "23+")
+ self.kubectl.create_service_account(
+ self.service_account_name, self.labels, self.namespace
+ )
+ self.assert_create_service_account_v_1_23(mock_create_service_account)
+ mock_create_secret.assert_not_called()
+ mock_list_secret.assert_not_called()
+
+ def test_raise_exception_if_service_account_already_exists(
+ self,
+ mock_list_service_account,
+ mock_create_service_account,
+ mock_create_secret,
+ mock_list_secret,
+ mock_version,
+ ):
+ mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[1])
+ with self.assertRaises(Exception) as context:
+ self.kubectl.create_service_account(
+ self.service_account_name, self.labels, self.namespace
+ )
+ self.assertTrue(
+ "Service account with metadata.name={} already exists".format(
+ self.service_account_name
+ )
+ in str(context.exception)
+ )
+ mock_create_service_account.assert_not_called()
+ mock_create_secret.assert_not_called()
+
+ @mock.patch("n2vc.kubectl.uuid.uuid4")
+ def test_raise_exception_if_secret_already_exists(
+ self,
+ mock_uuid4,
+ mock_list_service_account,
+ mock_create_service_account,
+ mock_create_secret,
+ mock_list_secret,
+ mock_version,
+ ):
+ mock_list_service_account.return_value = FakeK8sServiceAccountsList(items=[])
+ mock_list_secret.return_value = FakeK8sSecretList(items=[1])
+ mock_version.return_value = FakeK8sVersionApiCode("1", "24+")
+ mock_uuid4.return_value = self.token_id
+ with self.assertRaises(Exception) as context:
+ self.kubectl.create_service_account(
+ self.service_account_name, self.labels, self.namespace
+ )
+ self.assertTrue(
+ "Secret with metadata.name={}-token-{} already exists".format(
+ self.service_account_name, self.token_id[:5]
+ )
+ in str(context.exception)
+ )
+ mock_create_service_account.assert_called()
+ mock_create_secret.assert_not_called()