1 # Copyright 2021 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from base64
import b64decode
21 from Crypto
.Cipher
import AES
22 from motor
.motor_asyncio
import AsyncIOMotorClient
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.vca
.connection_data
import ConnectionData
25 from osm_common
.dbmongo
import DbMongo
, DbException
32 async def get_vca_connection_data(self
, vca_id
: str) -> ConnectionData
:
34 Get VCA connection data
36 :param: vca_id: VCA ID
38 :returns: ConnectionData with the information of the database
42 async def update_vca_endpoints(self
, hosts
: typing
.List
[str], vca_id
: str):
46 :param: endpoints: List of endpoints to write in the database
47 :param: vca_id: VCA ID
51 async def get_vca_endpoints(self
, vca_id
: str = None) -> typing
.List
[str]:
53 Get list if VCA endpoints
55 :param: vca_id: VCA ID
57 :returns: List of endpoints
61 async def get_vca_id(self
, vim_id
: str = None) -> str:
63 Get VCA id for a VIM account
65 :param: vim_id: Vim account ID
69 class DbMongoStore(Store
):
70 def __init__(self
, db
: DbMongo
):
74 :param: db: osm_common.dbmongo.DbMongo object
78 async def get_vca_connection_data(self
, vca_id
: str) -> ConnectionData
:
80 Get VCA connection data
82 :param: vca_id: VCA ID
84 :returns: ConnectionData with the information of the database
86 data
= self
.db
.get_one("vca", q_filter
={"_id": vca_id
})
87 self
.db
.encrypt_decrypt_fields(
91 schema_version
=data
["schema_version"],
94 return ConnectionData(**data
)
96 async def update_vca_endpoints(
97 self
, endpoints
: typing
.List
[str], vca_id
: str = None
102 :param: endpoints: List of endpoints to write in the database
103 :param: vca_id: VCA ID
106 data
= self
.db
.get_one("vca", q_filter
={"_id": vca_id
})
107 data
["endpoints"] = endpoints
108 self
._update
("vca", vca_id
, data
)
110 # The default VCA. Data for the endpoints is in a different place
111 juju_info
= self
._get
_juju
_info
()
112 # If it doesn't, then create it
119 except DbException
as e
:
120 # Racing condition: check if another N2VC worker has created it
121 juju_info
= self
._get
_juju
_info
()
127 {"api_endpoints": endpoints
},
130 async def get_vca_endpoints(self
, vca_id
: str = None) -> typing
.List
[str]:
132 Get list if VCA endpoints
134 :param: vca_id: VCA ID
136 :returns: List of endpoints
140 endpoints
= self
.get_vca_connection_data(vca_id
).endpoints
142 juju_info
= self
._get
_juju
_info
()
143 if juju_info
and "api_endpoints" in juju_info
:
144 endpoints
= juju_info
["api_endpoints"]
147 async def get_vca_id(self
, vim_id
: str = None) -> str:
149 Get VCA ID from the database for a given VIM account ID
151 :param: vim_id: VIM account ID
156 q_filter
={"_id": vim_id
},
163 def _update(self
, collection
: str, id: str, data
: dict):
165 Update object in database
167 :param: collection: Collection name
168 :param: id: ID of the object
169 :param: data: Object data
177 def _get_juju_info(self
):
178 """Get Juju information (the default VCA) from the admin collection"""
179 return self
.db
.get_one(
181 q_filter
={"_id": "juju"},
186 class MotorStore(Store
):
187 def __init__(self
, uri
: str, loop
=None):
191 :param: uri: Connection string to connect to the database.
192 :param: loop: Asyncio Loop
194 self
._client
= AsyncIOMotorClient(uri
)
195 self
.loop
= loop
or asyncio
.get_event_loop()
196 self
._secret
_key
= None
197 self
._config
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"])
201 return self
._client
[DB_NAME
]
204 def _vca_collection(self
):
205 return self
._database
["vca"]
208 def _admin_collection(self
):
209 return self
._database
["admin"]
212 def _vim_accounts_collection(self
):
213 return self
._database
["vim_accounts"]
215 async def get_vca_connection_data(self
, vca_id
: str) -> ConnectionData
:
217 Get VCA connection data
219 :param: vca_id: VCA ID
221 :returns: ConnectionData with the information of the database
223 data
= await self
._vca
_collection
.find_one({"_id": vca_id
})
225 raise Exception("vca with id {} not found".format(vca_id
))
226 await self
.decrypt_fields(
228 ["secret", "cacert"],
229 schema_version
=data
["schema_version"],
232 return ConnectionData(**data
)
234 async def update_vca_endpoints(
235 self
, endpoints
: typing
.List
[str], vca_id
: str = None
240 :param: endpoints: List of endpoints to write in the database
241 :param: vca_id: VCA ID
244 data
= await self
._vca
_collection
.find_one({"_id": vca_id
})
245 data
["endpoints"] = endpoints
246 await self
._vca
_collection
.replace_one({"_id": vca_id
}, data
)
248 # The default VCA. Data for the endpoints is in a different place
249 juju_info
= await self
._get
_juju
_info
()
250 # If it doesn't, then create it
253 await self
._admin
_collection
.insert_one({"_id": "juju"})
254 except Exception as e
:
255 # Racing condition: check if another N2VC worker has created it
256 juju_info
= await self
._get
_juju
_info
()
260 await self
._admin
_collection
.replace_one(
261 {"_id": "juju"}, {"api_endpoints": endpoints
}
264 async def get_vca_endpoints(self
, vca_id
: str = None) -> typing
.List
[str]:
266 Get list if VCA endpoints
268 :param: vca_id: VCA ID
270 :returns: List of endpoints
274 endpoints
= (await self
.get_vca_connection_data(vca_id
)).endpoints
276 juju_info
= await self
._get
_juju
_info
()
277 if juju_info
and "api_endpoints" in juju_info
:
278 endpoints
= juju_info
["api_endpoints"]
281 async def get_vca_id(self
, vim_id
: str = None) -> str:
283 Get VCA ID from the database for a given VIM account ID
285 :param: vim_id: VIM account ID
289 vim_account
= await self
._vim
_accounts
_collection
.find_one({"_id": vim_id
})
290 if vim_account
and "vca" in vim_account
:
291 vca_id
= vim_account
["vca"]
294 async def _get_juju_info(self
):
295 """Get Juju information (the default VCA) from the admin collection"""
296 return await self
._admin
_collection
.find_one({"_id": "juju"})
299 async def decrypt_fields(
302 fields
: typing
.List
[str],
303 schema_version
: str = None,
309 Decrypt fields from a dictionary. Follows the same logic as in osm_common.
311 :param: item: Dictionary with the keys to be decrypted
312 :param: fields: List of keys to decrypt
313 :param: schema version: Schema version. (i.e. 1.11)
314 :param: salt: Salt for the decryption
318 async def process(_item
):
319 if isinstance(_item
, list):
322 elif isinstance(_item
, dict):
323 for key
, val
in _item
.items():
324 if isinstance(val
, str):
325 if any(re
.search(f
, key
, flags
) for f
in fields
):
326 _item
[key
] = await self
.decrypt(val
, schema_version
, salt
)
332 async def decrypt(self
, value
, schema_version
=None, salt
=None):
334 Decrypt an encrypted value
335 :param value: value to be decrypted. It is a base64 string
336 :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
337 If '1.1' symmetric AES encryption has been done
338 :param salt: optional salt to be used
339 :return: Plain content of value
341 await self
.get_secret_key()
342 if not self
.secret_key
or not schema_version
or schema_version
== "1.0":
345 secret_key
= self
._join
_secret
_key
(salt
)
346 encrypted_msg
= b64decode(value
)
347 cipher
= AES
.new(secret_key
)
348 decrypted_msg
= cipher
.decrypt(encrypted_msg
)
350 unpadded_private_msg
= decrypted_msg
.decode().rstrip("\0")
351 except UnicodeDecodeError:
353 "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
356 return unpadded_private_msg
358 def _join_secret_key(self
, update_key
: typing
.Any
) -> bytes
:
360 Join key with secret key
362 :param: update_key: str or bytes with the to update
366 return self
._join
_keys
(update_key
, self
.secret_key
)
368 def _join_keys(self
, key
: typing
.Any
, secret_key
: bytes
) -> bytes
:
370 Join key with secret_key
372 :param: key: str or bytesof the key to update
373 :param: secret_key: bytes of the secret key
377 if isinstance(key
, str):
378 update_key_bytes
= key
.encode()
380 update_key_bytes
= key
381 new_secret_key
= bytearray(secret_key
) if secret_key
else bytearray(32)
382 for i
, b
in enumerate(update_key_bytes
):
383 new_secret_key
[i
% 32] ^
= b
384 return bytes(new_secret_key
)
387 def secret_key(self
):
388 return self
._secret
_key
390 async def get_secret_key(self
):
392 Get secret key using the database key and the serial key in the DB
393 The key is populated in the property self.secret_key
398 if self
.database_key
:
399 secret_key
= self
._join
_keys
(self
.database_key
, None)
400 version_data
= await self
._admin
_collection
.find_one({"_id": "version"})
401 if version_data
and version_data
.get("serial"):
402 secret_key
= self
._join
_keys
(b64decode(version_data
["serial"]), secret_key
)
403 self
._secret
_key
= secret_key
406 def database_key(self
):
407 return self
._config
["database_commonkey"]