import binascii
import base64
-from n2vc.config import ModelConfig
-from n2vc.exceptions import K8sException, N2VCBadArgumentsException
+from n2vc.config import EnvironConfig
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
-from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
from n2vc.utils import obj_to_dict, obj_to_yaml
-
+from n2vc.store import MotorStore
+from n2vc.vca.cloud import Cloud
+from n2vc.vca.connection import get_connection
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
class K8sJujuConnector(K8sConnector):
+ libjuju = None
+
def __init__(
self,
fs: object,
log: object = None,
loop: object = None,
on_update_db=None,
- vca_config: dict = None,
):
"""
:param fs: file system for kubernetes and helm configuration
self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
- required_vca_config = [
- "host",
- "user",
- "secret",
- "ca_cert",
- ]
- if not vca_config or not all(k in vca_config for k in required_vca_config):
- raise N2VCBadArgumentsException(
- message="Missing arguments in vca_config: {}".format(vca_config),
- bad_args=required_vca_config,
- )
- port = vca_config["port"] if "port" in vca_config else 17070
- url = "{}:{}".format(vca_config["host"], port)
- model_config = ModelConfig(vca_config)
- username = vca_config["user"]
- secret = vca_config["secret"]
- ca_cert = base64_to_cacert(vca_config["ca_cert"])
-
- self.libjuju = Libjuju(
- endpoint=url,
- api_proxy=None, # Not needed for k8s charms
- model_config=model_config,
- username=username,
- password=secret,
- cacert=ca_cert,
- loop=self.loop,
- log=self.log,
- db=self.db,
- )
+ db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
+ self._store = MotorStore(db_uri)
+ self.loading_libjuju = asyncio.Lock(loop=self.loop)
+
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
# self.authenticated = False
k8s_creds: str,
namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
+ **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
:param namespace: optional namespace to be used for juju. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
)
default_storage_class = kubectl.get_default_storage_class()
- await self.libjuju.add_k8s(
+ await libjuju.add_k8s(
name=cluster_uuid,
rbac_id=rbac_id,
token=token,
"""Reset"""
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
"""Reset a cluster
Resets the Kubernetes cluster by removing the model that represents it.
:param cluster_uuid str: The UUID of the cluster to reset
+ :param force: Force reset
+ :param uninstall_sw: Boolean to uninstall sw
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: Returns True if successful or raises an exception.
"""
try:
self.log.debug("[reset] Removing k8s cloud")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- cloud_creds = await self.libjuju.get_cloud_credentials(
- cluster_uuid,
- self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+
+ cloud_creds = await libjuju.get_cloud_credentials(cloud)
- await self.libjuju.remove_cloud(cluster_uuid)
+ await libjuju.remove_cloud(cluster_uuid)
kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
+ **kwargs,
) -> bool:
"""Install a bundle
:param params dict: Key-value pairs of instantiation parameters
:param kdu_name: Name of the KDU instance to be installed
:param namespace: K8s namespace to use for the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: If successful, returns ?
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
bundle = kdu_model
if not db_dict:
# Create the new model
self.log.debug("Adding model: {}".format(kdu_instance))
- await self.libjuju.add_model(
- model_name=kdu_instance,
- cloud_name=cluster_uuid,
- credential_name=self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+ await libjuju.add_model(kdu_instance, cloud)
# if model:
# TODO: Instantiation parameters
previous_workdir = "/app/storage"
self.log.debug("[install] deploying {}".format(bundle))
- await self.libjuju.deploy(
+ await libjuju.deploy(
bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
)
os.chdir(previous_workdir)
if self.on_update_db:
- await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
+ await self.on_update_db(
+ cluster_uuid,
+ kdu_instance,
+ filter=db_dict["filter"],
+ vca_id=kwargs.get("vca_id"),
+ )
+ return True
+
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ) -> bool:
+ """Scale an application in a model
+
+ :param: kdu_instance str: KDU instance name
+ :param: scale int: Scale to which to set this application
+ :param: resource_name str: Resource name (Application name)
+ :param: timeout float: The time, in seconds, to wait for the install
+ to finish
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+
+ :return: If successful, returns True
+ """
+
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ await libjuju.scale_application(
+ model_name=kdu_instance,
+ application_name=resource_name,
+ scale=scale,
+ total_timeout=total_timeout,
+ )
+ except Exception as e:
+ error_msg = "Error scaling application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e
+ )
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
return True
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> int:
+ """Get an application scale count
+
+ :param: resource_name str: Resource name (Application name)
+ :param: kdu_instance str: KDU instance name
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+ :return: Return application instance count
+ """
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ status = await libjuju.get_model_status(kdu_instance)
+ return len(status.applications[resource_name].units)
+ except Exception as e:
+ error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e
+ )
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+
async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
"""Deletion"""
- async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> bool:
"""Uninstall a KDU instance
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns True if successful, or raises an exception
"""
self.log.debug("[uninstall] Destroying model")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
+ await libjuju.destroy_model(kdu_instance, total_timeout=3600)
# self.log.debug("[uninstall] Model destroyed and disconnecting")
# await controller.disconnect()
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
+ :param db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns the output of the action
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
if not params or "application-name" not in params:
raise K8sException(
"kdu_instance: {}".format(kdu_instance)
)
application_name = params["application-name"]
- actions = await self.libjuju.get_actions(application_name, kdu_instance)
+ actions = await libjuju.get_actions(application_name, kdu_instance)
if primitive_name not in actions:
raise K8sException("Primitive {} not found".format(primitive_name))
- output, status = await self.libjuju.execute_action(
+ output, status = await libjuju.execute_action(
application_name, kdu_instance, primitive_name, **params
)
"status is not completed: {} output: {}".format(status, output)
)
if self.on_update_db:
- await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
+ await self.on_update_db(
+ cluster_uuid, kdu_instance, filter=db_dict["filter"]
+ )
return output
cluster_uuid: str,
kdu_instance: str,
complete_status: bool = False,
- yaml_format: bool = False
+ yaml_format: bool = False,
+ **kwargs,
) -> dict:
"""Get the status of the KDU
:param kdu_instance str: The unique id of the KDU instance
:param complete_status: To get the complete_status of the KDU
:param yaml_format: To get the status in proper format for NSR record
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns a dictionary containing namespace, state, resources,
and deployment_time and returns complete_status if complete_status is True
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
status = {}
- model_status = await self.libjuju.get_model_status(kdu_instance)
+ model_status = await libjuju.get_model_status(kdu_instance)
if not complete_status:
for name in model_status.applications:
return status
- async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
+ async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
"""
Add all configs, actions, executed actions of all applications in a model to vcastatus dict
:param vcastatus dict: dict containing vcastatus
:param kdu_instance str: The unique id of the KDU instance
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: None
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
try:
for model_name in vcastatus:
# Adding executed actions
- vcastatus[model_name]["executedActions"] = \
- await self.libjuju.get_executed_actions(kdu_instance)
+ vcastatus[model_name][
+ "executedActions"
+ ] = await libjuju.get_executed_actions(kdu_instance)
for application in vcastatus[model_name]["applications"]:
# Adding application actions
- vcastatus[model_name]["applications"][application]["actions"] = \
- await self.libjuju.get_actions(application, kdu_instance)
+ vcastatus[model_name]["applications"][application][
+ "actions"
+ ] = await libjuju.get_actions(application, kdu_instance)
# Adding application configs
- vcastatus[model_name]["applications"][application]["configs"] = \
- await self.libjuju.get_application_configs(kdu_instance, application)
+ vcastatus[model_name]["applications"][application][
+ "configs"
+ ] = await libjuju.get_application_configs(kdu_instance, application)
except Exception as e:
self.log.debug("Error in updating vca status: {}".format(str(e)))
else:
kdu_instance = db_dict["filter"]["_id"]
return kdu_instance
+
+ async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
+ """
+ Get libjuju object
+
+ :param: vca_id: VCA ID
+ If None, get a libjuju object with a Connection to the default VCA
+ Else, geta libjuju object with a Connection to the specified VCA
+ """
+ if not vca_id:
+ while self.loading_libjuju.locked():
+ await asyncio.sleep(0.1)
+ if not self.libjuju:
+ async with self.loading_libjuju:
+ vca_connection = await get_connection(self._store)
+ self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ return self.libjuju
+ else:
+ vca_connection = await get_connection(self._store, vca_id)
+ return Libjuju(
+ vca_connection,
+ loop=self.loop,
+ log=self.log,
+ n2vc=self,
+ )