import logging
import os
import re
-
+import time
+
+from juju.action import Action
+from juju.application import Application
+from juju.client import client
+from juju.controller import Controller
+from juju.errors import JujuAPIError
+from juju.machine import Machine
+from juju.model import Model
from n2vc.exceptions import (
N2VCBadArgumentsException,
N2VCException,
N2VCConnectionException,
N2VCExecutionException,
N2VCInvalidCertificate,
- # N2VCNotFound,
+ N2VCNotFound,
MethodNotImplemented,
JujuK8sProxycharmNotSupported,
)
+from n2vc.juju_observer import JujuModelObserver
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
+from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.libjuju import Libjuju
return N2VCJujuConnector._format_app_name(application_name)
+ async def _juju_create_machine(
+ self,
+ model_name: str,
+ application_name: str,
+ machine_id: str = None,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> Machine:
+
+ self.log.debug(
+ "creating machine in model: {}, existing machine id: {}".format(
+ model_name, machine_id
+ )
+ )
+
+ # get juju model and observer (create model if needed)
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # find machine id in model
+ machine = None
+ if machine_id is not None:
+ self.log.debug("Finding existing machine id {} in model".format(machine_id))
+ # get juju existing machines in the model
+ existing_machines = await model.get_machines()
+ if machine_id in existing_machines:
+ self.log.debug(
+ "Machine id {} found in model (reusing it)".format(machine_id)
+ )
+ machine = model.machines[machine_id]
+
+ if machine is None:
+ self.log.debug("Creating a new machine in juju...")
+ # machine does not exist, create it and wait for it
+ machine = await model.add_machine(
+ spec=None, constraints=None, disks=None, series="xenial"
+ )
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # id for the execution environment
+ ee_id = N2VCJujuConnector._build_ee_id(
+ model_name=model_name,
+ application_name=application_name,
+ machine_id=str(machine.entity_id),
+ )
+
+ # write ee_id in database
+ self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
+
+ # wait for machine creation
+ await observer.wait_for_machine(
+ machine_id=str(machine.entity_id),
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ else:
+
+ self.log.debug("Reusing old machine pending")
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # machine does exist, but it is in creation process (pending), wait for
+ # create finalisation
+ await observer.wait_for_machine(
+ machine_id=machine.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ self.log.debug("Machine ready at " + str(machine.dns_name))
+ return machine
+
+ async def _juju_provision_machine(
+ self,
+ model_name: str,
+ hostname: str,
+ username: str,
+ private_key_path: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> str:
+
+ if not self.api_proxy:
+ msg = "Cannot provision machine: api_proxy is not defined"
+ self.log.error(msg=msg)
+ raise N2VCException(message=msg)
+
+ self.log.debug(
+ "provisioning machine. model: {}, hostname: {}, username: {}".format(
+ model_name, hostname, username
+ )
+ )
+
+ if not self._authenticated:
+ await self._juju_login()
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # TODO check if machine is already provisioned
+ machine_list = await model.get_machines()
+
+ provisioner = AsyncSSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log,
+ )
+
+ params = None
+ try:
+ params = await provisioner.provision_machine()
+ except Exception as ex:
+ msg = "Exception provisioning machine: {}".format(ex)
+ self.log.error(msg)
+ raise N2VCException(message=msg)
+
+ params.jobs = ["JobHostUnits"]
+
+ connection = model.connection()
+
+ # Submit the request.
+ self.log.debug("Adding machine to model")
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+ if error:
+ msg = "Error adding machine: {}".format(error.message)
+ self.log.error(msg=msg)
+ raise ValueError(msg)
+
+ machine_id = results.machines[0].machine
+
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ self.log.debug("Installing Juju agent into machine {}".format(machine_id))
+ asyncio.ensure_future(
+ provisioner.install_agent(
+ connection=connection,
+ nonce=params.nonce,
+ machine_id=machine_id,
+ proxy=self.api_proxy,
+ )
+ )
+
+ # wait for machine in model (now, machine is not yet in model, so we must
+ # wait for it)
+ machine = None
+ for _ in range(10):
+ machine_list = await model.get_machines()
+ if machine_id in machine_list:
+ self.log.debug("Machine {} found in model!".format(machine_id))
+ machine = model.machines.get(machine_id)
+ break
+ await asyncio.sleep(2)
+
+ if machine is None:
+ msg = "Machine {} not found in model".format(machine_id)
+ self.log.error(msg=msg)
+ raise Exception(msg)
+
+ # register machine with observer
+ observer.register_machine(machine=machine, db_dict=db_dict)
+
+ # wait for machine creation
+ self.log.debug("waiting for provision finishes... {}".format(machine_id))
+ await observer.wait_for_machine(
+ machine_id=machine_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+
+ self.log.debug("Machine provisioned {}".format(machine_id))
+
+ return machine_id
+
+ async def _juju_deploy_charm(
+ self,
+ model_name: str,
+ application_name: str,
+ charm_path: str,
+ machine_id: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ ) -> (Application, int):
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ # check if application already exists
+ application = None
+ if application_name in model.applications:
+ application = model.applications[application_name]
+
+ if application is None:
+
+ # application does not exist, create it and wait for it
+ self.log.debug(
+ "deploying application {} to machine {}, model {}".format(
+ application_name, machine_id, model_name
+ )
+ )
+ self.log.debug("charm: {}".format(charm_path))
+ machine = model.machines[machine_id]
+ # series = None
+ application = await model.deploy(
+ entity_url=charm_path,
+ application_name=application_name,
+ channel="stable",
+ num_units=1,
+ series=machine.series,
+ to=machine_id,
+ config=config,
+ )
+
+ # register application with observer
+ observer.register_application(application=application, db_dict=db_dict)
+
+ self.log.debug(
+ "waiting for application deployed... {}".format(application.entity_id)
+ )
+ retries = await observer.wait_for_application(
+ application_id=application.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("application deployed")
+
+ else:
+
+ # register application with observer
+ observer.register_application(application=application, db_dict=db_dict)
+
+ # application already exists, but not finalised
+ self.log.debug("application already exists, waiting for deployed...")
+ retries = await observer.wait_for_application(
+ application_id=application.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("application deployed")
+
+ return application, retries
+
+ async def _juju_execute_action(
+ self,
+ model_name: str,
+ application_name: str,
+ action_name: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ **kwargs
+ ) -> Action:
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ application = await self._juju_get_application(
+ model_name=model_name, application_name=application_name
+ )
+
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ if unit is not None:
+ actions = await application.get_actions()
+ if action_name in actions:
+ self.log.debug(
+ 'executing action "{}" using params: {}'.format(action_name, kwargs)
+ )
+ action = await unit.run_action(action_name, **kwargs)
+
+ # register action with observer
+ observer.register_action(action=action, db_dict=db_dict)
+
+ await observer.wait_for_action(
+ action_id=action.entity_id,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("action completed with status: {}".format(action.status))
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+ if action.entity_id in status:
+ status = status[action.entity_id]
+ else:
+ status = "failed"
+ return output, status
+
+ raise N2VCExecutionException(
+ message="Cannot execute action on charm", primitive_name=action_name
+ )
+
+ async def _juju_configure_application(
+ self,
+ model_name: str,
+ application_name: str,
+ config: dict,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ):
+
+ # get the application
+ application = await self._juju_get_application(
+ model_name=model_name, application_name=application_name
+ )
+
+ self.log.debug(
+ "configuring the application {} -> {}".format(application_name, config)
+ )
+ res = await application.set_config(config)
+ self.log.debug(
+ "application {} configured. res={}".format(application_name, res)
+ )
+
+ # Verify the config is set
+ new_conf = await application.get_config()
+ for key in config:
+ value = new_conf[key]["value"]
+ self.log.debug(" {} = {}".format(key, value))
+ if config[key] != value:
+ raise N2VCException(
+ message="key {} is not configured correctly {} != {}".format(
+ key, config[key], new_conf[key]
+ )
+ )
+
+ # check if 'verify-ssh-credentials' action exists
+ # unit = application.units[0]
+ actions = await application.get_actions()
+ if "verify-ssh-credentials" not in actions:
+ msg = (
+ "Action verify-ssh-credentials does not exist in application {}"
+ ).format(application_name)
+ self.log.debug(msg=msg)
+ return False
+
+ # execute verify-credentials
+ num_retries = 20
+ retry_timeout = 15.0
+ for _ in range(num_retries):
+ try:
+ self.log.debug("Executing action verify-ssh-credentials...")
+ output, ok = await self._juju_execute_action(
+ model_name=model_name,
+ application_name=application_name,
+ action_name="verify-ssh-credentials",
+ db_dict=db_dict,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ )
+ self.log.debug("Result: {}, output: {}".format(ok, output))
+ return True
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ self.log.debug(
+ "Error executing verify-ssh-credentials: {}. Retrying...".format(e)
+ )
+ await asyncio.sleep(retry_timeout)
+ else:
+ self.log.error(
+ "Error executing verify-ssh-credentials after {} retries. ".format(
+ num_retries
+ )
+ )
+ return False
+
+ async def _juju_get_application(self, model_name: str, application_name: str):
+ """Get the deployed application."""
+
+ model = await self._juju_get_model(model_name=model_name)
+
+ application_name = N2VCJujuConnector._format_app_name(application_name)
+
+ if model.applications and application_name in model.applications:
+ return model.applications[application_name]
+ else:
+ raise N2VCException(
+ message="Cannot get application {} from model {}".format(
+ application_name, model_name
+ )
+ )
+
+ async def _juju_get_model(self, model_name: str) -> Model:
+ """ Get a model object from juju controller
+ If the model does not exits, it creates it.
+
+ :param str model_name: name of the model
+ :returns Model: model obtained from juju controller or Exception
+ """
+
+ # format model name
+ model_name = N2VCJujuConnector._format_model_name(model_name)
+
+ if model_name in self.juju_models:
+ return self.juju_models[model_name]
+
+ if self._creating_model:
+ self.log.debug("Another coroutine is creating a model. Wait...")
+ while self._creating_model:
+ # another coroutine is creating a model, wait
+ await asyncio.sleep(0.1)
+ # retry (perhaps another coroutine has created the model meanwhile)
+ if model_name in self.juju_models:
+ return self.juju_models[model_name]
+
+ try:
+ self._creating_model = True
+
+ # get juju model names from juju
+ model_list = await self.controller.list_models()
+ if model_name not in model_list:
+ self.log.info(
+ "Model {} does not exist. Creating new model...".format(model_name)
+ )
+ config_dict = {"authorized-keys": self.public_key}
+ if self.apt_mirror:
+ config_dict["apt-mirror"] = self.apt_mirror
+ if not self.enable_os_upgrade:
+ config_dict["enable-os-refresh-update"] = False
+ config_dict["enable-os-upgrade"] = False
+ if self.cloud in self.BUILT_IN_CLOUDS:
+ model = await self.controller.add_model(
+ model_name=model_name,
+ config=config_dict,
+ cloud_name=self.cloud,
+ )
+ else:
+ model = await self.controller.add_model(
+ model_name=model_name,
+ config=config_dict,
+ cloud_name=self.cloud,
+ credential_name=self.cloud,
+ )
+ self.log.info("New model created, name={}".format(model_name))
+ else:
+ self.log.debug(
+ "Model already exists in juju. Getting model {}".format(model_name)
+ )
+ model = await self.controller.get_model(model_name)
+ self.log.debug("Existing model in juju, name={}".format(model_name))
+
+ self.juju_models[model_name] = model
+ self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
+ return model
+
+ except Exception as e:
+ msg = "Cannot get model {}. Exception: {}".format(model_name, e)
+ self.log.error(msg)
+ raise N2VCException(msg)
+ finally:
+ self._creating_model = False
+
+ async def _juju_add_relation(
+ self,
+ model_name: str,
+ application_name_1: str,
+ application_name_2: str,
+ relation_1: str,
+ relation_2: str,
+ ):
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+
+ r1 = "{}:{}".format(application_name_1, relation_1)
+ r2 = "{}:{}".format(application_name_2, relation_2)
+
+ self.log.debug("adding relation: {} -> {}".format(r1, r2))
+ try:
+ await model.add_relation(relation1=r1, relation2=r2)
+ except JujuAPIError as e:
+ # If one of the applications in the relationship doesn't exist, or the
+ # relation has already been added,
+ # let the operation fail silently.
+ if "not found" in e.message:
+ return
+ if "already exists" in e.message:
+ return
+ # another execption, raise it
+ raise e
+
+ async def _juju_destroy_application(self, model_name: str, application_name: str):
+
+ self.log.debug(
+ "Destroying application {} in model {}".format(application_name, model_name)
+ )
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ application = model.applications.get(application_name)
+ if application:
+ observer.unregister_application(application_name)
+ await application.destroy()
+ else:
+ self.log.debug("Application not found: {}".format(application_name))
+
+ async def _juju_destroy_machine(
+ self, model_name: str, machine_id: str, total_timeout: float = None
+ ):
+
+ self.log.debug(
+ "Destroying machine {} in model {}".format(machine_id, model_name)
+ )
+
+ if total_timeout is None:
+ total_timeout = 3600
+
+ # get juju model and observer
+ model = await self._juju_get_model(model_name=model_name)
+ observer = self.juju_observers[model_name]
+
+ machines = await model.get_machines()
+ if machine_id in machines:
+ machine = model.machines[machine_id]
+ observer.unregister_machine(machine_id)
+ # TODO: change this by machine.is_manual when this is upstreamed:
+ # https://github.com/juju/python-libjuju/pull/396
+ if "instance-id" in machine.safe_data and machine.safe_data[
+ "instance-id"
+ ].startswith("manual:"):
+ self.log.debug("machine.destroy(force=True) started.")
+ await machine.destroy(force=True)
+ self.log.debug("machine.destroy(force=True) passed.")
+ # max timeout
+ end = time.time() + total_timeout
+ # wait for machine removal
+ machines = await model.get_machines()
+ while machine_id in machines and time.time() < end:
+ self.log.debug(
+ "Waiting for machine {} is destroyed".format(machine_id)
+ )
+ await asyncio.sleep(0.5)
+ machines = await model.get_machines()
+ self.log.debug("Machine destroyed: {}".format(machine_id))
+ else:
+ self.log.debug("Machine not found: {}".format(machine_id))
+
+ async def _juju_destroy_model(self, model_name: str, total_timeout: float = None):
+
+ self.log.debug("Destroying model {}".format(model_name))
+
+ if total_timeout is None:
+ total_timeout = 3600
+ end = time.time() + total_timeout
+
+ model = await self._juju_get_model(model_name=model_name)
+
+ if not model:
+ raise N2VCNotFound(message="Model {} does not exist".format(model_name))
+
+ uuid = model.info.uuid
+
+ # destroy applications
+ for application_name in model.applications:
+ try:
+ await self._juju_destroy_application(
+ model_name=model_name, application_name=application_name
+ )
+ except Exception as e:
+ self.log.error(
+ "Error destroying application {} in model {}: {}".format(
+ application_name, model_name, e
+ )
+ )
+
+ # destroy machines
+ machines = await model.get_machines()
+ for machine_id in machines:
+ try:
+ await self._juju_destroy_machine(
+ model_name=model_name, machine_id=machine_id
+ )
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ # ignore exceptions destroying machine
+ pass
+
+ await self._juju_disconnect_model(model_name=model_name)
+
+ self.log.debug("destroying model {}...".format(model_name))
+ await self.controller.destroy_model(uuid)
+ # self.log.debug('model destroy requested {}'.format(model_name))
+
+ # wait for model is completely destroyed
+ self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+ last_exception = ""
+ while time.time() < end:
+ try:
+ # await self.controller.get_model(uuid)
+ models = await self.controller.list_models()
+ if model_name not in models:
+ self.log.debug(
+ "The model {} ({}) was destroyed".format(model_name, uuid)
+ )
+ return
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ last_exception = e
+ await asyncio.sleep(5)
+ raise N2VCException(
+ "Timeout waiting for model {} to be destroyed {}".format(
+ model_name, last_exception
+ )
+ )
+
+ async def _juju_login(self):
+ """Connect to juju controller
+
+ """
+
+ # if already authenticated, exit function
+ if self._authenticated:
+ return
+
+ # if connecting, wait for finish
+ # another task could be trying to connect in parallel
+ while self._connecting:
+ await asyncio.sleep(0.1)
+
+ # double check after other task has finished
+ if self._authenticated:
+ return
+
+ try:
+ self._connecting = True
+ self.log.info(
+ "connecting to juju controller: {} {}:{}{}".format(
+ self.url,
+ self.username,
+ self.secret[:8] + "...",
+ " with ca_cert" if self.ca_cert else "",
+ )
+ )
+
+ # Create controller object
+ self.controller = Controller(loop=self.loop)
+ # Connect to controller
+ await self.controller.connect(
+ endpoint=self.url,
+ username=self.username,
+ password=self.secret,
+ cacert=self.ca_cert,
+ )
+ self._authenticated = True
+ self.log.info("juju controller connected")
+ except Exception as e:
+ message = "Exception connecting to juju: {}".format(e)
+ self.log.error(message)
+ raise N2VCConnectionException(message=message, url=self.url)
+ finally:
+ self._connecting = False
+
+ async def _juju_logout(self):
+ """Logout of the Juju controller."""
+ if not self._authenticated:
+ return False
+
+ # disconnect all models
+ for model_name in self.juju_models:
+ try:
+ await self._juju_disconnect_model(model_name)
+ except Exception as e:
+ self.log.error(
+ "Error disconnecting model {} : {}".format(model_name, e)
+ )
+ # continue with next model...
+
+ self.log.info("Disconnecting controller")
+ try:
+ await self.controller.disconnect()
+ except Exception as e:
+ raise N2VCConnectionException(
+ message="Error disconnecting controller: {}".format(e), url=self.url
+ )
+
+ self.controller = None
+ self._authenticated = False
+ self.log.info("disconnected")
+
+ async def _juju_disconnect_model(self, model_name: str):
+ self.log.debug("Disconnecting model {}".format(model_name))
+ if model_name in self.juju_models:
+ await self.juju_models[model_name].disconnect()
+ self.juju_models[model_name] = None
+ self.juju_observers[model_name] = None
+ else:
+ self.warning("Cannot disconnect model: {}".format(model_name))
+
def _create_juju_public_key(self):
"""Recreate the Juju public key on lcm container, if needed
Certain libjuju commands expect to be run from the same machine as Juju