X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fkubectl.py;h=c16c95a6bd1e4316c4c09736a8bfbaef520b9272;hb=1c1a25631024278b7caeb6a1dde34d5de326df6c;hp=bc8c3927d9537fbffb4e8bca3170def835c5683b;hpb=f6e9b00b6f7cd35e45ace4c84b53fe8d12b2438c;p=osm%2FN2VC.git diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index bc8c392..c16c95a 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -12,26 +12,55 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import logging +from typing import Dict +import typing +import uuid +import json + +from distutils.version import LooseVersion from kubernetes import client, config +from kubernetes.client.api import VersionApi +from kubernetes.client.models import ( + V1ClusterRole, + V1Role, + V1ObjectMeta, + V1PolicyRule, + V1ServiceAccount, + V1ClusterRoleBinding, + V1RoleBinding, + V1RoleRef, + V1Subject, + V1Secret, + V1SecretReference, + V1Namespace, +) from kubernetes.client.rest import ApiException +from n2vc.libjuju import retry_callback +from retrying_async import retry +SERVICE_ACCOUNT_TOKEN_KEY = "token" +SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" +# clients CORE_CLIENT = "core_v1" -STORAGE_CLIENT = "storage_v1" RBAC_CLIENT = "rbac_v1" +STORAGE_CLIENT = "storage_v1" +CUSTOM_OBJECT_CLIENT = "custom_object" class Kubectl: def __init__(self, config_file=None): config.load_kube_config(config_file=config_file) self._clients = { - "core_v1": client.CoreV1Api(), - "storage_v1": client.StorageV1Api(), - "rbac_v1": client.RbacAuthorizationV1Api(), + CORE_CLIENT: client.CoreV1Api(), + RBAC_CLIENT: client.RbacAuthorizationV1Api(), + STORAGE_CLIENT: client.StorageV1Api(), + CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), } - self._configuration = config.kube_config.Configuration() + self._configuration = config.kube_config.Configuration.get_default_copy() self.logger = logging.getLogger("Kubectl") @property @@ -42,7 +71,19 @@ class Kubectl: def clients(self): return self._clients - def get_services(self, field_selector=None, label_selector=None): + def get_services( + self, + field_selector: str = None, + label_selector: str = None, + ) -> typing.List[typing.Dict]: + """ + Get Service list from a namespace + + :param: field_selector: Kubernetes field selector for the namespace + :param: label_selector: Kubernetes label selector for the namespace + + :return: List of the services matching the selectors specified + """ kwargs = {} if field_selector: kwargs["field_selector"] = field_selector @@ -96,7 +137,7 @@ class Kubectl: if not selected_sc: # Select the first storage class in case there is no a default-class selected_sc = sc.metadata.name - annotations = sc.metadata.annotations + annotations = sc.metadata.annotations or {} if any( k in annotations and annotations[k] == v for k, v in default_sc_annotations.items() @@ -105,3 +146,472 @@ class Kubectl: selected_sc = sc.metadata.name break return selected_sc + + def create_cluster_role( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a cluster role + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role metadata + :param: namespace: Kubernetes namespace for cluster role metadata + Default: kube-system + """ + cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( + field_selector="metadata.name={}".format(name) + ) + + if len(cluster_roles.items) > 0: + raise Exception("Role with metadata.name={} already exists".format(name)) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + # Cluster role + cluster_role = V1ClusterRole( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), + V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), + ], + ) + + self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) + + async def create_role( + self, + name: str, + labels: Dict[str, str], + api_groups: list, + resources: list, + verbs: list, + namespace: str, + ): + """ + Create a role with one PolicyRule + + :param: name: Name of the namespaced Role + :param: labels: Labels for namespaced Role metadata + :param: api_groups: List with api-groups allowed in the policy rule + :param: resources: List with resources allowed in the policy rule + :param: verbs: List with verbs allowed in the policy rule + :param: namespace: Kubernetes namespace for Role metadata + + :return: None + """ + + roles = self.clients[RBAC_CLIENT].list_namespaced_role( + namespace, field_selector="metadata.name={}".format(name) + ) + + if len(roles.items) > 0: + raise Exception("Role with metadata.name={} already exists".format(name)) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + + role = V1Role( + metadata=metadata, + rules=[ + V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), + ], + ) + + self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) + + def delete_cluster_role(self, name: str): + """ + Delete a cluster role + + :param: name: Name of the cluster role + """ + self.clients[RBAC_CLIENT].delete_cluster_role(name) + + def _get_kubectl_version(self): + version = VersionApi().get_code() + return "{}.{}".format(version.major, version.minor) + + def _need_to_create_new_secret(self): + min_k8s_version = "1.24" + current_k8s_version = self._get_kubectl_version() + return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version) + + def _get_secret_name(self, service_account_name: str): + random_alphanum = str(uuid.uuid4())[:5] + return "{}-token-{}".format(service_account_name, random_alphanum) + + def _create_service_account_secret( + self, service_account_name: str, namespace: str, secret_name: str + ): + """ + Create a secret for the service account. K8s version >= 1.24 + + :param: service_account_name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + :param: secret_name: Name of the secret + """ + v1_core = self.clients[CORE_CLIENT] + secrets = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items + + if len(secrets) > 0: + raise Exception( + "Secret with metadata.name={} already exists".format(secret_name) + ) + + annotations = {"kubernetes.io/service-account.name": service_account_name} + metadata = V1ObjectMeta( + name=secret_name, namespace=namespace, annotations=annotations + ) + type = "kubernetes.io/service-account-token" + secret = V1Secret(metadata=metadata, type=type) + v1_core.create_namespaced_secret(namespace, secret) + + def _get_secret_reference_list(self, namespace: str, secret_name: str): + """ + Return a secret reference list with one secret. + K8s version >= 1.24 + + :param: namespace: Kubernetes namespace for service account metadata + :param: secret_name: Name of the secret + :rtype: list[V1SecretReference] + """ + return [V1SecretReference(name=secret_name, namespace=namespace)] + + def create_service_account( + self, + name: str, + labels: Dict[str, str], + namespace: str = "kube-system", + ): + """ + Create a service account + + :param: name: Name of the service account + :param: labels: Labels for service account metadata + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + v1_core = self.clients[CORE_CLIENT] + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) > 0: + raise Exception( + "Service account with metadata.name={} already exists".format(name) + ) + + metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) + + if self._need_to_create_new_secret(): + secret_name = self._get_secret_name(name) + secrets = self._get_secret_reference_list(namespace, secret_name) + service_account = V1ServiceAccount(metadata=metadata, secrets=secrets) + v1_core.create_namespaced_service_account(namespace, service_account) + self._create_service_account_secret(name, namespace, secret_name) + else: + service_account = V1ServiceAccount(metadata=metadata) + v1_core.create_namespaced_service_account(namespace, service_account) + + def delete_service_account(self, name: str, namespace: str = "kube-system"): + """ + Delete a service account + + :param: name: Name of the service account + :param: namespace: Kubernetes namespace for service account metadata + Default: kube-system + """ + self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) + + def create_cluster_role_binding( + self, name: str, labels: Dict[str, str], namespace: str = "kube-system" + ): + """ + Create a cluster role binding + + :param: name: Name of the cluster role + :param: labels: Labels for cluster role binding metadata + :param: namespace: Kubernetes namespace for cluster role binding metadata + Default: kube-system + """ + role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( + field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception("Generated rbac id already exists") + + role_binding = V1ClusterRoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), + subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], + ) + self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) + + async def create_role_binding( + self, + name: str, + role_name: str, + sa_name: str, + labels: Dict[str, str], + namespace: str, + ): + """ + Create a cluster role binding + + :param: name: Name of the namespaced Role Binding + :param: role_name: Name of the namespaced Role to be bound + :param: sa_name: Name of the Service Account to be bound + :param: labels: Labels for Role Binding metadata + :param: namespace: Kubernetes namespace for Role Binding metadata + + :return: None + """ + role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(role_bindings.items) > 0: + raise Exception( + "Role Binding with metadata.name={} already exists".format(name) + ) + + role_binding = V1RoleBinding( + metadata=V1ObjectMeta(name=name, labels=labels), + role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), + subjects=[ + V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) + ], + ) + self.clients[RBAC_CLIENT].create_namespaced_role_binding( + namespace, role_binding + ) + + def delete_cluster_role_binding(self, name: str): + """ + Delete a cluster role binding + + :param: name: Name of the cluster role binding + """ + self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting the secret from service account"), + callback=retry_callback, + ) + async def get_secret_data( + self, name: str, namespace: str = "kube-system" + ) -> (str, str): + """ + Get secret data + + :param: name: Name of the secret data + :param: namespace: Name of the namespace where the secret is stored + + :return: Tuple with the token and client certificate + """ + v1_core = self.clients[CORE_CLIENT] + + secret_name = None + + service_accounts = v1_core.list_namespaced_service_account( + namespace, field_selector="metadata.name={}".format(name) + ) + if len(service_accounts.items) == 0: + raise Exception( + "Service account not found with metadata.name={}".format(name) + ) + service_account = service_accounts.items[0] + if service_account.secrets and len(service_account.secrets) > 0: + secret_name = service_account.secrets[0].name + if not secret_name: + raise Exception( + "Failed getting the secret from service account {}".format(name) + ) + # TODO: refactor to use get_secret_content + secret = v1_core.list_namespaced_secret( + namespace, field_selector="metadata.name={}".format(secret_name) + ).items[0] + + token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] + client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] + + return ( + base64.b64decode(token).decode("utf-8"), + base64.b64decode(client_certificate_data).decode("utf-8"), + ) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed getting data from the secret"), + ) + async def get_secret_content( + self, + name: str, + namespace: str, + ) -> dict: + """ + Get secret data + + :param: name: Name of the secret + :param: namespace: Name of the namespace where the secret is stored + + :return: Dictionary with secret's data + """ + v1_core = self.clients[CORE_CLIENT] + + secret = v1_core.read_namespaced_secret(name, namespace) + + return secret.data + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the secret"), + ) + async def create_secret( + self, name: str, data: dict, namespace: str, secret_type: str + ): + """ + Get secret data + + :param: name: Name of the secret + :param: data: Dict with data content. Values must be already base64 encoded + :param: namespace: Name of the namespace where the secret will be stored + :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls + + :return: None + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret(metadata=metadata, data=data, type=secret_type) + v1_core.create_namespaced_secret(namespace, secret) + + async def create_certificate( + self, + namespace: str, + name: str, + dns_prefix: str, + secret_name: str, + usages: list, + issuer_name: str, + ): + """ + Creates cert-manager certificate object + + :param: namespace: Name of the namespace where the certificate and secret is stored + :param: name: Name of the certificate object + :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes + :param: secret_name: Name of the secret created by cert-manager + :param: usages: List of X.509 key usages + :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object + + """ + certificate_body = { + "apiVersion": "cert-manager.io/v1", + "kind": "Certificate", + "metadata": {"name": name, "namespace": namespace}, + "spec": { + "secretName": secret_name, + "privateKey": { + "rotationPolicy": "Always", + "algorithm": "ECDSA", + "size": 256, + }, + "duration": "8760h", # 1 Year + "renewBefore": "2208h", # 9 months + "subject": {"organizations": ["osm"]}, + "commonName": "osm", + "isCA": False, + "usages": usages, + "dnsNames": [ + "{}.{}".format(dns_prefix, namespace), + "{}.{}.svc".format(dns_prefix, namespace), + "{}.{}.svc.cluster".format(dns_prefix, namespace), + "{}.{}.svc.cluster.local".format(dns_prefix, namespace), + ], + "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, + }, + } + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.create_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + body=certificate_body, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Certificate already exists: {}".format(e)) + else: + raise e + + async def delete_certificate(self, namespace, object_name): + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.delete_namespaced_custom_object( + group="cert-manager.io", + plural="certificates", + version="v1", + name=object_name, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning("Certificate already deleted: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the namespace"), + ) + async def create_namespace(self, name: str, labels: dict = None): + """ + Create a namespace + + :param: name: Name of the namespace to be created + :param: labels: Dictionary with labels for the new namespace + + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, labels=labels) + namespace = V1Namespace( + metadata=metadata, + ) + + try: + v1_core.create_namespace(namespace) + self.logger.debug("Namespace created: {}".format(name)) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Namespace already exists: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed deleting the namespace"), + ) + async def delete_namespace(self, name: str): + """ + Delete a namespace + + :param: name: Name of the namespace to be deleted + + """ + try: + self.clients[CORE_CLIENT].delete_namespace(name) + except ApiException as e: + if e.reason == "Not Found": + self.logger.warning("Namespace already deleted: {}".format(e))