Initial refactor of N2VC

- Add libjuju.py to encapsulate all code related with the communication using libjuju
- Replace juju_observer with juju_watcher
- Add utils with some utilities like EntityType, FinalStatus, and JujuStatusToOSM

Change-Id: I5d46ad65d6ed37b49bf2504209bfdd46d1b5a7e4
Signed-off-by: David Garcia <david.garcia@canonical.com>
diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py
new file mode 100644
index 0000000..9945c91
--- /dev/null
+++ b/n2vc/libjuju.py
@@ -0,0 +1,806 @@
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import asyncio
+import logging
+from juju.controller import Controller
+from juju.client.connector import NoConnectionException
+from juju.client import client
+import time
+
+from juju.errors import JujuAPIError
+from juju.model import Model
+from juju.machine import Machine
+from juju.application import Application
+from juju.client._definitions import FullStatus
+from n2vc.juju_watcher import JujuModelWatcher
+from n2vc.provisioner import AsyncSSHProvisioner
+from n2vc.n2vc_conn import N2VCConnector
+from n2vc.exceptions import (
+    JujuMachineNotFound,
+    JujuApplicationNotFound,
+    JujuModelAlreadyExists,
+    JujuControllerFailedConnecting,
+    JujuApplicationExists,
+)
+
+
+class Libjuju:
+    def __init__(
+        self,
+        endpoint: str,
+        api_proxy: str,
+        username: str,
+        password: str,
+        cacert: str,
+        loop: asyncio.AbstractEventLoop = None,
+        log: logging.Logger = None,
+        db: dict = None,
+        n2vc: N2VCConnector = None,
+        apt_mirror: str = None,
+        enable_os_upgrade: bool = True,
+    ):
+        """
+        Constructor
+
+        :param: endpoint:               Endpoint of the juju controller (host:port)
+        :param: api_proxy:              Endpoint of the juju controller - Reachable from the VNFs
+        :param: username:               Juju username
+        :param: password:               Juju password
+        :param: cacert:                 Juju CA Certificate
+        :param: loop:                   Asyncio loop
+        :param: log:                    Logger
+        :param: db:                     DB object
+        :param: n2vc:                   N2VC object
+        :param: apt_mirror:             APT Mirror
+        :param: enable_os_upgrade:      Enable OS Upgrade
+        """
+
+        self.endpoints = [endpoint]  # TODO: Store and get endpoints from DB
+        self.api_proxy = api_proxy
+        self.username = username
+        self.password = password
+        self.cacert = cacert
+        self.loop = loop or asyncio.get_event_loop()
+        self.log = log or logging.getLogger("Libjuju")
+        self.db = db
+        self.n2vc = n2vc
+
+        # Generate config for models
+        self.model_config = {}
+        if apt_mirror:
+            self.model_config["apt-mirror"] = apt_mirror
+        self.model_config["enable-os-refresh-update"] = enable_os_upgrade
+        self.model_config["enable-os-upgrade"] = enable_os_upgrade
+
+        self.reconnecting = asyncio.Lock(loop=self.loop)
+        self.creating_model = asyncio.Lock(loop=self.loop)
+
+        self.models = set()
+        self.controller = Controller(loop=self.loop)
+
+        self.loop.run_until_complete(self.connect())
+
+    async def connect(self):
+        """Connect to the controller"""
+
+        self.log.debug("Connecting from controller")
+        await self.controller.connect(
+            endpoint=self.endpoints,
+            username=self.username,
+            password=self.password,
+            cacert=self.cacert,
+        )
+        e = self.controller.connection().endpoint
+        self.log.info("Connected to controller: {}".format(e))
+
+    async def disconnect(self):
+        """Disconnect from controller"""
+
+        self.log.debug("Disconnecting from controller")
+        await self.controller.disconnect()
+        self.log.info("Disconnected from controller")
+
+    def controller_connected(self) -> bool:
+        """Check if the controller connection is open
+
+        :return: bool: True if connected, False if not connected
+        """
+
+        is_connected = False
+        try:
+            is_connected = self.controller.connection().is_open
+        except NoConnectionException:
+            self.log.warning("VCA not connected")
+        return is_connected
+
+    async def disconnect_model(self, model: Model):
+        """
+        Disconnect model
+
+        :param: model: Model that will be disconnected
+        """
+        try:
+            await model.disconnect()
+        except Exception:
+            pass
+
+    async def _reconnect(
+        self,
+        retry: bool = False,
+        timeout: int = 5,
+        time_between_retries: int = 3,
+        maximum_retries: int = 0,
+    ):
+        """
+        Reconnect to the controller
+
+        :param: retry:                  Set it to True to retry if the connection fails
+        :param: time_between_retries:   Time in seconds between retries
+        :param: maximum_retries         Maximum retries. If not set, it will retry forever
+
+        :raises: Exception if cannot connect to the controller
+        """
+
+        if self.reconnecting.locked():
+            # Return if another function is trying to reconnect
+            return
+        async with self.reconnecting:
+            attempt = 0
+            while True:
+                try:
+                    await asyncio.wait_for(self.connect(), timeout=timeout)
+                    break
+                except asyncio.TimeoutError:
+                    self.log.error("Error reconnecting to controller: Timeout")
+                except Exception as e:
+                    self.log.error("Error reconnecting to controller: {}".format(e))
+
+                attempt += 1
+                maximum_retries_reached = attempt == maximum_retries
+
+                if not retry or maximum_retries_reached:
+                    raise JujuControllerFailedConnecting("Controller is not connected")
+                else:
+                    await asyncio.sleep(time_between_retries)
+
+    async def add_model(self, model_name: str, cloud_name: str):
+        """
+        Create model
+
+        :param: model_name: Model name
+        :param: cloud_name: Cloud name
+        """
+
+        # Reconnect to the controller if not connected
+        if not self.controller_connected():
+            await self._reconnect()
+
+        # Raise exception if model already exists
+        if await self.model_exists(model_name):
+            raise JujuModelAlreadyExists("Model {} already exists.".format(model_name))
+
+        # Block until other workers have finished model creation
+        while self.creating_model.locked():
+            await asyncio.sleep(0.1)
+
+        # If the model exists, return it from the controller
+        if model_name in self.models:
+            return await self.get_model(model_name)
+
+        # Create the model
+        self.log.debug("Creating model {}".format(model_name))
+        async with self.creating_model:
+            model = await self.controller.add_model(
+                model_name,
+                config=self.model_config,
+                cloud_name=cloud_name,
+                credential_name=cloud_name,
+            )
+            await self.disconnect_model(model)
+            self.models.add(model_name)
+
+    async def get_model(self, model_name: str) -> Model:
+        """
+        Get model from controller
+
+        :param: model_name: Model name
+
+        :return: Model: The created Juju model object
+        """
+
+        # Check if controller is connected
+        if not self.controller_connected():
+            await self._reconnect()
+        return await self.controller.get_model(model_name)
+
+    async def model_exists(self, model_name: str) -> bool:
+        """
+        Check if model exists
+
+        :param: model_name: Model name
+
+        :return bool
+        """
+
+        # Check if controller is connected
+        if not self.controller_connected():
+            await self._reconnect()
+
+        return model_name in await self.controller.list_models()
+
+    async def get_model_status(self, model_name: str) -> FullStatus:
+        """
+        Get model status
+
+        :param: model_name: Model name
+
+        :return: Full status object
+        """
+        model = await self.get_model(model_name)
+        status = await model.get_status()
+        await self.disconnect_model(model)
+        return status
+
+    async def create_machine(
+        self,
+        model_name: str,
+        machine_id: str = None,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+        series: str = "xenial",
+    ) -> (Machine, bool):
+        """
+        Create machine
+
+        :param: model_name:         Model name
+        :param: machine_id:         Machine id
+        :param: db_dict:            Dictionary with data of the DB to write the updates
+        :param: progress_timeout:   Maximum time between two updates in the model
+        :param: total_timeout:      Timeout for the entity to be active
+
+        :return: (juju.machine.Machine, bool):  Machine object and a boolean saying
+                                                if the machine is new or it already existed
+        """
+        new = False
+        machine = None
+
+        self.log.debug(
+            "Creating machine (id={}) in model: {}".format(machine_id, model_name)
+        )
+
+        # Get model
+        model = await self.get_model(model_name)
+        try:
+            if machine_id is not None:
+                self.log.debug(
+                    "Searching machine (id={}) in model {}".format(
+                        machine_id, model_name
+                    )
+                )
+
+                # Get machines from model and get the machine with machine_id if exists
+                machines = await model.get_machines()
+                if machine_id in machines:
+                    self.log.debug(
+                        "Machine (id={}) found in model {}".format(
+                            machine_id, model_name
+                        )
+                    )
+                    machine = model.machines[machine_id]
+                else:
+                    raise JujuMachineNotFound("Machine {} not found".format(machine_id))
+
+            if machine is None:
+                self.log.debug("Creating a new machine in model {}".format(model_name))
+
+                # Create machine
+                machine = await model.add_machine(
+                    spec=None, constraints=None, disks=None, series=series
+                )
+                new = True
+
+                # Wait until the machine is ready
+                await JujuModelWatcher.wait_for(
+                    model=model,
+                    entity=machine,
+                    progress_timeout=progress_timeout,
+                    total_timeout=total_timeout,
+                    db_dict=db_dict,
+                    n2vc=self.n2vc,
+                )
+        except Exception as e:
+            raise e
+        finally:
+            await self.disconnect_model(model)
+
+        self.log.debug("Machine ready at {}".format(machine.dns_name))
+        return machine, new
+
+    async def provision_machine(
+        self,
+        model_name: str,
+        hostname: str,
+        username: str,
+        private_key_path: str,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+    ) -> str:
+        """
+        Manually provisioning of a machine
+
+        :param: model_name:         Model name
+        :param: hostname:           IP to access the machine
+        :param: username:           Username to login to the machine
+        :param: private_key_path:   Local path for the private key
+        :param: db_dict:            Dictionary with data of the DB to write the updates
+        :param: progress_timeout:   Maximum time between two updates in the model
+        :param: total_timeout:      Timeout for the entity to be active
+
+        :return: (Entity): Machine id
+        """
+        self.log.debug(
+            "Provisioning machine. model: {}, hostname: {}, username: {}".format(
+                model_name, hostname, username
+            )
+        )
+
+        # Get model
+        model = await self.get_model(model_name)
+
+        try:
+            # Get provisioner
+            provisioner = AsyncSSHProvisioner(
+                host=hostname,
+                user=username,
+                private_key_path=private_key_path,
+                log=self.log,
+            )
+
+            # Provision machine
+            params = await provisioner.provision_machine()
+
+            params.jobs = ["JobHostUnits"]
+
+            self.log.debug("Adding machine to model")
+            connection = model.connection()
+            client_facade = client.ClientFacade.from_connection(connection)
+
+            results = await client_facade.AddMachines(params=[params])
+            error = results.machines[0].error
+
+            if error:
+                msg = "Error adding machine: {}".format(error.message)
+                self.log.error(msg=msg)
+                raise ValueError(msg)
+
+            machine_id = results.machines[0].machine
+
+            self.log.debug("Installing Juju agent into machine {}".format(machine_id))
+            asyncio.ensure_future(
+                provisioner.install_agent(
+                    connection=connection,
+                    nonce=params.nonce,
+                    machine_id=machine_id,
+                    api=self.api_proxy,
+                )
+            )
+
+            machine = None
+            for _ in range(10):
+                machine_list = await model.get_machines()
+                if machine_id in machine_list:
+                    self.log.debug("Machine {} found in model!".format(machine_id))
+                    machine = model.machines.get(machine_id)
+                    break
+                await asyncio.sleep(2)
+
+            if machine is None:
+                msg = "Machine {} not found in model".format(machine_id)
+                self.log.error(msg=msg)
+                raise JujuMachineNotFound(msg)
+
+            await JujuModelWatcher.wait_for(
+                model=model,
+                entity=machine,
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+                db_dict=db_dict,
+                n2vc=self.n2vc,
+            )
+        except Exception as e:
+            raise e
+        finally:
+            await self.disconnect_model(model)
+
+        self.log.debug("Machine provisioned {}".format(machine_id))
+
+        return machine_id
+
+    async def deploy_charm(
+        self,
+        application_name: str,
+        path: str,
+        model_name: str,
+        machine_id: str,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+        config: dict = None,
+        series: str = None,
+    ):
+        """Deploy charm
+
+        :param: application_name:   Application name
+        :param: path:               Local path to the charm
+        :param: model_name:         Model name
+        :param: machine_id          ID of the machine
+        :param: db_dict:            Dictionary with data of the DB to write the updates
+        :param: progress_timeout:   Maximum time between two updates in the model
+        :param: total_timeout:      Timeout for the entity to be active
+        :param: config:             Config for the charm
+        :param: series:             Series of the charm
+
+        :return: (juju.application.Application): Juju application
+        """
+
+        # Get model
+        model = await self.get_model(model_name)
+
+        try:
+            application = None
+            if application_name not in model.applications:
+                self.log.debug(
+                    "Deploying charm {} to machine {} in model ~{}".format(
+                        application_name, machine_id, model_name
+                    )
+                )
+                self.log.debug("charm: {}".format(path))
+                if machine_id is not None:
+                    if machine_id not in model.machines:
+                        msg = "Machine {} not found in model".format(machine_id)
+                        self.log.error(msg=msg)
+                        raise JujuMachineNotFound(msg)
+                    machine = model.machines[machine_id]
+                    series = machine.series
+
+                application = await model.deploy(
+                    entity_url=path,
+                    application_name=application_name,
+                    channel="stable",
+                    num_units=1,
+                    series=series,
+                    to=machine_id,
+                    config=config,
+                )
+
+                await JujuModelWatcher.wait_for(
+                    model=model,
+                    entity=application,
+                    progress_timeout=progress_timeout,
+                    total_timeout=total_timeout,
+                    db_dict=db_dict,
+                    n2vc=self.n2vc,
+                )
+            else:
+                raise JujuApplicationExists("Application {} exists".format(application_name))
+
+        except Exception as e:
+            raise e
+        finally:
+            await self.disconnect_model(model)
+
+        self.log.debug("application deployed")
+
+        return application
+
+    async def _get_application(
+        self, model: Model, application_name: str
+    ) -> Application:
+        """Get application
+
+        :param: model:              Model object
+        :param: application_name:   Application name
+
+        :return: juju.application.Application (or None if it doesn't exist)
+        """
+        if model.applications and application_name in model.applications:
+            return model.applications[application_name]
+
+    async def execute_action(
+        self,
+        application_name: str,
+        model_name: str,
+        action_name: str,
+        db_dict: dict = None,
+        progress_timeout: float = None,
+        total_timeout: float = None,
+        **kwargs
+    ):
+        """Execute action
+
+        :param: application_name:   Application name
+        :param: model_name:         Model name
+        :param: cloud_name:         Cloud name
+        :param: action_name:        Name of the action
+        :param: db_dict:            Dictionary with data of the DB to write the updates
+        :param: progress_timeout:   Maximum time between two updates in the model
+        :param: total_timeout:      Timeout for the entity to be active
+
+        :return: (str, str): (output and status)
+        """
+        # Get model and observer
+        model = await self.get_model(model_name)
+
+        try:
+            # Get application
+            application = await self._get_application(
+                model, application_name=application_name,
+            )
+            if application is None:
+                raise JujuApplicationNotFound("Cannot execute action")
+
+            # Get unit
+            unit = None
+            for u in application.units:
+                if await u.is_leader_from_status():
+                    unit = u
+            if unit is None:
+                raise Exception("Cannot execute action: leader unit not found")
+
+            actions = await application.get_actions()
+
+            if action_name not in actions:
+                raise Exception(
+                    "Action {} not in available actions".format(action_name)
+                )
+
+            self.log.debug(
+                "Executing action {} using params {}".format(action_name, kwargs)
+            )
+            action = await unit.run_action(action_name, **kwargs)
+
+            # Register action with observer and wait for it to finish
+            await JujuModelWatcher.wait_for(
+                model=model,
+                entity=action,
+                progress_timeout=progress_timeout,
+                total_timeout=total_timeout,
+                db_dict=db_dict,
+                n2vc=self.n2vc,
+            )
+            output = await model.get_action_output(action_uuid=action.entity_id)
+            status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+            status = (
+                status[action.entity_id] if action.entity_id in status else "failed"
+            )
+
+            self.log.debug("action completed with status: {}".format(action.status))
+        except Exception as e:
+            raise e
+        finally:
+            await self.disconnect_model(model)
+
+        return output, status
+
+    async def get_actions(self, application_name: str, model_name: str) -> dict:
+        """Get list of actions
+
+        :param: application_name: Application name
+        :param: model_name: Model name
+
+        :return: Dict with this format
+            {
+                "action_name": "Description of the action",
+                ...
+            }
+        """
+
+        # Get model
+        model = await self.get_model(model_name)
+
+        # Get application
+        application = await self._get_application(
+            model, application_name=application_name,
+        )
+
+        # Get list of actions
+        actions = await application.get_actions()
+
+        # Disconnect from model
+        await self.disconnect_model(model)
+
+        return actions
+
+    async def add_relation(
+        self,
+        model_name: str,
+        application_name_1: str,
+        application_name_2: str,
+        relation_1: str,
+        relation_2: str,
+    ):
+        """Add relation
+
+        :param: model_name:             Model name
+        :param: application_name_1      First application name
+        :param: application_name_2:     Second application name
+        :param: relation_1:             First relation name
+        :param: relation_2:             Second relation name
+        """
+
+        # Get model
+        model = await self.get_model(model_name)
+
+        # Build relation strings
+        r1 = "{}:{}".format(application_name_1, relation_1)
+        r2 = "{}:{}".format(application_name_2, relation_2)
+
+        # Add relation
+        self.log.debug("Adding relation: {} -> {}".format(r1, r2))
+        try:
+            await model.add_relation(relation1=r1, relation2=r2)
+        except JujuAPIError as e:
+            if "not found" in e.message:
+                self.log.warning("Relation not found: {}".format(e.message))
+                return
+            if "already exists" in e.message:
+                self.log.warning("Relation already exists: {}".format(e.message))
+                return
+            # another exception, raise it
+            raise e
+        finally:
+            await self.disconnect_model(model)
+
+    async def destroy_model(
+        self, model_name: str, total_timeout: float,
+    ):
+        """
+        Destroy model
+
+        :param: model_name:     Model name
+        :param: total_timeout:  Timeout
+        """
+        model = await self.get_model(model_name)
+        uuid = model.info.uuid
+
+        # Destroy applications
+        for application_name in model.applications:
+            try:
+                await self.destroy_application(
+                    model, application_name=application_name,
+                )
+            except Exception as e:
+                self.log.error(
+                    "Error destroying application {} in model {}: {}".format(
+                        application_name, model_name, e
+                    )
+                )
+
+        # Destroy machines
+        machines = await model.get_machines()
+        for machine_id in machines:
+            try:
+                await self.destroy_machine(
+                    model, machine_id=machine_id, total_timeout=total_timeout,
+                )
+            except asyncio.CancelledError:
+                raise
+            except Exception:
+                pass
+
+        # Disconnect model
+        await self.disconnect_model(model)
+
+        # Destroy model
+        self.models.remove(model_name)
+        await self.controller.destroy_model(uuid)
+
+        # Wait until model is destroyed
+        self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+        last_exception = ""
+
+        if total_timeout is None:
+            total_timeout = 3600
+        end = time.time() + total_timeout
+        while time.time() < end:
+            try:
+                models = await self.controller.list_models()
+                if model_name not in models:
+                    self.log.debug(
+                        "The model {} ({}) was destroyed".format(model_name, uuid)
+                    )
+                    return
+            except asyncio.CancelledError:
+                raise
+            except Exception as e:
+                last_exception = e
+            await asyncio.sleep(5)
+        raise Exception(
+            "Timeout waiting for model {} to be destroyed {}".format(
+                model_name, last_exception
+            )
+        )
+
+    async def destroy_application(self, model: Model, application_name: str):
+        """
+        Destroy application
+
+        :param: model:              Model object
+        :param: application_name:   Application name
+        """
+        self.log.debug(
+            "Destroying application {} in model {}".format(
+                application_name, model.info.name
+            )
+        )
+        application = model.applications.get(application_name)
+        if application:
+            await application.destroy()
+        else:
+            self.log.warning("Application not found: {}".format(application_name))
+
+    async def destroy_machine(
+        self, model: Model, machine_id: str, total_timeout: float = 3600
+    ):
+        """
+        Destroy machine
+
+        :param: model:          Model object
+        :param: machine_id:     Machine id
+        :param: total_timeout:  Timeout in seconds
+        """
+        machines = await model.get_machines()
+        if machine_id in machines:
+            machine = model.machines[machine_id]
+            # TODO: change this by machine.is_manual when this is upstreamed:
+            # https://github.com/juju/python-libjuju/pull/396
+            if "instance-id" in machine.safe_data and machine.safe_data[
+                "instance-id"
+            ].startswith("manual:"):
+                await machine.destroy(force=True)
+
+                # max timeout
+                end = time.time() + total_timeout
+
+                # wait for machine removal
+                machines = await model.get_machines()
+                while machine_id in machines and time.time() < end:
+                    self.log.debug(
+                        "Waiting for machine {} is destroyed".format(machine_id)
+                    )
+                    await asyncio.sleep(0.5)
+                    machines = await model.get_machines()
+                self.log.debug("Machine destroyed: {}".format(machine_id))
+        else:
+            self.log.debug("Machine not found: {}".format(machine_id))
+
+    async def configure_application(
+        self, model_name: str, application_name: str, config: dict = None
+    ):
+        """Configure application
+
+        :param: model_name:         Model name
+        :param: application_name:   Application name
+        :param: config:             Config to apply to the charm
+        """
+        if config:
+            model = await self.get_model(model_name)
+            application = await self._get_application(
+                model, application_name=application_name,
+            )
+            await application.set_config(config)
+            await self.disconnect_model(model)