from juju.model import Model
from juju.machine import Machine
from juju.application import Application
-from juju.client._definitions import FullStatus
+from juju.client._definitions import (
+ FullStatus,
+ QueryApplicationOffersResults,
+ Cloud,
+ CloudCredential,
+)
from n2vc.juju_watcher import JujuModelWatcher
from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.n2vc_conn import N2VCConnector
from n2vc.exceptions import (
JujuMachineNotFound,
JujuApplicationNotFound,
+ JujuLeaderUnitNotFound,
+ JujuActionNotFound,
JujuModelAlreadyExists,
JujuControllerFailedConnecting,
JujuApplicationExists,
self.log = log or logging.getLogger("Libjuju")
self.db = db
- self.endpoints = self._get_api_endpoints_db() or [endpoint]
+ db_endpoints = self._get_api_endpoints_db()
+ self.endpoints = db_endpoints or [endpoint]
+ if db_endpoints is None:
+ self._update_api_endpoints_db(self.endpoints)
self.api_proxy = api_proxy
self.username = username
self.password = password
if need_to_disconnect:
await self.disconnect_controller(controller)
+ async def models_exist(self, model_names: [str]) -> (bool, list):
+ """
+ Check if models exists
+
+ :param: model_names: List of strings with model names
+
+ :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
+ """
+ if not model_names:
+ raise Exception(
+ "model_names must be a non-empty array. Given value: {}".format(
+ model_names
+ )
+ )
+ non_existing_models = []
+ models = await self.list_models()
+ existing_models = list(set(models).intersection(model_names))
+ non_existing_models = list(set(model_names) - set(existing_models))
+
+ return (
+ len(non_existing_models) == 0,
+ non_existing_models,
+ )
+
async def get_model_status(self, model_name: str) -> FullStatus:
"""
Get model status
machine_id, model_name
)
)
- machine = model.machines[machine_id]
+ machine = machines[machine_id]
else:
raise JujuMachineNotFound("Machine {} not found".format(machine_id))
connection=connection,
nonce=params.nonce,
machine_id=machine_id,
- api=self.api_proxy,
+ proxy=self.api_proxy,
)
)
:param: application_name: Application name
:param: model_name: Model name
- :param: cloud_name: Cloud name
:param: action_name: Name of the action
:param: db_dict: Dictionary with data of the DB to write the updates
:param: progress_timeout: Maximum time between two updates in the model
if await u.is_leader_from_status():
unit = u
if unit is None:
- raise Exception("Cannot execute action: leader unit not found")
+ raise JujuLeaderUnitNotFound(
+ "Cannot execute action: leader unit not found"
+ )
actions = await application.get_actions()
if action_name not in actions:
- raise Exception(
+ raise JujuActionNotFound(
"Action {} not in available actions".format(action_name)
)
action_name, action.status, application_name, model_name
)
)
- except Exception as e:
- raise e
finally:
await self.disconnect_model(model)
await self.disconnect_controller(controller)
await self.disconnect_controller(controller)
async def add_relation(
- self,
- model_name: str,
- application_name_1: str,
- application_name_2: str,
- relation_1: str,
- relation_2: str,
+ self, model_name: str, endpoint_1: str, endpoint_2: str,
):
"""Add relation
- :param: model_name: Model name
- :param: application_name_1 First application name
- :param: application_name_2: Second application name
- :param: relation_1: First relation name
- :param: relation_2: Second relation name
+ :param: model_name: Model name
+ :param: endpoint_1 First endpoint name
+ ("app:endpoint" format or directly the saas name)
+ :param: endpoint_2: Second endpoint name (^ same format)
"""
- self.log.debug("Adding relation: {} -> {}".format(relation_1, relation_2))
+ self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
# Get controller
controller = await self.get_controller()
# Get model
model = await self.get_model(controller, model_name)
- # Build relation strings
- r1 = "{}:{}".format(application_name_1, relation_1)
- r2 = "{}:{}".format(application_name_2, relation_2)
-
# Add relation
try:
- await model.add_relation(relation1=r1, relation2=r2)
+ await model.add_relation(endpoint_1, endpoint_2)
except JujuAPIError as e:
if "not found" in e.message:
self.log.warning("Relation not found: {}".format(e.message))
await self.disconnect_model(model)
await self.disconnect_controller(controller)
+ async def consume(
+ self, offer_url: str, model_name: str,
+ ):
+ """
+ Adds a remote offer to the model. Relations can be created later using "juju relate".
+
+ :param: offer_url: Offer Url
+ :param: model_name: Model name
+
+ :raises ParseError if there's a problem parsing the offer_url
+ :raises JujuError if remote offer includes and endpoint
+ :raises JujuAPIError if the operation is not successful
+ """
+ controller = await self.get_controller()
+ model = await controller.get_model(model_name)
+
+ try:
+ await model.consume(offer_url)
+ finally:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+
async def destroy_model(self, model_name: str, total_timeout: float):
"""
Destroy model
self.log.debug("Destroying model {}".format(model_name))
uuid = model.info.uuid
- # Destroy applications
- for application_name in model.applications:
- try:
- await self.destroy_application(
- model, application_name=application_name,
- )
- except Exception as e:
- self.log.error(
- "Error destroying application {} in model {}: {}".format(
- application_name, model_name, e
- )
- )
-
# Destroy machines
machines = await model.get_machines()
for machine_id in machines:
"""
machines = await model.get_machines()
if machine_id in machines:
- machine = model.machines[machine_id]
- # TODO: change this by machine.is_manual when this is upstreamed:
- # https://github.com/juju/python-libjuju/pull/396
- if "instance-id" in machine.safe_data and machine.safe_data[
- "instance-id"
- ].startswith("manual:"):
- await machine.destroy(force=True)
-
- # max timeout
- end = time.time() + total_timeout
-
- # wait for machine removal
+ machine = machines[machine_id]
+ await machine.destroy(force=True)
+ # max timeout
+ end = time.time() + total_timeout
+
+ # wait for machine removal
+ machines = await model.get_machines()
+ while machine_id in machines and time.time() < end:
+ self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
+ await asyncio.sleep(0.5)
machines = await model.get_machines()
- while machine_id in machines and time.time() < end:
- self.log.debug(
- "Waiting for machine {} is destroyed".format(machine_id)
- )
- await asyncio.sleep(0.5)
- machines = await model.get_machines()
- self.log.debug("Machine destroyed: {}".format(machine_id))
+ self.log.debug("Machine destroyed: {}".format(machine_id))
else:
self.log.debug("Machine not found: {}".format(machine_id))
finally:
await self.disconnect_controller(controller)
await asyncio.sleep(interval)
+
+ async def list_models(self, contains: str = None) -> [str]:
+ """List models with certain names
+
+ :param: contains: String that is contained in model name
+
+ :retur: [models] Returns list of model names
+ """
+
+ controller = await self.get_controller()
+ try:
+ models = await controller.list_models()
+ if contains:
+ models = [model for model in models if contains in model]
+ return models
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
+ """List models with certain names
+
+ :param: model_name: Model name
+
+ :return: Returns list of offers
+ """
+
+ controller = await self.get_controller()
+ try:
+ return await controller.list_offers(model_name)
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
+ """
+ Add a Kubernetes cloud to the controller
+
+ Similar to the `juju add-k8s` command in the CLI
+
+ :param: name: Name for the K8s cloud
+ :param: auth_data: Dictionary with needed credentials. Format:
+ {
+ "server": "192.168.0.21:16443",
+ "cacert": "-----BEGIN CERTIFI...",
+ "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
+
+ }
+ :param: storage_class: Storage Class to use in the cloud
+ """
+
+ required_auth_data_keys = ["server", "cacert", "token"]
+ missing_keys = []
+ for k in required_auth_data_keys:
+ if k not in auth_data:
+ missing_keys.append(k)
+ if missing_keys:
+ raise Exception(
+ "missing keys in auth_data: {}".format(",".join(missing_keys))
+ )
+ if not storage_class:
+ raise Exception("storage_class must be a non-empty string")
+ if not name:
+ raise Exception("name must be a non-empty string")
+
+ endpoint = auth_data["server"]
+ cacert = auth_data["cacert"]
+ token = auth_data["token"]
+ region_name = "{}-region".format(name)
+
+ cloud = client.Cloud(
+ auth_types=["certificate"],
+ ca_certificates=[cacert],
+ endpoint=endpoint,
+ config={
+ "operator-storage": storage_class,
+ "workload-storage": storage_class,
+ },
+ regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
+ type_="kubernetes",
+ )
+
+ cred = client.CloudCredential(
+ auth_type="certificate",
+ attrs={"ClientCertificateData": cacert, "Token": token},
+ )
+ return await self.add_cloud(name, cloud, cred)
+
+ async def add_cloud(
+ self, name: str, cloud: Cloud, credential: CloudCredential = None
+ ) -> Cloud:
+ """
+ Add cloud to the controller
+
+ :param: name: Name of the cloud to be added
+ :param: cloud: Cloud object
+ :param: credential: CloudCredentials object for the cloud
+ """
+ controller = await self.get_controller()
+ try:
+ _ = await controller.add_cloud(name, cloud)
+ if credential:
+ await controller.add_credential(name, credential=credential, cloud=name)
+ # Need to return the object returned by the controller.add_cloud() function
+ # I'm returning the original value now until this bug is fixed:
+ # https://github.com/juju/python-libjuju/issues/443
+ return cloud
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def remove_cloud(self, name: str):
+ """
+ Remove cloud
+
+ :param: name: Name of the cloud to be removed
+ """
+ controller = await self.get_controller()
+ try:
+ await controller.remove_cloud(name)
+ finally:
+ await self.disconnect_controller(controller)