From d367232b4cb0620cb82a1c23de28db6aaaf6b57e Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Thu, 17 Jul 2025 17:00:12 +0200 Subject: [PATCH] Fix retrieval of services from K8s manifests of a helm-based KDU Change-Id: I31ca19f3484df4a74d5c06541f7409c57a7588d4 Signed-off-by: garciadeblas --- osm_lcm/n2vc/k8s_helm3_conn.py | 9 +- osm_lcm/n2vc/k8s_helm_base_conn.py | 127 +++++------------- .../n2vc/tests/unit/test_k8s_helm3_conn.py | 11 +- 3 files changed, 42 insertions(+), 105 deletions(-) diff --git a/osm_lcm/n2vc/k8s_helm3_conn.py b/osm_lcm/n2vc/k8s_helm3_conn.py index 98f4bebb..29e43490 100644 --- a/osm_lcm/n2vc/k8s_helm3_conn.py +++ b/osm_lcm/n2vc/k8s_helm3_conn.py @@ -338,14 +338,11 @@ class K8sHelm3Connector(K8sHelmBaseConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( + command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace) ) - command2 = "{} get --namespace={} -f -".format( - self.kubectl_command, quote(namespace) - ) - output, _rc = await self._local_async_exec_pipe( - command1, command2, env=env, raise_exception_on_error=True + output, _rc = await self._local_async_exec( + command, env=env, raise_exception_on_error=True ) services = self._parse_services(output) diff --git a/osm_lcm/n2vc/k8s_helm_base_conn.py b/osm_lcm/n2vc/k8s_helm_base_conn.py index d0690464..b99ce738 100644 --- a/osm_lcm/n2vc/k8s_helm_base_conn.py +++ b/osm_lcm/n2vc/k8s_helm_base_conn.py @@ -29,9 +29,9 @@ import shlex import shutil import stat import os -import yaml from uuid import uuid4 from urllib.parse import urlparse +import yaml from osm_lcm.n2vc.config import EnvironConfig from osm_lcm.n2vc.exceptions import K8sException @@ -1546,18 +1546,40 @@ class K8sHelmBaseConnector(K8sConnector): line_list.append(cell) return output_table - @staticmethod - def _parse_services(output: str) -> list: - lines = output.splitlines(keepends=False) - services = [] - for line in lines: - line = line.replace("\t", " ") - cells = line.split(sep=" ") - if len(cells) > 0 and cells[0].startswith("service/"): - elems = cells[0].split(sep="/") - if len(elems) > 1: - services.append(elems[1]) - return services + def _parse_services(self, yaml_input: str) -> list: + """ + Parses the output of a command to extract service names. + """ + + def get_manifest_services(manifest: dict) -> list: + """ + Extracts service names from a manifest dictionary. + """ + manifest_services = [] + if "kind" in manifest and manifest["kind"] == "Service": + if "metadata" in manifest and "name" in manifest["metadata"]: + manifest_services.append(manifest["metadata"]["name"]) + return manifest_services + + service_list = [] + self.log.debug("Parsing YAML manifests to obtain list of services...") + manifest_generator = yaml.safe_load_all(yaml_input) + i = 1 + while True: + try: + manifest = next(manifest_generator) + except StopIteration: + break + except yaml.YAMLError as e: + self.log.error("Skipping manifest %d due to YAML error: %s", i, e) + i += 1 + continue + if not manifest: + continue + manifest_services = get_manifest_services(manifest) + if manifest_services: + service_list.extend(manifest_services) + return service_list @staticmethod def _get_deep(dictionary: dict, members: tuple): @@ -1674,85 +1696,6 @@ class K8sHelmBaseConnector(K8sConnector): else: return "", -1 - async def _local_async_exec_pipe( - self, - command1: str, - command2: str, - raise_exception_on_error: bool = True, - show_error_log: bool = True, - encode_utf8: bool = False, - env: dict = None, - ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) - command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) - command = "{} | {}".format(command1, command2) - self.log.debug( - "Executing async local command: {}, env: {}".format(command, env) - ) - - # split command - command1 = shlex.split(command1) - command2 = shlex.split(command2) - - environ = os.environ.copy() - if env: - environ.update(env) - - process_1 = None - try: - async with self.cmd_lock: - read, write = os.pipe() - process_1 = await asyncio.create_subprocess_exec( - *command1, stdout=write, env=environ - ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() - - return_code = process_2.returncode - - output = "" - if stdout: - output = stdout.decode("utf-8").strip() - # output = stdout.decode() - if stderr: - output = stderr.decode("utf-8").strip() - # output = stderr.decode() - - if return_code != 0 and show_error_log: - self.log.debug( - "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) - ) - else: - self.log.debug("Return code: {}".format(return_code)) - - if raise_exception_on_error and return_code != 0: - raise K8sException(output) - - if encode_utf8: - output = output.encode("utf-8").strip() - output = str(output).replace("\\n", "\n") - - return output, return_code - except asyncio.CancelledError: - # first, kill the processes if they are still running - for process in (process_1, process_2): - if process.returncode is None: - process.kill() - raise - except K8sException: - raise - except Exception as e: - msg = "Exception executing command: {} -> {}".format(command, e) - self.log.error(msg) - if raise_exception_on_error: - raise K8sException(e) from e - else: - return "", -1 - async def _get_service(self, cluster_id, service_name, namespace): """ Obtains the data of the specified service in the k8cluster. diff --git a/osm_lcm/n2vc/tests/unit/test_k8s_helm3_conn.py b/osm_lcm/n2vc/tests/unit/test_k8s_helm3_conn.py index 3ff9197d..358bb434 100644 --- a/osm_lcm/n2vc/tests/unit/test_k8s_helm3_conn.py +++ b/osm_lcm/n2vc/tests/unit/test_k8s_helm3_conn.py @@ -542,9 +542,7 @@ class TestK8sHelm3Conn(asynctest.TestCase): async def test_get_services(self): kdu_instance = "test_services_1" service = {"name": "testservice", "type": "LoadBalancer"} - self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock( - return_value=("", 0) - ) + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=(0, "")) self.helm_conn._parse_services = Mock(return_value=["testservice"]) self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service) @@ -556,12 +554,11 @@ class TestK8sHelm3Conn(asynctest.TestCase): from_path=self.cluster_id ) self.helm_conn._parse_services.assert_called_once() - command1 = ( + command = ( "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s" ).format(kdu_instance) - command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace) - self.helm_conn._local_async_exec_pipe.assert_called_once_with( - command1, command2, env=self.env, raise_exception_on_error=True + self.helm_conn._local_async_exec.assert_called_once_with( + command, env=self.env, raise_exception_on_error=True ) self.assertEqual( services, [service], "Invalid service returned from get_service" -- 2.25.1