From c057eb3d407518373fb40c47a2fecb0452165f87 Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Thu, 4 Jul 2024 11:00:13 +0200 Subject: [PATCH] Features 11020,11022-11026: Advanced cluster management Change-Id: I15c910f146f31fb10b56dddae4027825441afdfc Signed-off-by: garciadeblas --- n2vc/kubectl.py | 418 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 402 insertions(+), 16 deletions(-) diff --git a/n2vc/kubectl.py b/n2vc/kubectl.py index 187ded6..27389cf 100644 --- a/n2vc/kubectl.py +++ b/n2vc/kubectl.py @@ -1,4 +1,6 @@ +####################################################################################### # Copyright 2020 Canonical Ltd. +# Copyright ETSI Contributors and Others. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +####################################################################################### import base64 import logging @@ -18,6 +21,8 @@ from typing import Dict import typing import uuid import json +import tarfile +import io from distutils.version import LooseVersion @@ -36,12 +41,21 @@ from kubernetes.client.models import ( V1Secret, V1SecretReference, V1Namespace, + V1PersistentVolumeClaim, + V1PersistentVolumeClaimSpec, + V1PersistentVolumeClaimVolumeSource, + V1ResourceRequirements, + V1Pod, + V1PodSpec, + V1Volume, + V1VolumeMount, + V1Container, ) from kubernetes.client.rest import ApiException +from kubernetes.stream import stream from n2vc.libjuju import retry_callback from retrying_async import retry - SERVICE_ACCOUNT_TOKEN_KEY = "token" SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" # clients @@ -61,7 +75,7 @@ class Kubectl: CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), } self._configuration = config.kube_config.Configuration.get_default_copy() - self.logger = logging.getLogger("Kubectl") + self.logger = logging.getLogger("lcm.kubectl") @property def configuration(self): @@ -96,18 +110,20 @@ class Kubectl: "name": i.metadata.name, "cluster_ip": i.spec.cluster_ip, "type": i.spec.type, - "ports": [ - { - "name": p.name, - "node_port": p.node_port, - "port": p.port, - "protocol": p.protocol, - "target_port": p.target_port, - } - for p in i.spec.ports - ] - if i.spec.ports - else [], + "ports": ( + [ + { + "name": p.name, + "node_port": p.node_port, + "port": p.port, + "protocol": p.protocol, + "target_port": p.target_port, + } + for p in i.spec.ports + ] + if i.spec.ports + else [] + ), "external_ip": [i.ip for i in i.status.load_balancer.ingress] if i.status.load_balancer.ingress else None, @@ -242,7 +258,10 @@ class Kubectl: return "{}-token-{}".format(service_account_name, random_alphanum) def _create_service_account_secret( - self, service_account_name: str, namespace: str, secret_name: str + self, + service_account_name: str, + namespace: str, + secret_name: str, ): """ Create a secret for the service account. K8s version >= 1.24 @@ -315,6 +334,16 @@ class Kubectl: service_account = V1ServiceAccount(metadata=metadata) v1_core.create_namespaced_service_account(namespace, service_account) + def delete_secret(self, name: str, namespace: str = "kube-system"): + """ + Delete a secret + + :param: name: Name of the secret + :param: namespace: Kubernetes namespace + Default: kube-system + """ + self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace) + def delete_service_account(self, name: str, namespace: str = "kube-system"): """ Delete a service account @@ -478,7 +507,7 @@ class Kubectl: self, name: str, data: dict, namespace: str, secret_type: str ): """ - Get secret data + Create secret with data :param: name: Name of the secret :param: data: Dict with data content. Values must be already base64 encoded @@ -487,10 +516,15 @@ class Kubectl: :return: None """ + self.logger.info("Enter create_secret function") v1_core = self.clients[CORE_CLIENT] + self.logger.info(f"v1_core: {v1_core}") metadata = V1ObjectMeta(name=name, namespace=namespace) + self.logger.info(f"metadata: {metadata}") secret = V1Secret(metadata=metadata, data=data, type=secret_type) + self.logger.info(f"secret: {secret}") v1_core.create_namespaced_secret(namespace, secret) + self.logger.info("Namespaced secret was created") async def create_certificate( self, @@ -617,3 +651,355 @@ class Kubectl: except ApiException as e: if e.reason == "Not Found": self.logger.warning("Namespace already deleted: {}".format(e)) + + def get_secrets( + self, + namespace: str, + field_selector: str = None, + ) -> typing.List[typing.Dict]: + """ + Get Secret list from a namespace + + :param: namespace: Kubernetes namespace + :param: field_selector: Kubernetes field selector + + :return: List of the secrets matching the selectors specified + """ + try: + v1_core = self.clients[CORE_CLIENT] + secrets = v1_core.list_namespaced_secret( + namespace=namespace, + field_selector=field_selector, + ).items + return secrets + except ApiException as e: + self.logger.error("Error calling get secrets: {}".format(e)) + raise e + + async def create_generic_object( + self, + api_group: str, + api_plural: str, + api_version: str, + namespace: str, + manifest_dict: dict, + ): + """ + Creates generic object + + :param: api_group: API Group + :param: api_plural: API Plural + :param: api_version: API Version + :param: namespace: Namespace + :param: manifest_dict: Dictionary with the content of the Kubernetes manifest + + """ + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.create_namespaced_custom_object( + group=api_group, + plural=api_plural, + version=api_version, + body=manifest_dict, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("Object already exists: {}".format(e)) + else: + raise e + + async def delete_generic_object( + self, + api_group: str, + api_plural: str, + api_version: str, + namespace: str, + name: str, + ): + """ + Deletes generic object + + :param: api_group: API Group + :param: api_plural: API Plural + :param: api_version: API Version + :param: namespace: Namespace + :param: name: Name of the object + + """ + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + client.delete_namespaced_custom_object( + group=api_group, + plural=api_plural, + version=api_version, + name=name, + namespace=namespace, + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning("Object already deleted: {}".format(e)) + else: + raise e + + async def get_generic_object( + self, + api_group: str, + api_plural: str, + api_version: str, + namespace: str, + name: str, + ): + """ + Gets generic object + + :param: api_group: API Group + :param: api_plural: API Plural + :param: api_version: API Version + :param: namespace: Namespace + :param: name: Name of the object + + """ + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + object_dict = client.list_namespaced_custom_object( + group=api_group, + plural=api_plural, + version=api_version, + namespace=namespace, + field_selector=f"metadata.name={name}", + ) + if len(object_dict.get("items")) == 0: + return None + return object_dict.get("items")[0] + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning("Cannot get custom object: {}".format(e)) + else: + raise e + + async def list_generic_object( + self, + api_group: str, + api_plural: str, + api_version: str, + namespace: str, + ): + """ + Lists all generic objects of the requested API group + + :param: api_group: API Group + :param: api_plural: API Plural + :param: api_version: API Version + :param: namespace: Namespace + + """ + client = self.clients[CUSTOM_OBJECT_CLIENT] + try: + object_dict = client.list_namespaced_custom_object( + group=api_group, + plural=api_plural, + version=api_version, + namespace=namespace, + ) + self.logger.debug(f"Object-list: {object_dict.get('items')}") + return object_dict.get("items") + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "notfound": + self.logger.warning( + "Cannot retrieve list of custom objects: {}".format(e) + ) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the secret"), + ) + async def create_secret_string( + self, name: str, string_data: str, namespace: str, secret_type: str + ): + """ + Create secret with data + + :param: name: Name of the secret + :param: string_data: String with data content + :param: namespace: Name of the namespace where the secret will be stored + :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls + + :return: None + """ + v1_core = self.clients[CORE_CLIENT] + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type) + v1_core.create_namespaced_secret(namespace, secret) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the pvc"), + ) + async def create_pvc(self, name: str, namespace: str): + """ + Create a namespace + + :param: name: Name of the pvc to be created + :param: namespace: Name of the namespace where the pvc will be stored + + """ + try: + pvc = V1PersistentVolumeClaim( + api_version="v1", + kind="PersistentVolumeClaim", + metadata=V1ObjectMeta(name=name), + spec=V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=V1ResourceRequirements(requests={"storage": "100Mi"}), + ), + ) + self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim( + namespace=namespace, body=pvc + ) + except ApiException as e: + info = json.loads(e.body) + if info.get("reason").lower() == "alreadyexists": + self.logger.warning("PVC already exists: {}".format(e)) + else: + raise e + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed deleting the pvc"), + ) + async def delete_pvc(self, name: str, namespace: str): + """ + Create a namespace + + :param: name: Name of the pvc to be deleted + :param: namespace: Namespace + + """ + self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim( + name, namespace + ) + + def copy_file_to_pod( + self, namespace, pod_name, container_name, src_file, dest_path + ): + # Create an in-memory tar file containing the source file + tar_buffer = io.BytesIO() + with tarfile.open(fileobj=tar_buffer, mode="w") as tar: + tar.add(src_file, arcname=dest_path.split("/")[-1]) + + tar_buffer.seek(0) + + # Define the command to extract the tar file in the pod + exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]] + + # Execute the command + resp = stream( + self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec, + pod_name, + namespace, + command=exec_command, + container=container_name, + stdin=True, + stderr=True, + stdout=True, + tty=False, + _preload_content=False, + ) + + # Write the tar data to the pod + resp.write_stdin(tar_buffer.read()) + resp.close() + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the pvc"), + ) + async def create_pvc_with_content( + self, name: str, namespace: str, src_folder: str, filename: str + ): + """ + Create a PVC with content + + :param: name: Name of the pvc to be created + :param: namespace: Name of the namespace where the pvc will be stored + :param: src_folder: Folder where the file to be copied is located + :param: filename: Name of the file to be copied + """ + pod_name = f"copy-pod-{name}" + await self.create_pvc(name=name, namespace=namespace) + await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name) + self.copy_file_to_pod( + namespace=namespace, + pod_name=pod_name, + container_name="copy-container", + src_file=f"{src_folder}/{filename}", + dest_path=f"/mnt/data/{filename}", + ) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed creating the pvc"), + ) + async def create_copy_pod(self, name: str, namespace: str, pvc_name: str): + """ + Create a pod to copy content into a PVC + + :param: name: Name of the pod to be created + :param: namespace: Name of the namespace where the pod will be stored + :param: pvc_name: Name of the PVC that the pod will mount as a volume + + """ + pod = V1Pod( + api_version="v1", + kind="Pod", + metadata=client.V1ObjectMeta(name=name), + spec=V1PodSpec( + containers=[ + V1Container( + name="copy-container", + image="busybox", # Imagen ligera para copiar archivos + command=["sleep", "3600"], # Mantén el contenedor en ejecución + volume_mounts=[ + V1VolumeMount(mount_path="/mnt/data", name="my-storage") + ], + ) + ], + volumes=[ + V1Volume( + name="my-storage", + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource( + claim_name=pvc_name + ), + ) + ], + ), + ) + # Create the pod + self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod) + + @retry( + attempts=10, + delay=1, + fallback=Exception("Failed deleting the pod"), + ) + async def delete_pod(self, name: str, namespace: str): + """ + Create a namespace + + :param: name: Name of the pod to be deleted + :param: namespace: Namespace + + """ + self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace) -- 2.25.1