:param salt: optional salt to be used
:return: Plain content of value
"""
- if not await self.secret_key or not schema_version or schema_version == "1.0":
+ await self.get_secret_key()
+ if not self.secret_key or not schema_version or schema_version == "1.0":
return value
else:
secret_key = self._join_secret_key(salt)
)
return unpadded_private_msg
- def _join_secret_key(self, update_key: typing.Any):
+ def _join_secret_key(self, update_key: typing.Any) -> bytes:
"""
- Join secret key
+ Join key with secret key
:param: update_key: str or bytes with the to update
+
+ :return: Joined key
+ """
+ return self._join_keys(update_key, self.secret_key)
+
+ def _join_keys(self, key: typing.Any, secret_key: bytes) -> bytes:
"""
- if isinstance(update_key, str):
- update_key_bytes = update_key.encode()
+ Join key with secret_key
+
+ :param: key: str or bytesof the key to update
+ :param: secret_key: bytes of the secret key
+
+ :return: Joined key
+ """
+ if isinstance(key, str):
+ update_key_bytes = key.encode()
else:
- update_key_bytes = update_key
- new_secret_key = (
- bytearray(self._secret_key) if self._secret_key else bytearray(32)
- )
+ update_key_bytes = key
+ new_secret_key = bytearray(secret_key) if secret_key else bytearray(32)
for i, b in enumerate(update_key_bytes):
new_secret_key[i % 32] ^= b
return bytes(new_secret_key)
@property
- async def secret_key(self):
- if self._secret_key:
- return self._secret_key
- else:
- if self.database_key:
- self._secret_key = self._join_secret_key(self.database_key)
- version_data = await self._admin_collection.find_one({"_id": "version"})
- if version_data and version_data.get("serial"):
- self._secret_key = self._join_secret_key(
- b64decode(version_data["serial"])
- )
- return self._secret_key
+ def secret_key(self):
+ return self._secret_key
+
+ async def get_secret_key(self):
+ """
+ Get secret key using the database key and the serial key in the DB
+ The key is populated in the property self.secret_key
+ """
+ if self.secret_key:
+ return
+ secret_key = None
+ if self.database_key:
+ secret_key = self._join_keys(self.database_key, None)
+ version_data = await self._admin_collection.find_one({"_id": "version"})
+ if version_data and version_data.get("serial"):
+ secret_key = self._join_keys(b64decode(version_data["serial"]), secret_key)
+ self._secret_key = secret_key
@property
def database_key(self):