# contact with: nfvlabs@tid.es
##
import asyncio
+from typing import Union
import os
import yaml
)
command = (
- "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
- " {} init"
+ "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+ " {}"
).format(
self._helm_command,
paths["kube_config"],
):
self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
- "{} --kubeconfig={} --tiller-namespace={} "
- "--home={} init --client-only {} "
+ "{} init --kubeconfig={} --tiller-namespace={} "
+ "--home={} --client-only {} "
).format(
self._helm_command,
paths["kube_config"],
cluster_id: str,
kdu_instance: str,
namespace: str = None,
+ yaml_format: bool = False,
show_error_log: bool = False,
- return_text: bool = False,
- ):
+ ) -> Union[str, dict]:
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
env=env,
)
- if return_text:
+ if yaml_format:
return str(output)
if rc != 0:
except KeyError:
pass
+ # parse the manifest to a list of dictionaries
+ if "manifest" in data:
+ manifest_str = data.get("manifest")
+ manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
+
+ data["manifest"] = []
+ for doc in manifest_docs:
+ data["manifest"].append(doc)
+
# parse field 'resources'
try:
resources = str(data.get("info").get("status").get("resources"))
)
status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
+ cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
)
# extract info.status.resources-> str