import binascii
import base64
+from n2vc.config import ModelConfig
from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
+from n2vc.utils import obj_to_dict, obj_to_yaml
from kubernetes.client.models import (
V1ClusterRole,
)
port = vca_config["port"] if "port" in vca_config else 17070
url = "{}:{}".format(vca_config["host"], port)
- enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
- apt_mirror = vca_config.get("apt_mirror", None)
+ model_config = ModelConfig(vca_config)
username = vca_config["user"]
secret = vca_config["secret"]
ca_cert = base64_to_cacert(vca_config["ca_cert"])
self.libjuju = Libjuju(
endpoint=url,
api_proxy=None, # Not needed for k8s charms
- enable_os_upgrade=enable_os_upgrade,
- apt_mirror=apt_mirror,
+ model_config=model_config,
username=username,
password=secret,
cacert=ca_cert,
bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
)
os.chdir(previous_workdir)
+ if self.on_update_db:
+ await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
return True
async def instances_list(self, cluster_uuid: str) -> list:
raise K8sException(
"status is not completed: {} output: {}".format(status, output)
)
+ if self.on_update_db:
+ await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
return output
self,
cluster_uuid: str,
kdu_instance: str,
+ complete_status: bool = False,
+ yaml_format: bool = False
) -> dict:
"""Get the status of the KDU
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique id of the KDU instance
+ :param complete_status: To get the complete_status of the KDU
+ :param yaml_format: To get the status in proper format for NSR record
:return: Returns a dictionary containing namespace, state, resources,
- and deployment_time.
+ and deployment_time and returns complete_status if complete_status is True
"""
status = {}
+
model_status = await self.libjuju.get_model_status(kdu_instance)
- for name in model_status.applications:
- application = model_status.applications[name]
- status[name] = {"status": application["status"]["status"]}
+
+ if not complete_status:
+ for name in model_status.applications:
+ application = model_status.applications[name]
+ status[name] = {"status": application["status"]["status"]}
+ else:
+ if yaml_format:
+ return obj_to_yaml(model_status)
+ else:
+ return obj_to_dict(model_status)
return status
+ async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
+ """
+ Add all configs, actions, executed actions of all applications in a model to vcastatus dict
+
+ :param vcastatus dict: dict containing vcastatus
+ :param kdu_instance str: The unique id of the KDU instance
+
+ :return: None
+ """
+ try:
+ for model_name in vcastatus:
+ # Adding executed actions
+ vcastatus[model_name]["executedActions"] = \
+ await self.libjuju.get_executed_actions(kdu_instance)
+
+ for application in vcastatus[model_name]["applications"]:
+ # Adding application actions
+ vcastatus[model_name]["applications"][application]["actions"] = \
+ await self.libjuju.get_actions(application, kdu_instance)
+ # Adding application configs
+ vcastatus[model_name]["applications"][application]["configs"] = \
+ await self.libjuju.get_application_configs(kdu_instance, application)
+
+ except Exception as e:
+ self.log.debug("Error in updating vca status: {}".format(str(e)))
+
async def get_services(
self, cluster_uuid: str, kdu_instance: str, namespace: str
) -> list: