blob: 9945c91ee3bf55382660b02fba9495c309720ab2 [file] [log] [blame]
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from juju.controller import Controller
from juju.client.connector import NoConnectionException
from juju.client import client
import time
from juju.errors import JujuAPIError
from juju.model import Model
from juju.machine import Machine
from juju.application import Application
from juju.client._definitions import FullStatus
from n2vc.juju_watcher import JujuModelWatcher
from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.n2vc_conn import N2VCConnector
from n2vc.exceptions import (
JujuMachineNotFound,
JujuApplicationNotFound,
JujuModelAlreadyExists,
JujuControllerFailedConnecting,
JujuApplicationExists,
)
class Libjuju:
def __init__(
self,
endpoint: str,
api_proxy: str,
username: str,
password: str,
cacert: str,
loop: asyncio.AbstractEventLoop = None,
log: logging.Logger = None,
db: dict = None,
n2vc: N2VCConnector = None,
apt_mirror: str = None,
enable_os_upgrade: bool = True,
):
"""
Constructor
:param: endpoint: Endpoint of the juju controller (host:port)
:param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
:param: username: Juju username
:param: password: Juju password
:param: cacert: Juju CA Certificate
:param: loop: Asyncio loop
:param: log: Logger
:param: db: DB object
:param: n2vc: N2VC object
:param: apt_mirror: APT Mirror
:param: enable_os_upgrade: Enable OS Upgrade
"""
self.endpoints = [endpoint] # TODO: Store and get endpoints from DB
self.api_proxy = api_proxy
self.username = username
self.password = password
self.cacert = cacert
self.loop = loop or asyncio.get_event_loop()
self.log = log or logging.getLogger("Libjuju")
self.db = db
self.n2vc = n2vc
# Generate config for models
self.model_config = {}
if apt_mirror:
self.model_config["apt-mirror"] = apt_mirror
self.model_config["enable-os-refresh-update"] = enable_os_upgrade
self.model_config["enable-os-upgrade"] = enable_os_upgrade
self.reconnecting = asyncio.Lock(loop=self.loop)
self.creating_model = asyncio.Lock(loop=self.loop)
self.models = set()
self.controller = Controller(loop=self.loop)
self.loop.run_until_complete(self.connect())
async def connect(self):
"""Connect to the controller"""
self.log.debug("Connecting from controller")
await self.controller.connect(
endpoint=self.endpoints,
username=self.username,
password=self.password,
cacert=self.cacert,
)
e = self.controller.connection().endpoint
self.log.info("Connected to controller: {}".format(e))
async def disconnect(self):
"""Disconnect from controller"""
self.log.debug("Disconnecting from controller")
await self.controller.disconnect()
self.log.info("Disconnected from controller")
def controller_connected(self) -> bool:
"""Check if the controller connection is open
:return: bool: True if connected, False if not connected
"""
is_connected = False
try:
is_connected = self.controller.connection().is_open
except NoConnectionException:
self.log.warning("VCA not connected")
return is_connected
async def disconnect_model(self, model: Model):
"""
Disconnect model
:param: model: Model that will be disconnected
"""
try:
await model.disconnect()
except Exception:
pass
async def _reconnect(
self,
retry: bool = False,
timeout: int = 5,
time_between_retries: int = 3,
maximum_retries: int = 0,
):
"""
Reconnect to the controller
:param: retry: Set it to True to retry if the connection fails
:param: time_between_retries: Time in seconds between retries
:param: maximum_retries Maximum retries. If not set, it will retry forever
:raises: Exception if cannot connect to the controller
"""
if self.reconnecting.locked():
# Return if another function is trying to reconnect
return
async with self.reconnecting:
attempt = 0
while True:
try:
await asyncio.wait_for(self.connect(), timeout=timeout)
break
except asyncio.TimeoutError:
self.log.error("Error reconnecting to controller: Timeout")
except Exception as e:
self.log.error("Error reconnecting to controller: {}".format(e))
attempt += 1
maximum_retries_reached = attempt == maximum_retries
if not retry or maximum_retries_reached:
raise JujuControllerFailedConnecting("Controller is not connected")
else:
await asyncio.sleep(time_between_retries)
async def add_model(self, model_name: str, cloud_name: str):
"""
Create model
:param: model_name: Model name
:param: cloud_name: Cloud name
"""
# Reconnect to the controller if not connected
if not self.controller_connected():
await self._reconnect()
# Raise exception if model already exists
if await self.model_exists(model_name):
raise JujuModelAlreadyExists("Model {} already exists.".format(model_name))
# Block until other workers have finished model creation
while self.creating_model.locked():
await asyncio.sleep(0.1)
# If the model exists, return it from the controller
if model_name in self.models:
return await self.get_model(model_name)
# Create the model
self.log.debug("Creating model {}".format(model_name))
async with self.creating_model:
model = await self.controller.add_model(
model_name,
config=self.model_config,
cloud_name=cloud_name,
credential_name=cloud_name,
)
await self.disconnect_model(model)
self.models.add(model_name)
async def get_model(self, model_name: str) -> Model:
"""
Get model from controller
:param: model_name: Model name
:return: Model: The created Juju model object
"""
# Check if controller is connected
if not self.controller_connected():
await self._reconnect()
return await self.controller.get_model(model_name)
async def model_exists(self, model_name: str) -> bool:
"""
Check if model exists
:param: model_name: Model name
:return bool
"""
# Check if controller is connected
if not self.controller_connected():
await self._reconnect()
return model_name in await self.controller.list_models()
async def get_model_status(self, model_name: str) -> FullStatus:
"""
Get model status
:param: model_name: Model name
:return: Full status object
"""
model = await self.get_model(model_name)
status = await model.get_status()
await self.disconnect_model(model)
return status
async def create_machine(
self,
model_name: str,
machine_id: str = None,
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
series: str = "xenial",
) -> (Machine, bool):
"""
Create machine
:param: model_name: Model name
:param: machine_id: Machine id
:param: db_dict: Dictionary with data of the DB to write the updates
:param: progress_timeout: Maximum time between two updates in the model
:param: total_timeout: Timeout for the entity to be active
:return: (juju.machine.Machine, bool): Machine object and a boolean saying
if the machine is new or it already existed
"""
new = False
machine = None
self.log.debug(
"Creating machine (id={}) in model: {}".format(machine_id, model_name)
)
# Get model
model = await self.get_model(model_name)
try:
if machine_id is not None:
self.log.debug(
"Searching machine (id={}) in model {}".format(
machine_id, model_name
)
)
# Get machines from model and get the machine with machine_id if exists
machines = await model.get_machines()
if machine_id in machines:
self.log.debug(
"Machine (id={}) found in model {}".format(
machine_id, model_name
)
)
machine = model.machines[machine_id]
else:
raise JujuMachineNotFound("Machine {} not found".format(machine_id))
if machine is None:
self.log.debug("Creating a new machine in model {}".format(model_name))
# Create machine
machine = await model.add_machine(
spec=None, constraints=None, disks=None, series=series
)
new = True
# Wait until the machine is ready
await JujuModelWatcher.wait_for(
model=model,
entity=machine,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
)
except Exception as e:
raise e
finally:
await self.disconnect_model(model)
self.log.debug("Machine ready at {}".format(machine.dns_name))
return machine, new
async def provision_machine(
self,
model_name: str,
hostname: str,
username: str,
private_key_path: str,
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
) -> str:
"""
Manually provisioning of a machine
:param: model_name: Model name
:param: hostname: IP to access the machine
:param: username: Username to login to the machine
:param: private_key_path: Local path for the private key
:param: db_dict: Dictionary with data of the DB to write the updates
:param: progress_timeout: Maximum time between two updates in the model
:param: total_timeout: Timeout for the entity to be active
:return: (Entity): Machine id
"""
self.log.debug(
"Provisioning machine. model: {}, hostname: {}, username: {}".format(
model_name, hostname, username
)
)
# Get model
model = await self.get_model(model_name)
try:
# Get provisioner
provisioner = AsyncSSHProvisioner(
host=hostname,
user=username,
private_key_path=private_key_path,
log=self.log,
)
# Provision machine
params = await provisioner.provision_machine()
params.jobs = ["JobHostUnits"]
self.log.debug("Adding machine to model")
connection = model.connection()
client_facade = client.ClientFacade.from_connection(connection)
results = await client_facade.AddMachines(params=[params])
error = results.machines[0].error
if error:
msg = "Error adding machine: {}".format(error.message)
self.log.error(msg=msg)
raise ValueError(msg)
machine_id = results.machines[0].machine
self.log.debug("Installing Juju agent into machine {}".format(machine_id))
asyncio.ensure_future(
provisioner.install_agent(
connection=connection,
nonce=params.nonce,
machine_id=machine_id,
api=self.api_proxy,
)
)
machine = None
for _ in range(10):
machine_list = await model.get_machines()
if machine_id in machine_list:
self.log.debug("Machine {} found in model!".format(machine_id))
machine = model.machines.get(machine_id)
break
await asyncio.sleep(2)
if machine is None:
msg = "Machine {} not found in model".format(machine_id)
self.log.error(msg=msg)
raise JujuMachineNotFound(msg)
await JujuModelWatcher.wait_for(
model=model,
entity=machine,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
)
except Exception as e:
raise e
finally:
await self.disconnect_model(model)
self.log.debug("Machine provisioned {}".format(machine_id))
return machine_id
async def deploy_charm(
self,
application_name: str,
path: str,
model_name: str,
machine_id: str,
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
config: dict = None,
series: str = None,
):
"""Deploy charm
:param: application_name: Application name
:param: path: Local path to the charm
:param: model_name: Model name
:param: machine_id ID of the machine
:param: db_dict: Dictionary with data of the DB to write the updates
:param: progress_timeout: Maximum time between two updates in the model
:param: total_timeout: Timeout for the entity to be active
:param: config: Config for the charm
:param: series: Series of the charm
:return: (juju.application.Application): Juju application
"""
# Get model
model = await self.get_model(model_name)
try:
application = None
if application_name not in model.applications:
self.log.debug(
"Deploying charm {} to machine {} in model ~{}".format(
application_name, machine_id, model_name
)
)
self.log.debug("charm: {}".format(path))
if machine_id is not None:
if machine_id not in model.machines:
msg = "Machine {} not found in model".format(machine_id)
self.log.error(msg=msg)
raise JujuMachineNotFound(msg)
machine = model.machines[machine_id]
series = machine.series
application = await model.deploy(
entity_url=path,
application_name=application_name,
channel="stable",
num_units=1,
series=series,
to=machine_id,
config=config,
)
await JujuModelWatcher.wait_for(
model=model,
entity=application,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
)
else:
raise JujuApplicationExists("Application {} exists".format(application_name))
except Exception as e:
raise e
finally:
await self.disconnect_model(model)
self.log.debug("application deployed")
return application
async def _get_application(
self, model: Model, application_name: str
) -> Application:
"""Get application
:param: model: Model object
:param: application_name: Application name
:return: juju.application.Application (or None if it doesn't exist)
"""
if model.applications and application_name in model.applications:
return model.applications[application_name]
async def execute_action(
self,
application_name: str,
model_name: str,
action_name: str,
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
**kwargs
):
"""Execute action
:param: application_name: Application name
:param: model_name: Model name
:param: cloud_name: Cloud name
:param: action_name: Name of the action
:param: db_dict: Dictionary with data of the DB to write the updates
:param: progress_timeout: Maximum time between two updates in the model
:param: total_timeout: Timeout for the entity to be active
:return: (str, str): (output and status)
"""
# Get model and observer
model = await self.get_model(model_name)
try:
# Get application
application = await self._get_application(
model, application_name=application_name,
)
if application is None:
raise JujuApplicationNotFound("Cannot execute action")
# Get unit
unit = None
for u in application.units:
if await u.is_leader_from_status():
unit = u
if unit is None:
raise Exception("Cannot execute action: leader unit not found")
actions = await application.get_actions()
if action_name not in actions:
raise Exception(
"Action {} not in available actions".format(action_name)
)
self.log.debug(
"Executing action {} using params {}".format(action_name, kwargs)
)
action = await unit.run_action(action_name, **kwargs)
# Register action with observer and wait for it to finish
await JujuModelWatcher.wait_for(
model=model,
entity=action,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
)
output = await model.get_action_output(action_uuid=action.entity_id)
status = await model.get_action_status(uuid_or_prefix=action.entity_id)
status = (
status[action.entity_id] if action.entity_id in status else "failed"
)
self.log.debug("action completed with status: {}".format(action.status))
except Exception as e:
raise e
finally:
await self.disconnect_model(model)
return output, status
async def get_actions(self, application_name: str, model_name: str) -> dict:
"""Get list of actions
:param: application_name: Application name
:param: model_name: Model name
:return: Dict with this format
{
"action_name": "Description of the action",
...
}
"""
# Get model
model = await self.get_model(model_name)
# Get application
application = await self._get_application(
model, application_name=application_name,
)
# Get list of actions
actions = await application.get_actions()
# Disconnect from model
await self.disconnect_model(model)
return actions
async def add_relation(
self,
model_name: str,
application_name_1: str,
application_name_2: str,
relation_1: str,
relation_2: str,
):
"""Add relation
:param: model_name: Model name
:param: application_name_1 First application name
:param: application_name_2: Second application name
:param: relation_1: First relation name
:param: relation_2: Second relation name
"""
# Get model
model = await self.get_model(model_name)
# Build relation strings
r1 = "{}:{}".format(application_name_1, relation_1)
r2 = "{}:{}".format(application_name_2, relation_2)
# Add relation
self.log.debug("Adding relation: {} -> {}".format(r1, r2))
try:
await model.add_relation(relation1=r1, relation2=r2)
except JujuAPIError as e:
if "not found" in e.message:
self.log.warning("Relation not found: {}".format(e.message))
return
if "already exists" in e.message:
self.log.warning("Relation already exists: {}".format(e.message))
return
# another exception, raise it
raise e
finally:
await self.disconnect_model(model)
async def destroy_model(
self, model_name: str, total_timeout: float,
):
"""
Destroy model
:param: model_name: Model name
:param: total_timeout: Timeout
"""
model = await self.get_model(model_name)
uuid = model.info.uuid
# Destroy applications
for application_name in model.applications:
try:
await self.destroy_application(
model, application_name=application_name,
)
except Exception as e:
self.log.error(
"Error destroying application {} in model {}: {}".format(
application_name, model_name, e
)
)
# Destroy machines
machines = await model.get_machines()
for machine_id in machines:
try:
await self.destroy_machine(
model, machine_id=machine_id, total_timeout=total_timeout,
)
except asyncio.CancelledError:
raise
except Exception:
pass
# Disconnect model
await self.disconnect_model(model)
# Destroy model
self.models.remove(model_name)
await self.controller.destroy_model(uuid)
# Wait until model is destroyed
self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
last_exception = ""
if total_timeout is None:
total_timeout = 3600
end = time.time() + total_timeout
while time.time() < end:
try:
models = await self.controller.list_models()
if model_name not in models:
self.log.debug(
"The model {} ({}) was destroyed".format(model_name, uuid)
)
return
except asyncio.CancelledError:
raise
except Exception as e:
last_exception = e
await asyncio.sleep(5)
raise Exception(
"Timeout waiting for model {} to be destroyed {}".format(
model_name, last_exception
)
)
async def destroy_application(self, model: Model, application_name: str):
"""
Destroy application
:param: model: Model object
:param: application_name: Application name
"""
self.log.debug(
"Destroying application {} in model {}".format(
application_name, model.info.name
)
)
application = model.applications.get(application_name)
if application:
await application.destroy()
else:
self.log.warning("Application not found: {}".format(application_name))
async def destroy_machine(
self, model: Model, machine_id: str, total_timeout: float = 3600
):
"""
Destroy machine
:param: model: Model object
:param: machine_id: Machine id
:param: total_timeout: Timeout in seconds
"""
machines = await model.get_machines()
if machine_id in machines:
machine = model.machines[machine_id]
# TODO: change this by machine.is_manual when this is upstreamed:
# https://github.com/juju/python-libjuju/pull/396
if "instance-id" in machine.safe_data and machine.safe_data[
"instance-id"
].startswith("manual:"):
await machine.destroy(force=True)
# max timeout
end = time.time() + total_timeout
# wait for machine removal
machines = await model.get_machines()
while machine_id in machines and time.time() < end:
self.log.debug(
"Waiting for machine {} is destroyed".format(machine_id)
)
await asyncio.sleep(0.5)
machines = await model.get_machines()
self.log.debug("Machine destroyed: {}".format(machine_id))
else:
self.log.debug("Machine not found: {}".format(machine_id))
async def configure_application(
self, model_name: str, application_name: str, config: dict = None
):
"""Configure application
:param: model_name: Model name
:param: application_name: Application name
:param: config: Config to apply to the charm
"""
if config:
model = await self.get_model(model_name)
application = await self._get_application(
model, application_name=application_name,
)
await application.set_config(config)
await self.disconnect_model(model)