Fix retrieval of services from K8s manifests of a helm-based KDU 86/15286/1
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 17 Jul 2025 15:00:12 +0000 (17:00 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 17 Jul 2025 16:10:45 +0000 (18:10 +0200)
Change-Id: I31ca19f3484df4a74d5c06541f7409c57a7588d4
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_lcm/n2vc/k8s_helm3_conn.py
osm_lcm/n2vc/k8s_helm_base_conn.py
osm_lcm/n2vc/tests/unit/test_k8s_helm3_conn.py

index 98f4beb..29e4349 100644 (file)
@@ -338,14 +338,11 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
-        command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+        command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
             kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
         )
-        command2 = "{} get --namespace={} -f -".format(
-            self.kubectl_command, quote(namespace)
-        )
-        output, _rc = await self._local_async_exec_pipe(
-            command1, command2, env=env, raise_exception_on_error=True
+        output, _rc = await self._local_async_exec(
+            command, env=env, raise_exception_on_error=True
         )
         services = self._parse_services(output)
 
index d069046..b99ce73 100644 (file)
@@ -29,9 +29,9 @@ import shlex
 import shutil
 import stat
 import os
-import yaml
 from uuid import uuid4
 from urllib.parse import urlparse
+import yaml
 
 from osm_lcm.n2vc.config import EnvironConfig
 from osm_lcm.n2vc.exceptions import K8sException
@@ -1546,18 +1546,40 @@ class K8sHelmBaseConnector(K8sConnector):
                     line_list.append(cell)
         return output_table
 
-    @staticmethod
-    def _parse_services(output: str) -> list:
-        lines = output.splitlines(keepends=False)
-        services = []
-        for line in lines:
-            line = line.replace("\t", " ")
-            cells = line.split(sep=" ")
-            if len(cells) > 0 and cells[0].startswith("service/"):
-                elems = cells[0].split(sep="/")
-                if len(elems) > 1:
-                    services.append(elems[1])
-        return services
+    def _parse_services(self, yaml_input: str) -> list:
+        """
+        Parses the output of a command to extract service names.
+        """
+
+        def get_manifest_services(manifest: dict) -> list:
+            """
+            Extracts service names from a manifest dictionary.
+            """
+            manifest_services = []
+            if "kind" in manifest and manifest["kind"] == "Service":
+                if "metadata" in manifest and "name" in manifest["metadata"]:
+                    manifest_services.append(manifest["metadata"]["name"])
+            return manifest_services
+
+        service_list = []
+        self.log.debug("Parsing YAML manifests to obtain list of services...")
+        manifest_generator = yaml.safe_load_all(yaml_input)
+        i = 1
+        while True:
+            try:
+                manifest = next(manifest_generator)
+            except StopIteration:
+                break
+            except yaml.YAMLError as e:
+                self.log.error("Skipping manifest %d due to YAML error: %s", i, e)
+                i += 1
+                continue
+            if not manifest:
+                continue
+            manifest_services = get_manifest_services(manifest)
+            if manifest_services:
+                service_list.extend(manifest_services)
+        return service_list
 
     @staticmethod
     def _get_deep(dictionary: dict, members: tuple):
@@ -1674,85 +1696,6 @@ class K8sHelmBaseConnector(K8sConnector):
             else:
                 return "", -1
 
-    async def _local_async_exec_pipe(
-        self,
-        command1: str,
-        command2: str,
-        raise_exception_on_error: bool = True,
-        show_error_log: bool = True,
-        encode_utf8: bool = False,
-        env: dict = None,
-    ):
-        command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
-        command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
-        command = "{} | {}".format(command1, command2)
-        self.log.debug(
-            "Executing async local command: {}, env: {}".format(command, env)
-        )
-
-        # split command
-        command1 = shlex.split(command1)
-        command2 = shlex.split(command2)
-
-        environ = os.environ.copy()
-        if env:
-            environ.update(env)
-
-        process_1 = None
-        try:
-            async with self.cmd_lock:
-                read, write = os.pipe()
-                process_1 = await asyncio.create_subprocess_exec(
-                    *command1, stdout=write, env=environ
-                )
-                os.close(write)
-                process_2 = await asyncio.create_subprocess_exec(
-                    *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
-                )
-                os.close(read)
-                stdout, stderr = await process_2.communicate()
-
-                return_code = process_2.returncode
-
-            output = ""
-            if stdout:
-                output = stdout.decode("utf-8").strip()
-                # output = stdout.decode()
-            if stderr:
-                output = stderr.decode("utf-8").strip()
-                # output = stderr.decode()
-
-            if return_code != 0 and show_error_log:
-                self.log.debug(
-                    "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
-                )
-            else:
-                self.log.debug("Return code: {}".format(return_code))
-
-            if raise_exception_on_error and return_code != 0:
-                raise K8sException(output)
-
-            if encode_utf8:
-                output = output.encode("utf-8").strip()
-                output = str(output).replace("\\n", "\n")
-
-            return output, return_code
-        except asyncio.CancelledError:
-            # first, kill the processes if they are still running
-            for process in (process_1, process_2):
-                if process.returncode is None:
-                    process.kill()
-            raise
-        except K8sException:
-            raise
-        except Exception as e:
-            msg = "Exception executing command: {} -> {}".format(command, e)
-            self.log.error(msg)
-            if raise_exception_on_error:
-                raise K8sException(e) from e
-            else:
-                return "", -1
-
     async def _get_service(self, cluster_id, service_name, namespace):
         """
         Obtains the data of the specified service in the k8cluster.
index 3ff9197..358bb43 100644 (file)
@@ -542,9 +542,7 @@ class TestK8sHelm3Conn(asynctest.TestCase):
     async def test_get_services(self):
         kdu_instance = "test_services_1"
         service = {"name": "testservice", "type": "LoadBalancer"}
-        self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
-            return_value=("", 0)
-        )
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=(0, ""))
         self.helm_conn._parse_services = Mock(return_value=["testservice"])
         self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
 
@@ -556,12 +554,11 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             from_path=self.cluster_id
         )
         self.helm_conn._parse_services.assert_called_once()
-        command1 = (
+        command = (
             "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s"
         ).format(kdu_instance)
-        command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
-        self.helm_conn._local_async_exec_pipe.assert_called_once_with(
-            command1, command2, env=self.env, raise_exception_on_error=True
+        self.helm_conn._local_async_exec.assert_called_once_with(
+            command, env=self.env, raise_exception_on_error=True
         )
         self.assertEqual(
             services, [service], "Invalid service returned from get_service"