X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fn2vc_juju_conn.py;h=31bdd6e6b293c70cc19626eedec237e6b4c9fd0f;hp=fff78c94414d5866f31945e1076c552c4eb60b07;hb=8bfcc14713a71f43f155e3cddec168380134d344;hpb=e8102d9e28e5c502fc66ca842d14e1ad29efbfda;ds=sidebyside diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index fff78c9..31bdd6e 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -26,19 +26,29 @@ import binascii import logging import os import re - +import time + +from juju.action import Action +from juju.application import Application +from juju.client import client +from juju.controller import Controller +from juju.errors import JujuAPIError +from juju.machine import Machine +from juju.model import Model from n2vc.exceptions import ( N2VCBadArgumentsException, N2VCException, N2VCConnectionException, N2VCExecutionException, N2VCInvalidCertificate, - # N2VCNotFound, + N2VCNotFound, MethodNotImplemented, JujuK8sProxycharmNotSupported, ) +from n2vc.juju_observer import JujuModelObserver from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml +from n2vc.provisioner import AsyncSSHProvisioner from n2vc.libjuju import Libjuju @@ -1024,6 +1034,714 @@ class N2VCJujuConnector(N2VCConnector): return N2VCJujuConnector._format_app_name(application_name) + async def _juju_create_machine( + self, + model_name: str, + application_name: str, + machine_id: str = None, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ) -> Machine: + + self.log.debug( + "creating machine in model: {}, existing machine id: {}".format( + model_name, machine_id + ) + ) + + # get juju model and observer (create model if needed) + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # find machine id in model + machine = None + if machine_id is not None: + self.log.debug("Finding existing machine id {} in model".format(machine_id)) + # get juju existing machines in the model + existing_machines = await model.get_machines() + if machine_id in existing_machines: + self.log.debug( + "Machine id {} found in model (reusing it)".format(machine_id) + ) + machine = model.machines[machine_id] + + if machine is None: + self.log.debug("Creating a new machine in juju...") + # machine does not exist, create it and wait for it + machine = await model.add_machine( + spec=None, constraints=None, disks=None, series="xenial" + ) + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # id for the execution environment + ee_id = N2VCJujuConnector._build_ee_id( + model_name=model_name, + application_name=application_name, + machine_id=str(machine.entity_id), + ) + + # write ee_id in database + self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id) + + # wait for machine creation + await observer.wait_for_machine( + machine_id=str(machine.entity_id), + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + else: + + self.log.debug("Reusing old machine pending") + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # machine does exist, but it is in creation process (pending), wait for + # create finalisation + await observer.wait_for_machine( + machine_id=machine.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + self.log.debug("Machine ready at " + str(machine.dns_name)) + return machine + + async def _juju_provision_machine( + self, + model_name: str, + hostname: str, + username: str, + private_key_path: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + ) -> str: + + if not self.api_proxy: + msg = "Cannot provision machine: api_proxy is not defined" + self.log.error(msg=msg) + raise N2VCException(message=msg) + + self.log.debug( + "provisioning machine. model: {}, hostname: {}, username: {}".format( + model_name, hostname, username + ) + ) + + if not self._authenticated: + await self._juju_login() + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # TODO check if machine is already provisioned + machine_list = await model.get_machines() + + provisioner = AsyncSSHProvisioner( + host=hostname, + user=username, + private_key_path=private_key_path, + log=self.log, + ) + + params = None + try: + params = await provisioner.provision_machine() + except Exception as ex: + msg = "Exception provisioning machine: {}".format(ex) + self.log.error(msg) + raise N2VCException(message=msg) + + params.jobs = ["JobHostUnits"] + + connection = model.connection() + + # Submit the request. + self.log.debug("Adding machine to model") + client_facade = client.ClientFacade.from_connection(connection) + results = await client_facade.AddMachines(params=[params]) + error = results.machines[0].error + if error: + msg = "Error adding machine: {}".format(error.message) + self.log.error(msg=msg) + raise ValueError(msg) + + machine_id = results.machines[0].machine + + # Need to run this after AddMachines has been called, + # as we need the machine_id + self.log.debug("Installing Juju agent into machine {}".format(machine_id)) + asyncio.ensure_future( + provisioner.install_agent( + connection=connection, + nonce=params.nonce, + machine_id=machine_id, + proxy=self.api_proxy, + ) + ) + + # wait for machine in model (now, machine is not yet in model, so we must + # wait for it) + machine = None + for _ in range(10): + machine_list = await model.get_machines() + if machine_id in machine_list: + self.log.debug("Machine {} found in model!".format(machine_id)) + machine = model.machines.get(machine_id) + break + await asyncio.sleep(2) + + if machine is None: + msg = "Machine {} not found in model".format(machine_id) + self.log.error(msg=msg) + raise Exception(msg) + + # register machine with observer + observer.register_machine(machine=machine, db_dict=db_dict) + + # wait for machine creation + self.log.debug("waiting for provision finishes... {}".format(machine_id)) + await observer.wait_for_machine( + machine_id=machine_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + + self.log.debug("Machine provisioned {}".format(machine_id)) + + return machine_id + + async def _juju_deploy_charm( + self, + model_name: str, + application_name: str, + charm_path: str, + machine_id: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + ) -> (Application, int): + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + # check if application already exists + application = None + if application_name in model.applications: + application = model.applications[application_name] + + if application is None: + + # application does not exist, create it and wait for it + self.log.debug( + "deploying application {} to machine {}, model {}".format( + application_name, machine_id, model_name + ) + ) + self.log.debug("charm: {}".format(charm_path)) + machine = model.machines[machine_id] + # series = None + application = await model.deploy( + entity_url=charm_path, + application_name=application_name, + channel="stable", + num_units=1, + series=machine.series, + to=machine_id, + config=config, + ) + + # register application with observer + observer.register_application(application=application, db_dict=db_dict) + + self.log.debug( + "waiting for application deployed... {}".format(application.entity_id) + ) + retries = await observer.wait_for_application( + application_id=application.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("application deployed") + + else: + + # register application with observer + observer.register_application(application=application, db_dict=db_dict) + + # application already exists, but not finalised + self.log.debug("application already exists, waiting for deployed...") + retries = await observer.wait_for_application( + application_id=application.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("application deployed") + + return application, retries + + async def _juju_execute_action( + self, + model_name: str, + application_name: str, + action_name: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs + ) -> Action: + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) + + unit = None + for u in application.units: + if await u.is_leader_from_status(): + unit = u + if unit is not None: + actions = await application.get_actions() + if action_name in actions: + self.log.debug( + 'executing action "{}" using params: {}'.format(action_name, kwargs) + ) + action = await unit.run_action(action_name, **kwargs) + + # register action with observer + observer.register_action(action=action, db_dict=db_dict) + + await observer.wait_for_action( + action_id=action.entity_id, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("action completed with status: {}".format(action.status)) + output = await model.get_action_output(action_uuid=action.entity_id) + status = await model.get_action_status(uuid_or_prefix=action.entity_id) + if action.entity_id in status: + status = status[action.entity_id] + else: + status = "failed" + return output, status + + raise N2VCExecutionException( + message="Cannot execute action on charm", primitive_name=action_name + ) + + async def _juju_configure_application( + self, + model_name: str, + application_name: str, + config: dict, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + ): + + # get the application + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) + + self.log.debug( + "configuring the application {} -> {}".format(application_name, config) + ) + res = await application.set_config(config) + self.log.debug( + "application {} configured. res={}".format(application_name, res) + ) + + # Verify the config is set + new_conf = await application.get_config() + for key in config: + value = new_conf[key]["value"] + self.log.debug(" {} = {}".format(key, value)) + if config[key] != value: + raise N2VCException( + message="key {} is not configured correctly {} != {}".format( + key, config[key], new_conf[key] + ) + ) + + # check if 'verify-ssh-credentials' action exists + # unit = application.units[0] + actions = await application.get_actions() + if "verify-ssh-credentials" not in actions: + msg = ( + "Action verify-ssh-credentials does not exist in application {}" + ).format(application_name) + self.log.debug(msg=msg) + return False + + # execute verify-credentials + num_retries = 20 + retry_timeout = 15.0 + for _ in range(num_retries): + try: + self.log.debug("Executing action verify-ssh-credentials...") + output, ok = await self._juju_execute_action( + model_name=model_name, + application_name=application_name, + action_name="verify-ssh-credentials", + db_dict=db_dict, + progress_timeout=progress_timeout, + total_timeout=total_timeout, + ) + self.log.debug("Result: {}, output: {}".format(ok, output)) + return True + except asyncio.CancelledError: + raise + except Exception as e: + self.log.debug( + "Error executing verify-ssh-credentials: {}. Retrying...".format(e) + ) + await asyncio.sleep(retry_timeout) + else: + self.log.error( + "Error executing verify-ssh-credentials after {} retries. ".format( + num_retries + ) + ) + return False + + async def _juju_get_application(self, model_name: str, application_name: str): + """Get the deployed application.""" + + model = await self._juju_get_model(model_name=model_name) + + application_name = N2VCJujuConnector._format_app_name(application_name) + + if model.applications and application_name in model.applications: + return model.applications[application_name] + else: + raise N2VCException( + message="Cannot get application {} from model {}".format( + application_name, model_name + ) + ) + + async def _juju_get_model(self, model_name: str) -> Model: + """ Get a model object from juju controller + If the model does not exits, it creates it. + + :param str model_name: name of the model + :returns Model: model obtained from juju controller or Exception + """ + + # format model name + model_name = N2VCJujuConnector._format_model_name(model_name) + + if model_name in self.juju_models: + return self.juju_models[model_name] + + if self._creating_model: + self.log.debug("Another coroutine is creating a model. Wait...") + while self._creating_model: + # another coroutine is creating a model, wait + await asyncio.sleep(0.1) + # retry (perhaps another coroutine has created the model meanwhile) + if model_name in self.juju_models: + return self.juju_models[model_name] + + try: + self._creating_model = True + + # get juju model names from juju + model_list = await self.controller.list_models() + if model_name not in model_list: + self.log.info( + "Model {} does not exist. Creating new model...".format(model_name) + ) + config_dict = {"authorized-keys": self.public_key} + if self.apt_mirror: + config_dict["apt-mirror"] = self.apt_mirror + if not self.enable_os_upgrade: + config_dict["enable-os-refresh-update"] = False + config_dict["enable-os-upgrade"] = False + if self.cloud in self.BUILT_IN_CLOUDS: + model = await self.controller.add_model( + model_name=model_name, + config=config_dict, + cloud_name=self.cloud, + ) + else: + model = await self.controller.add_model( + model_name=model_name, + config=config_dict, + cloud_name=self.cloud, + credential_name=self.cloud, + ) + self.log.info("New model created, name={}".format(model_name)) + else: + self.log.debug( + "Model already exists in juju. Getting model {}".format(model_name) + ) + model = await self.controller.get_model(model_name) + self.log.debug("Existing model in juju, name={}".format(model_name)) + + self.juju_models[model_name] = model + self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model) + return model + + except Exception as e: + msg = "Cannot get model {}. Exception: {}".format(model_name, e) + self.log.error(msg) + raise N2VCException(msg) + finally: + self._creating_model = False + + async def _juju_add_relation( + self, + model_name: str, + application_name_1: str, + application_name_2: str, + relation_1: str, + relation_2: str, + ): + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + + r1 = "{}:{}".format(application_name_1, relation_1) + r2 = "{}:{}".format(application_name_2, relation_2) + + self.log.debug("adding relation: {} -> {}".format(r1, r2)) + try: + await model.add_relation(relation1=r1, relation2=r2) + except JujuAPIError as e: + # If one of the applications in the relationship doesn't exist, or the + # relation has already been added, + # let the operation fail silently. + if "not found" in e.message: + return + if "already exists" in e.message: + return + # another execption, raise it + raise e + + async def _juju_destroy_application(self, model_name: str, application_name: str): + + self.log.debug( + "Destroying application {} in model {}".format(application_name, model_name) + ) + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + application = model.applications.get(application_name) + if application: + observer.unregister_application(application_name) + await application.destroy() + else: + self.log.debug("Application not found: {}".format(application_name)) + + async def _juju_destroy_machine( + self, model_name: str, machine_id: str, total_timeout: float = None + ): + + self.log.debug( + "Destroying machine {} in model {}".format(machine_id, model_name) + ) + + if total_timeout is None: + total_timeout = 3600 + + # get juju model and observer + model = await self._juju_get_model(model_name=model_name) + observer = self.juju_observers[model_name] + + machines = await model.get_machines() + if machine_id in machines: + machine = model.machines[machine_id] + observer.unregister_machine(machine_id) + # TODO: change this by machine.is_manual when this is upstreamed: + # https://github.com/juju/python-libjuju/pull/396 + if "instance-id" in machine.safe_data and machine.safe_data[ + "instance-id" + ].startswith("manual:"): + self.log.debug("machine.destroy(force=True) started.") + await machine.destroy(force=True) + self.log.debug("machine.destroy(force=True) passed.") + # max timeout + end = time.time() + total_timeout + # wait for machine removal + machines = await model.get_machines() + while machine_id in machines and time.time() < end: + self.log.debug( + "Waiting for machine {} is destroyed".format(machine_id) + ) + await asyncio.sleep(0.5) + machines = await model.get_machines() + self.log.debug("Machine destroyed: {}".format(machine_id)) + else: + self.log.debug("Machine not found: {}".format(machine_id)) + + async def _juju_destroy_model(self, model_name: str, total_timeout: float = None): + + self.log.debug("Destroying model {}".format(model_name)) + + if total_timeout is None: + total_timeout = 3600 + end = time.time() + total_timeout + + model = await self._juju_get_model(model_name=model_name) + + if not model: + raise N2VCNotFound(message="Model {} does not exist".format(model_name)) + + uuid = model.info.uuid + + # destroy applications + for application_name in model.applications: + try: + await self._juju_destroy_application( + model_name=model_name, application_name=application_name + ) + except Exception as e: + self.log.error( + "Error destroying application {} in model {}: {}".format( + application_name, model_name, e + ) + ) + + # destroy machines + machines = await model.get_machines() + for machine_id in machines: + try: + await self._juju_destroy_machine( + model_name=model_name, machine_id=machine_id + ) + except asyncio.CancelledError: + raise + except Exception: + # ignore exceptions destroying machine + pass + + await self._juju_disconnect_model(model_name=model_name) + + self.log.debug("destroying model {}...".format(model_name)) + await self.controller.destroy_model(uuid) + # self.log.debug('model destroy requested {}'.format(model_name)) + + # wait for model is completely destroyed + self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) + last_exception = "" + while time.time() < end: + try: + # await self.controller.get_model(uuid) + models = await self.controller.list_models() + if model_name not in models: + self.log.debug( + "The model {} ({}) was destroyed".format(model_name, uuid) + ) + return + except asyncio.CancelledError: + raise + except Exception as e: + last_exception = e + await asyncio.sleep(5) + raise N2VCException( + "Timeout waiting for model {} to be destroyed {}".format( + model_name, last_exception + ) + ) + + async def _juju_login(self): + """Connect to juju controller + + """ + + # if already authenticated, exit function + if self._authenticated: + return + + # if connecting, wait for finish + # another task could be trying to connect in parallel + while self._connecting: + await asyncio.sleep(0.1) + + # double check after other task has finished + if self._authenticated: + return + + try: + self._connecting = True + self.log.info( + "connecting to juju controller: {} {}:{}{}".format( + self.url, + self.username, + self.secret[:8] + "...", + " with ca_cert" if self.ca_cert else "", + ) + ) + + # Create controller object + self.controller = Controller(loop=self.loop) + # Connect to controller + await self.controller.connect( + endpoint=self.url, + username=self.username, + password=self.secret, + cacert=self.ca_cert, + ) + self._authenticated = True + self.log.info("juju controller connected") + except Exception as e: + message = "Exception connecting to juju: {}".format(e) + self.log.error(message) + raise N2VCConnectionException(message=message, url=self.url) + finally: + self._connecting = False + + async def _juju_logout(self): + """Logout of the Juju controller.""" + if not self._authenticated: + return False + + # disconnect all models + for model_name in self.juju_models: + try: + await self._juju_disconnect_model(model_name) + except Exception as e: + self.log.error( + "Error disconnecting model {} : {}".format(model_name, e) + ) + # continue with next model... + + self.log.info("Disconnecting controller") + try: + await self.controller.disconnect() + except Exception as e: + raise N2VCConnectionException( + message="Error disconnecting controller: {}".format(e), url=self.url + ) + + self.controller = None + self._authenticated = False + self.log.info("disconnected") + + async def _juju_disconnect_model(self, model_name: str): + self.log.debug("Disconnecting model {}".format(model_name)) + if model_name in self.juju_models: + await self.juju_models[model_name].disconnect() + self.juju_models[model_name] = None + self.juju_observers[model_name] = None + else: + self.warning("Cannot disconnect model: {}".format(model_name)) + def _create_juju_public_key(self): """Recreate the Juju public key on lcm container, if needed Certain libjuju commands expect to be run from the same machine as Juju