Add ModelConfig
[osm/N2VC.git] / n2vc / libjuju.py
index d761adc..98e31d8 100644 (file)
 
 import asyncio
 import logging
-from juju.controller import Controller
-from juju.client import client
+
 import time
 
 from juju.errors import JujuAPIError
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
+from juju.unit import Unit
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
     Cloud,
     CloudCredential,
 )
+from juju.controller import Controller
+from juju.client import client
+from juju import tag
+
+from n2vc.config import ModelConfig
 from n2vc.juju_watcher import JujuModelWatcher
 from n2vc.provisioner import AsyncSSHProvisioner
 from n2vc.n2vc_conn import N2VCConnector
@@ -36,12 +41,15 @@ from n2vc.exceptions import (
     JujuApplicationNotFound,
     JujuLeaderUnitNotFound,
     JujuActionNotFound,
-    JujuModelAlreadyExists,
     JujuControllerFailedConnecting,
     JujuApplicationExists,
+    JujuInvalidK8sConfiguration,
 )
 from n2vc.utils import DB_DATA
 from osm_common.dbbase import DbException
+from kubernetes.client.configuration import Configuration
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
 
 
 class Libjuju:
@@ -56,8 +64,7 @@ class Libjuju:
         log: logging.Logger = None,
         db: dict = None,
         n2vc: N2VCConnector = None,
-        apt_mirror: str = None,
-        enable_os_upgrade: bool = True,
+        model_config: ModelConfig = {},
     ):
         """
         Constructor
@@ -78,9 +85,12 @@ class Libjuju:
         self.log = log or logging.getLogger("Libjuju")
         self.db = db
         db_endpoints = self._get_api_endpoints_db()
-        self.endpoints = db_endpoints or [endpoint]
-        if db_endpoints is None:
+        self.endpoints = None
+        if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
+            self.endpoints = [endpoint]
             self._update_api_endpoints_db(self.endpoints)
+        else:
+            self.endpoints = db_endpoints
         self.api_proxy = api_proxy
         self.username = username
         self.password = password
@@ -89,21 +99,19 @@ class Libjuju:
         self.n2vc = n2vc
 
         # Generate config for models
-        self.model_config = {}
-        if apt_mirror:
-            self.model_config["apt-mirror"] = apt_mirror
-        self.model_config["enable-os-refresh-update"] = enable_os_upgrade
-        self.model_config["enable-os-upgrade"] = enable_os_upgrade
+        self.model_config = model_config
 
         self.loop.set_exception_handler(self.handle_exception)
         self.creating_model = asyncio.Lock(loop=self.loop)
 
-        self.models = set()
         self.log.debug("Libjuju initialized!")
 
-        self.health_check_task = self.loop.create_task(self.health_check())
+        self.health_check_task = self._create_health_check_task()
+
+    def _create_health_check_task(self):
+        return self.loop.create_task(self.health_check())
 
-    async def get_controller(self, timeout: float = 5.0) -> Controller:
+    async def get_controller(self, timeout: float = 15.0) -> Controller:
         """
         Get controller
 
@@ -156,44 +164,38 @@ class Libjuju:
 
         :param: controller: Controller that will be disconnected
         """
-        await controller.disconnect()
+        if controller:
+            await controller.disconnect()
 
-    async def add_model(self, model_name: str, cloud_name: str):
+    async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
         """
         Create model
 
         :param: model_name: Model name
         :param: cloud_name: Cloud name
+        :param: credential_name: Credential name to use for adding the model
+                                 If not specified, same name as the cloud will be used.
         """
 
         # Get controller
         controller = await self.get_controller()
         model = None
         try:
-            # Raise exception if model already exists
-            if await self.model_exists(model_name, controller=controller):
-                raise JujuModelAlreadyExists(
-                    "Model {} already exists.".format(model_name)
-                )
-
             # Block until other workers have finished model creation
             while self.creating_model.locked():
                 await asyncio.sleep(0.1)
 
-            # If the model exists, return it from the controller
-            if model_name in self.models:
-                return
-
             # Create the model
             async with self.creating_model:
+                if await self.model_exists(model_name, controller=controller):
+                    return
                 self.log.debug("Creating model {}".format(model_name))
                 model = await controller.add_model(
                     model_name,
                     config=self.model_config,
                     cloud_name=cloud_name,
-                    credential_name=cloud_name,
+                    credential_name=credential_name or cloud_name,
                 )
-                self.models.add(model_name)
         finally:
             if model:
                 await self.disconnect_model(model)
@@ -438,6 +440,7 @@ class Libjuju:
                     nonce=params.nonce,
                     machine_id=machine_id,
                     proxy=self.api_proxy,
+                    series=params.series,
                 )
             )
 
@@ -480,6 +483,28 @@ class Libjuju:
 
         return machine_id
 
+    async def deploy(
+        self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+    ):
+        """
+        Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
+
+        :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
+        :param: model_name:     Model name
+        :param: wait:           Indicates whether to wait or not until all applications are active
+        :param: timeout:        Time in seconds to wait until all applications are active
+        """
+        controller = await self.get_controller()
+        model = await self.get_model(controller, model_name)
+        try:
+            await model.deploy(uri)
+            if wait:
+                await JujuModelWatcher.wait_for_model(model, timeout=timeout)
+                self.log.debug("All units active in model {}".format(model_name))
+        finally:
+            await self.disconnect_model(model)
+            await self.disconnect_controller(controller)
+
     async def deploy_charm(
         self,
         application_name: str,
@@ -620,16 +645,27 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
             if application is None:
                 raise JujuApplicationNotFound("Cannot execute action")
 
-            # Get unit
+            # Get leader unit
+            # Racing condition:
+            #   Ocassionally, self._get_leader_unit() will return None
+            #   because the leader elected hook has not been triggered yet.
+            #   Therefore, we are doing some retries. If it happens again,
+            #   re-open bug 1236
+            attempts = 3
+            time_between_retries = 10
             unit = None
-            for u in application.units:
-                if await u.is_leader_from_status():
-                    unit = u
+            for _ in range(attempts):
+                unit = await self._get_leader_unit(application)
+                if unit is None:
+                    await asyncio.sleep(time_between_retries)
+                else:
+                    break
             if unit is None:
                 raise JujuLeaderUnitNotFound(
                     "Cannot execute action: leader unit not found"
@@ -700,7 +736,8 @@ class Libjuju:
         try:
             # Get application
             application = self._get_application(
-                model, application_name=application_name,
+                model,
+                application_name=application_name,
             )
 
             # Return list of actions
@@ -732,7 +769,10 @@ class Libjuju:
         return metrics
 
     async def add_relation(
-        self, model_name: str, endpoint_1: str, endpoint_2: str,
+        self,
+        model_name: str,
+        endpoint_1: str,
+        endpoint_2: str,
     ):
         """Add relation
 
@@ -767,7 +807,9 @@ class Libjuju:
             await self.disconnect_controller(controller)
 
     async def consume(
-        self, offer_url: str, model_name: str,
+        self,
+        offer_url: str,
+        model_name: str,
     ):
         """
         Adds a remote offer to the model. Relations can be created later using "juju relate".
@@ -797,57 +839,45 @@ class Libjuju:
         """
 
         controller = await self.get_controller()
-        model = await self.get_model(controller, model_name)
+        model = None
         try:
+            if not await self.model_exists(model_name, controller=controller):
+                return
+
+            model = await self.get_model(controller, model_name)
             self.log.debug("Destroying model {}".format(model_name))
             uuid = model.info.uuid
 
-            # Destroy machines
-            machines = await model.get_machines()
-            for machine_id in machines:
-                try:
-                    await self.destroy_machine(
-                        model, machine_id=machine_id, total_timeout=total_timeout,
-                    )
-                except asyncio.CancelledError:
-                    raise
-                except Exception:
-                    pass
+            # Destroy machines that are manually provisioned
+            # and still are in pending state
+            await self._destroy_pending_machines(model, only_manual=True)
 
             # Disconnect model
             await self.disconnect_model(model)
 
-            # Destroy model
-            if model_name in self.models:
-                self.models.remove(model_name)
-
-            await controller.destroy_model(uuid)
+            await controller.destroy_model(uuid, force=True, max_wait=0)
 
             # Wait until model is destroyed
             self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
-            last_exception = ""
 
             if total_timeout is None:
                 total_timeout = 3600
             end = time.time() + total_timeout
             while time.time() < end:
-                try:
-                    models = await controller.list_models()
-                    if model_name not in models:
-                        self.log.debug(
-                            "The model {} ({}) was destroyed".format(model_name, uuid)
-                        )
-                        return
-                except asyncio.CancelledError:
-                    raise
-                except Exception as e:
-                    last_exception = e
+                models = await controller.list_models()
+                if model_name not in models:
+                    self.log.debug(
+                        "The model {} ({}) was destroyed".format(model_name, uuid)
+                    )
+                    return
                 await asyncio.sleep(5)
             raise Exception(
-                "Timeout waiting for model {} to be destroyed {}".format(
-                    model_name, last_exception
-                )
+                "Timeout waiting for model {} to be destroyed".format(model_name)
             )
+        except Exception as e:
+            if model:
+                await self.disconnect_model(model)
+            raise e
         finally:
             await self.disconnect_controller(controller)
 
@@ -869,32 +899,22 @@ class Libjuju:
         else:
             self.log.warning("Application not found: {}".format(application_name))
 
-    async def destroy_machine(
-        self, model: Model, machine_id: str, total_timeout: float = 3600
-    ):
+    async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
         """
-        Destroy machine
+        Destroy pending machines in a given model
 
-        :param: model:          Model object
-        :param: machine_id:     Machine id
-        :param: total_timeout:  Timeout in seconds
+        :param: only_manual:    Bool that indicates only manually provisioned
+                                machines should be destroyed (if True), or that
+                                all pending machines should be destroyed
         """
-        machines = await model.get_machines()
-        if machine_id in machines:
-            machine = machines[machine_id]
-            await machine.destroy(force=True)
-            # max timeout
-            end = time.time() + total_timeout
-
-            # wait for machine removal
-            machines = await model.get_machines()
-            while machine_id in machines and time.time() < end:
-                self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
-                await asyncio.sleep(0.5)
-                machines = await model.get_machines()
-            self.log.debug("Machine destroyed: {}".format(machine_id))
-        else:
-            self.log.debug("Machine not found: {}".format(machine_id))
+        status = await model.get_status()
+        for machine_id in status.machines:
+            machine_status = status.machines[machine_id]
+            if machine_status.agent_status.status == "pending":
+                if only_manual and not machine_status.instance_id.startswith("manual:"):
+                    break
+                machine = model.machines[machine_id]
+                await machine.destroy(force=True)
 
     async def configure_application(
         self, model_name: str, application_name: str, config: dict = None
@@ -908,15 +928,18 @@ class Libjuju:
         self.log.debug("Configuring application {}".format(application_name))
 
         if config:
+            controller = await self.get_controller()
+            model = None
             try:
-                controller = await self.get_controller()
                 model = await self.get_model(controller, model_name)
                 application = self._get_application(
-                    model, application_name=application_name,
+                    model,
+                    application_name=application_name,
                 )
                 await application.set_config(config)
             finally:
-                await self.disconnect_model(model)
+                if model:
+                    await self.disconnect_model(model)
                 await self.disconnect_controller(controller)
 
     def _get_api_endpoints_db(self) -> [str]:
@@ -952,7 +975,8 @@ class Libjuju:
         if not juju_info:
             try:
                 self.db.create(
-                    DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+                    DB_DATA.api_endpoints.table,
+                    DB_DATA.api_endpoints.filter,
                 )
             except DbException as e:
                 # Racing condition: check if another N2VC worker has created it
@@ -979,6 +1003,7 @@ class Libjuju:
 
         :param: interval: Time in seconds between checks
         """
+        controller = None
         while True:
             try:
                 controller = await self.get_controller()
@@ -1020,75 +1045,126 @@ class Libjuju:
         finally:
             await self.disconnect_controller(controller)
 
-    async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
+    async def add_k8s(
+        self,
+        name: str,
+        rbac_id: str,
+        token: str,
+        client_cert_data: str,
+        configuration: Configuration,
+        storage_class: str,
+        credential_name: str = None,
+    ):
         """
         Add a Kubernetes cloud to the controller
 
         Similar to the `juju add-k8s` command in the CLI
 
-        :param: name:           Name for the K8s cloud
-        :param: auth_data:      Dictionary with needed credentials. Format:
-        {
-            "server": "192.168.0.21:16443",
-            "cacert": "-----BEGIN CERTIFI...",
-            "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
-
-        }
-        :param: storage_class:  Storage Class to use in the cloud
+        :param: name:               Name for the K8s cloud
+        :param: configuration:      Kubernetes configuration object
+        :param: storage_class:      Storage Class to use in the cloud
+        :param: credential_name:    Storage Class to use in the cloud
         """
 
-        required_auth_data_keys = ["server", "cacert", "token"]
-        missing_keys = []
-        for k in required_auth_data_keys:
-            if k not in auth_data:
-                missing_keys.append(k)
-        if missing_keys:
-            raise Exception(
-                "missing keys in auth_data: {}".format(",".join(missing_keys))
-            )
         if not storage_class:
             raise Exception("storage_class must be a non-empty string")
         if not name:
             raise Exception("name must be a non-empty string")
-
-        endpoint = auth_data["server"]
-        cacert = auth_data["cacert"]
-        token = auth_data["token"]
-        region_name = "{}-region".format(name)
-
+        if not configuration:
+            raise Exception("configuration must be provided")
+
+        endpoint = configuration.host
+        credential = self.get_k8s_cloud_credential(
+            configuration,
+            client_cert_data,
+            token,
+        )
+        credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
         cloud = client.Cloud(
-            auth_types=["certificate"],
-            ca_certificates=[cacert],
+            type_="kubernetes",
+            auth_types=[credential.auth_type],
             endpoint=endpoint,
+            ca_certificates=[client_cert_data],
             config={
                 "operator-storage": storage_class,
                 "workload-storage": storage_class,
             },
-            regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
-            type_="kubernetes",
         )
 
-        cred = client.CloudCredential(
-            auth_type="certificate",
-            attrs={"ClientCertificateData": cacert, "Token": token},
+        return await self.add_cloud(
+            name, cloud, credential, credential_name=credential_name
         )
-        return await self.add_cloud(name, cloud, cred)
+
+    def get_k8s_cloud_credential(
+        self,
+        configuration: Configuration,
+        client_cert_data: str,
+        token: str = None,
+    ) -> client.CloudCredential:
+        attrs = {}
+        # TODO: Test with AKS
+        key = None  # open(configuration.key_file, "r").read()
+        username = configuration.username
+        password = configuration.password
+
+        if client_cert_data:
+            attrs["ClientCertificateData"] = client_cert_data
+        if key:
+            attrs["ClientKeyData"] = key
+        if token:
+            if username or password:
+                raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+            attrs["Token"] = token
+
+        auth_type = None
+        if key:
+            auth_type = "oauth2"
+            if client_cert_data:
+                auth_type = "oauth2withcert"
+            if not token:
+                raise JujuInvalidK8sConfiguration(
+                    "missing token for auth type {}".format(auth_type)
+                )
+        elif username:
+            if not password:
+                self.log.debug(
+                    "credential for user {} has empty password".format(username)
+                )
+            attrs["username"] = username
+            attrs["password"] = password
+            if client_cert_data:
+                auth_type = "userpasswithcert"
+            else:
+                auth_type = "userpass"
+        elif client_cert_data and token:
+            auth_type = "certificate"
+        else:
+            raise JujuInvalidK8sConfiguration("authentication method not supported")
+        return client.CloudCredential(auth_type=auth_type, attrs=attrs)
 
     async def add_cloud(
-        self, name: str, cloud: Cloud, credential: CloudCredential = None
+        self,
+        name: str,
+        cloud: Cloud,
+        credential: CloudCredential = None,
+        credential_name: str = None,
     ) -> Cloud:
         """
         Add cloud to the controller
 
-        :param: name:   Name of the cloud to be added
-        :param: cloud:  Cloud object
-        :param: credential:   CloudCredentials object for the cloud
+        :param: name:               Name of the cloud to be added
+        :param: cloud:              Cloud object
+        :param: credential:         CloudCredentials object for the cloud
+        :param: credential_name:    Credential name.
+                                    If not defined, cloud of the name will be used.
         """
         controller = await self.get_controller()
         try:
             _ = await controller.add_cloud(name, cloud)
             if credential:
-                await controller.add_credential(name, credential=credential, cloud=name)
+                await controller.add_credential(
+                    credential_name or name, credential=credential, cloud=name
+                )
             # Need to return the object returned by the controller.add_cloud() function
             # I'm returning the original value now until this bug is fixed:
             #   https://github.com/juju/python-libjuju/issues/443
@@ -1107,3 +1183,21 @@ class Libjuju:
             await controller.remove_cloud(name)
         finally:
             await self.disconnect_controller(controller)
+
+    async def _get_leader_unit(self, application: Application) -> Unit:
+        unit = None
+        for u in application.units:
+            if await u.is_leader_from_status():
+                unit = u
+                break
+        return unit
+
+    async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+        controller = await self.get_controller()
+        try:
+            facade = client.CloudFacade.from_connection(controller.connection())
+            cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+            params = [client.Entity(cloud_cred_tag)]
+            return (await facade.Credential(params)).results
+        finally:
+            await self.disconnect_controller(controller)