Feature 8720: Add scale support
[osm/N2VC.git] / n2vc / libjuju.py
index 9945c91..b0e1358 100644 (file)
@@ -15,7 +15,6 @@
 import asyncio
 import logging
 from juju.controller import Controller
-from juju.client.connector import NoConnectionException
 from juju.client import client
 import time
 
@@ -34,6 +33,8 @@ from n2vc.exceptions import (
     JujuControllerFailedConnecting,
     JujuApplicationExists,
 )
+from n2vc.utils import DB_DATA
+from osm_common.dbbase import DbException
 
 
 class Libjuju:
@@ -67,14 +68,14 @@ class Libjuju:
         :param: enable_os_upgrade:      Enable OS Upgrade
         """
 
-        self.endpoints = [endpoint]  # TODO: Store and get endpoints from DB
+        self.log = log or logging.getLogger("Libjuju")
+        self.db = db
+        self.endpoints = self._get_api_endpoints_db() or [endpoint]
         self.api_proxy = api_proxy
         self.username = username
         self.password = password
         self.cacert = cacert
         self.loop = loop or asyncio.get_event_loop()
-        self.log = log or logging.getLogger("Libjuju")
-        self.db = db
         self.n2vc = n2vc
 
         # Generate config for models
@@ -84,46 +85,52 @@ class Libjuju:
         self.model_config["enable-os-refresh-update"] = enable_os_upgrade
         self.model_config["enable-os-upgrade"] = enable_os_upgrade
 
-        self.reconnecting = asyncio.Lock(loop=self.loop)
+        self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
         self.models = set()
-        self.controller = Controller(loop=self.loop)
-
-        self.loop.run_until_complete(self.connect())
-
-    async def connect(self):
-        """Connect to the controller"""
-
-        self.log.debug("Connecting from controller")
-        await self.controller.connect(
-            endpoint=self.endpoints,
-            username=self.username,
-            password=self.password,
-            cacert=self.cacert,
-        )
-        e = self.controller.connection().endpoint
-        self.log.info("Connected to controller: {}".format(e))
-
-    async def disconnect(self):
-        """Disconnect from controller"""
-
-        self.log.debug("Disconnecting from controller")
-        await self.controller.disconnect()
-        self.log.info("Disconnected from controller")
+        self.log.debug("Libjuju initialized!")
 
-    def controller_connected(self) -> bool:
-        """Check if the controller connection is open
+        self.health_check_task = self.loop.create_task(self.health_check())
 
-        :return: bool: True if connected, False if not connected
+    async def get_controller(self, timeout: float = 5.0) -> Controller:
         """
+        Get controller
 
-        is_connected = False
+        :param: timeout: Time in seconds to wait for controller to connect
+        """
+        controller = None
         try:
-            is_connected = self.controller.connection().is_open
-        except NoConnectionException:
-            self.log.warning("VCA not connected")
-        return is_connected
+            controller = Controller(loop=self.loop)
+            await asyncio.wait_for(
+                controller.connect(
+                    endpoint=self.endpoints,
+                    username=self.username,
+                    password=self.password,
+                    cacert=self.cacert,
+                ),
+                timeout=timeout,
+            )
+            endpoints = await controller.api_endpoints
+            if self.endpoints != endpoints:
+                self.endpoints = endpoints
+                self._update_api_endpoints_db(self.endpoints)
+            return controller
+        except asyncio.CancelledError as e:
+            raise e
+        except Exception as e:
+            self.log.error(
+                "Failed connecting to controller: {}...".format(self.endpoints)
+            )
+            if controller:
+                await self.disconnect_controller(controller)
+            raise JujuControllerFailedConnecting(e)
+
+    async def disconnect(self):
+        """Disconnect"""
+        # Cancel health check task
+        self.health_check_task.cancel()
+        self.log.debug("Libjuju disconnected!")
 
     async def disconnect_model(self, model: Model):
         """
@@ -131,49 +138,15 @@ class Libjuju:
 
         :param: model: Model that will be disconnected
         """
-        try:
-            await model.disconnect()
-        except Exception:
-            pass
+        await model.disconnect()
 
-    async def _reconnect(
-        self,
-        retry: bool = False,
-        timeout: int = 5,
-        time_between_retries: int = 3,
-        maximum_retries: int = 0,
-    ):
+    async def disconnect_controller(self, controller: Controller):
         """
-        Reconnect to the controller
-
-        :param: retry:                  Set it to True to retry if the connection fails
-        :param: time_between_retries:   Time in seconds between retries
-        :param: maximum_retries         Maximum retries. If not set, it will retry forever
+        Disconnect controller
 
-        :raises: Exception if cannot connect to the controller
+        :param: controller: Controller that will be disconnected
         """
-
-        if self.reconnecting.locked():
-            # Return if another function is trying to reconnect
-            return
-        async with self.reconnecting:
-            attempt = 0
-            while True:
-                try:
-                    await asyncio.wait_for(self.connect(), timeout=timeout)
-                    break
-                except asyncio.TimeoutError:
-                    self.log.error("Error reconnecting to controller: Timeout")
-                except Exception as e:
-                    self.log.error("Error reconnecting to controller: {}".format(e))
-
-                attempt += 1
-                maximum_retries_reached = attempt == maximum_retries
-
-                if not retry or maximum_retries_reached:
-                    raise JujuControllerFailedConnecting("Controller is not connected")
-                else:
-                    await asyncio.sleep(time_between_retries)
+        await controller.disconnect()
 
     async def add_model(self, model_name: str, cloud_name: str):
         """
@@ -183,62 +156,76 @@ class Libjuju:
         :param: cloud_name: Cloud name
         """
 
-        # Reconnect to the controller if not connected
-        if not self.controller_connected():
-            await self._reconnect()
-
-        # Raise exception if model already exists
-        if await self.model_exists(model_name):
-            raise JujuModelAlreadyExists("Model {} already exists.".format(model_name))
+        # Get controller
+        controller = await self.get_controller()
+        model = None
+        try:
+            # Raise exception if model already exists
+            if await self.model_exists(model_name, controller=controller):
+                raise JujuModelAlreadyExists(
+                    "Model {} already exists.".format(model_name)
+                )
 
-        # Block until other workers have finished model creation
-        while self.creating_model.locked():
-            await asyncio.sleep(0.1)
+            # Block until other workers have finished model creation
+            while self.creating_model.locked():
+                await asyncio.sleep(0.1)
 
-        # If the model exists, return it from the controller
-        if model_name in self.models:
-            return await self.get_model(model_name)
+            # If the model exists, return it from the controller
+            if model_name in self.models:
+                return
 
-        # Create the model
-        self.log.debug("Creating model {}".format(model_name))
-        async with self.creating_model:
-            model = await self.controller.add_model(
-                model_name,
-                config=self.model_config,
-                cloud_name=cloud_name,
-                credential_name=cloud_name,
-            )
-            await self.disconnect_model(model)
-            self.models.add(model_name)
+            # Create the model
+            async with self.creating_model:
+                self.log.debug("Creating model {}".format(model_name))
+                model = await controller.add_model(
+                    model_name,
+                    config=self.model_config,
+                    cloud_name=cloud_name,
+                    credential_name=cloud_name,
+                )
+                self.models.add(model_name)
+        finally:
+            if model:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
-    async def get_model(self, model_name: str) -> Model:
+    async def get_model(
+        self, controller: Controller, model_name: str, id=None
+    ) -> Model:
         """
         Get model from controller
 
+        :param: controller: Controller
         :param: model_name: Model name
 
         :return: Model: The created Juju model object
         """
+        return await controller.get_model(model_name)
 
-        # Check if controller is connected
-        if not self.controller_connected():
-            await self._reconnect()
-        return await self.controller.get_model(model_name)
-
-    async def model_exists(self, model_name: str) -> bool:
+    async def model_exists(
+        self, model_name: str, controller: Controller = None
+    ) -> bool:
         """
         Check if model exists
 
+        :param: controller: Controller
         :param: model_name: Model name
 
         :return bool
         """
+        need_to_disconnect = False
 
-        # Check if controller is connected
-        if not self.controller_connected():
-            await self._reconnect()
+        # Get controller if not passed
+        if not controller:
+            controller = await self.get_controller()
+            need_to_disconnect = True
 
-        return model_name in await self.controller.list_models()
+        # Check if model exists
+        try:
+            return model_name in await controller.list_models()
+        finally:
+            if need_to_disconnect:
+                await self.disconnect_controller(controller)
 
     async def get_model_status(self, model_name: str) -> FullStatus:
         """
@@ -248,10 +235,13 @@ class Libjuju:
 
         :return: Full status object
         """
-        model = await self.get_model(model_name)
-        status = await model.get_status()
-        await self.disconnect_model(model)
-        return status
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            return await model.get_status()
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
     async def create_machine(
         self,
@@ -261,6 +251,7 @@ class Libjuju:
         progress_timeout: float = None,
         total_timeout: float = None,
         series: str = "xenial",
+        wait: bool = True,
     ) -> (Machine, bool):
         """
         Create machine
@@ -270,6 +261,8 @@ class Libjuju:
         :param: db_dict:            Dictionary with data of the DB to write the updates
         :param: progress_timeout:   Maximum time between two updates in the model
         :param: total_timeout:      Timeout for the entity to be active
+        :param: series:             Series of the machine (xenial, bionic, focal, ...)
+        :param: wait:               Wait until machine is ready
 
         :return: (juju.machine.Machine, bool):  Machine object and a boolean saying
                                                 if the machine is new or it already existed
@@ -281,8 +274,11 @@ class Libjuju:
             "Creating machine (id={}) in model: {}".format(machine_id, model_name)
         )
 
+        # Get controller
+        controller = await self.get_controller()
+
         # Get model
-        model = await self.get_model(model_name)
+        model = await self.get_model(controller, model_name)
         try:
             if machine_id is not None:
                 self.log.debug(
@@ -313,20 +309,29 @@ class Libjuju:
                 new = True
 
                 # Wait until the machine is ready
-                await JujuModelWatcher.wait_for(
-                    model=model,
-                    entity=machine,
-                    progress_timeout=progress_timeout,
-                    total_timeout=total_timeout,
-                    db_dict=db_dict,
-                    n2vc=self.n2vc,
+                self.log.debug(
+                    "Wait until machine {} is ready in model {}".format(
+                        machine.entity_id, model_name
+                    )
                 )
-        except Exception as e:
-            raise e
+                if wait:
+                    await JujuModelWatcher.wait_for(
+                        model=model,
+                        entity=machine,
+                        progress_timeout=progress_timeout,
+                        total_timeout=total_timeout,
+                        db_dict=db_dict,
+                        n2vc=self.n2vc,
+                    )
         finally:
             await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
-        self.log.debug("Machine ready at {}".format(machine.dns_name))
+        self.log.debug(
+            "Machine {} ready at {} in model {}".format(
+                machine.entity_id, machine.dns_name, model_name
+            )
+        )
         return machine, new
 
     async def provision_machine(
@@ -358,8 +363,11 @@ class Libjuju:
             )
         )
 
+        # Get controller
+        controller = await self.get_controller()
+
         # Get model
-        model = await self.get_model(model_name)
+        model = await self.get_model(controller, model_name)
 
         try:
             # Get provisioner
@@ -413,6 +421,11 @@ class Libjuju:
                 self.log.error(msg=msg)
                 raise JujuMachineNotFound(msg)
 
+            self.log.debug(
+                "Wait until machine {} is ready in model {}".format(
+                    machine.entity_id, model_name
+                )
+            )
             await JujuModelWatcher.wait_for(
                 model=model,
                 entity=machine,
@@ -425,8 +438,11 @@ class Libjuju:
             raise e
         finally:
             await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
-        self.log.debug("Machine provisioned {}".format(machine_id))
+        self.log.debug(
+            "Machine provisioned {} in model {}".format(machine_id, model_name)
+        )
 
         return machine_id
 
@@ -441,6 +457,7 @@ class Libjuju:
         total_timeout: float = None,
         config: dict = None,
         series: str = None,
+        num_units: int = 1,
     ):
         """Deploy charm
 
@@ -453,22 +470,27 @@ class Libjuju:
         :param: total_timeout:      Timeout for the entity to be active
         :param: config:             Config for the charm
         :param: series:             Series of the charm
+        :param: num_units:          Number of units
 
         :return: (juju.application.Application): Juju application
         """
+        self.log.debug(
+            "Deploying charm {} to machine {} in model ~{}".format(
+                application_name, machine_id, model_name
+            )
+        )
+        self.log.debug("charm: {}".format(path))
+
+        # Get controller
+        controller = await self.get_controller()
 
         # Get model
-        model = await self.get_model(model_name)
+        model = await self.get_model(controller, model_name)
 
         try:
             application = None
             if application_name not in model.applications:
-                self.log.debug(
-                    "Deploying charm {} to machine {} in model ~{}".format(
-                        application_name, machine_id, model_name
-                    )
-                )
-                self.log.debug("charm: {}".format(path))
+
                 if machine_id is not None:
                     if machine_id not in model.machines:
                         msg = "Machine {} not found in model".format(machine_id)
@@ -487,6 +509,16 @@ class Libjuju:
                     config=config,
                 )
 
+                self.log.debug(
+                    "Wait until application {} is ready in model {}".format(
+                        application_name, model_name
+                    )
+                )
+                if num_units > 1:
+                    for _ in range(num_units - 1):
+                        m, _ = await self.create_machine(model_name, wait=False)
+                        await application.add_unit(to=m.entity_id)
+
                 await JujuModelWatcher.wait_for(
                     model=model,
                     entity=application,
@@ -495,21 +527,22 @@ class Libjuju:
                     db_dict=db_dict,
                     n2vc=self.n2vc,
                 )
+                self.log.debug(
+                    "Application {} is ready in model {}".format(
+                        application_name, model_name
+                    )
+                )
             else:
-                raise JujuApplicationExists("Application {} exists".format(application_name))
-
-        except Exception as e:
-            raise e
+                raise JujuApplicationExists(
+                    "Application {} exists".format(application_name)
+                )
         finally:
             await self.disconnect_model(model)
-
-        self.log.debug("application deployed")
+            await self.disconnect_controller(controller)
 
         return application
 
-    async def _get_application(
-        self, model: Model, application_name: str
-    ) -> Application:
+    def _get_application(self, model: Model, application_name: str) -> Application:
         """Get application
 
         :param: model:              Model object
@@ -542,12 +575,18 @@ class Libjuju:
 
         :return: (str, str): (output and status)
         """
-        # Get model and observer
-        model = await self.get_model(model_name)
+        self.log.debug(
+            "Executing action {} using params {}".format(action_name, kwargs)
+        )
+        # Get controller
+        controller = await self.get_controller()
+
+        # Get model
+        model = await self.get_model(controller, model_name)
 
         try:
             # Get application
-            application = await self._get_application(
+            application = self._get_application(
                 model, application_name=application_name,
             )
             if application is None:
@@ -568,12 +607,13 @@ class Libjuju:
                     "Action {} not in available actions".format(action_name)
                 )
 
-            self.log.debug(
-                "Executing action {} using params {}".format(action_name, kwargs)
-            )
             action = await unit.run_action(action_name, **kwargs)
 
-            # Register action with observer and wait for it to finish
+            self.log.debug(
+                "Wait until action {} is completed in application {} (model={})".format(
+                    action_name, application_name, model_name
+                )
+            )
             await JujuModelWatcher.wait_for(
                 model=model,
                 entity=action,
@@ -582,17 +622,23 @@ class Libjuju:
                 db_dict=db_dict,
                 n2vc=self.n2vc,
             )
+
             output = await model.get_action_output(action_uuid=action.entity_id)
             status = await model.get_action_status(uuid_or_prefix=action.entity_id)
             status = (
                 status[action.entity_id] if action.entity_id in status else "failed"
             )
 
-            self.log.debug("action completed with status: {}".format(action.status))
+            self.log.debug(
+                "Action {} completed with status {} in application {} (model={})".format(
+                    action_name, action.status, application_name, model_name
+                )
+            )
         except Exception as e:
             raise e
         finally:
             await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
         return output, status
 
@@ -608,22 +654,29 @@ class Libjuju:
                 ...
             }
         """
+        self.log.debug(
+            "Getting list of actions for application {}".format(application_name)
+        )
 
-        # Get model
-        model = await self.get_model(model_name)
+        # Get controller
+        controller = await self.get_controller()
 
-        # Get application
-        application = await self._get_application(
-            model, application_name=application_name,
-        )
+        # Get model
+        model = await self.get_model(controller, model_name)
 
-        # Get list of actions
-        actions = await application.get_actions()
+        try:
+            # Get application
+            application = self._get_application(
+                model, application_name=application_name,
+            )
 
-        # Disconnect from model
-        await self.disconnect_model(model)
+            # Return list of actions
+            return await application.get_actions()
 
-        return actions
+        finally:
+            # Disconnect from model and controller
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
     async def add_relation(
         self,
@@ -642,15 +695,19 @@ class Libjuju:
         :param: relation_2:             Second relation name
         """
 
+        self.log.debug("Adding relation: {} -> {}".format(relation_1, relation_2))
+
+        # Get controller
+        controller = await self.get_controller()
+
         # Get model
-        model = await self.get_model(model_name)
+        model = await self.get_model(controller, model_name)
 
         # Build relation strings
         r1 = "{}:{}".format(application_name_1, relation_1)
         r2 = "{}:{}".format(application_name_2, relation_2)
 
         # Add relation
-        self.log.debug("Adding relation: {} -> {}".format(r1, r2))
         try:
             await model.add_relation(relation1=r1, relation2=r2)
         except JujuAPIError as e:
@@ -664,76 +721,83 @@ class Libjuju:
             raise e
         finally:
             await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
 
-    async def destroy_model(
-        self, model_name: str, total_timeout: float,
-    ):
+    async def destroy_model(self, model_name: str, total_timeout: float):
         """
         Destroy model
 
         :param: model_name:     Model name
         :param: total_timeout:  Timeout
         """
-        model = await self.get_model(model_name)
-        uuid = model.info.uuid
 
-        # Destroy applications
-        for application_name in model.applications:
-            try:
-                await self.destroy_application(
-                    model, application_name=application_name,
-                )
-            except Exception as e:
-                self.log.error(
-                    "Error destroying application {} in model {}: {}".format(
-                        application_name, model_name, e
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            self.log.debug("Destroying model {}".format(model_name))
+            uuid = model.info.uuid
+
+            # Destroy applications
+            for application_name in model.applications:
+                try:
+                    await self.destroy_application(
+                        model, application_name=application_name,
+                    )
+                except Exception as e:
+                    self.log.error(
+                        "Error destroying application {} in model {}: {}".format(
+                            application_name, model_name, e
+                        )
                     )
-                )
 
-        # Destroy machines
-        machines = await model.get_machines()
-        for machine_id in machines:
-            try:
-                await self.destroy_machine(
-                    model, machine_id=machine_id, total_timeout=total_timeout,
-                )
-            except asyncio.CancelledError:
-                raise
-            except Exception:
-                pass
-
-        # Disconnect model
-        await self.disconnect_model(model)
-
-        # Destroy model
-        self.models.remove(model_name)
-        await self.controller.destroy_model(uuid)
-
-        # Wait until model is destroyed
-        self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
-        last_exception = ""
-
-        if total_timeout is None:
-            total_timeout = 3600
-        end = time.time() + total_timeout
-        while time.time() < end:
-            try:
-                models = await self.controller.list_models()
-                if model_name not in models:
-                    self.log.debug(
-                        "The model {} ({}) was destroyed".format(model_name, uuid)
+            # Destroy machines
+            machines = await model.get_machines()
+            for machine_id in machines:
+                try:
+                    await self.destroy_machine(
+                        model, machine_id=machine_id, total_timeout=total_timeout,
                     )
-                    return
-            except asyncio.CancelledError:
-                raise
-            except Exception as e:
-                last_exception = e
-            await asyncio.sleep(5)
-        raise Exception(
-            "Timeout waiting for model {} to be destroyed {}".format(
-                model_name, last_exception
+                except asyncio.CancelledError:
+                    raise
+                except Exception:
+                    pass
+
+            # Disconnect model
+            await self.disconnect_model(model)
+
+            # Destroy model
+            if model_name in self.models:
+                self.models.remove(model_name)
+
+            await controller.destroy_model(uuid)
+
+            # Wait until model is destroyed
+            self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+            last_exception = ""
+
+            if total_timeout is None:
+                total_timeout = 3600
+            end = time.time() + total_timeout
+            while time.time() < end:
+                try:
+                    models = await controller.list_models()
+                    if model_name not in models:
+                        self.log.debug(
+                            "The model {} ({}) was destroyed".format(model_name, uuid)
+                        )
+                        return
+                except asyncio.CancelledError:
+                    raise
+                except Exception as e:
+                    last_exception = e
+                await asyncio.sleep(5)
+            raise Exception(
+                "Timeout waiting for model {} to be destroyed {}".format(
+                    model_name, last_exception
+                )
             )
-        )
+        finally:
+            await self.disconnect_controller(controller)
 
     async def destroy_application(self, model: Model, application_name: str):
         """
@@ -797,10 +861,86 @@ class Libjuju:
         :param: application_name:   Application name
         :param: config:             Config to apply to the charm
         """
+        self.log.debug("Configuring application {}".format(application_name))
+
         if config:
-            model = await self.get_model(model_name)
-            application = await self._get_application(
-                model, application_name=application_name,
-            )
-            await application.set_config(config)
-            await self.disconnect_model(model)
+            try:
+                controller = await self.get_controller()
+                model = await self.get_model(controller, model_name)
+                application = self._get_application(
+                    model, application_name=application_name,
+                )
+                await application.set_config(config)
+            finally:
+                await self.disconnect_model(model)
+                await self.disconnect_controller(controller)
+
+    def _get_api_endpoints_db(self) -> [str]:
+        """
+        Get API Endpoints from DB
+
+        :return: List of API endpoints
+        """
+        self.log.debug("Getting endpoints from database")
+
+        juju_info = self.db.get_one(
+            DB_DATA.api_endpoints.table,
+            q_filter=DB_DATA.api_endpoints.filter,
+            fail_on_empty=False,
+        )
+        if juju_info and DB_DATA.api_endpoints.key in juju_info:
+            return juju_info[DB_DATA.api_endpoints.key]
+
+    def _update_api_endpoints_db(self, endpoints: [str]):
+        """
+        Update API endpoints in Database
+
+        :param: List of endpoints
+        """
+        self.log.debug("Saving endpoints {} in database".format(endpoints))
+
+        juju_info = self.db.get_one(
+            DB_DATA.api_endpoints.table,
+            q_filter=DB_DATA.api_endpoints.filter,
+            fail_on_empty=False,
+        )
+        # If it doesn't, then create it
+        if not juju_info:
+            try:
+                self.db.create(
+                    DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+                )
+            except DbException as e:
+                # Racing condition: check if another N2VC worker has created it
+                juju_info = self.db.get_one(
+                    DB_DATA.api_endpoints.table,
+                    q_filter=DB_DATA.api_endpoints.filter,
+                    fail_on_empty=False,
+                )
+                if not juju_info:
+                    raise e
+        self.db.set_one(
+            DB_DATA.api_endpoints.table,
+            DB_DATA.api_endpoints.filter,
+            {DB_DATA.api_endpoints.key: endpoints},
+        )
+
+    def handle_exception(self, loop, context):
+        # All unhandled exceptions by libjuju are handled here.
+        pass
+
+    async def health_check(self, interval: float = 300.0):
+        """
+        Health check to make sure controller and controller_model connections are OK
+
+        :param: interval: Time in seconds between checks
+        """
+        while True:
+            try:
+                controller = await self.get_controller()
+                # self.log.debug("VCA is alive")
+            except Exception as e:
+                self.log.error("Health check to VCA failed: {}".format(e))
+            finally:
+                await self.disconnect_controller(controller)
+            await asyncio.sleep(interval)