From: garciadeblas Date: Thu, 17 Jul 2025 16:26:11 +0000 (+0200) Subject: Fix retrieval of services from K8s manifests of a helm-based KDU X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=ae1d9491eddec4a75ebd71388164970e01d8e395;p=osm%2FN2VC.git Fix retrieval of services from K8s manifests of a helm-based KDU Change-Id: I001e6e64f4c5c0b1533828b27986908c5dba455b Signed-off-by: garciadeblas --- diff --git a/n2vc/k8s_helm3_conn.py b/n2vc/k8s_helm3_conn.py index 1ddcf41..68cf067 100644 --- a/n2vc/k8s_helm3_conn.py +++ b/n2vc/k8s_helm3_conn.py @@ -297,14 +297,11 @@ class K8sHelm3Connector(K8sHelmBaseConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( + command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format( kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace) ) - command2 = "{} get --namespace={} -f -".format( - self.kubectl_command, quote(namespace) - ) - output, _rc = await self._local_async_exec_pipe( - command1, command2, env=env, raise_exception_on_error=True + output, _rc = await self._local_async_exec( + command, env=env, raise_exception_on_error=True ) services = self._parse_services(output) diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index 14ad945..506b8de 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -29,8 +29,8 @@ import shlex import shutil import stat import os -import yaml from uuid import uuid4 +import yaml from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException @@ -1489,18 +1489,40 @@ class K8sHelmBaseConnector(K8sConnector): line_list.append(cell) return output_table - @staticmethod - def _parse_services(output: str) -> list: - lines = output.splitlines(keepends=False) - services = [] - for line in lines: - line = line.replace("\t", " ") - cells = line.split(sep=" ") - if len(cells) > 0 and cells[0].startswith("service/"): - elems = cells[0].split(sep="/") - if len(elems) > 1: - services.append(elems[1]) - return services + def _parse_services(self, yaml_input: str) -> list: + """ + Parses the output of a set of YAML K8s manifests to extract service names. + """ + + def get_manifest_services(manifest: dict) -> list: + """ + Extracts service names from a manifest dictionary. + """ + manifest_services = [] + if "kind" in manifest and manifest["kind"] == "Service": + if "metadata" in manifest and "name" in manifest["metadata"]: + manifest_services.append(manifest["metadata"]["name"]) + return manifest_services + + service_list = [] + self.log.debug("Parsing YAML manifests to obtain list of services...") + manifest_generator = yaml.safe_load_all(yaml_input) + i = 1 + while True: + try: + manifest = next(manifest_generator) + except StopIteration: + break + except yaml.YAMLError as e: + self.log.error("Skipping manifest %d due to YAML error: %s", i, e) + i += 1 + continue + if not manifest: + continue + manifest_services = get_manifest_services(manifest) + if manifest_services: + service_list.extend(manifest_services) + return service_list @staticmethod def _get_deep(dictionary: dict, members: tuple): @@ -1617,84 +1639,6 @@ class K8sHelmBaseConnector(K8sConnector): else: return "", -1 - async def _local_async_exec_pipe( - self, - command1: str, - command2: str, - raise_exception_on_error: bool = True, - show_error_log: bool = True, - encode_utf8: bool = False, - env: dict = None, - ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) - command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) - command = "{} | {}".format(command1, command2) - self.log.debug( - "Executing async local command: {}, env: {}".format(command, env) - ) - - # split command - command1 = shlex.split(command1) - command2 = shlex.split(command2) - - environ = os.environ.copy() - if env: - environ.update(env) - - try: - async with self.cmd_lock: - read, write = os.pipe() - process_1 = await asyncio.create_subprocess_exec( - *command1, stdout=write, env=environ - ) - os.close(write) - process_2 = await asyncio.create_subprocess_exec( - *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ - ) - os.close(read) - stdout, stderr = await process_2.communicate() - - return_code = process_2.returncode - - output = "" - if stdout: - output = stdout.decode("utf-8").strip() - # output = stdout.decode() - if stderr: - output = stderr.decode("utf-8").strip() - # output = stderr.decode() - - if return_code != 0 and show_error_log: - self.log.debug( - "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) - ) - else: - self.log.debug("Return code: {}".format(return_code)) - - if raise_exception_on_error and return_code != 0: - raise K8sException(output) - - if encode_utf8: - output = output.encode("utf-8").strip() - output = str(output).replace("\\n", "\n") - - return output, return_code - except asyncio.CancelledError: - # first, kill the processes if they are still running - for process in (process_1, process_2): - if process.returncode is None: - process.kill() - raise - except K8sException: - raise - except Exception as e: - msg = "Exception executing command: {} -> {}".format(command, e) - self.log.error(msg) - if raise_exception_on_error: - raise K8sException(e) from e - else: - return "", -1 - async def _get_service(self, cluster_id, service_name, namespace): """ Obtains the data of the specified service in the k8cluster. diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 5293c65..4e5f49e 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -237,14 +237,11 @@ class K8sHelmConnector(K8sHelmBaseConnector): cluster_name=cluster_id, create_if_not_exist=True ) - command1 = "env KUBECONFIG={} {} get manifest {} ".format( + command = "env KUBECONFIG={} {} get manifest {} ".format( kubeconfig, self._helm_command, quote(kdu_instance) ) - command2 = "{} get --namespace={} -f -".format( - self.kubectl_command, quote(namespace) - ) - output, _rc = await self._local_async_exec_pipe( - command1, command2, env=env, raise_exception_on_error=True + output, _rc = await self._local_async_exec( + command, env=env, raise_exception_on_error=True ) services = self._parse_services(output) diff --git a/n2vc/tests/unit/test_k8s_helm3_conn.py b/n2vc/tests/unit/test_k8s_helm3_conn.py index a2e75e1..6ebf46e 100644 --- a/n2vc/tests/unit/test_k8s_helm3_conn.py +++ b/n2vc/tests/unit/test_k8s_helm3_conn.py @@ -538,9 +538,7 @@ class TestK8sHelm3Conn(asynctest.TestCase): async def test_get_services(self): kdu_instance = "test_services_1" service = {"name": "testservice", "type": "LoadBalancer"} - self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock( - return_value=("", 0) - ) + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) self.helm_conn._parse_services = Mock(return_value=["testservice"]) self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service) @@ -552,12 +550,11 @@ class TestK8sHelm3Conn(asynctest.TestCase): from_path=self.cluster_id ) self.helm_conn._parse_services.assert_called_once() - command1 = ( + command = ( "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s" ).format(kdu_instance) - command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace) - self.helm_conn._local_async_exec_pipe.assert_called_once_with( - command1, command2, env=self.env, raise_exception_on_error=True + self.helm_conn._local_async_exec.assert_called_once_with( + command, env=self.env, raise_exception_on_error=True ) self.assertEqual( services, [service], "Invalid service returned from get_service" diff --git a/n2vc/tests/unit/test_k8s_helm_conn.py b/n2vc/tests/unit/test_k8s_helm_conn.py index 161471a..22462c6 100644 --- a/n2vc/tests/unit/test_k8s_helm_conn.py +++ b/n2vc/tests/unit/test_k8s_helm_conn.py @@ -472,9 +472,7 @@ class TestK8sHelmConn(asynctest.TestCase): async def test_get_services(self): kdu_instance = "test_services_1" service = {"name": "testservice", "type": "LoadBalancer"} - self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock( - return_value=("", 0) - ) + self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0)) self.helm_conn._parse_services = Mock(return_value=["testservice"]) self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service) @@ -486,12 +484,11 @@ class TestK8sHelmConn(asynctest.TestCase): from_path=self.cluster_id ) self.helm_conn._parse_services.assert_called_once() - command1 = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format( + command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format( kdu_instance ) - command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace) - self.helm_conn._local_async_exec_pipe.assert_called_once_with( - command1, command2, env=self.env, raise_exception_on_error=True + self.helm_conn._local_async_exec.assert_called_once_with( + command, env=self.env, raise_exception_on_error=True ) self.assertEqual( services, [service], "Invalid service returned from get_service"