+ async def _validate_instantiation_params(
+ self, uri: str, model, instantiation_params: dict
+ ) -> None:
+ """Checks if all the applications in instantiation_params
+ exist ins the original bundle.
+
+ Raises:
+ JujuApplicationNotFound if there is an invalid app in
+ the instantiation params.
+ """
+ overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
+ if not overlay_apps:
+ return
+ original_apps = await self._get_apps_in_original_bundle(uri, model)
+ if not all(app in original_apps for app in overlay_apps):
+ raise JujuApplicationNotFound(
+ "Cannot find application {} in original bundle {}".format(
+ overlay_apps, original_apps
+ )
+ )
+
+ async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
+ """Bundle is downloaded in BundleHandler.fetch_plan.
+ That method takes care of opening and exception handling.
+
+ Resolve method gets all the information regarding the channel,
+ track, revision, type, source.
+
+ Returns:
+ Set with the names of the applications in original bundle.
+ """
+ url = URL.parse(uri)
+ architecture = DEFAULT_ARCHITECTURE # only AMD64 is allowed
+ res = await model.deploy_types[str(url.schema)].resolve(
+ url, architecture, entity_url=uri
+ )
+ handler = BundleHandler(model, trusted=True, forced=False)
+ await handler.fetch_plan(url, res.origin)
+ return handler.applications
+
+ def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
+ """Extract applications key in instantiation params.
+
+ Returns:
+ List with the names of the applications in instantiation params.
+
+ Raises:
+ JujuError if applications key is not found.
+ """
+ if not instantiation_params:
+ return []
+ try:
+ return [key for key in instantiation_params.get("applications")]
+ except Exception as e:
+ raise JujuError("Invalid overlay format. {}".format(str(e)))
+
+ def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
+ """Creates a temporary overlay file which includes the instantiation params.
+ Only one overlay file is created.
+
+ Returns:
+ List with one overlay filename. Empty list if there are no instantiation params.
+ """
+ if not instantiation_params:
+ return []
+ file_name = model_name + "-overlay.yaml"
+ self._write_overlay_file(file_name, instantiation_params)
+ return [file_name]
+
+ def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
+ with open(file_name, "w") as file:
+ yaml.dump(instantiation_params, file)
+
+ def _remove_overlay_file(self, overlay: list) -> None:
+ """Overlay contains either one or zero file names."""
+ if not overlay:
+ return
+ try:
+ filename = overlay[0]
+ os.remove(filename)
+ except OSError as e:
+ self.log.warning(
+ "Overlay file {} could not be removed: {}".format(filename, e)
+ )
+
+ async def add_unit(
+ self,
+ application_name: str,
+ model_name: str,
+ machine_id: str,
+ db_dict: dict = None,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ):
+ """Add unit
+
+ :param: application_name: Application name
+ :param: model_name: Model name
+ :param: machine_id Machine id
+ :param: db_dict: Dictionary with data of the DB to write the updates
+ :param: progress_timeout: Maximum time between two updates in the model
+ :param: total_timeout: Timeout for the entity to be active
+
+ :return: None
+ """
+
+ model = None
+ controller = await self.get_controller()
+ try:
+ model = await self.get_model(controller, model_name)
+ application = self._get_application(model, application_name)
+
+ if application is not None:
+ # Checks if the given machine id in the model,
+ # otherwise function raises an error
+ _machine, _series = self._get_machine_info(model, machine_id)
+
+ self.log.debug(
+ "Adding unit (machine {}) to application {} in model ~{}".format(
+ machine_id, application_name, model_name
+ )
+ )
+
+ await application.add_unit(to=machine_id)
+
+ await JujuModelWatcher.wait_for(
+ model=model,
+ entity=application,
+ progress_timeout=progress_timeout,
+ total_timeout=total_timeout,
+ db_dict=db_dict,
+ n2vc=self.n2vc,
+ vca_id=self.vca_connection._vca_id,
+ )
+ self.log.debug(
+ "Unit is added to application {} in model {}".format(
+ application_name, model_name
+ )
+ )
+ else:
+ raise JujuApplicationNotFound(
+ "Application {} not exists".format(application_name)
+ )
+ finally:
+ if model:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+
+ async def destroy_unit(
+ self,
+ application_name: str,
+ model_name: str,
+ machine_id: str,
+ total_timeout: float = None,
+ ):
+ """Destroy unit
+
+ :param: application_name: Application name
+ :param: model_name: Model name
+ :param: machine_id Machine id
+ :param: total_timeout: Timeout for the entity to be active
+
+ :return: None
+ """
+
+ model = None
+ controller = await self.get_controller()
+ try:
+ model = await self.get_model(controller, model_name)
+ application = self._get_application(model, application_name)
+
+ if application is None:
+ raise JujuApplicationNotFound(
+ "Application not found: {} (model={})".format(
+ application_name, model_name
+ )
+ )
+
+ unit = self._get_unit(application, machine_id)
+ if not unit:
+ raise JujuError(
+ "A unit with machine id {} not in available units".format(
+ machine_id
+ )
+ )
+
+ unit_name = unit.name
+
+ self.log.debug(
+ "Destroying unit {} from application {} in model {}".format(
+ unit_name, application_name, model_name
+ )
+ )
+ await application.destroy_unit(unit_name)
+
+ self.log.debug(
+ "Waiting for unit {} to be destroyed in application {} (model={})...".format(
+ unit_name, application_name, model_name
+ )
+ )
+
+ # TODO: Add functionality in the Juju watcher to replace this kind of blocks
+ if total_timeout is None:
+ total_timeout = 3600
+ end = time.time() + total_timeout
+ while time.time() < end:
+ if not self._get_unit(application, machine_id):
+ self.log.debug(
+ "The unit {} was destroyed in application {} (model={}) ".format(
+ unit_name, application_name, model_name
+ )
+ )
+ return
+ await asyncio.sleep(5)
+ self.log.debug(
+ "Unit {} is destroyed from application {} in model {}".format(
+ unit_name, application_name, model_name
+ )
+ )
+ finally:
+ if model:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+