Features 11020,11022-11026: Advanced cluster management 64/14464/14
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 4 Jul 2024 09:00:13 +0000 (11:00 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Tue, 13 Aug 2024 11:40:59 +0000 (13:40 +0200)
Change-Id: I15c910f146f31fb10b56dddae4027825441afdfc
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
n2vc/kubectl.py

index 187ded6..27389cf 100644 (file)
@@ -1,4 +1,6 @@
+#######################################################################################
 # Copyright 2020 Canonical Ltd.
+# Copyright ETSI Contributors and Others.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -11,6 +13,7 @@
 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #     See the License for the specific language governing permissions and
 #     limitations under the License.
+#######################################################################################
 
 import base64
 import logging
@@ -18,6 +21,8 @@ from typing import Dict
 import typing
 import uuid
 import json
+import tarfile
+import io
 
 from distutils.version import LooseVersion
 
@@ -36,12 +41,21 @@ from kubernetes.client.models import (
     V1Secret,
     V1SecretReference,
     V1Namespace,
+    V1PersistentVolumeClaim,
+    V1PersistentVolumeClaimSpec,
+    V1PersistentVolumeClaimVolumeSource,
+    V1ResourceRequirements,
+    V1Pod,
+    V1PodSpec,
+    V1Volume,
+    V1VolumeMount,
+    V1Container,
 )
 from kubernetes.client.rest import ApiException
+from kubernetes.stream import stream
 from n2vc.libjuju import retry_callback
 from retrying_async import retry
 
-
 SERVICE_ACCOUNT_TOKEN_KEY = "token"
 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
 # clients
@@ -61,7 +75,7 @@ class Kubectl:
             CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
         }
         self._configuration = config.kube_config.Configuration.get_default_copy()
-        self.logger = logging.getLogger("Kubectl")
+        self.logger = logging.getLogger("lcm.kubectl")
 
     @property
     def configuration(self):
@@ -96,18 +110,20 @@ class Kubectl:
                     "name": i.metadata.name,
                     "cluster_ip": i.spec.cluster_ip,
                     "type": i.spec.type,
-                    "ports": [
-                        {
-                            "name": p.name,
-                            "node_port": p.node_port,
-                            "port": p.port,
-                            "protocol": p.protocol,
-                            "target_port": p.target_port,
-                        }
-                        for p in i.spec.ports
-                    ]
-                    if i.spec.ports
-                    else [],
+                    "ports": (
+                        [
+                            {
+                                "name": p.name,
+                                "node_port": p.node_port,
+                                "port": p.port,
+                                "protocol": p.protocol,
+                                "target_port": p.target_port,
+                            }
+                            for p in i.spec.ports
+                        ]
+                        if i.spec.ports
+                        else []
+                    ),
                     "external_ip": [i.ip for i in i.status.load_balancer.ingress]
                     if i.status.load_balancer.ingress
                     else None,
@@ -242,7 +258,10 @@ class Kubectl:
         return "{}-token-{}".format(service_account_name, random_alphanum)
 
     def _create_service_account_secret(
-        self, service_account_name: str, namespace: str, secret_name: str
+        self,
+        service_account_name: str,
+        namespace: str,
+        secret_name: str,
     ):
         """
         Create a secret for the service account. K8s version >= 1.24
@@ -315,6 +334,16 @@ class Kubectl:
             service_account = V1ServiceAccount(metadata=metadata)
             v1_core.create_namespaced_service_account(namespace, service_account)
 
+    def delete_secret(self, name: str, namespace: str = "kube-system"):
+        """
+        Delete a secret
+
+        :param: name:       Name of the secret
+        :param: namespace:  Kubernetes namespace
+                            Default: kube-system
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
+
     def delete_service_account(self, name: str, namespace: str = "kube-system"):
         """
         Delete a service account
@@ -478,7 +507,7 @@ class Kubectl:
         self, name: str, data: dict, namespace: str, secret_type: str
     ):
         """
-        Get secret data
+        Create secret with data
 
         :param: name:        Name of the secret
         :param: data:        Dict with data content. Values must be already base64 encoded
@@ -487,10 +516,15 @@ class Kubectl:
 
         :return: None
         """
+        self.logger.info("Enter create_secret function")
         v1_core = self.clients[CORE_CLIENT]
+        self.logger.info(f"v1_core: {v1_core}")
         metadata = V1ObjectMeta(name=name, namespace=namespace)
+        self.logger.info(f"metadata: {metadata}")
         secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+        self.logger.info(f"secret: {secret}")
         v1_core.create_namespaced_secret(namespace, secret)
+        self.logger.info("Namespaced secret was created")
 
     async def create_certificate(
         self,
@@ -617,3 +651,355 @@ class Kubectl:
         except ApiException as e:
             if e.reason == "Not Found":
                 self.logger.warning("Namespace already deleted: {}".format(e))
+
+    def get_secrets(
+        self,
+        namespace: str,
+        field_selector: str = None,
+    ) -> typing.List[typing.Dict]:
+        """
+        Get Secret list from a namespace
+
+        :param: namespace:  Kubernetes namespace
+        :param: field_selector:     Kubernetes field selector
+
+        :return: List of the secrets matching the selectors specified
+        """
+        try:
+            v1_core = self.clients[CORE_CLIENT]
+            secrets = v1_core.list_namespaced_secret(
+                namespace=namespace,
+                field_selector=field_selector,
+            ).items
+            return secrets
+        except ApiException as e:
+            self.logger.error("Error calling get secrets: {}".format(e))
+            raise e
+
+    async def create_generic_object(
+        self,
+        api_group: str,
+        api_plural: str,
+        api_version: str,
+        namespace: str,
+        manifest_dict: dict,
+    ):
+        """
+        Creates generic object
+
+        :param: api_group:       API Group
+        :param: api_plural:      API Plural
+        :param: api_version:     API Version
+        :param: namespace:       Namespace
+        :param: manifest_dict:   Dictionary with the content of the Kubernetes manifest
+
+        """
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.create_namespaced_custom_object(
+                group=api_group,
+                plural=api_plural,
+                version=api_version,
+                body=manifest_dict,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("Object already exists: {}".format(e))
+            else:
+                raise e
+
+    async def delete_generic_object(
+        self,
+        api_group: str,
+        api_plural: str,
+        api_version: str,
+        namespace: str,
+        name: str,
+    ):
+        """
+        Deletes generic object
+
+        :param: api_group:       API Group
+        :param: api_plural:      API Plural
+        :param: api_version:     API Version
+        :param: namespace:       Namespace
+        :param: name:            Name of the object
+
+        """
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            client.delete_namespaced_custom_object(
+                group=api_group,
+                plural=api_plural,
+                version=api_version,
+                name=name,
+                namespace=namespace,
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning("Object already deleted: {}".format(e))
+            else:
+                raise e
+
+    async def get_generic_object(
+        self,
+        api_group: str,
+        api_plural: str,
+        api_version: str,
+        namespace: str,
+        name: str,
+    ):
+        """
+        Gets generic object
+
+        :param: api_group:       API Group
+        :param: api_plural:      API Plural
+        :param: api_version:     API Version
+        :param: namespace:       Namespace
+        :param: name:            Name of the object
+
+        """
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            object_dict = client.list_namespaced_custom_object(
+                group=api_group,
+                plural=api_plural,
+                version=api_version,
+                namespace=namespace,
+                field_selector=f"metadata.name={name}",
+            )
+            if len(object_dict.get("items")) == 0:
+                return None
+            return object_dict.get("items")[0]
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning("Cannot get custom object: {}".format(e))
+            else:
+                raise e
+
+    async def list_generic_object(
+        self,
+        api_group: str,
+        api_plural: str,
+        api_version: str,
+        namespace: str,
+    ):
+        """
+        Lists all generic objects of the requested API group
+
+        :param: api_group:       API Group
+        :param: api_plural:      API Plural
+        :param: api_version:     API Version
+        :param: namespace:       Namespace
+
+        """
+        client = self.clients[CUSTOM_OBJECT_CLIENT]
+        try:
+            object_dict = client.list_namespaced_custom_object(
+                group=api_group,
+                plural=api_plural,
+                version=api_version,
+                namespace=namespace,
+            )
+            self.logger.debug(f"Object-list: {object_dict.get('items')}")
+            return object_dict.get("items")
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "notfound":
+                self.logger.warning(
+                    "Cannot retrieve list of custom objects: {}".format(e)
+                )
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the secret"),
+    )
+    async def create_secret_string(
+        self, name: str, string_data: str, namespace: str, secret_type: str
+    ):
+        """
+        Create secret with data
+
+        :param: name:        Name of the secret
+        :param: string_data: String with data content
+        :param: namespace:   Name of the namespace where the secret will be stored
+        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+        :return: None
+        """
+        v1_core = self.clients[CORE_CLIENT]
+        metadata = V1ObjectMeta(name=name, namespace=namespace)
+        secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
+        v1_core.create_namespaced_secret(namespace, secret)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the pvc"),
+    )
+    async def create_pvc(self, name: str, namespace: str):
+        """
+        Create a namespace
+
+        :param: name:       Name of the pvc to be created
+        :param: namespace:  Name of the namespace where the pvc will be stored
+
+        """
+        try:
+            pvc = V1PersistentVolumeClaim(
+                api_version="v1",
+                kind="PersistentVolumeClaim",
+                metadata=V1ObjectMeta(name=name),
+                spec=V1PersistentVolumeClaimSpec(
+                    access_modes=["ReadWriteOnce"],
+                    resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
+                ),
+            )
+            self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
+                namespace=namespace, body=pvc
+            )
+        except ApiException as e:
+            info = json.loads(e.body)
+            if info.get("reason").lower() == "alreadyexists":
+                self.logger.warning("PVC already exists: {}".format(e))
+            else:
+                raise e
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed deleting the pvc"),
+    )
+    async def delete_pvc(self, name: str, namespace: str):
+        """
+        Create a namespace
+
+        :param: name:       Name of the pvc to be deleted
+        :param: namespace:  Namespace
+
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
+            name, namespace
+        )
+
+    def copy_file_to_pod(
+        self, namespace, pod_name, container_name, src_file, dest_path
+    ):
+        # Create an in-memory tar file containing the source file
+        tar_buffer = io.BytesIO()
+        with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
+            tar.add(src_file, arcname=dest_path.split("/")[-1])
+
+        tar_buffer.seek(0)
+
+        # Define the command to extract the tar file in the pod
+        exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
+
+        # Execute the command
+        resp = stream(
+            self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+            pod_name,
+            namespace,
+            command=exec_command,
+            container=container_name,
+            stdin=True,
+            stderr=True,
+            stdout=True,
+            tty=False,
+            _preload_content=False,
+        )
+
+        # Write the tar data to the pod
+        resp.write_stdin(tar_buffer.read())
+        resp.close()
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the pvc"),
+    )
+    async def create_pvc_with_content(
+        self, name: str, namespace: str, src_folder: str, filename: str
+    ):
+        """
+        Create a PVC with content
+
+        :param: name:       Name of the pvc to be created
+        :param: namespace:  Name of the namespace where the pvc will be stored
+        :param: src_folder: Folder where the file to be copied is located
+        :param: filename:   Name of the file to be copied
+        """
+        pod_name = f"copy-pod-{name}"
+        await self.create_pvc(name=name, namespace=namespace)
+        await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
+        self.copy_file_to_pod(
+            namespace=namespace,
+            pod_name=pod_name,
+            container_name="copy-container",
+            src_file=f"{src_folder}/{filename}",
+            dest_path=f"/mnt/data/{filename}",
+        )
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed creating the pvc"),
+    )
+    async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
+        """
+        Create a pod to copy content into a PVC
+
+        :param: name:       Name of the pod to be created
+        :param: namespace:  Name of the namespace where the pod will be stored
+        :param: pvc_name:   Name of the PVC that the pod will mount as a volume
+
+        """
+        pod = V1Pod(
+            api_version="v1",
+            kind="Pod",
+            metadata=client.V1ObjectMeta(name=name),
+            spec=V1PodSpec(
+                containers=[
+                    V1Container(
+                        name="copy-container",
+                        image="busybox",  # Imagen ligera para copiar archivos
+                        command=["sleep", "3600"],  # Mantén el contenedor en ejecución
+                        volume_mounts=[
+                            V1VolumeMount(mount_path="/mnt/data", name="my-storage")
+                        ],
+                    )
+                ],
+                volumes=[
+                    V1Volume(
+                        name="my-storage",
+                        persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
+                            claim_name=pvc_name
+                        ),
+                    )
+                ],
+            ),
+        )
+        # Create the pod
+        self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
+
+    @retry(
+        attempts=10,
+        delay=1,
+        fallback=Exception("Failed deleting the pod"),
+    )
+    async def delete_pod(self, name: str, namespace: str):
+        """
+        Create a namespace
+
+        :param: name:       Name of the pod to be deleted
+        :param: namespace:  Namespace
+
+        """
+        self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)