From: garciadeblas Date: Thu, 22 Aug 2024 08:05:47 +0000 (+0200) Subject: Use kubectl module from n2vc and remove it from odu_libs X-Git-Tag: v16.0.0 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=66536b78aadcb39f8b9cd33f505967fe26e230d4;p=osm%2FLCM.git Use kubectl module from n2vc and remove it from odu_libs Change-Id: I08a715564f32623dda1608937c7d35bb212a4961 Signed-off-by: garciadeblas --- diff --git a/osm_lcm/odu_libs/kubectl.py b/osm_lcm/odu_libs/kubectl.py deleted file mode 100644 index a7f0cee..0000000 --- a/osm_lcm/odu_libs/kubectl.py +++ /dev/null @@ -1,829 +0,0 @@ -####################################################################################### -# Copyright 2020 Canonical Ltd. -# Copyright ETSI Contributors and Others. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -####################################################################################### - -import base64 -import logging -from typing import Dict -import typing -import uuid -import json - -from distutils.version import LooseVersion - -from kubernetes import client, config -from kubernetes.client.api import VersionApi -from kubernetes.client.models import ( - V1ClusterRole, - V1Role, - V1ObjectMeta, - V1PolicyRule, - V1ServiceAccount, - V1ClusterRoleBinding, - V1RoleBinding, - V1RoleRef, - RbacV1Subject, - V1Secret, - V1SecretReference, - V1Namespace, -) -from kubernetes.client.rest import ApiException -from n2vc.libjuju import retry_callback -from retrying_async import retry - - -SERVICE_ACCOUNT_TOKEN_KEY = "token" -SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" -# clients -CORE_CLIENT = "core_v1" -RBAC_CLIENT = "rbac_v1" -STORAGE_CLIENT = "storage_v1" -CUSTOM_OBJECT_CLIENT = "custom_object" - - -class Kubectl: - def __init__(self, config_file=None): - config.load_kube_config(config_file=config_file) - self._clients = { - CORE_CLIENT: client.CoreV1Api(), - RBAC_CLIENT: client.RbacAuthorizationV1Api(), - STORAGE_CLIENT: client.StorageV1Api(), - CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), - } - self._configuration = config.kube_config.Configuration.get_default_copy() - self.logger = logging.getLogger("lcm.odu") - - @property - def configuration(self): - return self._configuration - - @property - def clients(self): - return self._clients - - def get_services( - self, - field_selector: str = None, - label_selector: str = None, - ) -> typing.List[typing.Dict]: - """ - Get Service list from a namespace - - :param: field_selector: Kubernetes field selector for the namespace - :param: label_selector: Kubernetes label selector for the namespace - - :return: List of the services matching the selectors specified - """ - kwargs = {} - if field_selector: - kwargs["field_selector"] = field_selector - if label_selector: - kwargs["label_selector"] = label_selector - try: - result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) - return [ - { - "name": i.metadata.name, - "cluster_ip": i.spec.cluster_ip, - "type": i.spec.type, - "ports": ( - [ - { - "name": p.name, - "node_port": p.node_port, - "port": p.port, - "protocol": p.protocol, - "target_port": p.target_port, - } - for p in i.spec.ports - ] - if i.spec.ports - else [] - ), - "external_ip": [i.ip for i in i.status.load_balancer.ingress] - if i.status.load_balancer.ingress - else None, - } - for i in result.items - ] - except ApiException as e: - self.logger.error("Error calling get services: {}".format(e)) - raise e - - def get_default_storage_class(self) -> str: - """ - Default storage class - - :return: Returns the default storage class name, if exists. - If not, it returns the first storage class. - If there are not storage classes, returns None - """ - storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() - selected_sc = None - default_sc_annotations = { - "storageclass.kubernetes.io/is-default-class": "true", - # Older clusters still use the beta annotation. - "storageclass.beta.kubernetes.io/is-default-class": "true", - } - for sc in storage_classes.items: - if not selected_sc: - # Select the first storage class in case there is no a default-class - selected_sc = sc.metadata.name - annotations = sc.metadata.annotations or {} - if any( - k in annotations and annotations[k] == v - for k, v in default_sc_annotations.items() - ): - # Default storage - selected_sc = sc.metadata.name - break - return selected_sc - - def create_cluster_role( - self, - name: str, - labels: Dict[str, str], - namespace: str = "kube-system", - ): - """ - Create a cluster role - - :param: name: Name of the cluster role - :param: labels: Labels for cluster role metadata - :param: namespace: Kubernetes namespace for cluster role metadata - Default: kube-system - """ - cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( - field_selector="metadata.name={}".format(name) - ) - - if len(cluster_roles.items) > 0: - raise Exception("Role with metadata.name={} already exists".format(name)) - - metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) - # Cluster role - cluster_role = V1ClusterRole( - metadata=metadata, - rules=[ - V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), - V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), - ], - ) - - self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) - - async def create_role( - self, - name: str, - labels: Dict[str, str], - api_groups: list, - resources: list, - verbs: list, - namespace: str, - ): - """ - Create a role with one PolicyRule - - :param: name: Name of the namespaced Role - :param: labels: Labels for namespaced Role metadata - :param: api_groups: List with api-groups allowed in the policy rule - :param: resources: List with resources allowed in the policy rule - :param: verbs: List with verbs allowed in the policy rule - :param: namespace: Kubernetes namespace for Role metadata - - :return: None - """ - - roles = self.clients[RBAC_CLIENT].list_namespaced_role( - namespace, field_selector="metadata.name={}".format(name) - ) - - if len(roles.items) > 0: - raise Exception("Role with metadata.name={} already exists".format(name)) - - metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) - - role = V1Role( - metadata=metadata, - rules=[ - V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), - ], - ) - - self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) - - def delete_cluster_role(self, name: str): - """ - Delete a cluster role - - :param: name: Name of the cluster role - """ - self.clients[RBAC_CLIENT].delete_cluster_role(name) - - def _get_kubectl_version(self): - version = VersionApi().get_code() - return "{}.{}".format(version.major, version.minor) - - def _need_to_create_new_secret(self): - min_k8s_version = "1.24" - current_k8s_version = self._get_kubectl_version() - return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version) - - def _get_secret_name(self, service_account_name: str): - random_alphanum = str(uuid.uuid4())[:5] - return "{}-token-{}".format(service_account_name, random_alphanum) - - def _create_service_account_secret( - self, - service_account_name: str, - namespace: str, - secret_name: str, - ): - """ - Create a secret for the service account. K8s version >= 1.24 - - :param: service_account_name: Name of the service account - :param: namespace: Kubernetes namespace for service account metadata - :param: secret_name: Name of the secret - """ - v1_core = self.clients[CORE_CLIENT] - secrets = v1_core.list_namespaced_secret( - namespace, field_selector="metadata.name={}".format(secret_name) - ).items - - if len(secrets) > 0: - raise Exception( - "Secret with metadata.name={} already exists".format(secret_name) - ) - - annotations = {"kubernetes.io/service-account.name": service_account_name} - metadata = V1ObjectMeta( - name=secret_name, namespace=namespace, annotations=annotations - ) - type = "kubernetes.io/service-account-token" - secret = V1Secret(metadata=metadata, type=type) - v1_core.create_namespaced_secret(namespace, secret) - - def _get_secret_reference_list(self, namespace: str, secret_name: str): - """ - Return a secret reference list with one secret. - K8s version >= 1.24 - - :param: namespace: Kubernetes namespace for service account metadata - :param: secret_name: Name of the secret - :rtype: list[V1SecretReference] - """ - return [V1SecretReference(name=secret_name, namespace=namespace)] - - def create_service_account( - self, - name: str, - labels: Dict[str, str], - namespace: str = "kube-system", - ): - """ - Create a service account - - :param: name: Name of the service account - :param: labels: Labels for service account metadata - :param: namespace: Kubernetes namespace for service account metadata - Default: kube-system - """ - v1_core = self.clients[CORE_CLIENT] - service_accounts = v1_core.list_namespaced_service_account( - namespace, field_selector="metadata.name={}".format(name) - ) - if len(service_accounts.items) > 0: - raise Exception( - "Service account with metadata.name={} already exists".format(name) - ) - - metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) - - if self._need_to_create_new_secret(): - secret_name = self._get_secret_name(name) - secrets = self._get_secret_reference_list(namespace, secret_name) - service_account = V1ServiceAccount(metadata=metadata, secrets=secrets) - v1_core.create_namespaced_service_account(namespace, service_account) - self._create_service_account_secret(name, namespace, secret_name) - else: - service_account = V1ServiceAccount(metadata=metadata) - v1_core.create_namespaced_service_account(namespace, service_account) - - def delete_secret(self, name: str, namespace: str = "kube-system"): - """ - Delete a secret - - :param: name: Name of the secret - :param: namespace: Kubernetes namespace - Default: kube-system - """ - self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace) - - def delete_service_account(self, name: str, namespace: str = "kube-system"): - """ - Delete a service account - - :param: name: Name of the service account - :param: namespace: Kubernetes namespace for service account metadata - Default: kube-system - """ - self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) - - def create_cluster_role_binding( - self, name: str, labels: Dict[str, str], namespace: str = "kube-system" - ): - """ - Create a cluster role binding - - :param: name: Name of the cluster role - :param: labels: Labels for cluster role binding metadata - :param: namespace: Kubernetes namespace for cluster role binding metadata - Default: kube-system - """ - role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( - field_selector="metadata.name={}".format(name) - ) - if len(role_bindings.items) > 0: - raise Exception("Generated rbac id already exists") - - role_binding = V1ClusterRoleBinding( - metadata=V1ObjectMeta(name=name, labels=labels), - role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), - subjects=[ - RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace) - ], - ) - self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) - - async def create_role_binding( - self, - name: str, - role_name: str, - sa_name: str, - labels: Dict[str, str], - namespace: str, - ): - """ - Create a cluster role binding - - :param: name: Name of the namespaced Role Binding - :param: role_name: Name of the namespaced Role to be bound - :param: sa_name: Name of the Service Account to be bound - :param: labels: Labels for Role Binding metadata - :param: namespace: Kubernetes namespace for Role Binding metadata - - :return: None - """ - role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( - namespace, field_selector="metadata.name={}".format(name) - ) - if len(role_bindings.items) > 0: - raise Exception( - "Role Binding with metadata.name={} already exists".format(name) - ) - - role_binding = V1RoleBinding( - metadata=V1ObjectMeta(name=name, labels=labels), - role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), - subjects=[ - RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) - ], - ) - self.clients[RBAC_CLIENT].create_namespaced_role_binding( - namespace, role_binding - ) - - def delete_cluster_role_binding(self, name: str): - """ - Delete a cluster role binding - - :param: name: Name of the cluster role binding - """ - self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed getting the secret from service account"), - callback=retry_callback, - ) - async def get_secret_data( - self, name: str, namespace: str = "kube-system" - ) -> (str, str): - """ - Get secret data - - :param: name: Name of the secret data - :param: namespace: Name of the namespace where the secret is stored - - :return: Tuple with the token and client certificate - """ - v1_core = self.clients[CORE_CLIENT] - - secret_name = None - - service_accounts = v1_core.list_namespaced_service_account( - namespace, field_selector="metadata.name={}".format(name) - ) - if len(service_accounts.items) == 0: - raise Exception( - "Service account not found with metadata.name={}".format(name) - ) - service_account = service_accounts.items[0] - if service_account.secrets and len(service_account.secrets) > 0: - secret_name = service_account.secrets[0].name - if not secret_name: - raise Exception( - "Failed getting the secret from service account {}".format(name) - ) - # TODO: refactor to use get_secret_content - secret = v1_core.list_namespaced_secret( - namespace, field_selector="metadata.name={}".format(secret_name) - ).items[0] - - token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] - client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] - - return ( - base64.b64decode(token).decode("utf-8"), - base64.b64decode(client_certificate_data).decode("utf-8"), - ) - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed getting data from the secret"), - ) - async def get_secret_content( - self, - name: str, - namespace: str, - ) -> dict: - """ - Get secret data - - :param: name: Name of the secret - :param: namespace: Name of the namespace where the secret is stored - - :return: Dictionary with secret's data - """ - v1_core = self.clients[CORE_CLIENT] - - secret = v1_core.read_namespaced_secret(name, namespace) - - return secret.data - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed creating the secret"), - ) - async def create_secret( - self, name: str, data: dict, namespace: str, secret_type: str - ): - """ - Create secret with data - - :param: name: Name of the secret - :param: data: Dict with data content. Values must be already base64 encoded - :param: namespace: Name of the namespace where the secret will be stored - :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls - - :return: None - """ - self.logger.info("Enter create_secret function") - v1_core = self.clients[CORE_CLIENT] - self.logger.info(f"v1_core: {v1_core}") - metadata = V1ObjectMeta(name=name, namespace=namespace) - self.logger.info(f"metadata: {metadata}") - secret = V1Secret(metadata=metadata, data=data, type=secret_type) - self.logger.info(f"secret: {secret}") - v1_core.create_namespaced_secret(namespace, secret) - self.logger.info("Namespaced secret was created") - - async def create_certificate( - self, - namespace: str, - name: str, - dns_prefix: str, - secret_name: str, - usages: list, - issuer_name: str, - ): - """ - Creates cert-manager certificate object - - :param: namespace: Name of the namespace where the certificate and secret is stored - :param: name: Name of the certificate object - :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes - :param: secret_name: Name of the secret created by cert-manager - :param: usages: List of X.509 key usages - :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object - - """ - certificate_body = { - "apiVersion": "cert-manager.io/v1", - "kind": "Certificate", - "metadata": {"name": name, "namespace": namespace}, - "spec": { - "secretName": secret_name, - "privateKey": { - "rotationPolicy": "Always", - "algorithm": "ECDSA", - "size": 256, - }, - "duration": "8760h", # 1 Year - "renewBefore": "2208h", # 9 months - "subject": {"organizations": ["osm"]}, - "commonName": "osm", - "isCA": False, - "usages": usages, - "dnsNames": [ - "{}.{}".format(dns_prefix, namespace), - "{}.{}.svc".format(dns_prefix, namespace), - "{}.{}.svc.cluster".format(dns_prefix, namespace), - "{}.{}.svc.cluster.local".format(dns_prefix, namespace), - ], - "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, - }, - } - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - client.create_namespaced_custom_object( - group="cert-manager.io", - plural="certificates", - version="v1", - body=certificate_body, - namespace=namespace, - ) - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "alreadyexists": - self.logger.warning("Certificate already exists: {}".format(e)) - else: - raise e - - async def delete_certificate(self, namespace, object_name): - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - client.delete_namespaced_custom_object( - group="cert-manager.io", - plural="certificates", - version="v1", - name=object_name, - namespace=namespace, - ) - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "notfound": - self.logger.warning("Certificate already deleted: {}".format(e)) - else: - raise e - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed creating the namespace"), - ) - async def create_namespace(self, name: str, labels: dict = None): - """ - Create a namespace - - :param: name: Name of the namespace to be created - :param: labels: Dictionary with labels for the new namespace - - """ - v1_core = self.clients[CORE_CLIENT] - metadata = V1ObjectMeta(name=name, labels=labels) - namespace = V1Namespace( - metadata=metadata, - ) - - try: - v1_core.create_namespace(namespace) - self.logger.debug("Namespace created: {}".format(name)) - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "alreadyexists": - self.logger.warning("Namespace already exists: {}".format(e)) - else: - raise e - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed deleting the namespace"), - ) - async def delete_namespace(self, name: str): - """ - Delete a namespace - - :param: name: Name of the namespace to be deleted - - """ - try: - self.clients[CORE_CLIENT].delete_namespace(name) - except ApiException as e: - if e.reason == "Not Found": - self.logger.warning("Namespace already deleted: {}".format(e)) - - def get_secrets( - self, - namespace: str, - field_selector: str = None, - ) -> typing.List[typing.Dict]: - """ - Get Secret list from a namespace - - :param: namespace: Kubernetes namespace - :param: field_selector: Kubernetes field selector - - :return: List of the secrets matching the selectors specified - """ - try: - v1_core = self.clients[CORE_CLIENT] - secrets = v1_core.list_namespaced_secret( - namespace=namespace, - field_selector=field_selector, - ).items - return secrets - except ApiException as e: - self.logger.error("Error calling get secrets: {}".format(e)) - raise e - - def create_generic_object( - self, - api_group: str, - api_plural: str, - api_version: str, - namespace: str, - manifest_dict: dict, - ): - """ - Creates generic object - - :param: api_group: API Group - :param: api_plural: API Plural - :param: api_version: API Version - :param: namespace: Namespace - :param: manifest_dict: Dictionary with the content of the Kubernetes manifest - - """ - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - client.create_namespaced_custom_object( - group=api_group, - plural=api_plural, - version=api_version, - body=manifest_dict, - namespace=namespace, - ) - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "alreadyexists": - self.logger.warning("Object already exists: {}".format(e)) - else: - raise e - - def delete_generic_object( - self, - api_group: str, - api_plural: str, - api_version: str, - namespace: str, - name: str, - ): - """ - Deletes generic object - - :param: api_group: API Group - :param: api_plural: API Plural - :param: api_version: API Version - :param: namespace: Namespace - :param: name: Name of the object - - """ - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - client.delete_namespaced_custom_object( - group=api_group, - plural=api_plural, - version=api_version, - name=name, - namespace=namespace, - ) - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "notfound": - self.logger.warning("Object already deleted: {}".format(e)) - else: - raise e - - async def get_generic_object( - self, - api_group: str, - api_plural: str, - api_version: str, - namespace: str, - name: str, - ): - """ - Gets generic object - - :param: api_group: API Group - :param: api_plural: API Plural - :param: api_version: API Version - :param: namespace: Namespace - :param: name: Name of the object - - """ - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - object_dict = client.list_namespaced_custom_object( - group=api_group, - plural=api_plural, - version=api_version, - namespace=namespace, - field_selector=f"metadata.name={name}", - ) - if len(object_dict.get("items")) == 0: - return None - return object_dict.get("items")[0] - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "notfound": - self.logger.warning("Cannot get custom object: {}".format(e)) - else: - raise e - - async def list_generic_object( - self, - api_group: str, - api_plural: str, - api_version: str, - namespace: str, - ): - """ - Lists all generic objects of the requested API group - - :param: api_group: API Group - :param: api_plural: API Plural - :param: api_version: API Version - :param: namespace: Namespace - - """ - client = self.clients[CUSTOM_OBJECT_CLIENT] - try: - object_dict = client.list_namespaced_custom_object( - group=api_group, - plural=api_plural, - version=api_version, - namespace=namespace, - ) - self.logger.debug(f"Object-list: {object_dict.get('items')}") - return object_dict.get("items") - except ApiException as e: - info = json.loads(e.body) - if info.get("reason").lower() == "notfound": - self.logger.warning( - "Cannot retrieve list of custom objects: {}".format(e) - ) - else: - raise e - - @retry( - attempts=10, - delay=1, - fallback=Exception("Failed creating the secret"), - ) - async def create_secret_string( - self, name: str, string_data: str, namespace: str, secret_type: str - ): - """ - Create secret with data - - :param: name: Name of the secret - :param: string_data: String with data content - :param: namespace: Name of the namespace where the secret will be stored - :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls - - :return: None - """ - v1_core = self.clients[CORE_CLIENT] - metadata = V1ObjectMeta(name=name, namespace=namespace) - secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type) - v1_core.create_namespaced_secret(namespace, secret) diff --git a/osm_lcm/odu_workflows.py b/osm_lcm/odu_workflows.py index e46cdf7..bb3fce2 100644 --- a/osm_lcm/odu_workflows.py +++ b/osm_lcm/odu_workflows.py @@ -17,8 +17,7 @@ import logging from osm_lcm.lcm_utils import LcmBase -# from n2vc import kubectl -from osm_lcm.odu_libs import kubectl +from n2vc import kubectl class OduWorkflow(LcmBase):