:return: True if successful
"""
+ @abc.abstractmethod
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+
@abc.abstractmethod
async def inspect_kdu(
self,
return self._output_to_table(output)
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+ raise K8sException("KDUs deployed with Helm don't support actions "
+ "different from rollback, upgrade and status")
+
async def inspect_kdu(
self,
kdu_model: str,
from juju.controller import Controller
from juju.model import Model
from juju.errors import JujuAPIError, JujuError
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
return True
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+ if not self.authenticated:
+ self.log.debug("[exec_primitive] Connecting to controller")
+ await self.login(cluster_uuid)
+
+ if not params or "application-name" not in params:
+ raise K8sException("Missing application-name argument, \
+ argument needed for K8s actions")
+ try:
+ self.log.debug("[exec_primitive] Getting model "
+ "kdu_instance: {}".format(kdu_instance))
+
+ model = await self.get_model(kdu_instance, cluster_uuid)
+
+ application_name = params["application-name"]
+ application = model.applications[application_name]
+
+ actions = await application.get_actions()
+ if primitive_name not in actions:
+ raise K8sException("Primitive {} not found".format(primitive_name))
+
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ break
+
+ if unit is None:
+ raise K8sException("No leader unit found to execute action")
+
+ self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+ action = await unit.run_action(primitive_name, **params)
+
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+ status = (
+ status[action.entity_id] if action.entity_id in status else "failed"
+ )
+
+ if status != "completed":
+ raise K8sException("status is not completed: {} output: {}".format(status, output))
+
+ return output
+
+ except Exception as e:
+ error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+
"""Introspection"""
async def inspect_kdu(
self,
model = None
models = await self.controller.list_models()
- self.log.debug(models)
if model_name in models:
self.log.debug("Found model: {}".format(model_name))
model = await self.controller.get_model(