-import base64
-import logging
-from typing import Dict
-import typing
-import uuid
-import json
-from distutils.version import LooseVersion
-from kubernetes import client, config
-from kubernetes.client.api import VersionApi
-from kubernetes.client.models import (
- V1ClusterRole,
- V1Role,
- V1ObjectMeta,
- V1PolicyRule,
- V1ServiceAccount,
- V1ClusterRoleBinding,
- V1RoleBinding,
- V1RoleRef,
- RbacV1Subject,
- V1Secret,
- V1SecretReference,
- V1Namespace,
-from kubernetes.client.rest import ApiException
-from n2vc.libjuju import retry_callback
-from retrying_async import retry
-# clients
-CORE_CLIENT = "core_v1"
-RBAC_CLIENT = "rbac_v1"
-STORAGE_CLIENT = "storage_v1"
-CUSTOM_OBJECT_CLIENT = "custom_object"
-class Kubectl:
- def __init__(self, config_file=None):
- config.load_kube_config(config_file=config_file)
- self._clients = {
- CORE_CLIENT: client.CoreV1Api(),
- RBAC_CLIENT: client.RbacAuthorizationV1Api(),
- STORAGE_CLIENT: client.StorageV1Api(),
- CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
- }
- self._configuration = config.kube_config.Configuration.get_default_copy()
- self.logger = logging.getLogger("lcm.odu")
- @property
- def configuration(self):
- return self._configuration
- @property
- def clients(self):
- return self._clients
- def get_services(
- self,
- field_selector: str = None,
- label_selector: str = None,
- ) -> typing.List[typing.Dict]:
- """
- Get Service list from a namespace
- :param: field_selector: Kubernetes field selector for the namespace
- :param: label_selector: Kubernetes label selector for the namespace
- :return: List of the services matching the selectors specified
- """
- kwargs = {}
- if field_selector:
- kwargs["field_selector"] = field_selector
- if label_selector:
- kwargs["label_selector"] = label_selector
- try:
- result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
- return [
- {
- "name": i.metadata.name,
- "cluster_ip": i.spec.cluster_ip,
- "type": i.spec.type,
- "ports": (
- [
- {
- "name": p.name,
- "node_port": p.node_port,
- "port": p.port,
- "protocol": p.protocol,
- "target_port": p.target_port,
- }
- for p in i.spec.ports
- ]
- if i.spec.ports
- else []
- ),
- "external_ip": [i.ip for i in i.status.load_balancer.ingress]
- if i.status.load_balancer.ingress
- else None,
- }
- for i in result.items
- ]
- except ApiException as e:
- self.logger.error("Error calling get services: {}".format(e))
- raise e
- def get_default_storage_class(self) -> str:
- """
- Default storage class
- :return: Returns the default storage class name, if exists.
- If not, it returns the first storage class.
- If there are not storage classes, returns None
- """
- storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
- selected_sc = None
- default_sc_annotations = {
- "storageclass.kubernetes.io/is-default-class": "true",
- # Older clusters still use the beta annotation.
- "storageclass.beta.kubernetes.io/is-default-class": "true",
- }
- for sc in storage_classes.items:
- if not selected_sc:
- # Select the first storage class in case there is no a default-class
- selected_sc = sc.metadata.name
- annotations = sc.metadata.annotations or {}
- if any(
- k in annotations and annotations[k] == v
- for k, v in default_sc_annotations.items()
- ):
- # Default storage
- selected_sc = sc.metadata.name
- break
- return selected_sc
- def create_cluster_role(
- self,
- name: str,
- labels: Dict[str, str],
- namespace: str = "kube-system",
- ):
- """
- Create a cluster role
- :param: name: Name of the cluster role
- :param: labels: Labels for cluster role metadata
- :param: namespace: Kubernetes namespace for cluster role metadata
- Default: kube-system
- """
- cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
- if len(cluster_roles.items) > 0:
- raise Exception("Role with metadata.name={} already exists".format(name))
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
- self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
- async def create_role(
- self,
- name: str,
- labels: Dict[str, str],
- api_groups: list,
- resources: list,
- verbs: list,
- namespace: str,
- ):
- """
- Create a role with one PolicyRule
- :param: name: Name of the namespaced Role
- :param: labels: Labels for namespaced Role metadata
- :param: api_groups: List with api-groups allowed in the policy rule
- :param: resources: List with resources allowed in the policy rule
- :param: verbs: List with verbs allowed in the policy rule
- :param: namespace: Kubernetes namespace for Role metadata
- :return: None
- """
- roles = self.clients[RBAC_CLIENT].list_namespaced_role(
- namespace, field_selector="metadata.name={}".format(name)
- )
- if len(roles.items) > 0:
- raise Exception("Role with metadata.name={} already exists".format(name))
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
- role = V1Role(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
- ],
- )
- self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
- def delete_cluster_role(self, name: str):
- """
- Delete a cluster role
- :param: name: Name of the cluster role
- """
- self.clients[RBAC_CLIENT].delete_cluster_role(name)
- def _get_kubectl_version(self):
- version = VersionApi().get_code()
- return "{}.{}".format(version.major, version.minor)
- def _need_to_create_new_secret(self):
- min_k8s_version = "1.24"
- current_k8s_version = self._get_kubectl_version()
- return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
- def _get_secret_name(self, service_account_name: str):
- random_alphanum = str(uuid.uuid4())[:5]
- return "{}-token-{}".format(service_account_name, random_alphanum)
- def _create_service_account_secret(
- self,
- service_account_name: str,
- namespace: str,
- secret_name: str,
- ):
- """
- Create a secret for the service account. K8s version >= 1.24
- :param: service_account_name: Name of the service account
- :param: namespace: Kubernetes namespace for service account metadata
- :param: secret_name: Name of the secret
- """
- v1_core = self.clients[CORE_CLIENT]
- secrets = v1_core.list_namespaced_secret(
- namespace, field_selector="metadata.name={}".format(secret_name)
- ).items
- if len(secrets) > 0:
- raise Exception(
- "Secret with metadata.name={} already exists".format(secret_name)
- )
- annotations = {"kubernetes.io/service-account.name": service_account_name}
- metadata = V1ObjectMeta(
- name=secret_name, namespace=namespace, annotations=annotations
- )
- type = "kubernetes.io/service-account-token"
- secret = V1Secret(metadata=metadata, type=type)
- v1_core.create_namespaced_secret(namespace, secret)
- def _get_secret_reference_list(self, namespace: str, secret_name: str):
- """
- Return a secret reference list with one secret.
- K8s version >= 1.24
- :param: namespace: Kubernetes namespace for service account metadata
- :param: secret_name: Name of the secret
- :rtype: list[V1SecretReference]
- """
- return [V1SecretReference(name=secret_name, namespace=namespace)]
- def create_service_account(
- self,
- name: str,
- labels: Dict[str, str],
- namespace: str = "kube-system",
- ):
- """
- Create a service account
- :param: name: Name of the service account
- :param: labels: Labels for service account metadata
- :param: namespace: Kubernetes namespace for service account metadata
- Default: kube-system
- """
- v1_core = self.clients[CORE_CLIENT]
- service_accounts = v1_core.list_namespaced_service_account(
- namespace, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
- if self._need_to_create_new_secret():
- secret_name = self._get_secret_name(name)
- secrets = self._get_secret_reference_list(namespace, secret_name)
- service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
- v1_core.create_namespaced_service_account(namespace, service_account)
- self._create_service_account_secret(name, namespace, secret_name)
- else:
- service_account = V1ServiceAccount(metadata=metadata)
- v1_core.create_namespaced_service_account(namespace, service_account)
- def delete_secret(self, name: str, namespace: str = "kube-system"):
- """
- Delete a secret
- :param: name: Name of the secret
- :param: namespace: Kubernetes namespace
- Default: kube-system
- """
- self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
- def delete_service_account(self, name: str, namespace: str = "kube-system"):
- """
- Delete a service account
- :param: name: Name of the service account
- :param: namespace: Kubernetes namespace for service account metadata
- Default: kube-system
- """
- self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
- def create_cluster_role_binding(
- self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
- ):
- """
- Create a cluster role binding
- :param: name: Name of the cluster role
- :param: labels: Labels for cluster role binding metadata
- :param: namespace: Kubernetes namespace for cluster role binding metadata
- Default: kube-system
- """
- role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
- ],
- )
- self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
- async def create_role_binding(
- self,
- name: str,
- role_name: str,
- sa_name: str,
- labels: Dict[str, str],
- namespace: str,
- ):
- """
- Create a cluster role binding
- :param: name: Name of the namespaced Role Binding
- :param: role_name: Name of the namespaced Role to be bound
- :param: sa_name: Name of the Service Account to be bound
- :param: labels: Labels for Role Binding metadata
- :param: namespace: Kubernetes namespace for Role Binding metadata
- :return: None
- """
- role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
- namespace, field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception(
- "Role Binding with metadata.name={} already exists".format(name)
- )
- role_binding = V1RoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
- subjects=[
- RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
- ],
- )
- self.clients[RBAC_CLIENT].create_namespaced_role_binding(
- namespace, role_binding
- )
- def delete_cluster_role_binding(self, name: str):
- """
- Delete a cluster role binding
- :param: name: Name of the cluster role binding
- """
- self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed getting the secret from service account"),
- callback=retry_callback,
- )
- async def get_secret_data(
- self, name: str, namespace: str = "kube-system"
- ) -> (str, str):
- """
- Get secret data
- :param: name: Name of the secret data
- :param: namespace: Name of the namespace where the secret is stored
- :return: Tuple with the token and client certificate
- """
- v1_core = self.clients[CORE_CLIENT]
- secret_name = None
- service_accounts = v1_core.list_namespaced_service_account(
- namespace, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) == 0:
- raise Exception(
- "Service account not found with metadata.name={}".format(name)
- )
- service_account = service_accounts.items[0]
- if service_account.secrets and len(service_account.secrets) > 0:
- secret_name = service_account.secrets[0].name
- if not secret_name:
- raise Exception(
- "Failed getting the secret from service account {}".format(name)
- )
- # TODO: refactor to use get_secret_content
- secret = v1_core.list_namespaced_secret(
- namespace, field_selector="metadata.name={}".format(secret_name)
- ).items[0]
- token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
- client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
- return (
- base64.b64decode(token).decode("utf-8"),
- base64.b64decode(client_certificate_data).decode("utf-8"),
- )
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed getting data from the secret"),
- )
- async def get_secret_content(
- self,
- name: str,
- namespace: str,
- ) -> dict:
- """
- Get secret data
- :param: name: Name of the secret
- :param: namespace: Name of the namespace where the secret is stored
- :return: Dictionary with secret's data
- """
- v1_core = self.clients[CORE_CLIENT]
- secret = v1_core.read_namespaced_secret(name, namespace)
- return secret.data
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed creating the secret"),
- )
- async def create_secret(
- self, name: str, data: dict, namespace: str, secret_type: str
- ):
- """
- Create secret with data
- :param: name: Name of the secret
- :param: data: Dict with data content. Values must be already base64 encoded
- :param: namespace: Name of the namespace where the secret will be stored
- :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
- :return: None
- """
- self.logger.info("Enter create_secret function")
- v1_core = self.clients[CORE_CLIENT]
- self.logger.info(f"v1_core: {v1_core}")
- metadata = V1ObjectMeta(name=name, namespace=namespace)
- self.logger.info(f"metadata: {metadata}")
- secret = V1Secret(metadata=metadata, data=data, type=secret_type)
- self.logger.info(f"secret: {secret}")
- v1_core.create_namespaced_secret(namespace, secret)
- self.logger.info("Namespaced secret was created")
- async def create_certificate(
- self,
- namespace: str,
- name: str,
- dns_prefix: str,
- secret_name: str,
- usages: list,
- issuer_name: str,
- ):
- """
- Creates cert-manager certificate object
- :param: namespace: Name of the namespace where the certificate and secret is stored
- :param: name: Name of the certificate object
- :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
- :param: secret_name: Name of the secret created by cert-manager
- :param: usages: List of X.509 key usages
- :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
- """
- certificate_body = {
- "apiVersion": "cert-manager.io/v1",
- "kind": "Certificate",
- "metadata": {"name": name, "namespace": namespace},
- "spec": {
- "secretName": secret_name,
- "privateKey": {
- "rotationPolicy": "Always",
- "algorithm": "ECDSA",
- "size": 256,
- },
- "duration": "8760h", # 1 Year
- "renewBefore": "2208h", # 9 months
- "subject": {"organizations": ["osm"]},
- "commonName": "osm",
- "isCA": False,
- "usages": usages,
- "dnsNames": [
- "{}.{}".format(dns_prefix, namespace),
- "{}.{}.svc".format(dns_prefix, namespace),
- "{}.{}.svc.cluster".format(dns_prefix, namespace),
- "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
- ],
- "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
- },
- }
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- client.create_namespaced_custom_object(
- group="cert-manager.io",
- plural="certificates",
- version="v1",
- body=certificate_body,
- namespace=namespace,
- )
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "alreadyexists":
- self.logger.warning("Certificate already exists: {}".format(e))
- else:
- raise e
- async def delete_certificate(self, namespace, object_name):
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- client.delete_namespaced_custom_object(
- group="cert-manager.io",
- plural="certificates",
- version="v1",
- name=object_name,
- namespace=namespace,
- )
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "notfound":
- self.logger.warning("Certificate already deleted: {}".format(e))
- else:
- raise e
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed creating the namespace"),
- )
- async def create_namespace(self, name: str, labels: dict = None):
- """
- Create a namespace
- :param: name: Name of the namespace to be created
- :param: labels: Dictionary with labels for the new namespace
- """
- v1_core = self.clients[CORE_CLIENT]
- metadata = V1ObjectMeta(name=name, labels=labels)
- namespace = V1Namespace(
- metadata=metadata,
- )
- try:
- v1_core.create_namespace(namespace)
- self.logger.debug("Namespace created: {}".format(name))
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "alreadyexists":
- self.logger.warning("Namespace already exists: {}".format(e))
- else:
- raise e
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed deleting the namespace"),
- )
- async def delete_namespace(self, name: str):
- """
- Delete a namespace
- :param: name: Name of the namespace to be deleted
- """
- try:
- self.clients[CORE_CLIENT].delete_namespace(name)
- except ApiException as e:
- if e.reason == "Not Found":
- self.logger.warning("Namespace already deleted: {}".format(e))
- def get_secrets(
- self,
- namespace: str,
- field_selector: str = None,
- ) -> typing.List[typing.Dict]:
- """
- Get Secret list from a namespace
- :param: namespace: Kubernetes namespace
- :param: field_selector: Kubernetes field selector
- :return: List of the secrets matching the selectors specified
- """
- try:
- v1_core = self.clients[CORE_CLIENT]
- secrets = v1_core.list_namespaced_secret(
- namespace=namespace,
- field_selector=field_selector,
- ).items
- return secrets
- except ApiException as e:
- self.logger.error("Error calling get secrets: {}".format(e))
- raise e
- def create_generic_object(
- self,
- api_group: str,
- api_plural: str,
- api_version: str,
- namespace: str,
- manifest_dict: dict,
- ):
- """
- Creates generic object
- :param: api_group: API Group
- :param: api_plural: API Plural
- :param: api_version: API Version
- :param: namespace: Namespace
- :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
- """
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- client.create_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- body=manifest_dict,
- namespace=namespace,
- )
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "alreadyexists":
- self.logger.warning("Object already exists: {}".format(e))
- else:
- raise e
- def delete_generic_object(
- self,
- api_group: str,
- api_plural: str,
- api_version: str,
- namespace: str,
- name: str,
- ):
- """
- Deletes generic object
- :param: api_group: API Group
- :param: api_plural: API Plural
- :param: api_version: API Version
- :param: namespace: Namespace
- :param: name: Name of the object
- """
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- client.delete_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- name=name,
- namespace=namespace,
- )
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "notfound":
- self.logger.warning("Object already deleted: {}".format(e))
- else:
- raise e
- async def get_generic_object(
- self,
- api_group: str,
- api_plural: str,
- api_version: str,
- namespace: str,
- name: str,
- ):
- """
- Gets generic object
- :param: api_group: API Group
- :param: api_plural: API Plural
- :param: api_version: API Version
- :param: namespace: Namespace
- :param: name: Name of the object
- """
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- object_dict = client.list_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- namespace=namespace,
- field_selector=f"metadata.name={name}",
- )
- if len(object_dict.get("items")) == 0:
- return None
- return object_dict.get("items")[0]
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "notfound":
- self.logger.warning("Cannot get custom object: {}".format(e))
- else:
- raise e
- async def list_generic_object(
- self,
- api_group: str,
- api_plural: str,
- api_version: str,
- namespace: str,
- ):
- """
- Lists all generic objects of the requested API group
- :param: api_group: API Group
- :param: api_plural: API Plural
- :param: api_version: API Version
- :param: namespace: Namespace
- """
- client = self.clients[CUSTOM_OBJECT_CLIENT]
- try:
- object_dict = client.list_namespaced_custom_object(
- group=api_group,
- plural=api_plural,
- version=api_version,
- namespace=namespace,
- )
- self.logger.debug(f"Object-list: {object_dict.get('items')}")
- return object_dict.get("items")
- except ApiException as e:
- info = json.loads(e.body)
- if info.get("reason").lower() == "notfound":
- self.logger.warning(
- "Cannot retrieve list of custom objects: {}".format(e)
- )
- else:
- raise e
- @retry(
- attempts=10,
- delay=1,
- fallback=Exception("Failed creating the secret"),
- )
- async def create_secret_string(
- self, name: str, string_data: str, namespace: str, secret_type: str
- ):
- """
- Create secret with data
- :param: name: Name of the secret
- :param: string_data: String with data content
- :param: namespace: Name of the namespace where the secret will be stored
- :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
- :return: None
- """
- v1_core = self.clients[CORE_CLIENT]
- metadata = V1ObjectMeta(name=name, namespace=namespace)
- secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
- v1_core.create_namespaced_secret(namespace, secret)