Revert ""Remove unused lines of code""
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
index fff78c9..31bdd6e 100644 (file)
@@ -26,19 +26,29 @@ import binascii
 import logging
 import os
 import re
-
+import time
+
+from juju.action import Action
+from juju.application import Application
+from juju.client import client
+from juju.controller import Controller
+from juju.errors import JujuAPIError
+from juju.machine import Machine
+from juju.model import Model
 from n2vc.exceptions import (
     N2VCBadArgumentsException,
     N2VCException,
     N2VCConnectionException,
     N2VCExecutionException,
     N2VCInvalidCertificate,
-    N2VCNotFound,
+    N2VCNotFound,
     MethodNotImplemented,
     JujuK8sProxycharmNotSupported,
 )
+from n2vc.juju_observer import JujuModelObserver
 from n2vc.n2vc_conn import N2VCConnector
 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
+from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.libjuju import Libjuju
 
 
@@ -1024,6 +1034,714 @@ class N2VCJujuConnector(N2VCConnector):
 
         return N2VCJujuConnector._format_app_name(application_name)
 
+    async def _juju_create_machine(
+        self,
+        model_name: str,
+        application_name: str,
+        machine_id: str = None,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+    ) -> Machine:
+
+        self.log.debug(
+            "creating machine in model: {}, existing machine id: {}".format(
+                model_name, machine_id
+            )
+        )
+
+        # get juju model and observer (create model if needed)
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        # find machine id in model
+        machine = None
+        if machine_id is not None:
+            self.log.debug("Finding existing machine id {} in model".format(machine_id))
+            # get juju existing machines in the model
+            existing_machines = await model.get_machines()
+            if machine_id in existing_machines:
+                self.log.debug(
+                    "Machine id {} found in model (reusing it)".format(machine_id)
+                )
+                machine = model.machines[machine_id]
+
+        if machine is None:
+            self.log.debug("Creating a new machine in juju...")
+            # machine does not exist, create it and wait for it
+            machine = await model.add_machine(
+                spec=None, constraints=None, disks=None, series="xenial"
+            )
+
+            # register machine with observer
+            observer.register_machine(machine=machine, db_dict=db_dict)
+
+            # id for the execution environment
+            ee_id = N2VCJujuConnector._build_ee_id(
+                model_name=model_name,
+                application_name=application_name,
+                machine_id=str(machine.entity_id),
+            )
+
+            # write ee_id in database
+            self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
+
+            # wait for machine creation
+            await observer.wait_for_machine(
+                machine_id=str(machine.entity_id),
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+            )
+
+        else:
+
+            self.log.debug("Reusing old machine pending")
+
+            # register machine with observer
+            observer.register_machine(machine=machine, db_dict=db_dict)
+
+            # machine does exist, but it is in creation process (pending), wait for
+            # create finalisation
+            await observer.wait_for_machine(
+                machine_id=machine.entity_id,
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+            )
+
+        self.log.debug("Machine ready at " + str(machine.dns_name))
+        return machine
+
+    async def _juju_provision_machine(
+        self,
+        model_name: str,
+        hostname: str,
+        username: str,
+        private_key_path: str,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+    ) -> str:
+
+        if not self.api_proxy:
+            msg = "Cannot provision machine: api_proxy is not defined"
+            self.log.error(msg=msg)
+            raise N2VCException(message=msg)
+
+        self.log.debug(
+            "provisioning machine. model: {}, hostname: {}, username: {}".format(
+                model_name, hostname, username
+            )
+        )
+
+        if not self._authenticated:
+            await self._juju_login()
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        # TODO check if machine is already provisioned
+        machine_list = await model.get_machines()
+
+        provisioner = AsyncSSHProvisioner(
+            host=hostname,
+            user=username,
+            private_key_path=private_key_path,
+            log=self.log,
+        )
+
+        params = None
+        try:
+            params = await provisioner.provision_machine()
+        except Exception as ex:
+            msg = "Exception provisioning machine: {}".format(ex)
+            self.log.error(msg)
+            raise N2VCException(message=msg)
+
+        params.jobs = ["JobHostUnits"]
+
+        connection = model.connection()
+
+        # Submit the request.
+        self.log.debug("Adding machine to model")
+        client_facade = client.ClientFacade.from_connection(connection)
+        results = await client_facade.AddMachines(params=[params])
+        error = results.machines[0].error
+        if error:
+            msg = "Error adding machine: {}".format(error.message)
+            self.log.error(msg=msg)
+            raise ValueError(msg)
+
+        machine_id = results.machines[0].machine
+
+        # Need to run this after AddMachines has been called,
+        # as we need the machine_id
+        self.log.debug("Installing Juju agent into machine {}".format(machine_id))
+        asyncio.ensure_future(
+            provisioner.install_agent(
+                connection=connection,
+                nonce=params.nonce,
+                machine_id=machine_id,
+                proxy=self.api_proxy,
+            )
+        )
+
+        # wait for machine in model (now, machine is not yet in model, so we must
+        # wait for it)
+        machine = None
+        for _ in range(10):
+            machine_list = await model.get_machines()
+            if machine_id in machine_list:
+                self.log.debug("Machine {} found in model!".format(machine_id))
+                machine = model.machines.get(machine_id)
+                break
+            await asyncio.sleep(2)
+
+        if machine is None:
+            msg = "Machine {} not found in model".format(machine_id)
+            self.log.error(msg=msg)
+            raise Exception(msg)
+
+        # register machine with observer
+        observer.register_machine(machine=machine, db_dict=db_dict)
+
+        # wait for machine creation
+        self.log.debug("waiting for provision finishes... {}".format(machine_id))
+        await observer.wait_for_machine(
+            machine_id=machine_id,
+            progress_timeout=progress_timeout,
+            total_timeout=total_timeout,
+        )
+
+        self.log.debug("Machine provisioned {}".format(machine_id))
+
+        return machine_id
+
+    async def _juju_deploy_charm(
+        self,
+        model_name: str,
+        application_name: str,
+        charm_path: str,
+        machine_id: str,
+        db_dict: dict,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+        config: dict = None,
+    ) -> (Application, int):
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        # check if application already exists
+        application = None
+        if application_name in model.applications:
+            application = model.applications[application_name]
+
+        if application is None:
+
+            # application does not exist, create it and wait for it
+            self.log.debug(
+                "deploying application {} to machine {}, model {}".format(
+                    application_name, machine_id, model_name
+                )
+            )
+            self.log.debug("charm: {}".format(charm_path))
+            machine = model.machines[machine_id]
+            # series = None
+            application = await model.deploy(
+                entity_url=charm_path,
+                application_name=application_name,
+                channel="stable",
+                num_units=1,
+                series=machine.series,
+                to=machine_id,
+                config=config,
+            )
+
+            # register application with observer
+            observer.register_application(application=application, db_dict=db_dict)
+
+            self.log.debug(
+                "waiting for application deployed... {}".format(application.entity_id)
+            )
+            retries = await observer.wait_for_application(
+                application_id=application.entity_id,
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+            )
+            self.log.debug("application deployed")
+
+        else:
+
+            # register application with observer
+            observer.register_application(application=application, db_dict=db_dict)
+
+            # application already exists, but not finalised
+            self.log.debug("application already exists, waiting for deployed...")
+            retries = await observer.wait_for_application(
+                application_id=application.entity_id,
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+            )
+            self.log.debug("application deployed")
+
+        return application, retries
+
+    async def _juju_execute_action(
+        self,
+        model_name: str,
+        application_name: str,
+        action_name: str,
+        db_dict: dict,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+        **kwargs
+    ) -> Action:
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        application = await self._juju_get_application(
+            model_name=model_name, application_name=application_name
+        )
+
+        unit = None
+        for u in application.units:
+            if await u.is_leader_from_status():
+                unit = u
+        if unit is not None:
+            actions = await application.get_actions()
+            if action_name in actions:
+                self.log.debug(
+                    'executing action "{}" using params: {}'.format(action_name, kwargs)
+                )
+                action = await unit.run_action(action_name, **kwargs)
+
+                # register action with observer
+                observer.register_action(action=action, db_dict=db_dict)
+
+                await observer.wait_for_action(
+                    action_id=action.entity_id,
+                    progress_timeout=progress_timeout,
+                    total_timeout=total_timeout,
+                )
+                self.log.debug("action completed with status: {}".format(action.status))
+                output = await model.get_action_output(action_uuid=action.entity_id)
+                status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+                if action.entity_id in status:
+                    status = status[action.entity_id]
+                else:
+                    status = "failed"
+                return output, status
+
+        raise N2VCExecutionException(
+            message="Cannot execute action on charm", primitive_name=action_name
+        )
+
+    async def _juju_configure_application(
+        self,
+        model_name: str,
+        application_name: str,
+        config: dict,
+        db_dict: dict,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+    ):
+
+        # get the application
+        application = await self._juju_get_application(
+            model_name=model_name, application_name=application_name
+        )
+
+        self.log.debug(
+            "configuring the application {} -> {}".format(application_name, config)
+        )
+        res = await application.set_config(config)
+        self.log.debug(
+            "application {} configured. res={}".format(application_name, res)
+        )
+
+        # Verify the config is set
+        new_conf = await application.get_config()
+        for key in config:
+            value = new_conf[key]["value"]
+            self.log.debug("    {} = {}".format(key, value))
+            if config[key] != value:
+                raise N2VCException(
+                    message="key {} is not configured correctly {} != {}".format(
+                        key, config[key], new_conf[key]
+                    )
+                )
+
+        # check if 'verify-ssh-credentials' action exists
+        # unit = application.units[0]
+        actions = await application.get_actions()
+        if "verify-ssh-credentials" not in actions:
+            msg = (
+                "Action verify-ssh-credentials does not exist in application {}"
+            ).format(application_name)
+            self.log.debug(msg=msg)
+            return False
+
+        # execute verify-credentials
+        num_retries = 20
+        retry_timeout = 15.0
+        for _ in range(num_retries):
+            try:
+                self.log.debug("Executing action verify-ssh-credentials...")
+                output, ok = await self._juju_execute_action(
+                    model_name=model_name,
+                    application_name=application_name,
+                    action_name="verify-ssh-credentials",
+                    db_dict=db_dict,
+                    progress_timeout=progress_timeout,
+                    total_timeout=total_timeout,
+                )
+                self.log.debug("Result: {}, output: {}".format(ok, output))
+                return True
+            except asyncio.CancelledError:
+                raise
+            except Exception as e:
+                self.log.debug(
+                    "Error executing verify-ssh-credentials: {}. Retrying...".format(e)
+                )
+                await asyncio.sleep(retry_timeout)
+        else:
+            self.log.error(
+                "Error executing verify-ssh-credentials after {} retries. ".format(
+                    num_retries
+                )
+            )
+            return False
+
+    async def _juju_get_application(self, model_name: str, application_name: str):
+        """Get the deployed application."""
+
+        model = await self._juju_get_model(model_name=model_name)
+
+        application_name = N2VCJujuConnector._format_app_name(application_name)
+
+        if model.applications and application_name in model.applications:
+            return model.applications[application_name]
+        else:
+            raise N2VCException(
+                message="Cannot get application {} from model {}".format(
+                    application_name, model_name
+                )
+            )
+
+    async def _juju_get_model(self, model_name: str) -> Model:
+        """ Get a model object from juju controller
+        If the model does not exits, it creates it.
+
+        :param str model_name: name of the model
+        :returns Model: model obtained from juju controller or Exception
+        """
+
+        # format model name
+        model_name = N2VCJujuConnector._format_model_name(model_name)
+
+        if model_name in self.juju_models:
+            return self.juju_models[model_name]
+
+        if self._creating_model:
+            self.log.debug("Another coroutine is creating a model. Wait...")
+        while self._creating_model:
+            # another coroutine is creating a model, wait
+            await asyncio.sleep(0.1)
+            # retry (perhaps another coroutine has created the model meanwhile)
+            if model_name in self.juju_models:
+                return self.juju_models[model_name]
+
+        try:
+            self._creating_model = True
+
+            # get juju model names from juju
+            model_list = await self.controller.list_models()
+            if model_name not in model_list:
+                self.log.info(
+                    "Model {} does not exist. Creating new model...".format(model_name)
+                )
+                config_dict = {"authorized-keys": self.public_key}
+                if self.apt_mirror:
+                    config_dict["apt-mirror"] = self.apt_mirror
+                if not self.enable_os_upgrade:
+                    config_dict["enable-os-refresh-update"] = False
+                    config_dict["enable-os-upgrade"] = False
+                if self.cloud in self.BUILT_IN_CLOUDS:
+                    model = await self.controller.add_model(
+                        model_name=model_name,
+                        config=config_dict,
+                        cloud_name=self.cloud,
+                    )
+                else:
+                    model = await self.controller.add_model(
+                        model_name=model_name,
+                        config=config_dict,
+                        cloud_name=self.cloud,
+                        credential_name=self.cloud,
+                    )
+                self.log.info("New model created, name={}".format(model_name))
+            else:
+                self.log.debug(
+                    "Model already exists in juju. Getting model {}".format(model_name)
+                )
+                model = await self.controller.get_model(model_name)
+                self.log.debug("Existing model in juju, name={}".format(model_name))
+
+            self.juju_models[model_name] = model
+            self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
+            return model
+
+        except Exception as e:
+            msg = "Cannot get model {}. Exception: {}".format(model_name, e)
+            self.log.error(msg)
+            raise N2VCException(msg)
+        finally:
+            self._creating_model = False
+
+    async def _juju_add_relation(
+        self,
+        model_name: str,
+        application_name_1: str,
+        application_name_2: str,
+        relation_1: str,
+        relation_2: str,
+    ):
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+
+        r1 = "{}:{}".format(application_name_1, relation_1)
+        r2 = "{}:{}".format(application_name_2, relation_2)
+
+        self.log.debug("adding relation: {} -> {}".format(r1, r2))
+        try:
+            await model.add_relation(relation1=r1, relation2=r2)
+        except JujuAPIError as e:
+            # If one of the applications in the relationship doesn't exist, or the
+            # relation has already been added,
+            # let the operation fail silently.
+            if "not found" in e.message:
+                return
+            if "already exists" in e.message:
+                return
+            # another execption, raise it
+            raise e
+
+    async def _juju_destroy_application(self, model_name: str, application_name: str):
+
+        self.log.debug(
+            "Destroying application {} in model {}".format(application_name, model_name)
+        )
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        application = model.applications.get(application_name)
+        if application:
+            observer.unregister_application(application_name)
+            await application.destroy()
+        else:
+            self.log.debug("Application not found: {}".format(application_name))
+
+    async def _juju_destroy_machine(
+        self, model_name: str, machine_id: str, total_timeout: float = None
+    ):
+
+        self.log.debug(
+            "Destroying machine {} in model {}".format(machine_id, model_name)
+        )
+
+        if total_timeout is None:
+            total_timeout = 3600
+
+        # get juju model and observer
+        model = await self._juju_get_model(model_name=model_name)
+        observer = self.juju_observers[model_name]
+
+        machines = await model.get_machines()
+        if machine_id in machines:
+            machine = model.machines[machine_id]
+            observer.unregister_machine(machine_id)
+            # TODO: change this by machine.is_manual when this is upstreamed:
+            # https://github.com/juju/python-libjuju/pull/396
+            if "instance-id" in machine.safe_data and machine.safe_data[
+                "instance-id"
+            ].startswith("manual:"):
+                self.log.debug("machine.destroy(force=True) started.")
+                await machine.destroy(force=True)
+                self.log.debug("machine.destroy(force=True) passed.")
+                # max timeout
+                end = time.time() + total_timeout
+                # wait for machine removal
+                machines = await model.get_machines()
+                while machine_id in machines and time.time() < end:
+                    self.log.debug(
+                        "Waiting for machine {} is destroyed".format(machine_id)
+                    )
+                    await asyncio.sleep(0.5)
+                    machines = await model.get_machines()
+                self.log.debug("Machine destroyed: {}".format(machine_id))
+        else:
+            self.log.debug("Machine not found: {}".format(machine_id))
+
+    async def _juju_destroy_model(self, model_name: str, total_timeout: float = None):
+
+        self.log.debug("Destroying model {}".format(model_name))
+
+        if total_timeout is None:
+            total_timeout = 3600
+        end = time.time() + total_timeout
+
+        model = await self._juju_get_model(model_name=model_name)
+
+        if not model:
+            raise N2VCNotFound(message="Model {} does not exist".format(model_name))
+
+        uuid = model.info.uuid
+
+        # destroy applications
+        for application_name in model.applications:
+            try:
+                await self._juju_destroy_application(
+                    model_name=model_name, application_name=application_name
+                )
+            except Exception as e:
+                self.log.error(
+                    "Error destroying application {} in model {}: {}".format(
+                        application_name, model_name, e
+                    )
+                )
+
+        # destroy machines
+        machines = await model.get_machines()
+        for machine_id in machines:
+            try:
+                await self._juju_destroy_machine(
+                    model_name=model_name, machine_id=machine_id
+                )
+            except asyncio.CancelledError:
+                raise
+            except Exception:
+                # ignore exceptions destroying machine
+                pass
+
+        await self._juju_disconnect_model(model_name=model_name)
+
+        self.log.debug("destroying model {}...".format(model_name))
+        await self.controller.destroy_model(uuid)
+        # self.log.debug('model destroy requested {}'.format(model_name))
+
+        # wait for model is completely destroyed
+        self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+        last_exception = ""
+        while time.time() < end:
+            try:
+                # await self.controller.get_model(uuid)
+                models = await self.controller.list_models()
+                if model_name not in models:
+                    self.log.debug(
+                        "The model {} ({}) was destroyed".format(model_name, uuid)
+                    )
+                    return
+            except asyncio.CancelledError:
+                raise
+            except Exception as e:
+                last_exception = e
+            await asyncio.sleep(5)
+        raise N2VCException(
+            "Timeout waiting for model {} to be destroyed {}".format(
+                model_name, last_exception
+            )
+        )
+
+    async def _juju_login(self):
+        """Connect to juju controller
+
+        """
+
+        # if already authenticated, exit function
+        if self._authenticated:
+            return
+
+        # if connecting, wait for finish
+        # another task could be trying to connect in parallel
+        while self._connecting:
+            await asyncio.sleep(0.1)
+
+        # double check after other task has finished
+        if self._authenticated:
+            return
+
+        try:
+            self._connecting = True
+            self.log.info(
+                "connecting to juju controller: {} {}:{}{}".format(
+                    self.url,
+                    self.username,
+                    self.secret[:8] + "...",
+                    " with ca_cert" if self.ca_cert else "",
+                )
+            )
+
+            # Create controller object
+            self.controller = Controller(loop=self.loop)
+            # Connect to controller
+            await self.controller.connect(
+                endpoint=self.url,
+                username=self.username,
+                password=self.secret,
+                cacert=self.ca_cert,
+            )
+            self._authenticated = True
+            self.log.info("juju controller connected")
+        except Exception as e:
+            message = "Exception connecting to juju: {}".format(e)
+            self.log.error(message)
+            raise N2VCConnectionException(message=message, url=self.url)
+        finally:
+            self._connecting = False
+
+    async def _juju_logout(self):
+        """Logout of the Juju controller."""
+        if not self._authenticated:
+            return False
+
+        # disconnect all models
+        for model_name in self.juju_models:
+            try:
+                await self._juju_disconnect_model(model_name)
+            except Exception as e:
+                self.log.error(
+                    "Error disconnecting model {} : {}".format(model_name, e)
+                )
+                # continue with next model...
+
+        self.log.info("Disconnecting controller")
+        try:
+            await self.controller.disconnect()
+        except Exception as e:
+            raise N2VCConnectionException(
+                message="Error disconnecting controller: {}".format(e), url=self.url
+            )
+
+        self.controller = None
+        self._authenticated = False
+        self.log.info("disconnected")
+
+    async def _juju_disconnect_model(self, model_name: str):
+        self.log.debug("Disconnecting model {}".format(model_name))
+        if model_name in self.juju_models:
+            await self.juju_models[model_name].disconnect()
+            self.juju_models[model_name] = None
+            self.juju_observers[model_name] = None
+        else:
+            self.warning("Cannot disconnect model: {}".format(model_name))
+
     def _create_juju_public_key(self):
         """Recreate the Juju public key on lcm container, if needed
         Certain libjuju commands expect to be run from the same machine as Juju