| # Copyright 2023 Canonical Ltd. |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| from dataclasses import dataclass |
| from typing import List |
| |
| from juju.controller import Controller |
| from juju.model import Model |
| |
| from n2vc.exceptions import ( |
| JujuControllerFailedConnecting, |
| JujuModelAlreadyExists, |
| ) |
| |
| |
| @dataclass |
| class ConnectionInfo: |
| """Information to connect to juju controller""" |
| |
| endpoint: str |
| user: str |
| password: str |
| cacert: str |
| cloud_name: str |
| cloud_credentials: str |
| |
| def __repr__(self): |
| return f"{self.__class__.__name__}(endpoint: {self.endpoint}, user: {self.user}, password: ******, caert: ******)" |
| |
| def __str__(self): |
| return f"{self.__class__.__name__}(endpoint: {self.endpoint}, user: {self.user}, password: ******, caert: ******)" |
| |
| |
| class Libjuju: |
| def __init__(self, connection_info: ConnectionInfo) -> None: |
| self.logger = logging.getLogger("temporal_libjuju") |
| self.connection_info = connection_info |
| |
| async def get_controller(self) -> Controller: |
| controller = Controller() |
| try: |
| await controller.connect( |
| endpoint=self.connection_info.endpoint, |
| username=self.connection_info.user, |
| password=self.connection_info.password, |
| cacert=self.connection_info.cacert, |
| ) |
| return controller |
| except Exception as e: |
| self.logger.error( |
| "Error connecting to controller={}: {}".format( |
| self.connection_info.endpoint, e |
| ) |
| ) |
| await self.disconnect_controller(controller) |
| raise JujuControllerFailedConnecting(str(e)) |
| |
| async def disconnect_controller(self, controller: Controller) -> None: |
| if controller: |
| await controller.disconnect() |
| |
| async def disconnect_model(self, model: Model): |
| if model: |
| await model.disconnect() |
| |
| async def add_model(self, model_name: str): |
| """Exception is raised if model_name already exists""" |
| model = None |
| controller = None |
| try: |
| controller = await self.get_controller() |
| if await self.model_exists(model_name, controller=controller): |
| raise JujuModelAlreadyExists( |
| "Cannot create model {}".format(model_name) |
| ) |
| self.logger.debug("Creating model {}".format(model_name)) |
| model = await controller.add_model( |
| model_name, |
| cloud_name=self.connection_info.cloud_name, |
| credential_name=self.connection_info.cloud_credentials, |
| ) |
| finally: |
| await self.disconnect_model(model) |
| await self.disconnect_controller(controller) |
| |
| async def model_exists( |
| self, model_name: str, controller: Controller = None |
| ) -> bool: |
| """Returns True if model exists. False otherwhise.""" |
| need_to_disconnect = False |
| try: |
| if not controller: |
| controller = await self.get_controller() |
| need_to_disconnect = True |
| |
| return model_name in await controller.list_models() |
| finally: |
| if need_to_disconnect: |
| await self.disconnect_controller(controller) |
| |
| async def get_model(self, controller: Controller, model_name: str) -> Model: |
| return await controller.get_model(model_name) |
| |
| async def list_models(self) -> List[str]: |
| """List models in controller.""" |
| try: |
| controller = await self.get_controller() |
| return await controller.list_models() |
| finally: |
| await self.disconnect_controller(controller) |
| |
| async def destroy_model(self, model_name: str, force=False) -> None: |
| controller = None |
| model = None |
| |
| try: |
| controller = await self.get_controller() |
| if not await self.model_exists(model_name, controller=controller): |
| self.logger.warn(f"Model {model_name} doesn't exist") |
| return |
| |
| self.logger.debug(f"Getting model {model_name} to destroy") |
| model = await self.get_model(controller, model_name) |
| await self.disconnect_model(model) |
| |
| await controller.destroy_model( |
| model_name, destroy_storage=True, force=force, max_wait=60 |
| ) |
| |
| except Exception as e: |
| self.logger.warn(f"Failed deleting model {model_name}: {e}") |
| raise e |
| finally: |
| await self.disconnect_model(model) |
| await self.disconnect_controller(controller) |