-
- # DECRYPT METHODS
- async def decrypt_fields(
- self,
- item: dict,
- fields: typing.List[str],
- schema_version: str = None,
- salt: str = None,
- ):
- """
- Decrypt fields
-
- Decrypt fields from a dictionary. Follows the same logic as in osm_common.
-
- :param: item: Dictionary with the keys to be decrypted
- :param: fields: List of keys to decrypt
- :param: schema version: Schema version. (i.e. 1.11)
- :param: salt: Salt for the decryption
- """
- flags = re.I
-
- async def process(_item):
- if isinstance(_item, list):
- for elem in _item:
- await process(elem)
- elif isinstance(_item, dict):
- for key, val in _item.items():
- if isinstance(val, str):
- if any(re.search(f, key, flags) for f in fields):
- _item[key] = await self.decrypt(val, schema_version, salt)
- else:
- await process(val)
-
- await process(item)
-
- async def decrypt(self, value, schema_version=None, salt=None):
- """
- Decrypt an encrypted value
- :param value: value to be decrypted. It is a base64 string
- :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
- If '1.1' symmetric AES encryption has been done
- :param salt: optional salt to be used
- :return: Plain content of value
- """
- await self.get_secret_key()
- if not self.secret_key or not schema_version or schema_version == "1.0":
- return value
- else:
- secret_key = self._join_secret_key(salt)
- encrypted_msg = b64decode(value)
- cipher = AES.new(secret_key)
- decrypted_msg = cipher.decrypt(encrypted_msg)
- try:
- unpadded_private_msg = decrypted_msg.decode().rstrip("\0")
- except UnicodeDecodeError:
- raise DbException(
- "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
- http_code=500,
- )
- return unpadded_private_msg
-
- def _join_secret_key(self, update_key: typing.Any) -> bytes:
- """
- Join key with secret key
-
- :param: update_key: str or bytes with the to update
-
- :return: Joined key
- """
- return self._join_keys(update_key, self.secret_key)
-
- def _join_keys(self, key: typing.Any, secret_key: bytes) -> bytes:
- """
- Join key with secret_key
-
- :param: key: str or bytesof the key to update
- :param: secret_key: bytes of the secret key
-
- :return: Joined key
- """
- if isinstance(key, str):
- update_key_bytes = key.encode()
- else:
- update_key_bytes = key
- new_secret_key = bytearray(secret_key) if secret_key else bytearray(32)
- for i, b in enumerate(update_key_bytes):
- new_secret_key[i % 32] ^= b
- return bytes(new_secret_key)
-
- @property
- def secret_key(self):
- return self._secret_key
-
- async def get_secret_key(self):
- """
- Get secret key using the database key and the serial key in the DB
- The key is populated in the property self.secret_key
- """
- if self.secret_key:
- return
- secret_key = None
- if self.database_key:
- secret_key = self._join_keys(self.database_key, None)
- version_data = await self._admin_collection.find_one({"_id": "version"})
- if version_data and version_data.get("serial"):
- secret_key = self._join_keys(b64decode(version_data["serial"]), secret_key)
- self._secret_key = secret_key
-
- @property
- def database_key(self):
- return self._config["database_commonkey"]