| # Copyright 2021 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import typing |
| |
| from n2vc.config import EnvironConfig, ModelConfig |
| from n2vc.store import Store |
| from n2vc.vca.cloud import Cloud |
| from n2vc.vca.connection_data import ConnectionData |
| |
| |
| class Connection: |
| def __init__(self, store: Store, vca_id: str = None): |
| """ |
| Contructor |
| |
| :param: store: Store object. Used to communicate wuth the DB |
| :param: vca_id: Id of the VCA. If none specified, the default VCA will be used. |
| """ |
| self._data = None |
| self.default = vca_id is None |
| self._vca_id = vca_id |
| self._store = store |
| |
| async def load(self): |
| """Load VCA connection data""" |
| await self._load_vca_connection_data() |
| |
| @property |
| def is_default(self): |
| return self._vca_id is None |
| |
| @property |
| def data(self) -> ConnectionData: |
| return self._data |
| |
| async def _load_vca_connection_data(self) -> typing.NoReturn: |
| """ |
| Load VCA connection data |
| |
| If self._vca_id is None, it will get the VCA data from the Environment variables, |
| and the default VCA will be used. If it is not None, then it means that it will |
| load the credentials from the database (A non-default VCA will be used). |
| """ |
| if self._vca_id: |
| self._data = await self._store.get_vca_connection_data(self._vca_id) |
| else: |
| envs = EnvironConfig() |
| # Get endpoints from the DB and ENV. Check if update in the database is needed or not. |
| db_endpoints = await self._store.get_vca_endpoints() |
| env_endpoints = ( |
| envs["endpoints"].split(",") |
| if "endpoints" in envs |
| else ["{}:{}".format(envs["host"], envs.get("port", 17070))] |
| ) |
| |
| db_update_needed = not all(e in db_endpoints for e in env_endpoints) |
| |
| endpoints = env_endpoints if db_update_needed else db_endpoints |
| config = { |
| "endpoints": endpoints, |
| "user": envs["user"], |
| "secret": envs["secret"], |
| "cacert": envs["cacert"], |
| "pubkey": envs["pubkey"], |
| "lxd-cloud": envs["cloud"], |
| "lxd-credentials": envs.get("credentials", envs["cloud"]), |
| "k8s-cloud": envs["k8s_cloud"], |
| "k8s-credentials": envs.get("k8s_credentials", envs["k8s_cloud"]), |
| "model-config": ModelConfig(envs), |
| "api-proxy": envs.get("api_proxy", None), |
| } |
| self._data = ConnectionData(**config) |
| if db_update_needed: |
| await self.update_endpoints(endpoints) |
| |
| @property |
| def endpoints(self): |
| return self._data.endpoints |
| |
| async def update_endpoints(self, endpoints: typing.List[str]): |
| await self._store.update_vca_endpoints(endpoints, self._vca_id) |
| self._data.endpoints = endpoints |
| |
| @property |
| def lxd_cloud(self) -> Cloud: |
| return Cloud(self.data.lxd_cloud, self.data.lxd_credentials) |
| |
| @property |
| def k8s_cloud(self) -> Cloud: |
| return Cloud(self.data.k8s_cloud, self.data.k8s_credentials) |
| |
| |
| async def get_connection(store: Store, vca_id: str = None) -> Connection: |
| """ |
| Get Connection |
| |
| Method to get a Connection object with the VCA information loaded |
| """ |
| connection = Connection(store, vca_id=vca_id) |
| await connection.load() |
| return connection |