from .exceptions import MethodNotImplemented
from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
+from n2vc.utils import obj_to_dict, obj_to_yaml
from kubernetes.client.models import (
V1ClusterRole,
bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
)
os.chdir(previous_workdir)
+ if self.on_update_db:
+ await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
return True
async def instances_list(self, cluster_uuid: str) -> list:
raise K8sException(
"status is not completed: {} output: {}".format(status, output)
)
+ if self.on_update_db:
+ await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
return output
self,
cluster_uuid: str,
kdu_instance: str,
+ complete_status: bool = False,
+ yaml_format: bool = False
) -> dict:
"""Get the status of the KDU
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique id of the KDU instance
+ :param complete_status: To get the complete_status of the KDU
+ :param yaml_format: To get the status in proper format for NSR record
:return: Returns a dictionary containing namespace, state, resources,
- and deployment_time.
+ and deployment_time and returns complete_status if complete_status is True
"""
status = {}
+
model_status = await self.libjuju.get_model_status(kdu_instance)
- for name in model_status.applications:
- application = model_status.applications[name]
- status[name] = {"status": application["status"]["status"]}
+
+ if not complete_status:
+ for name in model_status.applications:
+ application = model_status.applications[name]
+ status[name] = {"status": application["status"]["status"]}
+ else:
+ if yaml_format:
+ return obj_to_yaml(model_status)
+ else:
+ return obj_to_dict(model_status)
return status
+ async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
+ """
+ Add all configs, actions, executed actions of all applications in a model to vcastatus dict
+
+ :param vcastatus dict: dict containing vcastatus
+ :param kdu_instance str: The unique id of the KDU instance
+
+ :return: None
+ """
+ try:
+ for model_name in vcastatus:
+ # Adding executed actions
+ vcastatus[model_name]["executedActions"] = \
+ await self.libjuju.get_executed_actions(kdu_instance)
+
+ for application in vcastatus[model_name]["applications"]:
+ # Adding application actions
+ vcastatus[model_name]["applications"][application]["actions"] = \
+ await self.libjuju.get_actions(application, kdu_instance)
+ # Adding application configs
+ vcastatus[model_name]["applications"][application]["configs"] = \
+ await self.libjuju.get_application_configs(kdu_instance, application)
+
+ except Exception as e:
+ self.log.debug("Error in updating vca status: {}".format(str(e)))
+
async def get_services(
self, cluster_uuid: str, kdu_instance: str, namespace: str
) -> list:
import base64
import re
import binascii
+import yaml
from enum import Enum
from juju.machine import Machine
from juju.application import Application
)
}
)
+
+
+def obj_to_yaml(obj: object) -> str:
+ """
+ Converts object to yaml format
+ :return: yaml data
+ """
+ # dump to yaml
+ dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
+ # split lines
+ lines = dump_text.splitlines()
+ # remove !!python/object tags
+ yaml_text = ""
+ for line in lines:
+ index = line.find("!!python/object")
+ if index >= 0:
+ line = line[:index]
+ yaml_text += line + "\n"
+ return yaml_text
+
+
+def obj_to_dict(obj: object) -> dict:
+ """
+ Converts object to dictionary format
+ :return: dict data
+ """
+ # convert obj to yaml
+ yaml_text = obj_to_yaml(obj)
+ # parse to dict
+ return yaml.load(yaml_text, Loader=yaml.Loader)