Feature 10974: Add juju instantiation params.
[osm/N2VC.git] / n2vc / libjuju.py
index 1785caa..55ca859 100644 (file)
 
 import asyncio
 import logging
 
 import asyncio
 import logging
+import os
 import typing
 import typing
+import yaml
 
 import time
 
 import juju.errors
 
 import time
 
 import juju.errors
+from juju.bundle import BundleHandler
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
 from juju.unit import Unit
 from juju.model import Model
 from juju.machine import Machine
 from juju.application import Application
 from juju.unit import Unit
+from juju.url import URL
+from juju.version import DEFAULT_ARCHITECTURE
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
 from juju.client._definitions import (
     FullStatus,
     QueryApplicationOffersResults,
@@ -549,27 +554,122 @@ class Libjuju:
         return machine_id
 
     async def deploy(
         return machine_id
 
     async def deploy(
-        self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+        self,
+        uri: str,
+        model_name: str,
+        wait: bool = True,
+        timeout: float = 3600,
+        instantiation_params: dict = None,
     ):
         """
         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
 
     ):
         """
         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
 
-        :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
-        :param: model_name:     Model name
-        :param: wait:           Indicates whether to wait or not until all applications are active
-        :param: timeout:        Time in seconds to wait until all applications are active
+        :param uri:            Path or Charm Store uri in which the charm or bundle can be found
+        :param model_name:     Model name
+        :param wait:           Indicates whether to wait or not until all applications are active
+        :param timeout:        Time in seconds to wait until all applications are active
+        :param instantiation_params: To be applied as overlay bundle over primary bundle.
         """
         controller = await self.get_controller()
         model = await self.get_model(controller, model_name)
         """
         controller = await self.get_controller()
         model = await self.get_model(controller, model_name)
+        overlays = []
         try:
         try:
-            await model.deploy(uri, trust=True)
+            await self._validate_instantiation_params(uri, model, instantiation_params)
+            overlays = self._get_overlays(model_name, instantiation_params)
+            await model.deploy(uri, trust=True, overlays=overlays)
             if wait:
                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
                 self.log.debug("All units active in model {}".format(model_name))
         finally:
             if wait:
                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
                 self.log.debug("All units active in model {}".format(model_name))
         finally:
+            self._remove_overlay_file(overlays)
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
             await self.disconnect_model(model)
             await self.disconnect_controller(controller)
 
+    async def _validate_instantiation_params(
+        self, uri: str, model, instantiation_params: dict
+    ) -> None:
+        """Checks if all the applications in instantiation_params
+        exist ins the original bundle.
+
+        Raises:
+            JujuApplicationNotFound if there is an invalid app in
+            the instantiation params.
+        """
+        overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
+        if not overlay_apps:
+            return
+        original_apps = await self._get_apps_in_original_bundle(uri, model)
+        if not all(app in original_apps for app in overlay_apps):
+            raise JujuApplicationNotFound(
+                "Cannot find application {} in original bundle {}".format(
+                    overlay_apps, original_apps
+                )
+            )
+
+    async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
+        """Bundle is downloaded in BundleHandler.fetch_plan.
+        That method takes care of opening and exception handling.
+
+        Resolve method gets all the information regarding the channel,
+        track, revision, type, source.
+
+        Returns:
+            Set with the names of the applications in original bundle.
+        """
+        url = URL.parse(uri)
+        architecture = DEFAULT_ARCHITECTURE  # only AMD64 is allowed
+        res = await model.deploy_types[str(url.schema)].resolve(
+            url, architecture, entity_url=uri
+        )
+        handler = BundleHandler(model, trusted=True, forced=False)
+        await handler.fetch_plan(url, res.origin)
+        return handler.applications
+
+    def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
+        """Extract applications key in instantiation params.
+
+        Returns:
+            List with the names of the applications in instantiation params.
+
+        Raises:
+            JujuError if applications key is not found.
+        """
+        if not instantiation_params:
+            return []
+        try:
+            return [key for key in instantiation_params.get("applications")]
+        except Exception as e:
+            raise JujuError("Invalid overlay format. {}".format(str(e)))
+
+    def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
+        """Creates a temporary overlay file which includes the instantiation params.
+        Only one overlay file is created.
+
+        Returns:
+            List with one overlay filename. Empty list if there are no instantiation params.
+        """
+        if not instantiation_params:
+            return []
+        file_name = model_name + "-overlay.yaml"
+        self._write_overlay_file(file_name, instantiation_params)
+        return [file_name]
+
+    def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
+        with open(file_name, "w") as file:
+            yaml.dump(instantiation_params, file)
+
+    def _remove_overlay_file(self, overlay: list) -> None:
+        """Overlay contains either one or zero file names."""
+        if not overlay:
+            return
+        try:
+            filename = overlay[0]
+            os.remove(filename)
+        except OSError as e:
+            self.log.warning(
+                "Overlay file {} could not be removed: {}".format(filename, e)
+            )
+
     async def add_unit(
         self,
         application_name: str,
     async def add_unit(
         self,
         application_name: str,