blob: e582c7e8f08699c93ba5a1ffe36e2657171329ff [file] [log] [blame]
# Copyright 2018 Whitestack, LLC
# Copyright 2018 Telefonica S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
##
import asyncio
import copy
from copy import deepcopy
import http
from http import HTTPStatus
import logging
from os import urandom
import unittest
from unittest.mock import MagicMock, Mock, patch
from Crypto.Cipher import AES
from osm_common.dbbase import DbBase, DbException, deep_update, Encryption
import pytest
# Variables used in TestBaseEncryption and TestAsyncEncryption
salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106"
value = "private key txt"
padded_value = b"private key txt\0"
padded_encoded_value = b"private key txt\x00"
encoding_type = "ascii"
encyrpt_mode = AES.MODE_ECB
secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
encyrpted_value = "ZW5jcnlwdGVkIGRhdGE="
encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE="
data_to_b4_encode = b"encrypted data"
b64_decoded = b"decrypted data"
schema_version = "1.1"
joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
base64_decoded_serial = b"g\xbe\xdb"
decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg=="
decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA=="
item = {
"secret": "mysecret",
"cacert": "mycacert",
"path": "/var",
"ip": "192.168.12.23",
}
def exception_message(message):
return "database exception " + message
@pytest.fixture
def db_base():
return DbBase()
def test_constructor():
db_base = DbBase()
assert db_base is not None
assert isinstance(db_base, DbBase)
def test_db_connect(db_base):
with pytest.raises(DbException) as excinfo:
db_base.db_connect(None)
assert str(excinfo.value).startswith(
exception_message("Method 'db_connect' not implemented")
)
def test_db_disconnect(db_base):
db_base.db_disconnect()
def test_get_list(db_base):
with pytest.raises(DbException) as excinfo:
db_base.get_list(None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'get_list' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_get_one(db_base):
with pytest.raises(DbException) as excinfo:
db_base.get_one(None, None, None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'get_one' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_create(db_base):
with pytest.raises(DbException) as excinfo:
db_base.create(None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'create' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_create_list(db_base):
with pytest.raises(DbException) as excinfo:
db_base.create_list(None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'create_list' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_del_list(db_base):
with pytest.raises(DbException) as excinfo:
db_base.del_list(None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'del_list' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_del_one(db_base):
with pytest.raises(DbException) as excinfo:
db_base.del_one(None, None, None)
assert str(excinfo.value).startswith(
exception_message("Method 'del_one' not implemented")
)
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
class TestEncryption(unittest.TestCase):
def setUp(self):
master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
db_base1 = DbBase()
db_base2 = DbBase()
db_base3 = DbBase()
# set self.secret_key obtained when connect
db_base1.set_secret_key(master_key, replace=True)
db_base1.set_secret_key(urandom(32))
db_base2.set_secret_key(None, replace=True)
db_base2.set_secret_key(urandom(30))
db_base3.set_secret_key(master_key)
self.db_bases = [db_base1, db_base2, db_base3]
def test_encrypt_decrypt(self):
TEST = (
("plain text 1 ! ", None),
("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
)
for db_base in self.db_bases:
for value, salt in TEST:
# no encryption
encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
self.assertEqual(
encrypted, value, "value '{}' has been encrypted".format(value)
)
decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
self.assertEqual(
decrypted, value, "value '{}' has been decrypted".format(value)
)
# encrypt/decrypt
encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
self.assertNotEqual(
encrypted, value, "value '{}' has not been encrypted".format(value)
)
self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
self.assertEqual(
decrypted, value, "value is not equal after encryption/decryption"
)
def test_encrypt_decrypt_salt(self):
value = "value to be encrypted!"
encrypted = []
for db_base in self.db_bases:
for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
# encrypt/decrypt
encrypted.append(
db_base.encrypt(value, schema_version="1.1", salt=salt)
)
self.assertNotEqual(
encrypted[-1],
value,
"value '{}' has not been encrypted".format(value),
)
self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
decrypted = db_base.decrypt(
encrypted[-1], schema_version="1.1", salt=salt
)
self.assertEqual(
decrypted, value, "value is not equal after encryption/decryption"
)
for i in range(0, len(encrypted)):
for j in range(i + 1, len(encrypted)):
self.assertNotEqual(
encrypted[i],
encrypted[j],
"encryption with different salt must contain different result",
)
# decrypt with a different master key
try:
decrypted = self.db_bases[-1].decrypt(
encrypted[0], schema_version="1.1", salt=None
)
self.assertNotEqual(
encrypted[0],
decrypted,
"Decryption with different KEY must generate different result",
)
except DbException as e:
self.assertEqual(
e.http_code,
HTTPStatus.INTERNAL_SERVER_ERROR,
"Decryption with different KEY does not provide expected http_code",
)
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
args = deepcopy(args)
kwargs = deepcopy(kwargs)
return super(AsyncMock, self).__call__(*args, **kwargs)
class CopyingMock(MagicMock):
def __call__(self, *args, **kwargs):
args = deepcopy(args)
kwargs = deepcopy(kwargs)
return super(CopyingMock, self).__call__(*args, **kwargs)
def check_if_assert_not_called(mocks: list):
for mocking in mocks:
mocking.assert_not_called()
class TestBaseEncryption(unittest.TestCase):
@patch("logging.getLogger", autospec=True)
def setUp(self, mock_logger):
mock_logger = logging.getLogger()
mock_logger.disabled = True
self.db_base = DbBase()
self.mock_cipher = CopyingMock()
self.db_base.encoding_type = encoding_type
self.db_base.encrypt_mode = encyrpt_mode
self.db_base.secret_key = secret_key
self.mock_padded_msg = CopyingMock()
def test_pad_data_len_not_multiplication_of_16(self):
data = "hello word hello hello word hello word"
data_len = len(data)
expected_len = 48
padded = self.db_base.pad_data(data)
self.assertEqual(len(padded), expected_len)
self.assertTrue("\0" * (expected_len - data_len) in padded)
def test_pad_data_len_multiplication_of_16(self):
data = "hello word!!!!!!"
padded = self.db_base.pad_data(data)
self.assertEqual(padded, data)
self.assertFalse("\0" in padded)
def test_pad_data_empty_string(self):
data = ""
expected_len = 0
padded = self.db_base.pad_data(data)
self.assertEqual(len(padded), expected_len)
self.assertFalse("\0" in padded)
def test_pad_data_not_string(self):
data = None
with self.assertRaises(Exception) as err:
self.db_base.pad_data(data)
self.assertEqual(
str(err.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
def test_unpad_data_null_char_at_right(self):
null_padded_data = "hell0word\0\0"
expected_length = len(null_padded_data) - 2
unpadded = self.db_base.unpad_data(null_padded_data)
self.assertEqual(len(unpadded), expected_length)
self.assertFalse("\0" in unpadded)
self.assertTrue("0" in unpadded)
def test_unpad_data_null_char_is_not_rightest(self):
null_padded_data = "hell0word\r\t\0\n"
expected_length = len(null_padded_data)
unpadded = self.db_base.unpad_data(null_padded_data)
self.assertEqual(len(unpadded), expected_length)
self.assertTrue("\0" in unpadded)
def test_unpad_data_with_spaces_at_right(self):
null_padded_data = " hell0word\0 "
expected_length = len(null_padded_data)
unpadded = self.db_base.unpad_data(null_padded_data)
self.assertEqual(len(unpadded), expected_length)
self.assertTrue("\0" in unpadded)
def test_unpad_data_empty_string(self):
data = ""
unpadded = self.db_base.unpad_data(data)
self.assertEqual(unpadded, "")
self.assertFalse("\0" in unpadded)
def test_unpad_data_not_string(self):
data = None
with self.assertRaises(Exception) as err:
self.db_base.unpad_data(data)
self.assertEqual(
str(err.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
self, mock_pad_data, mock_join_secret_key
):
"""schema_version 1.0, secret_key is None and salt is None."""
schema_version = "1.0"
salt = None
self.db_base.secret_key = None
result = self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(result, value)
check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""schema_version 1.1, secret_key exists, salt exists."""
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.encrypt.return_value = data_to_b4_encode
self.mock_padded_msg.return_value = padded_value
mock_pad_data.return_value = self.mock_padded_msg
self.mock_padded_msg.encode.return_value = padded_encoded_value
mock_b64_encode.return_value = encyrpted_bytes
result = self.db_base._encrypt_value(value, schema_version, salt)
self.assertTrue(isinstance(result, str))
self.assertEqual(result, encyrpted_value)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_pad_data.assert_called_once_with(value)
mock_b64_encode.assert_called_once_with(data_to_b4_encode)
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
self.mock_padded_msg.encode.assert_called_with(encoding_type)
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
self, mock_pad_data, mock_join_secret_key
):
"""schema_version 1.0, secret_key is None, salt exists."""
schema_version = "1.0"
self.db_base.secret_key = None
result = self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(result, value)
check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
self, mock_pad_data, mock_join_secret_key
):
"""schema_version 1.1, secret_key is None, salt exists."""
self.db_base.secret_key = None
result = self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(result, value)
check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""schema_version 1.1, secret_key exists, salt is None."""
salt = None
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.encrypt.return_value = data_to_b4_encode
self.mock_padded_msg.return_value = padded_value
mock_pad_data.return_value = self.mock_padded_msg
self.mock_padded_msg.encode.return_value = padded_encoded_value
mock_b64_encode.return_value = encyrpted_bytes
result = self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(result, encyrpted_value)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_pad_data.assert_called_once_with(value)
mock_b64_encode.assert_called_once_with(data_to_b4_encode)
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
self.mock_padded_msg.encode.assert_called_with(encoding_type)
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_invalid_encrpt_mode(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""encrypt_mode is invalid."""
mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
self.db_base.encrypt_mode = "AES.MODE_XXX"
with self.assertRaises(Exception) as err:
self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(str(err.exception), "Invalid ciphering mode.")
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
check_if_assert_not_called([mock_pad_data, mock_b64_encode])
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""schema_version 1.1, secret_key exists, value is None."""
value = None
mock_aes.new.return_value = self.mock_cipher
mock_pad_data.side_effect = DbException(
"Incorrect data type: type(None), string is expected."
)
with self.assertRaises(Exception) as err:
self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(
str(err.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_pad_data.assert_called_once_with(value)
check_if_assert_not_called(
[mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode]
)
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_join_secret_key_raises(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""Method join_secret_key raises DbException."""
salt = b"3434o34-3wewrwr-222424-2242dwew"
mock_join_secret_key.side_effect = DbException("Unexpected type")
mock_aes.new.return_value = self.mock_cipher
with self.assertRaises(Exception) as err:
self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(str(err.exception), "database exception Unexpected type")
check_if_assert_not_called(
[mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt]
)
mock_join_secret_key.assert_called_once_with(salt)
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""schema_version 1.1, secret_key exists, b64encode raises TypeError."""
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.encrypt.return_value = "encrypted data"
self.mock_padded_msg.return_value = padded_value
mock_pad_data.return_value = self.mock_padded_msg
self.mock_padded_msg.encode.return_value = padded_encoded_value
mock_b64_encode.side_effect = TypeError(
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_pad_data.assert_called_once_with(value)
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
self.mock_padded_msg.encode.assert_called_with(encoding_type)
mock_b64_encode.assert_called_once_with("encrypted data")
@patch("osm_common.dbbase.b64encode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "pad_data")
def test__encrypt_value_cipher_encrypt_raises(
self,
mock_pad_data,
mock_join_secret_key,
mock_aes,
mock_b64_encode,
):
"""AES encrypt method raises Exception."""
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.")
self.mock_padded_msg.return_value = padded_value
mock_pad_data.return_value = self.mock_padded_msg
self.mock_padded_msg.encode.return_value = padded_encoded_value
with self.assertRaises(Exception) as error:
self.db_base._encrypt_value(value, schema_version, salt)
self.assertEqual(str(error.exception), "Invalid data type.")
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_pad_data.assert_called_once_with(value)
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
self.mock_padded_msg.encode.assert_called_with(encoding_type)
mock_b64_encode.assert_not_called()
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_encrypt_value")
def test_encrypt_without_schema_version_without_salt(
self, mock_encrypt_value, mock_get_secret_key
):
"""schema and salt is None."""
mock_encrypt_value.return_value = encyrpted_value
result = self.db_base.encrypt(value)
mock_encrypt_value.assert_called_once_with(value, None, None)
mock_get_secret_key.assert_called_once()
self.assertEqual(result, encyrpted_value)
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_encrypt_value")
def test_encrypt_with_schema_version_with_salt(
self, mock_encrypt_value, mock_get_secret_key
):
"""schema version exists, salt is None."""
mock_encrypt_value.return_value = encyrpted_value
result = self.db_base.encrypt(value, schema_version, salt)
mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
mock_get_secret_key.assert_called_once()
self.assertEqual(result, encyrpted_value)
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_encrypt_value")
def test_encrypt_get_secret_key_raises(
self, mock_encrypt_value, mock_get_secret_key
):
"""get_secret_key method raises DbException."""
mock_get_secret_key.side_effect = DbException("KeyError")
with self.assertRaises(Exception) as error:
self.db_base.encrypt(value)
self.assertEqual(str(error.exception), "database exception KeyError")
mock_encrypt_value.assert_not_called()
mock_get_secret_key.assert_called_once()
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_encrypt_value")
def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
"""_encrypt method raises DbException."""
mock_encrypt_value.side_effect = DbException(
"Incorrect data type: type(None), string is expected."
)
with self.assertRaises(Exception) as error:
self.db_base.encrypt(value, schema_version, salt)
self.assertEqual(
str(error.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
mock_get_secret_key.assert_called_once()
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""schema_version 1.1, secret_key exists, salt is None."""
salt = None
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.decrypt.return_value = padded_encoded_value
mock_b64_decode.return_value = b64_decoded
mock_unpad_data.return_value = value
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(result, value)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_unpad_data.assert_called_once_with("private key txt\0")
mock_b64_decode.assert_called_once_with(encyrpted_value)
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""schema_version 1.1, secret_key exists, salt is None."""
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.decrypt.return_value = padded_encoded_value
mock_b64_decode.return_value = b64_decoded
mock_unpad_data.return_value = value
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(result, value)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_unpad_data.assert_called_once_with("private key txt\0")
mock_b64_decode.assert_called_once_with(encyrpted_value)
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_schema_version_1_1_without_secret_key(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""schema_version 1.1, secret_key is None, salt exists."""
self.db_base.secret_key = None
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(result, encyrpted_value)
check_if_assert_not_called(
[
mock_join_secret_key,
mock_aes.new,
mock_unpad_data,
mock_b64_decode,
self.mock_cipher.decrypt,
]
)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_schema_version_1_0_with_secret_key(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""schema_version 1.0, secret_key exists, salt exists."""
schema_version = "1.0"
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(result, encyrpted_value)
check_if_assert_not_called(
[
mock_join_secret_key,
mock_aes.new,
mock_unpad_data,
mock_b64_decode,
self.mock_cipher.decrypt,
]
)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_join_secret_key_raises(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""_join_secret_key raises TypeError."""
salt = object()
mock_join_secret_key.side_effect = TypeError("'type' object is not iterable")
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(str(error.exception), "'type' object is not iterable")
mock_join_secret_key.assert_called_once_with(salt)
check_if_assert_not_called(
[mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt]
)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_b64decode_raises(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""b64decode raises TypeError."""
mock_b64_decode.side_effect = TypeError(
"A str-like object is required, not 'bytes'"
)
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(
str(error.exception), "A str-like object is required, not 'bytes'"
)
mock_b64_decode.assert_called_once_with(encyrpted_value)
mock_join_secret_key.assert_called_once_with(salt)
check_if_assert_not_called(
[mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data]
)
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_invalid_encrypt_mode(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""Invalid AES encrypt mode."""
mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
self.db_base.encrypt_mode = "AES.MODE_XXX"
mock_b64_decode.return_value = b64_decoded
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(str(error.exception), "Invalid ciphering mode.")
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
mock_b64_decode.assert_called_once_with(encyrpted_value)
check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt])
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_cipher_decrypt_raises(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""AES decrypt raises Exception."""
mock_b64_decode.return_value = b64_decoded
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.")
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(str(error.exception), "Invalid data type.")
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_b64_decode.assert_called_once_with(encyrpted_value)
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
mock_unpad_data.assert_not_called()
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_decode_raises(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""Decode raises UnicodeDecodeError."""
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.decrypt.return_value = b"\xd0\x000091"
mock_b64_decode.return_value = b64_decoded
mock_unpad_data.return_value = value
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(
str(error.exception),
"database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
)
self.assertEqual(type(error.exception), DbException)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_b64_decode.assert_called_once_with(encyrpted_value)
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
mock_unpad_data.assert_not_called()
@patch("osm_common.dbbase.b64decode")
@patch("osm_common.dbbase.AES")
@patch.object(DbBase, "_join_secret_key")
@patch.object(DbBase, "unpad_data")
def test__decrypt_value_unpad_data_raises(
self,
mock_unpad_data,
mock_join_secret_key,
mock_aes,
mock_b64_decode,
):
"""Method unpad_data raises error."""
mock_decrypted_message = MagicMock()
mock_decrypted_message.decode.return_value = None
mock_aes.new.return_value = self.mock_cipher
self.mock_cipher.decrypt.return_value = mock_decrypted_message
mock_unpad_data.side_effect = DbException(
"Incorrect data type: type(None), string is expected."
)
mock_b64_decode.return_value = b64_decoded
with self.assertRaises(Exception) as error:
self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
self.assertEqual(
str(error.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
self.assertEqual(type(error.exception), DbException)
mock_join_secret_key.assert_called_once_with(salt)
_call_mock_aes_new = mock_aes.new.call_args_list[0].args
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
mock_b64_decode.assert_called_once_with(encyrpted_value)
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
mock_decrypted_message.decode.assert_called_once_with(
self.db_base.encoding_type
)
mock_unpad_data.assert_called_once_with(None)
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_decrypt_value")
def test_decrypt_without_schema_version_without_salt(
self, mock_decrypt_value, mock_get_secret_key
):
"""schema_version is None, salt is None."""
mock_decrypt_value.return_value = encyrpted_value
result = self.db_base.decrypt(value)
mock_decrypt_value.assert_called_once_with(value, None, None)
mock_get_secret_key.assert_called_once()
self.assertEqual(result, encyrpted_value)
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_decrypt_value")
def test_decrypt_with_schema_version_with_salt(
self, mock_decrypt_value, mock_get_secret_key
):
"""schema_version and salt exist."""
mock_decrypt_value.return_value = encyrpted_value
result = self.db_base.decrypt(value, schema_version, salt)
mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
mock_get_secret_key.assert_called_once()
self.assertEqual(result, encyrpted_value)
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_decrypt_value")
def test_decrypt_get_secret_key_raises(
self, mock_decrypt_value, mock_get_secret_key
):
"""Method get_secret_key raises KeyError."""
mock_get_secret_key.side_effect = DbException("KeyError")
with self.assertRaises(Exception) as error:
self.db_base.decrypt(value)
self.assertEqual(str(error.exception), "database exception KeyError")
mock_decrypt_value.assert_not_called()
mock_get_secret_key.assert_called_once()
@patch.object(DbBase, "get_secret_key")
@patch.object(DbBase, "_decrypt_value")
def test_decrypt_decrypt_value_raises(
self, mock_decrypt_value, mock_get_secret_key
):
"""Method _decrypt raises error."""
mock_decrypt_value.side_effect = DbException(
"Incorrect data type: type(None), string is expected."
)
with self.assertRaises(Exception) as error:
self.db_base.decrypt(value, schema_version, salt)
self.assertEqual(
str(error.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
mock_get_secret_key.assert_called_once()
def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
"""Encrypt and decrypt with schema version 1.1, salt exists."""
encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
self.assertEqual(value, decrypted_msg)
def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
"""Encrypt and decrypt with schema version 1.0, salt exists."""
schema_version = "1.0"
encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
self.assertEqual(value, decrypted_msg)
def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
"""Encrypt and decrypt with schema version 1.1 and without salt."""
salt = None
encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
self.assertEqual(value, decrypted_msg)
class TestAsyncEncryption(unittest.TestCase):
@patch("logging.getLogger", autospec=True)
def setUp(self, mock_logger):
mock_logger = logging.getLogger()
mock_logger.disabled = True
self.encryption = Encryption(uri="uri", config={})
self.encryption.encoding_type = encoding_type
self.encryption.encrypt_mode = encyrpt_mode
self.encryption._secret_key = secret_key
self.admin_collection = Mock()
self.admin_collection.find_one = AsyncMock()
self.encryption._client = {
"osm": {
"admin": self.admin_collection,
}
}
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_with_item_with_fields(self, mock_decrypt):
"""item and fields exist."""
mock_decrypt.side_effect = [decrypted_val1, decrypted_val2]
input_item = copy.deepcopy(item)
expected_item = {
"secret": decrypted_val1,
"cacert": decrypted_val2,
"path": "/var",
"ip": "192.168.12.23",
}
fields = ["secret", "cacert"]
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, expected_item)
_call_mock_decrypt = mock_decrypt.call_args_list
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt))
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt):
"""item is empty and fields exists."""
input_item = {}
fields = ["secret", "cacert"]
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, {})
mock_decrypt.assert_not_called()
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_with_item_without_fields(self, mock_decrypt):
"""item exists and fields is empty."""
input_item = copy.deepcopy(item)
fields = []
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, item)
mock_decrypt.assert_not_called()
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt):
"""item exists and field has single value."""
mock_decrypt.return_value = decrypted_val1
fields = ["secret"]
input_item = copy.deepcopy(item)
expected_item = {
"secret": decrypted_val1,
"cacert": "mycacert",
"path": "/var",
"ip": "192.168.12.23",
}
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, expected_item)
_call_mock_decrypt = mock_decrypt.call_args_list
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
self, mock_decrypt
):
"""item exists and field has single value, salt is None, schema version is 1.0."""
schema_version = "1.0"
salt = None
mock_decrypt.return_value = "mysecret"
input_item = copy.deepcopy(item)
fields = ["secret"]
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(input_item, item)
_call_mock_decrypt = mock_decrypt.call_args_list
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None))
@patch.object(Encryption, "decrypt", new_callable=AsyncMock)
def test_decrypt_fields_decrypt_raises(self, mock_decrypt):
"""Method decrypt raises error."""
mock_decrypt.side_effect = DbException(
"Incorrect data type: type(None), string is expected."
)
fields = ["secret"]
input_item = copy.deepcopy(item)
with self.assertRaises(Exception) as error:
asyncio.run(
self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
)
self.assertEqual(
str(error.exception),
"database exception Incorrect data type: type(None), string is expected.",
)
self.assertEqual(input_item, item)
_call_mock_decrypt = mock_decrypt.call_args_list
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_encrypt_value")
def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
"""Method decrypt raises error."""
mock_encrypt_value.return_value = encyrpted_value
result = asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(result, encyrpted_value)
mock_get_secret_key.assert_called_once()
mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_encrypt_value")
def test_encrypt_get_secret_key_raises(
self, mock_encrypt_value, mock_get_secret_key
):
"""Method get_secret_key raises error."""
mock_get_secret_key.side_effect = DbException("Unexpected type.")
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(str(error.exception), "database exception Unexpected type.")
mock_get_secret_key.assert_called_once()
mock_encrypt_value.assert_not_called()
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_encrypt_value")
def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
"""Method _encrypt raises error."""
mock_encrypt_value.side_effect = TypeError(
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
mock_get_secret_key.assert_called_once()
mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_decrypt_value")
def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
"""Decrypted successfully."""
mock_decrypt_value.return_value = value
result = asyncio.run(
self.encryption.decrypt(encyrpted_value, schema_version, salt)
)
self.assertEqual(result, value)
mock_get_secret_key.assert_called_once()
mock_decrypt_value.assert_called_once_with(
encyrpted_value, schema_version, salt
)
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_decrypt_value")
def test_decrypt_get_secret_key_raises(
self, mock_decrypt_value, mock_get_secret_key
):
"""Method get_secret_key raises error."""
mock_get_secret_key.side_effect = DbException("Unexpected type.")
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
self.assertEqual(str(error.exception), "database exception Unexpected type.")
mock_get_secret_key.assert_called_once()
mock_decrypt_value.assert_not_called()
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
@patch.object(Encryption, "_decrypt_value")
def test_decrypt_decrypt_value_raises(
self, mock_decrypt_value, mock_get_secret_key
):
"""Method get_secret_key raises error."""
mock_decrypt_value.side_effect = TypeError(
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
mock_get_secret_key.assert_called_once()
mock_decrypt_value.assert_called_once_with(
encyrpted_value, schema_version, salt
)
def test_join_keys_string_key(self):
"""key is string."""
string_key = "sample key"
result = self.encryption._join_keys(string_key, secret_key)
self.assertEqual(result, joined_key)
self.assertTrue(isinstance(result, bytes))
def test_join_keys_bytes_key(self):
"""key is bytes."""
bytes_key = b"sample key"
result = self.encryption._join_keys(bytes_key, secret_key)
self.assertEqual(result, joined_key)
self.assertTrue(isinstance(result, bytes))
self.assertEqual(len(result.decode("unicode_escape")), 32)
def test_join_keys_int_key(self):
"""key is int."""
int_key = 923
with self.assertRaises(Exception) as error:
self.encryption._join_keys(int_key, None)
self.assertEqual(str(error.exception), "'int' object is not iterable")
def test_join_keys_none_secret_key(self):
"""key is as bytes and secret key is None."""
bytes_key = b"sample key"
result = self.encryption._join_keys(bytes_key, None)
self.assertEqual(
result,
b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
self.assertTrue(isinstance(result, bytes))
self.assertEqual(len(result.decode("unicode_escape")), 32)
def test_join_keys_none_key_none_secret_key(self):
"""key is None and secret key is None."""
with self.assertRaises(Exception) as error:
self.encryption._join_keys(None, None)
self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
def test_join_keys_none_key(self):
"""key is None and secret key exists."""
with self.assertRaises(Exception) as error:
self.encryption._join_keys(None, secret_key)
self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
@patch.object(Encryption, "_join_keys")
def test_join_secret_key_string_sample_key(self, mock_join_keys):
"""key is None and secret key exists as string."""
update_key = "sample key"
mock_join_keys.return_value = joined_key
result = self.encryption._join_secret_key(update_key)
self.assertEqual(result, joined_key)
self.assertTrue(isinstance(result, bytes))
mock_join_keys.assert_called_once_with(update_key, secret_key)
@patch.object(Encryption, "_join_keys")
def test_join_secret_key_byte_sample_key(self, mock_join_keys):
"""key is None and secret key exists as bytes."""
update_key = b"sample key"
mock_join_keys.return_value = joined_key
result = self.encryption._join_secret_key(update_key)
self.assertEqual(result, joined_key)
self.assertTrue(isinstance(result, bytes))
mock_join_keys.assert_called_once_with(update_key, secret_key)
@patch.object(Encryption, "_join_keys")
def test_join_secret_key_join_keys_raises(self, mock_join_keys):
"""Method _join_secret_key raises."""
update_key = 3434
mock_join_keys.side_effect = TypeError("'int' object is not iterable")
with self.assertRaises(Exception) as error:
self.encryption._join_secret_key(update_key)
self.assertEqual(str(error.exception), "'int' object is not iterable")
mock_join_keys.assert_called_once_with(update_key, secret_key)
@patch.object(Encryption, "_join_keys")
def test_get_secret_key_exists(self, mock_join_keys):
"""secret_key exists."""
self.encryption._secret_key = secret_key
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, secret_key)
mock_join_keys.assert_not_called()
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_not_exist_database_key_exist(
self, mock_b64decode, mock_join_keys
):
"""secret_key does not exist, database key exists."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.return_value = None
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = joined_key
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_not_called()
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
self, mock_b64decode, mock_join_keys
):
"""secret_key does not exist, database key exists."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = joined_key
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_not_called()
self.encryption._admin_collection.find_one.assert_called_once_with(
{"_id": "version"}
)
_call_mock_join_keys = mock_join_keys.call_args_list
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
self, mock_b64decode, mock_join_keys
):
"""secret_key does not exist, database key exists, version and serial exist
in admin collection."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.return_value = {
"version": "1.0",
"serial": serial_bytes,
}
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.side_effect = [secret_key, joined_key]
mock_b64decode.return_value = base64_decoded_serial
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(self.encryption.secret_key, joined_key)
self.assertEqual(mock_join_keys.call_count, 2)
mock_b64decode.assert_called_once_with(serial_bytes)
self.encryption._admin_collection.find_one.assert_called_once_with(
{"_id": "version"}
)
_call_mock_join_keys = mock_join_keys.call_args_list
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
self.assertEqual(
_call_mock_join_keys[1].args, (base64_decoded_serial, secret_key)
)
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys):
"""Method _join_keys raises."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.return_value = {
"version": "1.0",
"serial": serial_bytes,
}
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.side_effect = DbException("Invalid data type.")
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(str(error.exception), "database exception Invalid data type.")
self.assertEqual(mock_join_keys.call_count, 1)
check_if_assert_not_called(
[mock_b64decode, self.encryption._admin_collection.find_one]
)
_call_mock_join_keys = mock_join_keys.call_args_list
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys):
"""Method b64decode raises."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.return_value = {
"version": "1.0",
"serial": base64_decoded_serial,
}
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = secret_key
mock_b64decode.side_effect = TypeError(
"A bytes-like object is required, not 'str'"
)
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(
str(error.exception), "A bytes-like object is required, not 'str'"
)
self.assertEqual(self.encryption.secret_key, None)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_called_once_with(base64_decoded_serial)
self.encryption._admin_collection.find_one.assert_called_once_with(
{"_id": "version"}
)
_call_mock_join_keys = mock_join_keys.call_args_list
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
@patch.object(Encryption, "_join_keys")
@patch("osm_common.dbbase.b64decode")
def test_get_secret_key_admin_collection_find_one_raises(
self, mock_b64decode, mock_join_keys
):
"""admin_collection find_one raises."""
self.encryption._secret_key = None
self.encryption._admin_collection.find_one.side_effect = DbException(
"Connection failed."
)
self.encryption._config = {"database_commonkey": "osm_new_key"}
mock_join_keys.return_value = secret_key
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.get_secret_key())
self.assertEqual(str(error.exception), "database exception Connection failed.")
self.assertEqual(self.encryption.secret_key, None)
self.assertEqual(mock_join_keys.call_count, 1)
mock_b64decode.assert_not_called()
self.encryption._admin_collection.find_one.assert_called_once_with(
{"_id": "version"}
)
_call_mock_join_keys = mock_join_keys.call_args_list
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
"""Encrypt and decrypt with schema version 1.1, salt exists."""
encrypted_msg = asyncio.run(
self.encryption.encrypt(value, schema_version, salt)
)
decrypted_msg = asyncio.run(
self.encryption.decrypt(encrypted_msg, schema_version, salt)
)
self.assertEqual(value, decrypted_msg)
def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
"""Encrypt and decrypt with schema version 1.0, salt exists."""
schema_version = "1.0"
encrypted_msg = asyncio.run(
self.encryption.encrypt(value, schema_version, salt)
)
decrypted_msg = asyncio.run(
self.encryption.decrypt(encrypted_msg, schema_version, salt)
)
self.assertEqual(value, decrypted_msg)
def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
"""Encrypt and decrypt with schema version 1.1, without salt."""
salt = None
with self.assertRaises(Exception) as error:
asyncio.run(self.encryption.encrypt(value, schema_version, salt))
self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
class TestDeepUpdate(unittest.TestCase):
def test_update_dict(self):
# Original, patch, expected result
TEST = (
({"a": "b"}, {"a": "c"}, {"a": "c"}),
({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
({"a": "b"}, {"a": None}, {}),
({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
({1: {"a": "foo"}}, {1: None}, {}),
({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
({"e": None}, {"a": 1}, {"e": None, "a": 1}),
({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
)
for t in TEST:
deep_update(t[0], t[1])
self.assertEqual(t[0], t[2])
# test deepcopy is done. So that original dictionary does not reference the pach
test_original = {1: {"a": "b"}}
test_patch = {1: {"c": {"d": "e"}}}
test_result = {1: {"a": "b", "c": {"d": "e"}}}
deep_update(test_original, test_patch)
self.assertEqual(test_original, test_result)
test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
self.assertEqual(test_original, test_result)
def test_update_array(self):
# This TEST contains a list with the the Original, patch, and expected result
TEST = (
# delete all instances of "a"/"d"
({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
# delete and insert at 0
(
{"A": ["a", "b", "c"]},
{"A": {"$b": None, "$+[0]": "b"}},
{"A": ["b", "a", "c"]},
),
# delete and edit
(
{"A": ["a", "b", "a"]},
{"A": {"$a": None, "$[1]": {"c": "d"}}},
{"A": [{"c": "d"}]},
),
# insert if not exist
({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
# edit by filter
(
{"A": ["a", "b", "a"]},
{"A": {"$b": {"c": "d"}}},
{"A": ["a", {"c": "d"}, "a"]},
),
(
{"A": ["a", "b", "a"]},
{"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
{"A": ["b", "a", "a", "c"]},
),
({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
# index deletion out of range
({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
# nested array->dict
(
{"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
{"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
{"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
),
(
{"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
{"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
{
"A": [
{"id": 1, "c": {"d": "e", "f": "g"}},
{"id": 1, "c": {"d": "e", "f": "g"}},
]
},
),
# nested array->array
(
{"A": ["a", "b", ["a", "b"]]},
{"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
{"A": ["a", ["a", {}, "c"]]},
),
# types str and int different, so not found
(
{"A": ["a", {"id": "1", "c": "d"}]},
{"A": {"$id: 1": {"c": "e"}}},
{"A": ["a", {"id": "1", "c": "d"}]},
),
)
for t in TEST:
print(t)
deep_update(t[0], t[1])
self.assertEqual(t[0], t[2])
def test_update_badformat(self):
# This TEST contains original, incorrect patch and #TODO text that must be present
TEST = (
# conflict, index 0 is edited twice
({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
# conflict, two insertions at same index
({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
# bad format keys with and without $
({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
# bad format empty $ and yaml incorrect
({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
# insertion of None
({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
# Not found, insertion of None
({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
# index edition out of range
({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
# conflict, two editions on index 2
(
{"A": ["a", {"id": "1", "c": "d"}]},
{"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
),
)
for t in TEST:
print(t)
self.assertRaises(DbException, deep_update, t[0], t[1])
try:
deep_update(t[0], t[1])
except DbException as e:
print(e)
if __name__ == "__main__":
unittest.main()