cluster_name=cluster_id, create_if_not_exist=True
)
- command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+ command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
)
- command2 = "{} get --namespace={} -f -".format(
- self.kubectl_command, quote(namespace)
- )
- output, _rc = await self._local_async_exec_pipe(
- command1, command2, env=env, raise_exception_on_error=True
+ output, _rc = await self._local_async_exec(
+ command, env=env, raise_exception_on_error=True
)
services = self._parse_services(output)
import shutil
import stat
import os
-import yaml
from uuid import uuid4
from urllib.parse import urlparse
+import yaml
from osm_lcm.n2vc.config import EnvironConfig
from osm_lcm.n2vc.exceptions import K8sException
line_list.append(cell)
return output_table
- @staticmethod
- def _parse_services(output: str) -> list:
- lines = output.splitlines(keepends=False)
- services = []
- for line in lines:
- line = line.replace("\t", " ")
- cells = line.split(sep=" ")
- if len(cells) > 0 and cells[0].startswith("service/"):
- elems = cells[0].split(sep="/")
- if len(elems) > 1:
- services.append(elems[1])
- return services
+ def _parse_services(self, yaml_input: str) -> list:
+ """
+ Parses the output of a command to extract service names.
+ """
+
+ def get_manifest_services(manifest: dict) -> list:
+ """
+ Extracts service names from a manifest dictionary.
+ """
+ manifest_services = []
+ if "kind" in manifest and manifest["kind"] == "Service":
+ if "metadata" in manifest and "name" in manifest["metadata"]:
+ manifest_services.append(manifest["metadata"]["name"])
+ return manifest_services
+
+ service_list = []
+ self.log.debug("Parsing YAML manifests to obtain list of services...")
+ manifest_generator = yaml.safe_load_all(yaml_input)
+ i = 1
+ while True:
+ try:
+ manifest = next(manifest_generator)
+ except StopIteration:
+ break
+ except yaml.YAMLError as e:
+ self.log.error("Skipping manifest %d due to YAML error: %s", i, e)
+ i += 1
+ continue
+ if not manifest:
+ continue
+ manifest_services = get_manifest_services(manifest)
+ if manifest_services:
+ service_list.extend(manifest_services)
+ return service_list
@staticmethod
def _get_deep(dictionary: dict, members: tuple):
else:
return "", -1
- async def _local_async_exec_pipe(
- self,
- command1: str,
- command2: str,
- raise_exception_on_error: bool = True,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- env: dict = None,
- ):
- command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
- command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
- command = "{} | {}".format(command1, command2)
- self.log.debug(
- "Executing async local command: {}, env: {}".format(command, env)
- )
-
- # split command
- command1 = shlex.split(command1)
- command2 = shlex.split(command2)
-
- environ = os.environ.copy()
- if env:
- environ.update(env)
-
- process_1 = None
- try:
- async with self.cmd_lock:
- read, write = os.pipe()
- process_1 = await asyncio.create_subprocess_exec(
- *command1, stdout=write, env=environ
- )
- os.close(write)
- process_2 = await asyncio.create_subprocess_exec(
- *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
- )
- os.close(read)
- stdout, stderr = await process_2.communicate()
-
- return_code = process_2.returncode
-
- output = ""
- if stdout:
- output = stdout.decode("utf-8").strip()
- # output = stdout.decode()
- if stderr:
- output = stderr.decode("utf-8").strip()
- # output = stderr.decode()
-
- if return_code != 0 and show_error_log:
- self.log.debug(
- "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
- )
- else:
- self.log.debug("Return code: {}".format(return_code))
-
- if raise_exception_on_error and return_code != 0:
- raise K8sException(output)
-
- if encode_utf8:
- output = output.encode("utf-8").strip()
- output = str(output).replace("\\n", "\n")
-
- return output, return_code
- except asyncio.CancelledError:
- # first, kill the processes if they are still running
- for process in (process_1, process_2):
- if process.returncode is None:
- process.kill()
- raise
- except K8sException:
- raise
- except Exception as e:
- msg = "Exception executing command: {} -> {}".format(command, e)
- self.log.error(msg)
- if raise_exception_on_error:
- raise K8sException(e) from e
- else:
- return "", -1
-
async def _get_service(self, cluster_id, service_name, namespace):
"""
Obtains the data of the specified service in the k8cluster.
async def test_get_services(self):
kdu_instance = "test_services_1"
service = {"name": "testservice", "type": "LoadBalancer"}
- self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
- return_value=("", 0)
- )
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=(0, ""))
self.helm_conn._parse_services = Mock(return_value=["testservice"])
self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
from_path=self.cluster_id
)
self.helm_conn._parse_services.assert_called_once()
- command1 = (
+ command = (
"env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s"
).format(kdu_instance)
- command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
- self.helm_conn._local_async_exec_pipe.assert_called_once_with(
- command1, command2, env=self.env, raise_exception_on_error=True
+ self.helm_conn._local_async_exec.assert_called_once_with(
+ command, env=self.env, raise_exception_on_error=True
)
self.assertEqual(
services, [service], "Invalid service returned from get_service"