from juju.model import Model
from juju.machine import Machine
from juju.application import Application
-from juju.client._definitions import FullStatus, QueryApplicationOffersResults
+from juju.client._definitions import (
+ FullStatus,
+ QueryApplicationOffersResults,
+ Cloud,
+ CloudCredential,
+)
from n2vc.juju_watcher import JujuModelWatcher
from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.n2vc_conn import N2VCConnector
"""
if not model_names:
raise Exception(
- "model_names must be a non-empty array. Given value: {}".format(model_names)
+ "model_names must be a non-empty array. Given value: {}".format(
+ model_names
+ )
)
non_existing_models = []
models = await self.list_models()
if await u.is_leader_from_status():
unit = u
if unit is None:
- raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
+ raise JujuLeaderUnitNotFound(
+ "Cannot execute action: leader unit not found"
+ )
actions = await application.get_actions()
await self.disconnect_model(model)
await self.disconnect_controller(controller)
+ async def get_metrics(self, model_name: str, application_name: str) -> dict:
+ """Get the metrics collected by the VCA.
+
+ :param model_name The name or unique id of the network service
+ :param application_name The name of the application
+ """
+ if not model_name or not application_name:
+ raise Exception("model_name and application_name must be non-empty strings")
+ metrics = {}
+ controller = await self.get_controller()
+ model = await self.get_model(controller, model_name)
+ try:
+ application = self._get_application(model, application_name)
+ if application is not None:
+ metrics = await application.get_metrics()
+ finally:
+ self.disconnect_model(model)
+ self.disconnect_controller(controller)
+ return metrics
+
async def add_relation(
- self,
- model_name: str,
- endpoint_1: str,
- endpoint_2: str,
+ self, model_name: str, endpoint_1: str, endpoint_2: str,
):
"""Add relation
# wait for machine removal
machines = await model.get_machines()
while machine_id in machines and time.time() < end:
- self.log.debug(
- "Waiting for machine {} is destroyed".format(machine_id)
- )
+ self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
await asyncio.sleep(0.5)
machines = await model.get_machines()
self.log.debug("Machine destroyed: {}".format(machine_id))
return await controller.list_offers(model_name)
finally:
await self.disconnect_controller(controller)
+
+ async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
+ """
+ Add a Kubernetes cloud to the controller
+
+ Similar to the `juju add-k8s` command in the CLI
+
+ :param: name: Name for the K8s cloud
+ :param: auth_data: Dictionary with needed credentials. Format:
+ {
+ "server": "192.168.0.21:16443",
+ "cacert": "-----BEGIN CERTIFI...",
+ "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
+
+ }
+ :param: storage_class: Storage Class to use in the cloud
+ """
+
+ required_auth_data_keys = ["server", "cacert", "token"]
+ missing_keys = []
+ for k in required_auth_data_keys:
+ if k not in auth_data:
+ missing_keys.append(k)
+ if missing_keys:
+ raise Exception(
+ "missing keys in auth_data: {}".format(",".join(missing_keys))
+ )
+ if not storage_class:
+ raise Exception("storage_class must be a non-empty string")
+ if not name:
+ raise Exception("name must be a non-empty string")
+
+ endpoint = auth_data["server"]
+ cacert = auth_data["cacert"]
+ token = auth_data["token"]
+ region_name = "{}-region".format(name)
+
+ cloud = client.Cloud(
+ auth_types=["certificate"],
+ ca_certificates=[cacert],
+ endpoint=endpoint,
+ config={
+ "operator-storage": storage_class,
+ "workload-storage": storage_class,
+ },
+ regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
+ type_="kubernetes",
+ )
+
+ cred = client.CloudCredential(
+ auth_type="certificate",
+ attrs={"ClientCertificateData": cacert, "Token": token},
+ )
+ return await self.add_cloud(name, cloud, cred)
+
+ async def add_cloud(
+ self, name: str, cloud: Cloud, credential: CloudCredential = None
+ ) -> Cloud:
+ """
+ Add cloud to the controller
+
+ :param: name: Name of the cloud to be added
+ :param: cloud: Cloud object
+ :param: credential: CloudCredentials object for the cloud
+ """
+ controller = await self.get_controller()
+ try:
+ _ = await controller.add_cloud(name, cloud)
+ if credential:
+ await controller.add_credential(name, credential=credential, cloud=name)
+ # Need to return the object returned by the controller.add_cloud() function
+ # I'm returning the original value now until this bug is fixed:
+ # https://github.com/juju/python-libjuju/issues/443
+ return cloud
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def remove_cloud(self, name: str):
+ """
+ Remove cloud
+
+ :param: name: Name of the cloud to be removed
+ """
+ controller = await self.get_controller()
+ try:
+ await controller.remove_cloud(name)
+ finally:
+ await self.disconnect_controller(controller)