Fix 1539: Add --skip-repo option for Helm
[osm/N2VC.git] / n2vc / libjuju.py
index eb0fa72..0e221e2 100644 (file)
@@ -14,6 +14,7 @@
 
 import asyncio
 import logging
+import typing
 
 import time
 
@@ -32,7 +33,6 @@ from juju.controller import Controller
 from juju.client import client
 from juju import tag
 
-from n2vc.config import ModelConfig
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -44,11 +44,13 @@ from n2vc.exceptions import (
     JujuControllerFailedConnecting,
     JujuApplicationExists,
     JujuInvalidK8sConfiguration,
-    JujuError
+    JujuError,
 )
-from n2vc.utils import DB_DATA
-from osm_common.dbbase import DbException
+from n2vc.vca.cloud import Cloud as VcaCloud
+from n2vc.vca.connection import Connection
 from kubernetes.client.configuration import Configuration
+from retrying_async import retry
+
 
 RBAC_LABEL_KEY_NAME = "rbac-id"
 
@@ -56,63 +58,35 @@ RBAC_LABEL_KEY_NAME = "rbac-id"
 class Libjuju:
     def __init__(
         self,
-        endpoint: str,
-        api_proxy: str,
-        username: str,
-        password: str,
-        cacert: str,
+        vca_connection: Connection,
         loop: asyncio.AbstractEventLoop = None,
         log: logging.Logger = None,
-        db: dict = None,
         n2vc: N2VCConnector = None,
-        model_config: ModelConfig = {},
     ):
         """
         Constructor
 
-        :param: endpoint:               Endpoint of the juju controller (host:port)
-        :param: api_proxy:              Endpoint of the juju controller - Reachable from the VNFs
-        :param: username:               Juju username
-        :param: password:               Juju password
-        :param: cacert:                 Juju CA Certificate
+        :param: vca_connection:         n2vc.vca.connection object
         :param: loop:                   Asyncio loop
         :param: log:                    Logger
-        :param: db:                     DB object
         :param: n2vc:                   N2VC object
-        :param: apt_mirror:             APT Mirror
-        :param: enable_os_upgrade:      Enable OS Upgrade
         """
 
         self.log = log or logging.getLogger("Libjuju")
-        self.db = db
-        db_endpoints = self._get_api_endpoints_db()
-        self.endpoints = None
-        if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
-            self.endpoints = [endpoint]
-            self._update_api_endpoints_db(self.endpoints)
-        else:
-            self.endpoints = db_endpoints
-        self.api_proxy = api_proxy
-        self.username = username
-        self.password = password
-        self.cacert = cacert
-        self.loop = loop or asyncio.get_event_loop()
         self.n2vc = n2vc
+        self.vca_connection = vca_connection
 
-        # Generate config for models
-        self.model_config = model_config
-
+        self.loop = loop or asyncio.get_event_loop()
         self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
-        self.log.debug("Libjuju initialized!")
-
-        self.health_check_task = self._create_health_check_task()
+        if self.vca_connection.is_default:
+            self.health_check_task = self._create_health_check_task()
 
     def _create_health_check_task(self):
         return self.loop.create_task(self.health_check())
 
-    async def get_controller(self, timeout: float = 15.0) -> Controller:
+    async def get_controller(self, timeout: float = 60.0) -> Controller:
         """
         Get controller
 
@@ -123,23 +97,27 @@ class Libjuju:
             controller = Controller(loop=self.loop)
             await asyncio.wait_for(
                 controller.connect(
-                    endpoint=self.endpoints,
-                    username=self.username,
-                    password=self.password,
-                    cacert=self.cacert,
+                    endpoint=self.vca_connection.data.endpoints,
+                    username=self.vca_connection.data.user,
+                    password=self.vca_connection.data.secret,
+                    cacert=self.vca_connection.data.cacert,
                 ),
                 timeout=timeout,
             )
-            endpoints = await controller.api_endpoints
-            if self.endpoints != endpoints:
-                self.endpoints = endpoints
-                self._update_api_endpoints_db(self.endpoints)
+            if self.vca_connection.is_default:
+                endpoints = await controller.api_endpoints
+                if not all(
+                    endpoint in self.vca_connection.endpoints for endpoint in endpoints
+                ):
+                    await self.vca_connection.update_endpoints(endpoints)
             return controller
         except asyncio.CancelledError as e:
             raise e
         except Exception as e:
             self.log.error(
-                "Failed connecting to controller: {}...".format(self.endpoints)
+                "Failed connecting to controller: {}... {}".format(
+                    self.vca_connection.data.endpoints, e
+                )
             )
             if controller:
                 await self.disconnect_controller(controller)
@@ -168,14 +146,13 @@ class Libjuju:
         if controller:
             await controller.disconnect()
 
-    async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
+    @retry(attempts=3, delay=5, timeout=None)
+    async def add_model(self, model_name: str, cloud: VcaCloud):
         """
         Create model
 
         :param: model_name: Model name
-        :param: cloud_name: Cloud name
-        :param: credential_name: Credential name to use for adding the model
-                                 If not specified, same name as the cloud will be used.
+        :param: cloud: Cloud object
         """
 
         # Get controller
@@ -193,10 +170,15 @@ class Libjuju:
                 self.log.debug("Creating model {}".format(model_name))
                 model = await controller.add_model(
                     model_name,
-                    config=self.model_config,
-                    cloud_name=cloud_name,
-                    credential_name=credential_name or cloud_name,
+                    config=self.vca_connection.data.model_config,
+                    cloud_name=cloud.name,
+                    credential_name=cloud.credential_name,
                 )
+        except JujuAPIError as e:
+            if "already exists" in e.message:
+                pass
+            else:
+                raise e
         finally:
             if model:
                 await self.disconnect_model(model)
@@ -221,25 +203,35 @@ class Libjuju:
                 actions.update(application_actions)
             # Get status of all actions
             for application_action in actions:
-                app_action_status_list = await model.get_action_status(name=application_action)
+                app_action_status_list = await model.get_action_status(
+                    name=application_action
+                )
                 for action_id, action_status in app_action_status_list.items():
-                    executed_action = {"id": action_id, "action": application_action,
-                                       "status": action_status}
+                    executed_action = {
+                        "id": action_id,
+                        "action": application_action,
+                        "status": action_status,
+                    }
                     # Get action output by id
                     action_status = await model.get_action_output(executed_action["id"])
                     for k, v in action_status.items():
                         executed_action[k] = v
                     executed_actions.append(executed_action)
         except Exception as e:
-            raise JujuError("Error in getting executed actions for model: {}. Error: {}"
-                            .format(model_name, str(e)))
+            raise JujuError(
+                "Error in getting executed actions for model: {}. Error: {}".format(
+                    model_name, str(e)
+                )
+            )
         finally:
             if model:
                 await self.disconnect_model(model)
             await self.disconnect_controller(controller)
         return executed_actions
 
-    async def get_application_configs(self, model_name: str, application_name: str) -> dict:
+    async def get_application_configs(
+        self, model_name: str, application_name: str
+    ) -> dict:
         """
         Get available configs for an application.
 
@@ -253,20 +245,24 @@ class Libjuju:
         controller = await self.get_controller()
         try:
             model = await self.get_model(controller, model_name)
-            application = self._get_application(model, application_name=application_name)
+            application = self._get_application(
+                model, application_name=application_name
+            )
             application_configs = await application.get_config()
         except Exception as e:
-            raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
-                            .format(application_name, model_name, str(e)))
+            raise JujuError(
+                "Error in getting configs for application: {} in model: {}. Error: {}".format(
+                    application_name, model_name, str(e)
+                )
+            )
         finally:
             if model:
                 await self.disconnect_model(model)
             await self.disconnect_controller(controller)
         return application_configs
 
-    async def get_model(
-        self, controller: Controller, model_name: str, id=None
-    ) -> Model:
+    @retry(attempts=3, delay=5)
+    async def get_model(self, controller: Controller, model_name: str) -> Model:
         """
         Get model from controller
 
@@ -349,7 +345,7 @@ class Libjuju:
         db_dict: dict = None,
         progress_timeout: float = None,
         total_timeout: float = None,
-        series: str = "xenial",
+        series: str = "bionic",
         wait: bool = True,
     ) -> (Machine, bool):
         """
@@ -421,6 +417,7 @@ class Libjuju:
                         total_timeout=total_timeout,
                         db_dict=db_dict,
                         n2vc=self.n2vc,
+                        vca_id=self.vca_connection._vca_id,
                     )
         finally:
             await self.disconnect_model(model)
@@ -502,7 +499,7 @@ class Libjuju:
                     connection=connection,
                     nonce=params.nonce,
                     machine_id=machine_id,
-                    proxy=self.api_proxy,
+                    proxy=self.vca_connection.data.api_proxy,
                     series=params.series,
                 )
             )
@@ -533,6 +530,7 @@ class Libjuju:
                 total_timeout=total_timeout,
                 db_dict=db_dict,
                 n2vc=self.n2vc,
+                vca_id=self.vca_connection._vca_id,
             )
         except Exception as e:
             raise e
@@ -648,6 +646,7 @@ class Libjuju:
                     total_timeout=total_timeout,
                     db_dict=db_dict,
                     n2vc=self.n2vc,
+                    vca_id=self.vca_connection._vca_id,
                 )
                 self.log.debug(
                     "Application {} is ready in model {}".format(
@@ -664,6 +663,83 @@ class Libjuju:
 
         return application
 
+    async def scale_application(
+        self,
+        model_name: str,
+        application_name: str,
+        scale: int = 1,
+        total_timeout: float = None,
+    ):
+        """
+        Scale application (K8s)
+
+        :param: model_name:         Model name
+        :param: application_name:   Application name
+        :param: scale:              Scale to which to set this application
+        :param: total_timeout:      Timeout for the entity to be active
+        """
+
+        model = None
+        controller = await self.get_controller()
+        try:
+            model = await self.get_model(controller, model_name)
+
+            self.log.debug(
+                "Scaling application {} in model {}".format(
+                    application_name, model_name
+                )
+            )
+            application = self._get_application(model, application_name)
+            if application is None:
+                raise JujuApplicationNotFound("Cannot scale application")
+            await application.scale(scale=scale)
+            # Wait until application is scaled in model
+            self.log.debug(
+                "Waiting for application {} to be scaled in model {}...".format(
+                    application_name, model_name
+                )
+            )
+            if total_timeout is None:
+                total_timeout = 1800
+            end = time.time() + total_timeout
+            while time.time() < end:
+                application_scale = self._get_application_count(model, application_name)
+                # Before calling wait_for_model function,
+                # wait until application unit count and scale count are equal.
+                # Because there is a delay before scaling triggers in Juju model.
+                if application_scale == scale:
+                    await JujuModelWatcher.wait_for_model(
+                        model=model, timeout=total_timeout
+                    )
+                    self.log.debug(
+                        "Application {} is scaled in model {}".format(
+                            application_name, model_name
+                        )
+                    )
+                    return
+                await asyncio.sleep(5)
+            raise Exception(
+                "Timeout waiting for application {} in model {} to be scaled".format(
+                    application_name, model_name
+                )
+            )
+        finally:
+            if model:
+                await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
+    def _get_application_count(self, model: Model, application_name: str) -> int:
+        """Get number of units of the application
+
+        :param: model:              Model object
+        :param: application_name:   Application name
+
+        :return: int (or None if application doesn't exist)
+        """
+        application = self._get_application(model, application_name)
+        if application is not None:
+            return len(application.units)
+
     def _get_application(self, model: Model, application_name: str) -> Application:
         """Get application
 
@@ -720,19 +796,7 @@ class Libjuju:
             #   because the leader elected hook has not been triggered yet.
             #   Therefore, we are doing some retries. If it happens again,
             #   re-open bug 1236
-            attempts = 3
-            time_between_retries = 10
-            unit = None
-            for _ in range(attempts):
-                unit = await self._get_leader_unit(application)
-                if unit is None:
-                    await asyncio.sleep(time_between_retries)
-                else:
-                    break
-            if unit is None:
-                raise JujuLeaderUnitNotFound(
-                    "Cannot execute action: leader unit not found"
-                )
+            unit = await self._get_leader_unit(application)
 
             actions = await application.get_actions()
 
@@ -755,6 +819,7 @@ class Libjuju:
                 total_timeout=total_timeout,
                 db_dict=db_dict,
                 n2vc=self.n2vc,
+                vca_id=self.vca_connection._vca_id,
             )
 
             output = await model.get_action_output(action_uuid=action.entity_id)
@@ -1041,57 +1106,6 @@ class Libjuju:
                     await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
-    def _get_api_endpoints_db(self) -> [str]:
-        """
-        Get API Endpoints from DB
-
-        :return: List of API endpoints
-        """
-        self.log.debug("Getting endpoints from database")
-
-        juju_info = self.db.get_one(
-            DB_DATA.api_endpoints.table,
-            q_filter=DB_DATA.api_endpoints.filter,
-            fail_on_empty=False,
-        )
-        if juju_info and DB_DATA.api_endpoints.key in juju_info:
-            return juju_info[DB_DATA.api_endpoints.key]
-
-    def _update_api_endpoints_db(self, endpoints: [str]):
-        """
-        Update API endpoints in Database
-
-        :param: List of endpoints
-        """
-        self.log.debug("Saving endpoints {} in database".format(endpoints))
-
-        juju_info = self.db.get_one(
-            DB_DATA.api_endpoints.table,
-            q_filter=DB_DATA.api_endpoints.filter,
-            fail_on_empty=False,
-        )
-        # If it doesn't, then create it
-        if not juju_info:
-            try:
-                self.db.create(
-                    DB_DATA.api_endpoints.table,
-                    DB_DATA.api_endpoints.filter,
-                )
-            except DbException as e:
-                # Racing condition: check if another N2VC worker has created it
-                juju_info = self.db.get_one(
-                    DB_DATA.api_endpoints.table,
-                    q_filter=DB_DATA.api_endpoints.filter,
-                    fail_on_empty=False,
-                )
-                if not juju_info:
-                    raise e
-        self.db.set_one(
-            DB_DATA.api_endpoints.table,
-            DB_DATA.api_endpoints.filter,
-            {DB_DATA.api_endpoints.key: endpoints},
-        )
-
     def handle_exception(self, loop, context):
         # All unhandled exceptions by libjuju are handled here.
         pass
@@ -1283,19 +1297,32 @@ class Libjuju:
         finally:
             await self.disconnect_controller(controller)
 
+    @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
     async def _get_leader_unit(self, application: Application) -> Unit:
         unit = None
         for u in application.units:
             if await u.is_leader_from_status():
                 unit = u
                 break
+        if not unit:
+            raise Exception()
         return unit
 
-    async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+    async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
+        """
+        Get cloud credentials
+
+        :param: cloud: Cloud object. The returned credentials will be from this cloud.
+
+        :return: List of credentials object associated to the specified cloud
+
+        """
         controller = await self.get_controller()
         try:
             facade = client.CloudFacade.from_connection(controller.connection())
-            cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+            cloud_cred_tag = tag.credential(
+                cloud.name, self.vca_connection.data.user, cloud.credential_name
+            )
             params = [client.Entity(cloud_cred_tag)]
             return (await facade.Credential(params)).results
         finally: