+ pass
+
+ @staticmethod
+ def pad_data(value: str) -> str:
+ if not isinstance(value, str):
+ raise DbException(
+ f"Incorrect data type: type({value}), string is expected."
+ )
+ return value + ("\0" * ((16 - len(value)) % 16))
+
+ @staticmethod
+ def unpad_data(value: str) -> str:
+ if not isinstance(value, str):
+ raise DbException(
+ f"Incorrect data type: type({value}), string is expected."
+ )
+ return value.rstrip("\0")
+
+ def _encrypt_value(self, value: str, schema_version: str, salt: str):
+ """Encrypt a value.
+
+ Args:
+ value (str): value to be encrypted. It is string/unicode
+ schema_version (str): used for version control. If None or '1.0' no encryption is done.
+ If '1.1' symmetric AES encryption is done
+ salt (str): optional salt to be used. Must be str
+
+ Returns:
+ Encrypted content of value (str)
+
+ """
+ if not self.secret_key or not schema_version or schema_version == "1.0":
+ return value
+
+ else:
+ # Secret key as bytes
+ secret_key = self._join_secret_key(salt)
+ cipher = AES.new(secret_key, self.encrypt_mode)
+ # Padded data as string
+ padded_private_msg = self.pad_data(value)
+ # Padded data as bytes
+ padded_private_msg_bytes = padded_private_msg.encode(self.encoding_type)
+ # Encrypt padded data
+ encrypted_msg = cipher.encrypt(padded_private_msg_bytes)
+ # Base64 encoded encrypted data
+ encoded_encrypted_msg = b64encode(encrypted_msg)
+ # Converting to string
+ return encoded_encrypted_msg.decode(self.encoding_type)
+
+ def encrypt(self, value: str, schema_version: str = None, salt: str = None) -> str:
+ """Encrypt a value.
+
+ Args:
+ value (str): value to be encrypted. It is string/unicode
+ schema_version (str): used for version control. If None or '1.0' no encryption is done.
+ If '1.1' symmetric AES encryption is done
+ salt (str): optional salt to be used. Must be str
+
+ Returns:
+ Encrypted content of value (str)
+
+ """
+ self.get_secret_key()
+ return self._encrypt_value(value, schema_version, salt)
+
+ def _decrypt_value(self, value: str, schema_version: str, salt: str) -> str:
+ """Decrypt an encrypted value.
+ Args:
+
+ value (str): value to be decrypted. It is a base64 string
+ schema_version (str): used for known encryption method used.
+ If None or '1.0' no encryption has been done.
+ If '1.1' symmetric AES encryption has been done
+ salt (str): optional salt to be used
+
+ Returns:
+ Plain content of value (str)
+
+ """
+ if not self.secret_key or not schema_version or schema_version == "1.0":
+ return value
+
+ else:
+ secret_key = self._join_secret_key(salt)
+ # Decoding encrypted data, output bytes
+ encrypted_msg = b64decode(value)
+ cipher = AES.new(secret_key, self.encrypt_mode)
+ # Decrypted data, output bytes
+ decrypted_msg = cipher.decrypt(encrypted_msg)
+ try:
+ # Converting to string
+ private_msg = decrypted_msg.decode(self.encoding_type)
+ except UnicodeDecodeError:
+ raise DbException(
+ "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
+ # Unpadded data as string
+ return self.unpad_data(private_msg)
+
+ def decrypt(self, value: str, schema_version: str = None, salt: str = None) -> str:
+ """Decrypt an encrypted value.
+ Args:
+
+ value (str): value to be decrypted. It is a base64 string
+ schema_version (str): used for known encryption method used.
+ If None or '1.0' no encryption has been done.
+ If '1.1' symmetric AES encryption has been done
+ salt (str): optional salt to be used
+
+ Returns:
+ Plain content of value (str)
+
+ """
+ self.get_secret_key()
+ return self._decrypt_value(value, schema_version, salt)
+
+ def encrypt_decrypt_fields(
+ self, item, action, fields=None, flags=None, schema_version=None, salt=None
+ ):
+ if not fields:
+ return
+ self.get_secret_key()
+ actions = ["encrypt", "decrypt"]
+ if action.lower() not in actions:
+ raise DbException(
+ "Unknown action ({}): Must be one of {}".format(action, actions),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
+ method = self.encrypt if action.lower() == "encrypt" else self.decrypt
+ if flags is None:
+ flags = re.I
+
+ def process(_item):
+ if isinstance(_item, list):
+ for elem in _item:
+ process(elem)
+ elif isinstance(_item, dict):
+ for key, val in _item.items():
+ if isinstance(val, str):
+ if any(re.search(f, key, flags) for f in fields):
+ _item[key] = method(val, schema_version, salt)
+ else:
+ process(val)
+
+ process(item)