# See the License for the specific language governing permissions and
# limitations under the License.
+import base64
+import logging
+from typing import Dict
+import typing
+import uuid
+
+from distutils.version import LooseVersion
+
from kubernetes import client, config
+from kubernetes.client.api import VersionApi
+from kubernetes.client.models import (
+ V1ClusterRole,
+ V1ObjectMeta,
+ V1PolicyRule,
+ V1ServiceAccount,
+ V1ClusterRoleBinding,
+ V1RoleRef,
+ V1Subject,
+ V1Secret,
+ V1SecretReference,
+)
from kubernetes.client.rest import ApiException
-import logging
+from retrying_async import retry
+
+
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
+CORE_CLIENT = "core_v1"
+RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
+ self._clients = {
+ CORE_CLIENT: client.CoreV1Api(),
+ RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+ STORAGE_CLIENT: client.StorageV1Api(),
+ }
+ self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
- def get_services(self, field_selector=None, label_selector=None):
+ @property
+ def configuration(self):
+ return self._configuration
+
+ @property
+ def clients(self):
+ return self._clients
+
+ def get_services(
+ self,
+ field_selector: str = None,
+ label_selector: str = None,
+ ) -> typing.List[typing.Dict]:
+ """
+ Get Service list from a namespace
+
+ :param: field_selector: Kubernetes field selector for the namespace
+ :param: label_selector: Kubernetes label selector for the namespace
+
+ :return: List of the services matching the selectors specified
+ """
kwargs = {}
if field_selector:
kwargs["field_selector"] = field_selector
if label_selector:
kwargs["label_selector"] = label_selector
-
try:
- v1 = client.CoreV1Api()
- result = v1.list_service_for_all_namespaces(**kwargs)
+ result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
return [
{
"name": i.metadata.name,
except ApiException as e:
self.logger.error("Error calling get services: {}".format(e))
raise e
+
+ def get_default_storage_class(self) -> str:
+ """
+ Default storage class
+
+ :return: Returns the default storage class name, if exists.
+ If not, it returns the first storage class.
+ If there are not storage classes, returns None
+ """
+ storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
+ selected_sc = None
+ default_sc_annotations = {
+ "storageclass.kubernetes.io/is-default-class": "true",
+ # Older clusters still use the beta annotation.
+ "storageclass.beta.kubernetes.io/is-default-class": "true",
+ }
+ for sc in storage_classes.items:
+ if not selected_sc:
+ # Select the first storage class in case there is no a default-class
+ selected_sc = sc.metadata.name
+ annotations = sc.metadata.annotations or {}
+ if any(
+ k in annotations and annotations[k] == v
+ for k, v in default_sc_annotations.items()
+ ):
+ # Default storage
+ selected_sc = sc.metadata.name
+ break
+ return selected_sc
+
+ def create_cluster_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ namespace: str = "kube-system",
+ ):
+ """
+ Create a cluster role
+
+ :param: name: Name of the cluster role
+ :param: labels: Labels for cluster role metadata
+ :param: namespace: Kubernetes namespace for cluster role metadata
+ Default: kube-system
+ """
+ cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+ field_selector="metadata.name={}".format(name)
+ )
+
+ if len(cluster_roles.items) > 0:
+ raise Exception(
+ "Cluster role with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+ # Cluster role
+ cluster_role = V1ClusterRole(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+ V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+ def delete_cluster_role(self, name: str):
+ """
+ Delete a cluster role
+
+ :param: name: Name of the cluster role
+ """
+ self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+ def _get_kubectl_version(self):
+ version = VersionApi().get_code()
+ return "{}.{}".format(version.major, version.minor)
+
+ def _need_to_create_new_secret(self):
+ min_k8s_version = "1.24"
+ current_k8s_version = self._get_kubectl_version()
+ return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
+
+ def _get_secret_name(self, service_account_name: str):
+ random_alphanum = str(uuid.uuid4())[:5]
+ return "{}-token-{}".format(service_account_name, random_alphanum)
+
+ def _create_service_account_secret(
+ self, service_account_name: str, namespace: str, secret_name: str
+ ):
+ """
+ Create a secret for the service account. K8s version >= 1.24
+
+ :param: service_account_name: Name of the service account
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ secrets = v1_core.list_namespaced_secret(
+ namespace, field_selector="metadata.name={}".format(secret_name)
+ ).items
+
+ if len(secrets) > 0:
+ raise Exception(
+ "Secret with metadata.name={} already exists".format(secret_name)
+ )
+
+ annotations = {"kubernetes.io/service-account.name": service_account_name}
+ metadata = V1ObjectMeta(
+ name=secret_name, namespace=namespace, annotations=annotations
+ )
+ type = "kubernetes.io/service-account-token"
+ secret = V1Secret(metadata=metadata, type=type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ def _get_secret_reference_list(self, namespace: str, secret_name: str):
+ """
+ Return a secret reference list with one secret.
+ K8s version >= 1.24
+
+ :param: namespace: Kubernetes namespace for service account metadata
+ :param: secret_name: Name of the secret
+ :rtype: list[V1SecretReference]
+ """
+ return [V1SecretReference(name=secret_name, namespace=namespace)]
+
+ def create_service_account(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ namespace: str = "kube-system",
+ ):
+ """
+ Create a service account
+
+ :param: name: Name of the service account
+ :param: labels: Labels for service account metadata
+ :param: namespace: Kubernetes namespace for service account metadata
+ Default: kube-system
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ service_accounts = v1_core.list_namespaced_service_account(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) > 0:
+ raise Exception(
+ "Service account with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+ if self._need_to_create_new_secret():
+ secret_name = self._get_secret_name(name)
+ secrets = self._get_secret_reference_list(namespace, secret_name)
+ service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
+ v1_core.create_namespaced_service_account(namespace, service_account)
+ self._create_service_account_secret(name, namespace, secret_name)
+ else:
+ service_account = V1ServiceAccount(metadata=metadata)
+ v1_core.create_namespaced_service_account(namespace, service_account)
+
+ def delete_service_account(self, name: str, namespace: str = "kube-system"):
+ """
+ Delete a service account
+
+ :param: name: Name of the service account
+ :param: namespace: Kubernetes namespace for service account metadata
+ Default: kube-system
+ """
+ self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+ def create_cluster_role_binding(
+ self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the cluster role
+ :param: labels: Labels for cluster role binding metadata
+ :param: namespace: Kubernetes namespace for cluster role binding metadata
+ Default: kube-system
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+ field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception("Generated rbac id already exists")
+
+ role_binding = V1ClusterRoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+ subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+ )
+ self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+ def delete_cluster_role_binding(self, name: str):
+ """
+ Delete a cluster role binding
+
+ :param: name: Name of the cluster role binding
+ """
+ self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting the secret from service account"),
+ )
+ async def get_secret_data(
+ self, name: str, namespace: str = "kube-system"
+ ) -> (str, str):
+ """
+ Get secret data
+
+ :param: name: Name of the secret data
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Tuple with the token and client certificate
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret_name = None
+
+ service_accounts = v1_core.list_namespaced_service_account(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) == 0:
+ raise Exception(
+ "Service account not found with metadata.name={}".format(name)
+ )
+ service_account = service_accounts.items[0]
+ if service_account.secrets and len(service_account.secrets) > 0:
+ secret_name = service_account.secrets[0].name
+ if not secret_name:
+ raise Exception(
+ "Failed getting the secret from service account {}".format(name)
+ )
+ secret = v1_core.list_namespaced_secret(
+ namespace, field_selector="metadata.name={}".format(secret_name)
+ ).items[0]
+
+ token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+ client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+ return (
+ base64.b64decode(token).decode("utf-8"),
+ base64.b64decode(client_certificate_data).decode("utf-8"),
+ )