Feature 10950: Replace pycrypto with pycryptodome
[osm/N2VC.git] / n2vc / store.py
1 # Copyright 2021 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16 import asyncio
17 import typing
18
19 from motor.motor_asyncio import AsyncIOMotorClient
20 from n2vc.config import EnvironConfig
21 from n2vc.vca.connection_data import ConnectionData
22 from osm_common.dbmongo import DbMongo, DbException
23 from osm_common.dbbase import Encryption
24
25
26 DB_NAME = "osm"
27
28
29 class Store(abc.ABC):
30 @abc.abstractmethod
31 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
32 """
33 Get VCA connection data
34
35 :param: vca_id: VCA ID
36
37 :returns: ConnectionData with the information of the database
38 """
39
40 @abc.abstractmethod
41 async def update_vca_endpoints(self, hosts: typing.List[str], vca_id: str):
42 """
43 Update VCA endpoints
44
45 :param: endpoints: List of endpoints to write in the database
46 :param: vca_id: VCA ID
47 """
48
49 @abc.abstractmethod
50 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
51 """
52 Get list if VCA endpoints
53
54 :param: vca_id: VCA ID
55
56 :returns: List of endpoints
57 """
58
59 @abc.abstractmethod
60 async def get_vca_id(self, vim_id: str = None) -> str:
61 """
62 Get VCA id for a VIM account
63
64 :param: vim_id: Vim account ID
65 """
66
67
68 class DbMongoStore(Store):
69 def __init__(self, db: DbMongo):
70 """
71 Constructor
72
73 :param: db: osm_common.dbmongo.DbMongo object
74 """
75 self.db = db
76
77 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
78 """
79 Get VCA connection data
80
81 :param: vca_id: VCA ID
82
83 :returns: ConnectionData with the information of the database
84 """
85 data = self.db.get_one("vca", q_filter={"_id": vca_id})
86 self.db.encrypt_decrypt_fields(
87 data,
88 "decrypt",
89 ["secret", "cacert"],
90 schema_version=data["schema_version"],
91 salt=data["_id"],
92 )
93 return ConnectionData(**data)
94
95 async def update_vca_endpoints(
96 self, endpoints: typing.List[str], vca_id: str = None
97 ):
98 """
99 Update VCA endpoints
100
101 :param: endpoints: List of endpoints to write in the database
102 :param: vca_id: VCA ID
103 """
104 if vca_id:
105 data = self.db.get_one("vca", q_filter={"_id": vca_id})
106 data["endpoints"] = endpoints
107 self._update("vca", vca_id, data)
108 else:
109 # The default VCA. Data for the endpoints is in a different place
110 juju_info = self._get_juju_info()
111 # If it doesn't, then create it
112 if not juju_info:
113 try:
114 self.db.create(
115 "vca",
116 {"_id": "juju"},
117 )
118 except DbException as e:
119 # Racing condition: check if another N2VC worker has created it
120 juju_info = self._get_juju_info()
121 if not juju_info:
122 raise e
123 self.db.set_one(
124 "vca",
125 {"_id": "juju"},
126 {"api_endpoints": endpoints},
127 )
128
129 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
130 """
131 Get list if VCA endpoints
132
133 :param: vca_id: VCA ID
134
135 :returns: List of endpoints
136 """
137 endpoints = []
138 if vca_id:
139 endpoints = self.get_vca_connection_data(vca_id).endpoints
140 else:
141 juju_info = self._get_juju_info()
142 if juju_info and "api_endpoints" in juju_info:
143 endpoints = juju_info["api_endpoints"]
144 return endpoints
145
146 async def get_vca_id(self, vim_id: str = None) -> str:
147 """
148 Get VCA ID from the database for a given VIM account ID
149
150 :param: vim_id: VIM account ID
151 """
152 return (
153 self.db.get_one(
154 "vim_accounts",
155 q_filter={"_id": vim_id},
156 fail_on_empty=False,
157 ).get("vca")
158 if vim_id
159 else None
160 )
161
162 def _update(self, collection: str, id: str, data: dict):
163 """
164 Update object in database
165
166 :param: collection: Collection name
167 :param: id: ID of the object
168 :param: data: Object data
169 """
170 self.db.replace(
171 collection,
172 id,
173 data,
174 )
175
176 def _get_juju_info(self):
177 """Get Juju information (the default VCA) from the admin collection"""
178 return self.db.get_one(
179 "vca",
180 q_filter={"_id": "juju"},
181 fail_on_empty=False,
182 )
183
184
185 class MotorStore(Store):
186 def __init__(self, uri: str, loop=None):
187 """
188 Constructor
189
190 :param: uri: Connection string to connect to the database.
191 :param: loop: Asyncio Loop
192 """
193 self._client = AsyncIOMotorClient(uri)
194 self.loop = loop or asyncio.get_event_loop()
195 self._secret_key = None
196 self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"])
197 self.encryption = Encryption(
198 uri=uri,
199 config=self._config,
200 encoding_type="utf-8",
201 loop=self.loop,
202 logger_name="db",
203 )
204
205 @property
206 def _database(self):
207 return self._client[DB_NAME]
208
209 @property
210 def _vca_collection(self):
211 return self._database["vca"]
212
213 @property
214 def _admin_collection(self):
215 return self._database["admin"]
216
217 @property
218 def _vim_accounts_collection(self):
219 return self._database["vim_accounts"]
220
221 async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
222 """
223 Get VCA connection data
224
225 :param: vca_id: VCA ID
226
227 :returns: ConnectionData with the information of the database
228 """
229 data = await self._vca_collection.find_one({"_id": vca_id})
230 if not data:
231 raise Exception("vca with id {} not found".format(vca_id))
232 await self.encryption.decrypt_fields(
233 data,
234 ["secret", "cacert"],
235 schema_version=data["schema_version"],
236 salt=data["_id"],
237 )
238 return ConnectionData(**data)
239
240 async def update_vca_endpoints(
241 self, endpoints: typing.List[str], vca_id: str = None
242 ):
243 """
244 Update VCA endpoints
245
246 :param: endpoints: List of endpoints to write in the database
247 :param: vca_id: VCA ID
248 """
249 if vca_id:
250 data = await self._vca_collection.find_one({"_id": vca_id})
251 data["endpoints"] = endpoints
252 await self._vca_collection.replace_one({"_id": vca_id}, data)
253 else:
254 # The default VCA. Data for the endpoints is in a different place
255 juju_info = await self._get_juju_info()
256 # If it doesn't, then create it
257 if not juju_info:
258 try:
259 await self._admin_collection.insert_one({"_id": "juju"})
260 except Exception as e:
261 # Racing condition: check if another N2VC worker has created it
262 juju_info = await self._get_juju_info()
263 if not juju_info:
264 raise e
265
266 await self._admin_collection.replace_one(
267 {"_id": "juju"}, {"api_endpoints": endpoints}
268 )
269
270 async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
271 """
272 Get list if VCA endpoints
273
274 :param: vca_id: VCA ID
275
276 :returns: List of endpoints
277 """
278 endpoints = []
279 if vca_id:
280 endpoints = (await self.get_vca_connection_data(vca_id)).endpoints
281 else:
282 juju_info = await self._get_juju_info()
283 if juju_info and "api_endpoints" in juju_info:
284 endpoints = juju_info["api_endpoints"]
285 return endpoints
286
287 async def get_vca_id(self, vim_id: str = None) -> str:
288 """
289 Get VCA ID from the database for a given VIM account ID
290
291 :param: vim_id: VIM account ID
292 """
293 vca_id = None
294 if vim_id:
295 vim_account = await self._vim_accounts_collection.find_one({"_id": vim_id})
296 if vim_account and "vca" in vim_account:
297 vca_id = vim_account["vca"]
298 return vca_id
299
300 async def _get_juju_info(self):
301 """Get Juju information (the default VCA) from the admin collection"""
302 return await self._admin_collection.find_one({"_id": "juju"})