- def _get_api_endpoints_db(self) -> [str]:
- """
- Get API Endpoints from DB
-
- :return: List of API endpoints
- """
- self.log.debug("Getting endpoints from database")
-
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- if juju_info and DB_DATA.api_endpoints.key in juju_info:
- return juju_info[DB_DATA.api_endpoints.key]
-
- def _update_api_endpoints_db(self, endpoints: [str]):
- """
- Update API endpoints in Database
-
- :param: List of endpoints
- """
- self.log.debug("Saving endpoints {} in database".format(endpoints))
-
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- # If it doesn't, then create it
- if not juju_info:
- try:
- self.db.create(
- DB_DATA.api_endpoints.table,
- DB_DATA.api_endpoints.filter,
- )
- except DbException as e:
- # Racing condition: check if another N2VC worker has created it
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- if not juju_info:
- raise e
- self.db.set_one(
- DB_DATA.api_endpoints.table,
- DB_DATA.api_endpoints.filter,
- {DB_DATA.api_endpoints.key: endpoints},
- )
-