Fix retrieval of services from K8s manifests of a helm-based KDU 87/15287/1 v14.0
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 17 Jul 2025 16:26:11 +0000 (18:26 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 17 Jul 2025 16:26:11 +0000 (18:26 +0200)
Change-Id: I001e6e64f4c5c0b1533828b27986908c5dba455b
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
n2vc/k8s_helm3_conn.py
n2vc/k8s_helm_base_conn.py
n2vc/k8s_helm_conn.py
n2vc/tests/unit/test_k8s_helm3_conn.py
n2vc/tests/unit/test_k8s_helm_conn.py

index 1ddcf41..68cf067 100644 (file)
@@ -297,14 +297,11 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
-        command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+        command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
             kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
         )
-        command2 = "{} get --namespace={} -f -".format(
-            self.kubectl_command, quote(namespace)
-        )
-        output, _rc = await self._local_async_exec_pipe(
-            command1, command2, env=env, raise_exception_on_error=True
+        output, _rc = await self._local_async_exec(
+            command, env=env, raise_exception_on_error=True
         )
         services = self._parse_services(output)
 
index 14ad945..506b8de 100644 (file)
@@ -29,8 +29,8 @@ import shlex
 import shutil
 import stat
 import os
-import yaml
 from uuid import uuid4
+import yaml
 
 from n2vc.config import EnvironConfig
 from n2vc.exceptions import K8sException
@@ -1489,18 +1489,40 @@ class K8sHelmBaseConnector(K8sConnector):
                     line_list.append(cell)
         return output_table
 
-    @staticmethod
-    def _parse_services(output: str) -> list:
-        lines = output.splitlines(keepends=False)
-        services = []
-        for line in lines:
-            line = line.replace("\t", " ")
-            cells = line.split(sep=" ")
-            if len(cells) > 0 and cells[0].startswith("service/"):
-                elems = cells[0].split(sep="/")
-                if len(elems) > 1:
-                    services.append(elems[1])
-        return services
+    def _parse_services(self, yaml_input: str) -> list:
+        """
+        Parses the output of a set of YAML K8s manifests to extract service names.
+        """
+
+        def get_manifest_services(manifest: dict) -> list:
+            """
+            Extracts service names from a manifest dictionary.
+            """
+            manifest_services = []
+            if "kind" in manifest and manifest["kind"] == "Service":
+                if "metadata" in manifest and "name" in manifest["metadata"]:
+                    manifest_services.append(manifest["metadata"]["name"])
+            return manifest_services
+
+        service_list = []
+        self.log.debug("Parsing YAML manifests to obtain list of services...")
+        manifest_generator = yaml.safe_load_all(yaml_input)
+        i = 1
+        while True:
+            try:
+                manifest = next(manifest_generator)
+            except StopIteration:
+                break
+            except yaml.YAMLError as e:
+                self.log.error("Skipping manifest %d due to YAML error: %s", i, e)
+                i += 1
+                continue
+            if not manifest:
+                continue
+            manifest_services = get_manifest_services(manifest)
+            if manifest_services:
+                service_list.extend(manifest_services)
+        return service_list
 
     @staticmethod
     def _get_deep(dictionary: dict, members: tuple):
@@ -1617,84 +1639,6 @@ class K8sHelmBaseConnector(K8sConnector):
             else:
                 return "", -1
 
-    async def _local_async_exec_pipe(
-        self,
-        command1: str,
-        command2: str,
-        raise_exception_on_error: bool = True,
-        show_error_log: bool = True,
-        encode_utf8: bool = False,
-        env: dict = None,
-    ):
-        command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
-        command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
-        command = "{} | {}".format(command1, command2)
-        self.log.debug(
-            "Executing async local command: {}, env: {}".format(command, env)
-        )
-
-        # split command
-        command1 = shlex.split(command1)
-        command2 = shlex.split(command2)
-
-        environ = os.environ.copy()
-        if env:
-            environ.update(env)
-
-        try:
-            async with self.cmd_lock:
-                read, write = os.pipe()
-                process_1 = await asyncio.create_subprocess_exec(
-                    *command1, stdout=write, env=environ
-                )
-                os.close(write)
-                process_2 = await asyncio.create_subprocess_exec(
-                    *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
-                )
-                os.close(read)
-                stdout, stderr = await process_2.communicate()
-
-                return_code = process_2.returncode
-
-            output = ""
-            if stdout:
-                output = stdout.decode("utf-8").strip()
-                # output = stdout.decode()
-            if stderr:
-                output = stderr.decode("utf-8").strip()
-                # output = stderr.decode()
-
-            if return_code != 0 and show_error_log:
-                self.log.debug(
-                    "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
-                )
-            else:
-                self.log.debug("Return code: {}".format(return_code))
-
-            if raise_exception_on_error and return_code != 0:
-                raise K8sException(output)
-
-            if encode_utf8:
-                output = output.encode("utf-8").strip()
-                output = str(output).replace("\\n", "\n")
-
-            return output, return_code
-        except asyncio.CancelledError:
-            # first, kill the processes if they are still running
-            for process in (process_1, process_2):
-                if process.returncode is None:
-                    process.kill()
-            raise
-        except K8sException:
-            raise
-        except Exception as e:
-            msg = "Exception executing command: {} -> {}".format(command, e)
-            self.log.error(msg)
-            if raise_exception_on_error:
-                raise K8sException(e) from e
-            else:
-                return "", -1
-
     async def _get_service(self, cluster_id, service_name, namespace):
         """
         Obtains the data of the specified service in the k8cluster.
index 5293c65..4e5f49e 100644 (file)
@@ -237,14 +237,11 @@ class K8sHelmConnector(K8sHelmBaseConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
-        command1 = "env KUBECONFIG={} {} get manifest {} ".format(
+        command = "env KUBECONFIG={} {} get manifest {} ".format(
             kubeconfig, self._helm_command, quote(kdu_instance)
         )
-        command2 = "{} get --namespace={} -f -".format(
-            self.kubectl_command, quote(namespace)
-        )
-        output, _rc = await self._local_async_exec_pipe(
-            command1, command2, env=env, raise_exception_on_error=True
+        output, _rc = await self._local_async_exec(
+            command, env=env, raise_exception_on_error=True
         )
         services = self._parse_services(output)
 
index a2e75e1..6ebf46e 100644 (file)
@@ -538,9 +538,7 @@ class TestK8sHelm3Conn(asynctest.TestCase):
     async def test_get_services(self):
         kdu_instance = "test_services_1"
         service = {"name": "testservice", "type": "LoadBalancer"}
-        self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
-            return_value=("", 0)
-        )
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
         self.helm_conn._parse_services = Mock(return_value=["testservice"])
         self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
 
@@ -552,12 +550,11 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             from_path=self.cluster_id
         )
         self.helm_conn._parse_services.assert_called_once()
-        command1 = (
+        command = (
             "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s"
         ).format(kdu_instance)
-        command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
-        self.helm_conn._local_async_exec_pipe.assert_called_once_with(
-            command1, command2, env=self.env, raise_exception_on_error=True
+        self.helm_conn._local_async_exec.assert_called_once_with(
+            command, env=self.env, raise_exception_on_error=True
         )
         self.assertEqual(
             services, [service], "Invalid service returned from get_service"
index 161471a..22462c6 100644 (file)
@@ -472,9 +472,7 @@ class TestK8sHelmConn(asynctest.TestCase):
     async def test_get_services(self):
         kdu_instance = "test_services_1"
         service = {"name": "testservice", "type": "LoadBalancer"}
-        self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
-            return_value=("", 0)
-        )
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
         self.helm_conn._parse_services = Mock(return_value=["testservice"])
         self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
 
@@ -486,12 +484,11 @@ class TestK8sHelmConn(asynctest.TestCase):
             from_path=self.cluster_id
         )
         self.helm_conn._parse_services.assert_called_once()
-        command1 = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format(
+        command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format(
             kdu_instance
         )
-        command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
-        self.helm_conn._local_async_exec_pipe.assert_called_once_with(
-            command1, command2, env=self.env, raise_exception_on_error=True
+        self.helm_conn._local_async_exec.assert_called_once_with(
+            command, env=self.env, raise_exception_on_error=True
         )
         self.assertEqual(
             services, [service], "Invalid service returned from get_service"