+ new_secret_key = bytearray(self.secret_key) if self.secret_key else bytearray(32)
+ for i, b in enumerate(update_key_bytes):
+ new_secret_key[i % 32] ^= b
+ return bytes(new_secret_key)
+
+ def set_secret_key(self, new_secret_key, replace=False):
+ """
+ Updates internal secret_key used for encryption, with a byte xor
+ :param new_secret_key: string or byte array. It is recommended a 32 byte length
+ :param replace: if True, old value of internal secret_key is ignored and replaced. If false, a byte xor is used
+ :return: None
+ """
+ if replace:
+ self.secret_key = None
+ self.secret_key = self._join_secret_key(new_secret_key)
+
+ def get_secret_key(self):
+ """
+ Get the database secret key in case it is not done when "connect" is called. It can happens when database is
+ empty after an initial install. It should skip if secret is already obtained.
+ """
+ pass
+
+ def encrypt(self, value, schema_version=None, salt=None):
+ """
+ Encrypt a value
+ :param value: value to be encrypted. It is string/unicode
+ :param schema_version: used for version control. If None or '1.0' no encryption is done.
+ If '1.1' symmetric AES encryption is done
+ :param salt: optional salt to be used. Must be str
+ :return: Encrypted content of value
+ """
+ self.get_secret_key()
+ if not self.secret_key or not schema_version or schema_version == '1.0':
+ return value
+ else:
+ secret_key = self._join_secret_key(salt)
+ cipher = AES.new(secret_key)
+ padded_private_msg = value + ('\0' * ((16-len(value)) % 16))
+ encrypted_msg = cipher.encrypt(padded_private_msg)
+ encoded_encrypted_msg = b64encode(encrypted_msg)
+ return encoded_encrypted_msg.decode("ascii")
+
+ def decrypt(self, value, schema_version=None, salt=None):
+ """
+ Decrypt an encrypted value
+ :param value: value to be decrypted. It is a base64 string
+ :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
+ If '1.1' symmetric AES encryption has been done
+ :param salt: optional salt to be used
+ :return: Plain content of value
+ """
+ self.get_secret_key()
+ if not self.secret_key or not schema_version or schema_version == '1.0':
+ return value
+ else:
+ secret_key = self._join_secret_key(salt)
+ encrypted_msg = b64decode(value)
+ cipher = AES.new(secret_key)
+ decrypted_msg = cipher.decrypt(encrypted_msg)
+ try:
+ unpadded_private_msg = decrypted_msg.decode().rstrip('\0')
+ except UnicodeDecodeError:
+ raise DbException("Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ return unpadded_private_msg
+
+ def encrypt_decrypt_fields(self, item, action, fields=None, flags=None, schema_version=None, salt=None):
+ if not fields:
+ return
+ self.get_secret_key()
+ actions = ['encrypt', 'decrypt']
+ if action.lower() not in actions:
+ raise DbException("Unknown action ({}): Must be one of {}".format(action, actions),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ method = self.encrypt if action.lower() == 'encrypt' else self.decrypt
+ if flags is None:
+ flags = re.I
+
+ def process(_item):
+ if isinstance(_item, list):
+ for elem in _item:
+ process(elem)
+ elif isinstance(_item, dict):
+ for key, val in _item.items():
+ if isinstance(val, str):
+ if any(re.search(f, key, flags) for f in fields):
+ _item[key] = method(val, schema_version, salt)
+ else:
+ process(val)
+ process(item)
+
+
+def deep_update_rfc7396(dict_to_change, dict_reference, key_list=None):