Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / store.py
1 # Copyright 2021 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16 import typing
17
18 from motor.motor_asyncio import AsyncIOMotorClient
19 from n2vc.config import EnvironConfig
20 from n2vc.vca.connection_data import ConnectionData
21 from osm_common.dbmongo import DbMongo, DbException
22 from osm_common.dbbase import Encryption
23
24
25 DB_NAME = "osm"
26
27
28 class Store(abc.ABC):
29 @abc.abstractmethod
30 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
31 """
32 Get VCA connection data
33
34 :param: vca_id: VCA ID
35
36 :returns: ConnectionData with the information of the database
37 """
38
39 @abc.abstractmethod
40 async def update_vca_endpoints(self, hosts: typing.List[str], vca_id: str):
41 """
42 Update VCA endpoints
43
44 :param: endpoints: List of endpoints to write in the database
45 :param: vca_id: VCA ID
46 """
47
48 @abc.abstractmethod
49 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
50 """
51 Get list if VCA endpoints
52
53 :param: vca_id: VCA ID
54
55 :returns: List of endpoints
56 """
57
58 @abc.abstractmethod
59 async def get_vca_id(self, vim_id: str = None) -> str:
60 """
61 Get VCA id for a VIM account
62
63 :param: vim_id: Vim account ID
64 """
65
66
67 class DbMongoStore(Store):
68 def __init__(self, db: DbMongo):
69 """
70 Constructor
71
72 :param: db: osm_common.dbmongo.DbMongo object
73 """
74 self.db = db
75
76 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
77 """
78 Get VCA connection data
79
80 :param: vca_id: VCA ID
81
82 :returns: ConnectionData with the information of the database
83 """
84 data = self.db.get_one("vca", q_filter={"_id": vca_id})
85 self.db.encrypt_decrypt_fields(
86 data,
87 "decrypt",
88 ["secret", "cacert"],
89 schema_version=data["schema_version"],
90 salt=data["_id"],
91 )
92 return ConnectionData(**data)
93
94 async def update_vca_endpoints(
95 self, endpoints: typing.List[str], vca_id: str = None
96 ):
97 """
98 Update VCA endpoints
99
100 :param: endpoints: List of endpoints to write in the database
101 :param: vca_id: VCA ID
102 """
103 if vca_id:
104 data = self.db.get_one("vca", q_filter={"_id": vca_id})
105 data["endpoints"] = endpoints
106 self._update("vca", vca_id, data)
107 else:
108 # The default VCA. Data for the endpoints is in a different place
109 juju_info = self._get_juju_info()
110 # If it doesn't, then create it
111 if not juju_info:
112 try:
113 self.db.create(
114 "vca",
115 {"_id": "juju"},
116 )
117 except DbException as e:
118 # Racing condition: check if another N2VC worker has created it
119 juju_info = self._get_juju_info()
120 if not juju_info:
121 raise e
122 self.db.set_one(
123 "vca",
124 {"_id": "juju"},
125 {"api_endpoints": endpoints},
126 )
127
128 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
129 """
130 Get list if VCA endpoints
131
132 :param: vca_id: VCA ID
133
134 :returns: List of endpoints
135 """
136 endpoints = []
137 if vca_id:
138 endpoints = self.get_vca_connection_data(vca_id).endpoints
139 else:
140 juju_info = self._get_juju_info()
141 if juju_info and "api_endpoints" in juju_info:
142 endpoints = juju_info["api_endpoints"]
143 return endpoints
144
145 async def get_vca_id(self, vim_id: str = None) -> str:
146 """
147 Get VCA ID from the database for a given VIM account ID
148
149 :param: vim_id: VIM account ID
150 """
151 return (
152 self.db.get_one(
153 "vim_accounts",
154 q_filter={"_id": vim_id},
155 fail_on_empty=False,
156 ).get("vca")
157 if vim_id
158 else None
159 )
160
161 def _update(self, collection: str, id: str, data: dict):
162 """
163 Update object in database
164
165 :param: collection: Collection name
166 :param: id: ID of the object
167 :param: data: Object data
168 """
169 self.db.replace(
170 collection,
171 id,
172 data,
173 )
174
175 def _get_juju_info(self):
176 """Get Juju information (the default VCA) from the admin collection"""
177 return self.db.get_one(
178 "vca",
179 q_filter={"_id": "juju"},
180 fail_on_empty=False,
181 )
182
183
184 class MotorStore(Store):
185 def __init__(self, uri: str):
186 """
187 Constructor
188
189 :param: uri: Connection string to connect to the database.
190 """
191 self._client = AsyncIOMotorClient(uri)
192 self._secret_key = None
193 self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"])
194 self.encryption = Encryption(
195 uri=uri,
196 config=self._config,
197 encoding_type="utf-8",
198 logger_name="db",
199 )
200
201 @property
202 def _database(self):
203 return self._client[DB_NAME]
204
205 @property
206 def _vca_collection(self):
207 return self._database["vca"]
208
209 @property
210 def _admin_collection(self):
211 return self._database["admin"]
212
213 @property
214 def _vim_accounts_collection(self):
215 return self._database["vim_accounts"]
216
217 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
218 """
219 Get VCA connection data
220
221 :param: vca_id: VCA ID
222
223 :returns: ConnectionData with the information of the database
224 """
225 data = await self._vca_collection.find_one({"_id": vca_id})
226 if not data:
227 raise Exception("vca with id {} not found".format(vca_id))
228 await self.encryption.decrypt_fields(
229 data,
230 ["secret", "cacert"],
231 schema_version=data["schema_version"],
232 salt=data["_id"],
233 )
234 return ConnectionData(**data)
235
236 async def update_vca_endpoints(
237 self, endpoints: typing.List[str], vca_id: str = None
238 ):
239 """
240 Update VCA endpoints
241
242 :param: endpoints: List of endpoints to write in the database
243 :param: vca_id: VCA ID
244 """
245 if vca_id:
246 data = await self._vca_collection.find_one({"_id": vca_id})
247 data["endpoints"] = endpoints
248 await self._vca_collection.replace_one({"_id": vca_id}, data)
249 else:
250 # The default VCA. Data for the endpoints is in a different place
251 juju_info = await self._get_juju_info()
252 # If it doesn't, then create it
253 if not juju_info:
254 try:
255 await self._admin_collection.insert_one({"_id": "juju"})
256 except Exception as e:
257 # Racing condition: check if another N2VC worker has created it
258 juju_info = await self._get_juju_info()
259 if not juju_info:
260 raise e
261
262 await self._admin_collection.replace_one(
263 {"_id": "juju"}, {"api_endpoints": endpoints}
264 )
265
266 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
267 """
268 Get list if VCA endpoints
269
270 :param: vca_id: VCA ID
271
272 :returns: List of endpoints
273 """
274 endpoints = []
275 if vca_id:
276 endpoints = (await self.get_vca_connection_data(vca_id)).endpoints
277 else:
278 juju_info = await self._get_juju_info()
279 if juju_info and "api_endpoints" in juju_info:
280 endpoints = juju_info["api_endpoints"]
281 return endpoints
282
283 async def get_vca_id(self, vim_id: str = None) -> str:
284 """
285 Get VCA ID from the database for a given VIM account ID
286
287 :param: vim_id: VIM account ID
288 """
289 vca_id = None
290 if vim_id:
291 vim_account = await self._vim_accounts_collection.find_one({"_id": vim_id})
292 if vim_account and "vca" in vim_account:
293 vca_id = vim_account["vca"]
294 return vca_id
295
296 async def _get_juju_info(self):
297 """Get Juju information (the default VCA) from the admin collection"""
298 return await self._admin_collection.find_one({"_id": "juju"})