+ try:
+ controller = await self.get_controller()
+ model = await self.get_model(controller, model_name)
+ application = self._get_application(
+ model, application_name=application_name,
+ )
+ await application.set_config(config)
+ finally:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+
+ def _get_api_endpoints_db(self) -> [str]:
+ """
+ Get API Endpoints from DB
+
+ :return: List of API endpoints
+ """
+ self.log.debug("Getting endpoints from database")
+
+ juju_info = self.db.get_one(
+ DB_DATA.api_endpoints.table,
+ q_filter=DB_DATA.api_endpoints.filter,
+ fail_on_empty=False,
+ )
+ if juju_info and DB_DATA.api_endpoints.key in juju_info:
+ return juju_info[DB_DATA.api_endpoints.key]
+
+ def _update_api_endpoints_db(self, endpoints: [str]):
+ """
+ Update API endpoints in Database
+
+ :param: List of endpoints
+ """
+ self.log.debug("Saving endpoints {} in database".format(endpoints))
+
+ juju_info = self.db.get_one(
+ DB_DATA.api_endpoints.table,
+ q_filter=DB_DATA.api_endpoints.filter,
+ fail_on_empty=False,
+ )
+ # If it doesn't, then create it
+ if not juju_info:
+ try:
+ self.db.create(
+ DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
+ )
+ except DbException as e:
+ # Racing condition: check if another N2VC worker has created it
+ juju_info = self.db.get_one(
+ DB_DATA.api_endpoints.table,
+ q_filter=DB_DATA.api_endpoints.filter,
+ fail_on_empty=False,
+ )
+ if not juju_info:
+ raise e
+ self.db.set_one(
+ DB_DATA.api_endpoints.table,
+ DB_DATA.api_endpoints.filter,
+ {DB_DATA.api_endpoints.key: endpoints},
+ )
+
+ def handle_exception(self, loop, context):
+ # All unhandled exceptions by libjuju are handled here.
+ pass
+
+ async def health_check(self, interval: float = 300.0):
+ """
+ Health check to make sure controller and controller_model connections are OK
+
+ :param: interval: Time in seconds between checks
+ """
+ while True:
+ try:
+ controller = await self.get_controller()
+ # self.log.debug("VCA is alive")
+ except Exception as e:
+ self.log.error("Health check to VCA failed: {}".format(e))
+ finally:
+ await self.disconnect_controller(controller)
+ await asyncio.sleep(interval)
+
+ async def list_models(self, contains: str = None) -> [str]:
+ """List models with certain names
+
+ :param: contains: String that is contained in model name
+
+ :retur: [models] Returns list of model names
+ """
+
+ controller = await self.get_controller()
+ try:
+ models = await controller.list_models()
+ if contains:
+ models = [model for model in models if contains in model]
+ return models
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
+ """List models with certain names
+
+ :param: model_name: Model name
+
+ :return: Returns list of offers
+ """
+
+ controller = await self.get_controller()
+ try:
+ return await controller.list_offers(model_name)
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def add_k8s(
+ self,
+ name: str,
+ configuration: Configuration,
+ storage_class: str,
+ credential_name: str = None,
+ ):
+ """
+ Add a Kubernetes cloud to the controller
+
+ Similar to the `juju add-k8s` command in the CLI
+
+ :param: name: Name for the K8s cloud
+ :param: configuration: Kubernetes configuration object
+ :param: storage_class: Storage Class to use in the cloud
+ :param: credential_name: Storage Class to use in the cloud
+ """
+
+ if not storage_class:
+ raise Exception("storage_class must be a non-empty string")
+ if not name:
+ raise Exception("name must be a non-empty string")
+ if not configuration:
+ raise Exception("configuration must be provided")
+
+ endpoint = configuration.host
+ credential = self.get_k8s_cloud_credential(configuration)
+ ca_certificates = (
+ [credential.attrs["ClientCertificateData"]]
+ if "ClientCertificateData" in credential.attrs
+ else []
+ )
+ cloud = client.Cloud(
+ type_="kubernetes",
+ auth_types=[credential.auth_type],
+ endpoint=endpoint,
+ ca_certificates=ca_certificates,
+ config={
+ "operator-storage": storage_class,
+ "workload-storage": storage_class,
+ },
+ )
+
+ return await self.add_cloud(
+ name, cloud, credential, credential_name=credential_name
+ )
+
+ def get_k8s_cloud_credential(
+ self, configuration: Configuration,
+ ) -> client.CloudCredential:
+ attrs = {}
+ ca_cert = configuration.ssl_ca_cert or configuration.cert_file
+ key = configuration.key_file
+ api_key = configuration.api_key
+ token = None
+ username = configuration.username
+ password = configuration.password
+
+ if "authorization" in api_key:
+ authorization = api_key["authorization"]
+ if "Bearer " in authorization:
+ bearer_list = authorization.split(" ")
+ if len(bearer_list) == 2:
+ [_, token] = bearer_list
+ else:
+ raise JujuInvalidK8sConfiguration("unknown format of api_key")
+ else:
+ token = authorization
+ if ca_cert:
+ attrs["ClientCertificateData"] = open(ca_cert, "r").read()
+ if key:
+ attrs["ClientKeyData"] = open(key, "r").read()
+ if token:
+ if username or password:
+ raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+ attrs["Token"] = token
+
+ auth_type = None
+ if key:
+ auth_type = "oauth2"
+ if not token:
+ raise JujuInvalidK8sConfiguration(
+ "missing token for auth type {}".format(auth_type)
+ )
+ elif username:
+ if not password:
+ self.log.debug(
+ "credential for user {} has empty password".format(username)
+ )
+ attrs["username"] = username
+ attrs["password"] = password
+ if ca_cert:
+ auth_type = "userpasswithcert"
+ else:
+ auth_type = "userpass"
+ elif ca_cert and token:
+ auth_type = "certificate"
+ else:
+ raise JujuInvalidK8sConfiguration("authentication method not supported")
+ return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
+
+ async def add_cloud(
+ self,
+ name: str,
+ cloud: Cloud,
+ credential: CloudCredential = None,
+ credential_name: str = None,
+ ) -> Cloud:
+ """
+ Add cloud to the controller
+
+ :param: name: Name of the cloud to be added
+ :param: cloud: Cloud object
+ :param: credential: CloudCredentials object for the cloud
+ :param: credential_name: Credential name.
+ If not defined, cloud of the name will be used.
+ """
+ controller = await self.get_controller()
+ try:
+ _ = await controller.add_cloud(name, cloud)
+ if credential:
+ await controller.add_credential(
+ credential_name or name, credential=credential, cloud=name
+ )
+ # Need to return the object returned by the controller.add_cloud() function
+ # I'm returning the original value now until this bug is fixed:
+ # https://github.com/juju/python-libjuju/issues/443
+ return cloud
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def remove_cloud(self, name: str):
+ """
+ Remove cloud
+
+ :param: name: Name of the cloud to be removed
+ """
+ controller = await self.get_controller()
+ try:
+ await controller.remove_cloud(name)
+ finally:
+ await self.disconnect_controller(controller)
+
+ async def _get_leader_unit(self, application: Application) -> Unit:
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ break
+ return unit