import binascii
import base64
-from n2vc.exceptions import K8sException, N2VCBadArgumentsException
+from n2vc.config import EnvironConfig
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
-from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
-
+from n2vc.utils import obj_to_dict, obj_to_yaml
+from n2vc.store import MotorStore
+from n2vc.vca.cloud import Cloud
+from n2vc.vca.connection import get_connection
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
class K8sJujuConnector(K8sConnector):
+ libjuju = None
+
def __init__(
self,
fs: object,
log: object = None,
loop: object = None,
on_update_db=None,
- vca_config: dict = None,
):
"""
:param fs: file system for kubernetes and helm configuration
self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
- required_vca_config = [
- "host",
- "user",
- "secret",
- "ca_cert",
- ]
- if not vca_config or not all(k in vca_config for k in required_vca_config):
- raise N2VCBadArgumentsException(
- message="Missing arguments in vca_config: {}".format(vca_config),
- bad_args=required_vca_config,
- )
- port = vca_config["port"] if "port" in vca_config else 17070
- url = "{}:{}".format(vca_config["host"], port)
- enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
- apt_mirror = vca_config.get("apt_mirror", None)
- username = vca_config["user"]
- secret = vca_config["secret"]
- ca_cert = base64_to_cacert(vca_config["ca_cert"])
-
- self.libjuju = Libjuju(
- endpoint=url,
- api_proxy=None, # Not needed for k8s charms
- enable_os_upgrade=enable_os_upgrade,
- apt_mirror=apt_mirror,
- username=username,
- password=secret,
- cacert=ca_cert,
- loop=self.loop,
- log=self.log,
- db=self.db,
- )
+ db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
+ self._store = MotorStore(db_uri)
+ self.loading_libjuju = asyncio.Lock(loop=self.loop)
+
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
# self.authenticated = False
k8s_creds: str,
namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
+ **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
:param namespace: optional namespace to be used for juju. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
)
default_storage_class = kubectl.get_default_storage_class()
- await self.libjuju.add_k8s(
+ await libjuju.add_k8s(
name=cluster_uuid,
rbac_id=rbac_id,
token=token,
"""Reset"""
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
"""Reset a cluster
Resets the Kubernetes cluster by removing the model that represents it.
:param cluster_uuid str: The UUID of the cluster to reset
+ :param force: Force reset
+ :param uninstall_sw: Boolean to uninstall sw
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: Returns True if successful or raises an exception.
"""
try:
self.log.debug("[reset] Removing k8s cloud")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- cloud_creds = await self.libjuju.get_cloud_credentials(
- cluster_uuid,
- self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+
+ cloud_creds = await libjuju.get_cloud_credentials(cloud)
- await self.libjuju.remove_cloud(cluster_uuid)
+ await libjuju.remove_cloud(cluster_uuid)
kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
self,
cluster_uuid: str,
kdu_model: str,
+ kdu_instance: str,
atomic: bool = True,
timeout: float = 1800,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
+ **kwargs,
) -> bool:
"""Install a bundle
:param cluster_uuid str: The UUID of the cluster to install to
:param kdu_model str: The name or path of a bundle to install
+ :param kdu_instance: Kdu instance name
:param atomic bool: If set, waits until the model is active and resets
the cluster on failure.
:param timeout int: The time, in seconds, to wait for the install
:param params dict: Key-value pairs of instantiation parameters
:param kdu_name: Name of the KDU instance to be installed
:param namespace: K8s namespace to use for the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: If successful, returns ?
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
bundle = kdu_model
if not db_dict:
os.chdir(new_workdir)
bundle = "local:{}".format(kdu_model)
- if kdu_name:
- kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
- else:
- kdu_instance = db_dict["filter"]["_id"]
-
self.log.debug("Checking for model named {}".format(kdu_instance))
# Create the new model
self.log.debug("Adding model: {}".format(kdu_instance))
- await self.libjuju.add_model(
- model_name=kdu_instance,
- cloud_name=cluster_uuid,
- credential_name=self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+ await libjuju.add_model(kdu_instance, cloud)
# if model:
# TODO: Instantiation parameters
previous_workdir = "/app/storage"
self.log.debug("[install] deploying {}".format(bundle))
- await self.libjuju.deploy(
+ await libjuju.deploy(
bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
)
os.chdir(previous_workdir)
- return kdu_instance
+ if self.on_update_db:
+ await self.on_update_db(
+ cluster_uuid,
+ kdu_instance,
+ filter=db_dict["filter"],
+ vca_id=kwargs.get("vca_id"),
+ )
+ return True
+
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ) -> bool:
+ """Scale an application in a model
+
+ :param: kdu_instance str: KDU instance name
+ :param: scale int: Scale to which to set this application
+ :param: resource_name str: Resource name (Application name)
+ :param: timeout float: The time, in seconds, to wait for the install
+ to finish
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+
+ :return: If successful, returns True
+ """
+
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ await libjuju.scale_application(
+ model_name=kdu_instance,
+ application_name=resource_name,
+ scale=scale,
+ total_timeout=total_timeout,
+ )
+ except Exception as e:
+ error_msg = "Error scaling application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e
+ )
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+ return True
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> int:
+ """Get an application scale count
+
+ :param: resource_name str: Resource name (Application name)
+ :param: kdu_instance str: KDU instance name
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+ :return: Return application instance count
+ """
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ status = await libjuju.get_model_status(kdu_instance)
+ return len(status.applications[resource_name].units)
+ except Exception as e:
+ error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e
+ )
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
async def instances_list(self, cluster_uuid: str) -> list:
"""
"""Deletion"""
- async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> bool:
"""Uninstall a KDU instance
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns True if successful, or raises an exception
"""
self.log.debug("[uninstall] Destroying model")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
+ await libjuju.destroy_model(kdu_instance, total_timeout=3600)
# self.log.debug("[uninstall] Model destroyed and disconnecting")
# await controller.disconnect()
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
+ :param db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns the output of the action
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
if not params or "application-name" not in params:
raise K8sException(
"kdu_instance: {}".format(kdu_instance)
)
application_name = params["application-name"]
- actions = await self.libjuju.get_actions(application_name, kdu_instance)
+ actions = await libjuju.get_actions(application_name, kdu_instance)
if primitive_name not in actions:
raise K8sException("Primitive {} not found".format(primitive_name))
- output, status = await self.libjuju.execute_action(
+ output, status = await libjuju.execute_action(
application_name, kdu_instance, primitive_name, **params
)
raise K8sException(
"status is not completed: {} output: {}".format(status, output)
)
+ if self.on_update_db:
+ await self.on_update_db(
+ cluster_uuid, kdu_instance, filter=db_dict["filter"]
+ )
return output
self,
cluster_uuid: str,
kdu_instance: str,
+ complete_status: bool = False,
+ yaml_format: bool = False,
+ **kwargs,
) -> dict:
"""Get the status of the KDU
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique id of the KDU instance
+ :param complete_status: To get the complete_status of the KDU
+ :param yaml_format: To get the status in proper format for NSR record
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns a dictionary containing namespace, state, resources,
- and deployment_time.
+ and deployment_time and returns complete_status if complete_status is True
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
status = {}
- model_status = await self.libjuju.get_model_status(kdu_instance)
- for name in model_status.applications:
- application = model_status.applications[name]
- status[name] = {"status": application["status"]["status"]}
+
+ model_status = await libjuju.get_model_status(kdu_instance)
+
+ if not complete_status:
+ for name in model_status.applications:
+ application = model_status.applications[name]
+ status[name] = {"status": application["status"]["status"]}
+ else:
+ if yaml_format:
+ return obj_to_yaml(model_status)
+ else:
+ return obj_to_dict(model_status)
return status
+ async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
+ """
+ Add all configs, actions, executed actions of all applications in a model to vcastatus dict
+
+ :param vcastatus dict: dict containing vcastatus
+ :param kdu_instance str: The unique id of the KDU instance
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
+ :return: None
+ """
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ try:
+ for model_name in vcastatus:
+ # Adding executed actions
+ vcastatus[model_name][
+ "executedActions"
+ ] = await libjuju.get_executed_actions(kdu_instance)
+
+ for application in vcastatus[model_name]["applications"]:
+ # Adding application actions
+ vcastatus[model_name]["applications"][application][
+ "actions"
+ ] = await libjuju.get_actions(application, kdu_instance)
+ # Adding application configs
+ vcastatus[model_name]["applications"][application][
+ "configs"
+ ] = await libjuju.get_application_configs(kdu_instance, application)
+
+ except Exception as e:
+ self.log.debug("Error in updating vca status: {}".format(str(e)))
+
async def get_services(
self, cluster_uuid: str, kdu_instance: str, namespace: str
) -> list:
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ @staticmethod
+ def generate_kdu_instance_name(**kwargs):
+ db_dict = kwargs.get("db_dict")
+ kdu_name = kwargs.get("kdu_name", None)
+ if kdu_name:
+ kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+ else:
+ kdu_instance = db_dict["filter"]["_id"]
+ return kdu_instance
+
+ async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
+ """
+ Get libjuju object
+
+ :param: vca_id: VCA ID
+ If None, get a libjuju object with a Connection to the default VCA
+ Else, geta libjuju object with a Connection to the specified VCA
+ """
+ if not vca_id:
+ while self.loading_libjuju.locked():
+ await asyncio.sleep(0.1)
+ if not self.libjuju:
+ async with self.loading_libjuju:
+ vca_connection = await get_connection(self._store)
+ self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ return self.libjuju
+ else:
+ vca_connection = await get_connection(self._store, vca_id)
+ return Libjuju(
+ vca_connection,
+ loop=self.loop,
+ log=self.log,
+ n2vc=self,
+ )