Fix bug 1467
[osm/N2VC.git] / n2vc / libjuju.py
index d761adc..7a73033 100644 (file)
 
 import asyncio
 import logging
 
 import asyncio
 import logging
-from juju.controller import Controller
-from juju.client import client
+
 import time
 
 from juju.errors import JujuAPIError
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
 import time
 
 from juju.errors import JujuAPIError
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
+from juju.unit import Unit
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
     Cloud,
     CloudCredential,
 )
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
     Cloud,
     CloudCredential,
 )
+from juju.controller import Controller
+from juju.client import client
+from juju import tag
+
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -36,12 +40,15 @@ from n2vc.exceptions import (
     JujuApplicationNotFound,
     JujuLeaderUnitNotFound,
     JujuActionNotFound,
     JujuApplicationNotFound,
     JujuLeaderUnitNotFound,
     JujuActionNotFound,
-    JujuModelAlreadyExists,
     JujuControllerFailedConnecting,
     JujuApplicationExists,
     JujuControllerFailedConnecting,
     JujuApplicationExists,
+    JujuInvalidK8sConfiguration,
 )
 from n2vc.utils import DB_DATA
 from osm_common.dbbase import DbException
 )
 from n2vc.utils import DB_DATA
 from osm_common.dbbase import DbException
+from kubernetes.client.configuration import Configuration
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
 
 
 class Libjuju:
 
 
 class Libjuju:
@@ -78,9 +85,12 @@ class Libjuju:
         self.log = log or logging.getLogger("Libjuju")
         self.db = db
         db_endpoints = self._get_api_endpoints_db()
         self.log = log or logging.getLogger("Libjuju")
         self.db = db
         db_endpoints = self._get_api_endpoints_db()
-        self.endpoints = db_endpoints or [endpoint]
-        if db_endpoints is None:
+        self.endpoints = None
+        if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
+            self.endpoints = [endpoint]
             self._update_api_endpoints_db(self.endpoints)
             self._update_api_endpoints_db(self.endpoints)
+        else:
+            self.endpoints = db_endpoints
         self.api_proxy = api_proxy
         self.username = username
         self.password = password
         self.api_proxy = api_proxy
         self.username = username
         self.password = password
@@ -98,12 +108,14 @@ class Libjuju:
         self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
         self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
-        self.models = set()
         self.log.debug("Libjuju initialized!")
 
         self.log.debug("Libjuju initialized!")
 
-        self.health_check_task = self.loop.create_task(self.health_check())
+        self.health_check_task = self._create_health_check_task()
+
+    def _create_health_check_task(self):
+        return self.loop.create_task(self.health_check())
 
 
-    async def get_controller(self, timeout: float = 5.0) -> Controller:
+    async def get_controller(self, timeout: float = 15.0) -> Controller:
         """
         Get controller
 
         """
         Get controller
 
@@ -156,44 +168,38 @@ class Libjuju:
 
         :param: controller: Controller that will be disconnected
         """
 
         :param: controller: Controller that will be disconnected
         """
-        await controller.disconnect()
+        if controller:
+            await controller.disconnect()
 
 
-    async def add_model(self, model_name: str, cloud_name: str):
+    async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
         """
         Create model
 
         :param: model_name: Model name
         :param: cloud_name: Cloud name
         """
         Create model
 
         :param: model_name: Model name
         :param: cloud_name: Cloud name
+        :param: credential_name: Credential name to use for adding the model
+                                 If not specified, same name as the cloud will be used.
         """
 
         # Get controller
         controller = await self.get_controller()
         model = None
         try:
         """
 
         # Get controller
         controller = await self.get_controller()
         model = None
         try:
-            # Raise exception if model already exists
-            if await self.model_exists(model_name, controller=controller):
-                raise JujuModelAlreadyExists(
-                    "Model {} already exists.".format(model_name)
-                )
-
             # Block until other workers have finished model creation
             while self.creating_model.locked():
                 await asyncio.sleep(0.1)
 
             # Block until other workers have finished model creation
             while self.creating_model.locked():
                 await asyncio.sleep(0.1)
 
-            # If the model exists, return it from the controller
-            if model_name in self.models:
-                return
-
             # Create the model
             async with self.creating_model:
             # Create the model
             async with self.creating_model:
+                if await self.model_exists(model_name, controller=controller):
+                    return
                 self.log.debug("Creating model {}".format(model_name))
                 model = await controller.add_model(
                     model_name,
                     config=self.model_config,
                     cloud_name=cloud_name,
                 self.log.debug("Creating model {}".format(model_name))
                 model = await controller.add_model(
                     model_name,
                     config=self.model_config,
                     cloud_name=cloud_name,
-                    credential_name=cloud_name,
+                    credential_name=credential_name or cloud_name,
                 )
                 )
-                self.models.add(model_name)
         finally:
             if model:
                 await self.disconnect_model(model)
         finally:
             if model:
                 await self.disconnect_model(model)
@@ -438,6 +444,7 @@ class Libjuju:
                     nonce=params.nonce,
                     machine_id=machine_id,
                     proxy=self.api_proxy,
                     nonce=params.nonce,
                     machine_id=machine_id,
                     proxy=self.api_proxy,
+                    series=params.series,
                 )
             )
 
                 )
             )
 
@@ -480,6 +487,28 @@ class Libjuju:
 
         return machine_id
 
 
         return machine_id
 
+    async def deploy(
+        self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+    ):
+        """
+        Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
+
+        :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
+        :param: model_name:     Model name
+        :param: wait:           Indicates whether to wait or not until all applications are active
+        :param: timeout:        Time in seconds to wait until all applications are active
+        """
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            await model.deploy(uri)
+            if wait:
+                await JujuModelWatcher.wait_for_model(model, timeout=timeout)
+                self.log.debug("All units active in model {}".format(model_name))
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
     async def deploy_charm(
         self,
         application_name: str,
     async def deploy_charm(
         self,
         application_name: str,
@@ -620,16 +649,27 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
 
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
 
-            # Get unit
+            # Get leader unit
+            # Racing condition:
+            #   Ocassionally, self._get_leader_unit() will return None
+            #   because the leader elected hook has not been triggered yet.
+            #   Therefore, we are doing some retries. If it happens again,
+            #   re-open bug 1236
+            attempts = 3
+            time_between_retries = 10
             unit = None
             unit = None
-            for u in application.units:
-                if await u.is_leader_from_status():
-                    unit = u
+            for _ in range(attempts):
+                unit = await self._get_leader_unit(application)
+                if unit is None:
+                    await asyncio.sleep(time_between_retries)
+                else:
+                    break
             if unit is None:
                 raise JujuLeaderUnitNotFound(
                     "Cannot execute action: leader unit not found"
             if unit is None:
                 raise JujuLeaderUnitNotFound(
                     "Cannot execute action: leader unit not found"
@@ -700,7 +740,8 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
 
             # Return list of actions
             )
 
             # Return list of actions
@@ -732,7 +773,10 @@ class Libjuju:
         return metrics
 
     async def add_relation(
         return metrics
 
     async def add_relation(
-        self, model_name: str, endpoint_1: str, endpoint_2: str,
+        self,
+        model_name: str,
+        endpoint_1: str,
+        endpoint_2: str,
     ):
         """Add relation
 
     ):
         """Add relation
 
@@ -767,7 +811,9 @@ class Libjuju:
             await self.disconnect_controller(controller)
 
     async def consume(
             await self.disconnect_controller(controller)
 
     async def consume(
-        self, offer_url: str, model_name: str,
+        self,
+        offer_url: str,
+        model_name: str,
     ):
         """
         Adds a remote offer to the model. Relations can be created later using "juju relate".
     ):
         """
         Adds a remote offer to the model. Relations can be created later using "juju relate".
@@ -797,57 +843,45 @@ class Libjuju:
         """
 
         controller = await self.get_controller()
         """
 
         controller = await self.get_controller()
-        model = await self.get_model(controller, model_name)
+        model = None
         try:
         try:
+            if not await self.model_exists(model_name, controller=controller):
+                return
+
+            model = await self.get_model(controller, model_name)
             self.log.debug("Destroying model {}".format(model_name))
             uuid = model.info.uuid
 
             self.log.debug("Destroying model {}".format(model_name))
             uuid = model.info.uuid
 
-            # Destroy machines
-            machines = await model.get_machines()
-            for machine_id in machines:
-                try:
-                    await self.destroy_machine(
-                        model, machine_id=machine_id, total_timeout=total_timeout,
-                    )
-                except asyncio.CancelledError:
-                    raise
-                except Exception:
-                    pass
+            # Destroy machines that are manually provisioned
+            # and still are in pending state
+            await self._destroy_pending_machines(model, only_manual=True)
 
             # Disconnect model
             await self.disconnect_model(model)
 
 
             # Disconnect model
             await self.disconnect_model(model)
 
-            # Destroy model
-            if model_name in self.models:
-                self.models.remove(model_name)
-
-            await controller.destroy_model(uuid)
+            await controller.destroy_model(uuid, force=True, max_wait=0)
 
             # Wait until model is destroyed
             self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
 
             # Wait until model is destroyed
             self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
-            last_exception = ""
 
             if total_timeout is None:
                 total_timeout = 3600
             end = time.time() + total_timeout
             while time.time() < end:
 
             if total_timeout is None:
                 total_timeout = 3600
             end = time.time() + total_timeout
             while time.time() < end:
-                try:
-                    models = await controller.list_models()
-                    if model_name not in models:
-                        self.log.debug(
-                            "The model {} ({}) was destroyed".format(model_name, uuid)
-                        )
-                        return
-                except asyncio.CancelledError:
-                    raise
-                except Exception as e:
-                    last_exception = e
+                models = await controller.list_models()
+                if model_name not in models:
+                    self.log.debug(
+                        "The model {} ({}) was destroyed".format(model_name, uuid)
+                    )
+                    return
                 await asyncio.sleep(5)
             raise Exception(
                 await asyncio.sleep(5)
             raise Exception(
-                "Timeout waiting for model {} to be destroyed {}".format(
-                    model_name, last_exception
-                )
+                "Timeout waiting for model {} to be destroyed".format(model_name)
             )
             )
+        except Exception as e:
+            if model:
+                await self.disconnect_model(model)
+            raise e
         finally:
             await self.disconnect_controller(controller)
 
         finally:
             await self.disconnect_controller(controller)
 
@@ -869,32 +903,22 @@ class Libjuju:
         else:
             self.log.warning("Application not found: {}".format(application_name))
 
         else:
             self.log.warning("Application not found: {}".format(application_name))
 
-    async def destroy_machine(
-        self, model: Model, machine_id: str, total_timeout: float = 3600
-    ):
+    async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
         """
         """
-        Destroy machine
+        Destroy pending machines in a given model
 
 
-        :param: model:          Model object
-        :param: machine_id:     Machine id
-        :param: total_timeout:  Timeout in seconds
+        :param: only_manual:    Bool that indicates only manually provisioned
+                                machines should be destroyed (if True), or that
+                                all pending machines should be destroyed
         """
         """
-        machines = await model.get_machines()
-        if machine_id in machines:
-            machine = machines[machine_id]
-            await machine.destroy(force=True)
-            # max timeout
-            end = time.time() + total_timeout
-
-            # wait for machine removal
-            machines = await model.get_machines()
-            while machine_id in machines and time.time() < end:
-                self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
-                await asyncio.sleep(0.5)
-                machines = await model.get_machines()
-            self.log.debug("Machine destroyed: {}".format(machine_id))
-        else:
-            self.log.debug("Machine not found: {}".format(machine_id))
+        status = await model.get_status()
+        for machine_id in status.machines:
+            machine_status = status.machines[machine_id]
+            if machine_status.agent_status.status == "pending":
+                if only_manual and not machine_status.instance_id.startswith("manual:"):
+                    break
+                machine = model.machines[machine_id]
+                await machine.destroy(force=True)
 
     async def configure_application(
         self, model_name: str, application_name: str, config: dict = None
 
     async def configure_application(
         self, model_name: str, application_name: str, config: dict = None
@@ -908,15 +932,18 @@ class Libjuju:
         self.log.debug("Configuring application {}".format(application_name))
 
         if config:
         self.log.debug("Configuring application {}".format(application_name))
 
         if config:
+            controller = await self.get_controller()
+            model = None
             try:
             try:
-                controller = await self.get_controller()
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
-                    model, application_name=application_name,
+                    model,
+                    application_name=application_name,
                 )
                 await application.set_config(config)
             finally:
                 )
                 await application.set_config(config)
             finally:
-                await self.disconnect_model(model)
+                if model:
+                    await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
     def _get_api_endpoints_db(self) -> [str]:
                 await self.disconnect_controller(controller)
 
     def _get_api_endpoints_db(self) -> [str]:
@@ -952,7 +979,8 @@ class Libjuju:
         if not juju_info:
             try:
                 self.db.create(
         if not juju_info:
             try:
                 self.db.create(
-                    DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+                    DB_DATA.api_endpoints.table,
+                    DB_DATA.api_endpoints.filter,
                 )
             except DbException as e:
                 # Racing condition: check if another N2VC worker has created it
                 )
             except DbException as e:
                 # Racing condition: check if another N2VC worker has created it
@@ -979,6 +1007,7 @@ class Libjuju:
 
         :param: interval: Time in seconds between checks
         """
 
         :param: interval: Time in seconds between checks
         """
+        controller = None
         while True:
             try:
                 controller = await self.get_controller()
         while True:
             try:
                 controller = await self.get_controller()
@@ -1020,75 +1049,126 @@ class Libjuju:
         finally:
             await self.disconnect_controller(controller)
 
         finally:
             await self.disconnect_controller(controller)
 
-    async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
+    async def add_k8s(
+        self,
+        name: str,
+        rbac_id: str,
+        token: str,
+        client_cert_data: str,
+        configuration: Configuration,
+        storage_class: str,
+        credential_name: str = None,
+    ):
         """
         Add a Kubernetes cloud to the controller
 
         Similar to the `juju add-k8s` command in the CLI
 
         """
         Add a Kubernetes cloud to the controller
 
         Similar to the `juju add-k8s` command in the CLI
 
-        :param: name:           Name for the K8s cloud
-        :param: auth_data:      Dictionary with needed credentials. Format:
-        {
-            "server": "192.168.0.21:16443",
-            "cacert": "-----BEGIN CERTIFI...",
-            "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
-
-        }
-        :param: storage_class:  Storage Class to use in the cloud
+        :param: name:               Name for the K8s cloud
+        :param: configuration:      Kubernetes configuration object
+        :param: storage_class:      Storage Class to use in the cloud
+        :param: credential_name:    Storage Class to use in the cloud
         """
 
         """
 
-        required_auth_data_keys = ["server", "cacert", "token"]
-        missing_keys = []
-        for k in required_auth_data_keys:
-            if k not in auth_data:
-                missing_keys.append(k)
-        if missing_keys:
-            raise Exception(
-                "missing keys in auth_data: {}".format(",".join(missing_keys))
-            )
         if not storage_class:
             raise Exception("storage_class must be a non-empty string")
         if not name:
             raise Exception("name must be a non-empty string")
         if not storage_class:
             raise Exception("storage_class must be a non-empty string")
         if not name:
             raise Exception("name must be a non-empty string")
-
-        endpoint = auth_data["server"]
-        cacert = auth_data["cacert"]
-        token = auth_data["token"]
-        region_name = "{}-region".format(name)
-
+        if not configuration:
+            raise Exception("configuration must be provided")
+
+        endpoint = configuration.host
+        credential = self.get_k8s_cloud_credential(
+            configuration,
+            client_cert_data,
+            token,
+        )
+        credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
         cloud = client.Cloud(
         cloud = client.Cloud(
-            auth_types=["certificate"],
-            ca_certificates=[cacert],
+            type_="kubernetes",
+            auth_types=[credential.auth_type],
             endpoint=endpoint,
             endpoint=endpoint,
+            ca_certificates=[client_cert_data],
             config={
                 "operator-storage": storage_class,
                 "workload-storage": storage_class,
             },
             config={
                 "operator-storage": storage_class,
                 "workload-storage": storage_class,
             },
-            regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
-            type_="kubernetes",
         )
 
         )
 
-        cred = client.CloudCredential(
-            auth_type="certificate",
-            attrs={"ClientCertificateData": cacert, "Token": token},
+        return await self.add_cloud(
+            name, cloud, credential, credential_name=credential_name
         )
         )
-        return await self.add_cloud(name, cloud, cred)
+
+    def get_k8s_cloud_credential(
+        self,
+        configuration: Configuration,
+        client_cert_data: str,
+        token: str = None,
+    ) -> client.CloudCredential:
+        attrs = {}
+        # TODO: Test with AKS
+        key = None  # open(configuration.key_file, "r").read()
+        username = configuration.username
+        password = configuration.password
+
+        if client_cert_data:
+            attrs["ClientCertificateData"] = client_cert_data
+        if key:
+            attrs["ClientKeyData"] = key
+        if token:
+            if username or password:
+                raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+            attrs["Token"] = token
+
+        auth_type = None
+        if key:
+            auth_type = "oauth2"
+            if client_cert_data:
+                auth_type = "oauth2withcert"
+            if not token:
+                raise JujuInvalidK8sConfiguration(
+                    "missing token for auth type {}".format(auth_type)
+                )
+        elif username:
+            if not password:
+                self.log.debug(
+                    "credential for user {} has empty password".format(username)
+                )
+            attrs["username"] = username
+            attrs["password"] = password
+            if client_cert_data:
+                auth_type = "userpasswithcert"
+            else:
+                auth_type = "userpass"
+        elif client_cert_data and token:
+            auth_type = "certificate"
+        else:
+            raise JujuInvalidK8sConfiguration("authentication method not supported")
+        return client.CloudCredential(auth_type=auth_type, attrs=attrs)
 
     async def add_cloud(
 
     async def add_cloud(
-        self, name: str, cloud: Cloud, credential: CloudCredential = None
+        self,
+        name: str,
+        cloud: Cloud,
+        credential: CloudCredential = None,
+        credential_name: str = None,
     ) -> Cloud:
         """
         Add cloud to the controller
 
     ) -> Cloud:
         """
         Add cloud to the controller
 
-        :param: name:   Name of the cloud to be added
-        :param: cloud:  Cloud object
-        :param: credential:   CloudCredentials object for the cloud
+        :param: name:               Name of the cloud to be added
+        :param: cloud:              Cloud object
+        :param: credential:         CloudCredentials object for the cloud
+        :param: credential_name:    Credential name.
+                                    If not defined, cloud of the name will be used.
         """
         controller = await self.get_controller()
         try:
             _ = await controller.add_cloud(name, cloud)
             if credential:
         """
         controller = await self.get_controller()
         try:
             _ = await controller.add_cloud(name, cloud)
             if credential:
-                await controller.add_credential(name, credential=credential, cloud=name)
+                await controller.add_credential(
+                    credential_name or name, credential=credential, cloud=name
+                )
             # Need to return the object returned by the controller.add_cloud() function
             # I'm returning the original value now until this bug is fixed:
             #   https://github.com/juju/python-libjuju/issues/443
             # Need to return the object returned by the controller.add_cloud() function
             # I'm returning the original value now until this bug is fixed:
             #   https://github.com/juju/python-libjuju/issues/443
@@ -1107,3 +1187,21 @@ class Libjuju:
             await controller.remove_cloud(name)
         finally:
             await self.disconnect_controller(controller)
             await controller.remove_cloud(name)
         finally:
             await self.disconnect_controller(controller)
+
+    async def _get_leader_unit(self, application: Application) -> Unit:
+        unit = None
+        for u in application.units:
+            if await u.is_leader_from_status():
+                unit = u
+                break
+        return unit
+
+    async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+        controller = await self.get_controller()
+        try:
+            facade = client.CloudFacade.from_connection(controller.connection())
+            cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+            params = [client.Entity(cloud_cred_tag)]
+            return (await facade.Credential(params)).results
+        finally:
+            await self.disconnect_controller(controller)