Ubuntu 22.04 and Python 3.10 preparation
[osm/common.git] / osm_common / tests / test_dbbase.py
index c2af52f..e582c7e 100644 (file)
@@ -1,7 +1,60 @@
+# Copyright 2018 Whitestack, LLC
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
+##
+import asyncio
+import copy
+from copy import deepcopy
 import http
+from http import HTTPStatus
+import logging
+from os import urandom
+import unittest
+from unittest.mock import MagicMock, Mock, patch
+
+from Crypto.Cipher import AES
+from osm_common.dbbase import DbBase, DbException, deep_update, Encryption
 import pytest
 
-from osm_common.dbbase import DbBase, DbException
+
+# Variables used in TestBaseEncryption and TestAsyncEncryption
+salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106"
+value = "private key txt"
+padded_value = b"private key txt\0"
+padded_encoded_value = b"private key txt\x00"
+encoding_type = "ascii"
+encyrpt_mode = AES.MODE_ECB
+secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
+encyrpted_value = "ZW5jcnlwdGVkIGRhdGE="
+encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE="
+data_to_b4_encode = b"encrypted data"
+b64_decoded = b"decrypted data"
+schema_version = "1.1"
+joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
+serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
+base64_decoded_serial = b"g\xbe\xdb"
+decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg=="
+decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA=="
+item = {
+    "secret": "mysecret",
+    "cacert": "mycacert",
+    "path": "/var",
+    "ip": "192.168.12.23",
+}
 
 
 def exception_message(message):
@@ -20,7 +73,11 @@ def test_constructor():
 
 
 def test_db_connect(db_base):
-    db_base.db_connect(None)
+    with pytest.raises(DbException) as excinfo:
+        db_base.db_connect(None)
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'db_connect' not implemented")
+    )
 
 
 def test_db_disconnect(db_base):
@@ -30,33 +87,1494 @@ def test_db_disconnect(db_base):
 def test_get_list(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.get_list(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'get_list' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'get_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_get_one(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.get_one(None, None, None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'get_one' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'get_one' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_create(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.create(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'create' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'create' not implemented")
+    )
+    assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
+
+
+def test_create_list(db_base):
+    with pytest.raises(DbException) as excinfo:
+        db_base.create_list(None, None)
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'create_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_del_list(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.del_list(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'del_list' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'del_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_del_one(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.del_one(None, None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'del_one' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'del_one' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
+
+
+class TestEncryption(unittest.TestCase):
+    def setUp(self):
+        master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
+        db_base1 = DbBase()
+        db_base2 = DbBase()
+        db_base3 = DbBase()
+        # set self.secret_key obtained when connect
+        db_base1.set_secret_key(master_key, replace=True)
+        db_base1.set_secret_key(urandom(32))
+        db_base2.set_secret_key(None, replace=True)
+        db_base2.set_secret_key(urandom(30))
+        db_base3.set_secret_key(master_key)
+        self.db_bases = [db_base1, db_base2, db_base3]
+
+    def test_encrypt_decrypt(self):
+        TEST = (
+            ("plain text 1 ! ", None),
+            ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
+        )
+        for db_base in self.db_bases:
+            for value, salt in TEST:
+                # no encryption
+                encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
+                self.assertEqual(
+                    encrypted, value, "value '{}' has been encrypted".format(value)
+                )
+                decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
+                self.assertEqual(
+                    decrypted, value, "value '{}' has been decrypted".format(value)
+                )
+
+                # encrypt/decrypt
+                encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
+                self.assertNotEqual(
+                    encrypted, value, "value '{}' has not been encrypted".format(value)
+                )
+                self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
+                decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
+                self.assertEqual(
+                    decrypted, value, "value is not equal after encryption/decryption"
+                )
+
+    def test_encrypt_decrypt_salt(self):
+        value = "value to be encrypted!"
+        encrypted = []
+        for db_base in self.db_bases:
+            for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
+                # encrypt/decrypt
+                encrypted.append(
+                    db_base.encrypt(value, schema_version="1.1", salt=salt)
+                )
+                self.assertNotEqual(
+                    encrypted[-1],
+                    value,
+                    "value '{}' has not been encrypted".format(value),
+                )
+                self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
+                decrypted = db_base.decrypt(
+                    encrypted[-1], schema_version="1.1", salt=salt
+                )
+                self.assertEqual(
+                    decrypted, value, "value is not equal after encryption/decryption"
+                )
+        for i in range(0, len(encrypted)):
+            for j in range(i + 1, len(encrypted)):
+                self.assertNotEqual(
+                    encrypted[i],
+                    encrypted[j],
+                    "encryption with different salt must contain different result",
+                )
+        # decrypt with a different master key
+        try:
+            decrypted = self.db_bases[-1].decrypt(
+                encrypted[0], schema_version="1.1", salt=None
+            )
+            self.assertNotEqual(
+                encrypted[0],
+                decrypted,
+                "Decryption with different KEY must generate different result",
+            )
+        except DbException as e:
+            self.assertEqual(
+                e.http_code,
+                HTTPStatus.INTERNAL_SERVER_ERROR,
+                "Decryption with different KEY does not provide expected http_code",
+            )
+
+
+class AsyncMock(MagicMock):
+    async def __call__(self, *args, **kwargs):
+        args = deepcopy(args)
+        kwargs = deepcopy(kwargs)
+        return super(AsyncMock, self).__call__(*args, **kwargs)
+
+
+class CopyingMock(MagicMock):
+    def __call__(self, *args, **kwargs):
+        args = deepcopy(args)
+        kwargs = deepcopy(kwargs)
+        return super(CopyingMock, self).__call__(*args, **kwargs)
+
+
+def check_if_assert_not_called(mocks: list):
+    for mocking in mocks:
+        mocking.assert_not_called()
+
+
+class TestBaseEncryption(unittest.TestCase):
+    @patch("logging.getLogger", autospec=True)
+    def setUp(self, mock_logger):
+        mock_logger = logging.getLogger()
+        mock_logger.disabled = True
+        self.db_base = DbBase()
+        self.mock_cipher = CopyingMock()
+        self.db_base.encoding_type = encoding_type
+        self.db_base.encrypt_mode = encyrpt_mode
+        self.db_base.secret_key = secret_key
+        self.mock_padded_msg = CopyingMock()
+
+    def test_pad_data_len_not_multiplication_of_16(self):
+        data = "hello word hello hello word hello word"
+        data_len = len(data)
+        expected_len = 48
+        padded = self.db_base.pad_data(data)
+        self.assertEqual(len(padded), expected_len)
+        self.assertTrue("\0" * (expected_len - data_len) in padded)
+
+    def test_pad_data_len_multiplication_of_16(self):
+        data = "hello word!!!!!!"
+        padded = self.db_base.pad_data(data)
+        self.assertEqual(padded, data)
+        self.assertFalse("\0" in padded)
+
+    def test_pad_data_empty_string(self):
+        data = ""
+        expected_len = 0
+        padded = self.db_base.pad_data(data)
+        self.assertEqual(len(padded), expected_len)
+        self.assertFalse("\0" in padded)
+
+    def test_pad_data_not_string(self):
+        data = None
+        with self.assertRaises(Exception) as err:
+            self.db_base.pad_data(data)
+        self.assertEqual(
+            str(err.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+
+    def test_unpad_data_null_char_at_right(self):
+        null_padded_data = "hell0word\0\0"
+        expected_length = len(null_padded_data) - 2
+        unpadded = self.db_base.unpad_data(null_padded_data)
+        self.assertEqual(len(unpadded), expected_length)
+        self.assertFalse("\0" in unpadded)
+        self.assertTrue("0" in unpadded)
+
+    def test_unpad_data_null_char_is_not_rightest(self):
+        null_padded_data = "hell0word\r\t\0\n"
+        expected_length = len(null_padded_data)
+        unpadded = self.db_base.unpad_data(null_padded_data)
+        self.assertEqual(len(unpadded), expected_length)
+        self.assertTrue("\0" in unpadded)
+
+    def test_unpad_data_with_spaces_at_right(self):
+        null_padded_data = "  hell0word\0  "
+        expected_length = len(null_padded_data)
+        unpadded = self.db_base.unpad_data(null_padded_data)
+        self.assertEqual(len(unpadded), expected_length)
+        self.assertTrue("\0" in unpadded)
+
+    def test_unpad_data_empty_string(self):
+        data = ""
+        unpadded = self.db_base.unpad_data(data)
+        self.assertEqual(unpadded, "")
+        self.assertFalse("\0" in unpadded)
+
+    def test_unpad_data_not_string(self):
+        data = None
+        with self.assertRaises(Exception) as err:
+            self.db_base.unpad_data(data)
+        self.assertEqual(
+            str(err.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
+        self, mock_pad_data, mock_join_secret_key
+    ):
+        """schema_version 1.0, secret_key is None and salt is None."""
+        schema_version = "1.0"
+        salt = None
+        self.db_base.secret_key = None
+        result = self.db_base._encrypt_value(value, schema_version, salt)
+        self.assertEqual(result, value)
+        check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """schema_version 1.1, secret_key exists, salt exists."""
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.encrypt.return_value = data_to_b4_encode
+        self.mock_padded_msg.return_value = padded_value
+        mock_pad_data.return_value = self.mock_padded_msg
+        self.mock_padded_msg.encode.return_value = padded_encoded_value
+
+        mock_b64_encode.return_value = encyrpted_bytes
+
+        result = self.db_base._encrypt_value(value, schema_version, salt)
+
+        self.assertTrue(isinstance(result, str))
+        self.assertEqual(result, encyrpted_value)
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_pad_data.assert_called_once_with(value)
+        mock_b64_encode.assert_called_once_with(data_to_b4_encode)
+        self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
+        self.mock_padded_msg.encode.assert_called_with(encoding_type)
+
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
+        self, mock_pad_data, mock_join_secret_key
+    ):
+        """schema_version 1.0, secret_key is None, salt exists."""
+        schema_version = "1.0"
+        self.db_base.secret_key = None
+        result = self.db_base._encrypt_value(value, schema_version, salt)
+        self.assertEqual(result, value)
+        check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
+
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
+        self, mock_pad_data, mock_join_secret_key
+    ):
+        """schema_version 1.1, secret_key is None, salt exists."""
+        self.db_base.secret_key = None
+        result = self.db_base._encrypt_value(value, schema_version, salt)
+        self.assertEqual(result, value)
+        check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """schema_version 1.1, secret_key exists, salt is None."""
+        salt = None
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.encrypt.return_value = data_to_b4_encode
+
+        self.mock_padded_msg.return_value = padded_value
+        mock_pad_data.return_value = self.mock_padded_msg
+        self.mock_padded_msg.encode.return_value = padded_encoded_value
+
+        mock_b64_encode.return_value = encyrpted_bytes
+
+        result = self.db_base._encrypt_value(value, schema_version, salt)
+
+        self.assertEqual(result, encyrpted_value)
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_pad_data.assert_called_once_with(value)
+        mock_b64_encode.assert_called_once_with(data_to_b4_encode)
+        self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
+        self.mock_padded_msg.encode.assert_called_with(encoding_type)
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_invalid_encrpt_mode(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """encrypt_mode is invalid."""
+        mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
+        self.db_base.encrypt_mode = "AES.MODE_XXX"
+
+        with self.assertRaises(Exception) as err:
+            self.db_base._encrypt_value(value, schema_version, salt)
+
+        self.assertEqual(str(err.exception), "Invalid ciphering mode.")
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
+        check_if_assert_not_called([mock_pad_data, mock_b64_encode])
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """schema_version 1.1, secret_key exists, value is None."""
+        value = None
+        mock_aes.new.return_value = self.mock_cipher
+        mock_pad_data.side_effect = DbException(
+            "Incorrect data type: type(None), string is expected."
+        )
+
+        with self.assertRaises(Exception) as err:
+            self.db_base._encrypt_value(value, schema_version, salt)
+        self.assertEqual(
+            str(err.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_pad_data.assert_called_once_with(value)
+        check_if_assert_not_called(
+            [mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode]
+        )
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_join_secret_key_raises(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """Method join_secret_key raises DbException."""
+        salt = b"3434o34-3wewrwr-222424-2242dwew"
+
+        mock_join_secret_key.side_effect = DbException("Unexpected type")
+
+        mock_aes.new.return_value = self.mock_cipher
+
+        with self.assertRaises(Exception) as err:
+            self.db_base._encrypt_value(value, schema_version, salt)
+
+        self.assertEqual(str(err.exception), "database exception Unexpected type")
+        check_if_assert_not_called(
+            [mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt]
+        )
+        mock_join_secret_key.assert_called_once_with(salt)
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """schema_version 1.1, secret_key exists, b64encode raises TypeError."""
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.encrypt.return_value = "encrypted data"
+
+        self.mock_padded_msg.return_value = padded_value
+        mock_pad_data.return_value = self.mock_padded_msg
+        self.mock_padded_msg.encode.return_value = padded_encoded_value
+
+        mock_b64_encode.side_effect = TypeError(
+            "A bytes-like object is required, not 'str'"
+        )
+
+        with self.assertRaises(Exception) as error:
+            self.db_base._encrypt_value(value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception), "A bytes-like object is required, not 'str'"
+        )
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_pad_data.assert_called_once_with(value)
+        self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
+        self.mock_padded_msg.encode.assert_called_with(encoding_type)
+        mock_b64_encode.assert_called_once_with("encrypted data")
+
+    @patch("osm_common.dbbase.b64encode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "pad_data")
+    def test__encrypt_value_cipher_encrypt_raises(
+        self,
+        mock_pad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_encode,
+    ):
+        """AES encrypt method raises Exception."""
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.")
+
+        self.mock_padded_msg.return_value = padded_value
+        mock_pad_data.return_value = self.mock_padded_msg
+        self.mock_padded_msg.encode.return_value = padded_encoded_value
+
+        with self.assertRaises(Exception) as error:
+            self.db_base._encrypt_value(value, schema_version, salt)
+
+        self.assertEqual(str(error.exception), "Invalid data type.")
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_pad_data.assert_called_once_with(value)
+        self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
+        self.mock_padded_msg.encode.assert_called_with(encoding_type)
+        mock_b64_encode.assert_not_called()
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_encrypt_value")
+    def test_encrypt_without_schema_version_without_salt(
+        self, mock_encrypt_value, mock_get_secret_key
+    ):
+        """schema and salt is None."""
+        mock_encrypt_value.return_value = encyrpted_value
+        result = self.db_base.encrypt(value)
+        mock_encrypt_value.assert_called_once_with(value, None, None)
+        mock_get_secret_key.assert_called_once()
+        self.assertEqual(result, encyrpted_value)
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_encrypt_value")
+    def test_encrypt_with_schema_version_with_salt(
+        self, mock_encrypt_value, mock_get_secret_key
+    ):
+        """schema version exists, salt is None."""
+        mock_encrypt_value.return_value = encyrpted_value
+        result = self.db_base.encrypt(value, schema_version, salt)
+        mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
+        mock_get_secret_key.assert_called_once()
+        self.assertEqual(result, encyrpted_value)
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_encrypt_value")
+    def test_encrypt_get_secret_key_raises(
+        self, mock_encrypt_value, mock_get_secret_key
+    ):
+        """get_secret_key method raises DbException."""
+        mock_get_secret_key.side_effect = DbException("KeyError")
+        with self.assertRaises(Exception) as error:
+            self.db_base.encrypt(value)
+        self.assertEqual(str(error.exception), "database exception KeyError")
+        mock_encrypt_value.assert_not_called()
+        mock_get_secret_key.assert_called_once()
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_encrypt_value")
+    def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
+        """_encrypt method raises DbException."""
+        mock_encrypt_value.side_effect = DbException(
+            "Incorrect data type: type(None), string is expected."
+        )
+        with self.assertRaises(Exception) as error:
+            self.db_base.encrypt(value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+        mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
+        mock_get_secret_key.assert_called_once()
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """schema_version 1.1, secret_key exists, salt is None."""
+        salt = None
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.decrypt.return_value = padded_encoded_value
+
+        mock_b64_decode.return_value = b64_decoded
+
+        mock_unpad_data.return_value = value
+
+        result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(result, value)
+
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_unpad_data.assert_called_once_with("private key txt\0")
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """schema_version 1.1, secret_key exists, salt is None."""
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.decrypt.return_value = padded_encoded_value
+
+        mock_b64_decode.return_value = b64_decoded
+
+        mock_unpad_data.return_value = value
+
+        result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(result, value)
+
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_unpad_data.assert_called_once_with("private key txt\0")
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_schema_version_1_1_without_secret_key(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """schema_version 1.1, secret_key is None, salt exists."""
+        self.db_base.secret_key = None
+
+        result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+
+        self.assertEqual(result, encyrpted_value)
+        check_if_assert_not_called(
+            [
+                mock_join_secret_key,
+                mock_aes.new,
+                mock_unpad_data,
+                mock_b64_decode,
+                self.mock_cipher.decrypt,
+            ]
+        )
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_schema_version_1_0_with_secret_key(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """schema_version 1.0, secret_key exists, salt exists."""
+        schema_version = "1.0"
+        result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+
+        self.assertEqual(result, encyrpted_value)
+        check_if_assert_not_called(
+            [
+                mock_join_secret_key,
+                mock_aes.new,
+                mock_unpad_data,
+                mock_b64_decode,
+                self.mock_cipher.decrypt,
+            ]
+        )
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_join_secret_key_raises(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """_join_secret_key raises TypeError."""
+        salt = object()
+        mock_join_secret_key.side_effect = TypeError("'type' object is not iterable")
+
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(str(error.exception), "'type' object is not iterable")
+
+        mock_join_secret_key.assert_called_once_with(salt)
+        check_if_assert_not_called(
+            [mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt]
+        )
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_b64decode_raises(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """b64decode raises TypeError."""
+        mock_b64_decode.side_effect = TypeError(
+            "A str-like object is required, not 'bytes'"
+        )
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception), "A str-like object is required, not 'bytes'"
+        )
+
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        mock_join_secret_key.assert_called_once_with(salt)
+        check_if_assert_not_called(
+            [mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data]
+        )
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_invalid_encrypt_mode(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """Invalid AES encrypt mode."""
+        mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
+        self.db_base.encrypt_mode = "AES.MODE_XXX"
+
+        mock_b64_decode.return_value = b64_decoded
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+
+        self.assertEqual(str(error.exception), "Invalid ciphering mode.")
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt])
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_cipher_decrypt_raises(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """AES decrypt raises Exception."""
+        mock_b64_decode.return_value = b64_decoded
+
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.")
+
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(str(error.exception), "Invalid data type.")
+
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
+        mock_unpad_data.assert_not_called()
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_decode_raises(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """Decode raises UnicodeDecodeError."""
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.decrypt.return_value = b"\xd0\x000091"
+
+        mock_b64_decode.return_value = b64_decoded
+
+        mock_unpad_data.return_value = value
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception),
+            "database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
+        )
+        self.assertEqual(type(error.exception), DbException)
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
+        mock_unpad_data.assert_not_called()
+
+    @patch("osm_common.dbbase.b64decode")
+    @patch("osm_common.dbbase.AES")
+    @patch.object(DbBase, "_join_secret_key")
+    @patch.object(DbBase, "unpad_data")
+    def test__decrypt_value_unpad_data_raises(
+        self,
+        mock_unpad_data,
+        mock_join_secret_key,
+        mock_aes,
+        mock_b64_decode,
+    ):
+        """Method unpad_data raises error."""
+        mock_decrypted_message = MagicMock()
+        mock_decrypted_message.decode.return_value = None
+        mock_aes.new.return_value = self.mock_cipher
+        self.mock_cipher.decrypt.return_value = mock_decrypted_message
+        mock_unpad_data.side_effect = DbException(
+            "Incorrect data type: type(None), string is expected."
+        )
+        mock_b64_decode.return_value = b64_decoded
+
+        with self.assertRaises(Exception) as error:
+            self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+        self.assertEqual(type(error.exception), DbException)
+        mock_join_secret_key.assert_called_once_with(salt)
+        _call_mock_aes_new = mock_aes.new.call_args_list[0].args
+        self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
+        mock_b64_decode.assert_called_once_with(encyrpted_value)
+        self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
+        mock_decrypted_message.decode.assert_called_once_with(
+            self.db_base.encoding_type
+        )
+        mock_unpad_data.assert_called_once_with(None)
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_decrypt_value")
+    def test_decrypt_without_schema_version_without_salt(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """schema_version is None, salt is None."""
+        mock_decrypt_value.return_value = encyrpted_value
+        result = self.db_base.decrypt(value)
+        mock_decrypt_value.assert_called_once_with(value, None, None)
+        mock_get_secret_key.assert_called_once()
+        self.assertEqual(result, encyrpted_value)
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_decrypt_value")
+    def test_decrypt_with_schema_version_with_salt(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """schema_version and salt exist."""
+        mock_decrypt_value.return_value = encyrpted_value
+        result = self.db_base.decrypt(value, schema_version, salt)
+        mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
+        mock_get_secret_key.assert_called_once()
+        self.assertEqual(result, encyrpted_value)
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_decrypt_value")
+    def test_decrypt_get_secret_key_raises(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """Method get_secret_key raises KeyError."""
+        mock_get_secret_key.side_effect = DbException("KeyError")
+        with self.assertRaises(Exception) as error:
+            self.db_base.decrypt(value)
+        self.assertEqual(str(error.exception), "database exception KeyError")
+        mock_decrypt_value.assert_not_called()
+        mock_get_secret_key.assert_called_once()
+
+    @patch.object(DbBase, "get_secret_key")
+    @patch.object(DbBase, "_decrypt_value")
+    def test_decrypt_decrypt_value_raises(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """Method _decrypt raises error."""
+        mock_decrypt_value.side_effect = DbException(
+            "Incorrect data type: type(None), string is expected."
+        )
+        with self.assertRaises(Exception) as error:
+            self.db_base.decrypt(value, schema_version, salt)
+        self.assertEqual(
+            str(error.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+        mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
+        mock_get_secret_key.assert_called_once()
+
+    def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
+        """Encrypt and decrypt with schema version 1.1, salt exists."""
+        encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
+        decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
+        self.assertEqual(value, decrypted_msg)
+
+    def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
+        """Encrypt and decrypt with schema version 1.0, salt exists."""
+        schema_version = "1.0"
+        encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
+        decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
+        self.assertEqual(value, decrypted_msg)
+
+    def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
+        """Encrypt and decrypt with schema version 1.1 and without salt."""
+        salt = None
+        encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
+        decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
+        self.assertEqual(value, decrypted_msg)
+
+
+class TestAsyncEncryption(unittest.TestCase):
+    @patch("logging.getLogger", autospec=True)
+    def setUp(self, mock_logger):
+        mock_logger = logging.getLogger()
+        mock_logger.disabled = True
+        self.encryption = Encryption(uri="uri", config={})
+        self.encryption.encoding_type = encoding_type
+        self.encryption.encrypt_mode = encyrpt_mode
+        self.encryption._secret_key = secret_key
+        self.admin_collection = Mock()
+        self.admin_collection.find_one = AsyncMock()
+        self.encryption._client = {
+            "osm": {
+                "admin": self.admin_collection,
+            }
+        }
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_with_item_with_fields(self, mock_decrypt):
+        """item and fields exist."""
+        mock_decrypt.side_effect = [decrypted_val1, decrypted_val2]
+        input_item = copy.deepcopy(item)
+        expected_item = {
+            "secret": decrypted_val1,
+            "cacert": decrypted_val2,
+            "path": "/var",
+            "ip": "192.168.12.23",
+        }
+        fields = ["secret", "cacert"]
+
+        asyncio.run(
+            self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+        )
+        self.assertEqual(input_item, expected_item)
+        _call_mock_decrypt = mock_decrypt.call_args_list
+        self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
+        self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt))
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt):
+        """item is empty and fields exists."""
+        input_item = {}
+        fields = ["secret", "cacert"]
+        asyncio.run(
+            self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+        )
+        self.assertEqual(input_item, {})
+        mock_decrypt.assert_not_called()
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_with_item_without_fields(self, mock_decrypt):
+        """item exists and fields is empty."""
+        input_item = copy.deepcopy(item)
+        fields = []
+        asyncio.run(
+            self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+        )
+        self.assertEqual(input_item, item)
+        mock_decrypt.assert_not_called()
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt):
+        """item exists and field has single value."""
+        mock_decrypt.return_value = decrypted_val1
+        fields = ["secret"]
+        input_item = copy.deepcopy(item)
+        expected_item = {
+            "secret": decrypted_val1,
+            "cacert": "mycacert",
+            "path": "/var",
+            "ip": "192.168.12.23",
+        }
+        asyncio.run(
+            self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+        )
+        self.assertEqual(input_item, expected_item)
+        _call_mock_decrypt = mock_decrypt.call_args_list
+        self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
+        self, mock_decrypt
+    ):
+        """item exists and field has single value, salt is None, schema version is 1.0."""
+        schema_version = "1.0"
+        salt = None
+        mock_decrypt.return_value = "mysecret"
+        input_item = copy.deepcopy(item)
+        fields = ["secret"]
+        asyncio.run(
+            self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+        )
+        self.assertEqual(input_item, item)
+        _call_mock_decrypt = mock_decrypt.call_args_list
+        self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None))
+
+    @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
+    def test_decrypt_fields_decrypt_raises(self, mock_decrypt):
+        """Method decrypt raises error."""
+        mock_decrypt.side_effect = DbException(
+            "Incorrect data type: type(None), string is expected."
+        )
+        fields = ["secret"]
+        input_item = copy.deepcopy(item)
+        with self.assertRaises(Exception) as error:
+            asyncio.run(
+                self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
+            )
+        self.assertEqual(
+            str(error.exception),
+            "database exception Incorrect data type: type(None), string is expected.",
+        )
+        self.assertEqual(input_item, item)
+        _call_mock_decrypt = mock_decrypt.call_args_list
+        self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_encrypt_value")
+    def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
+        """Method decrypt raises error."""
+        mock_encrypt_value.return_value = encyrpted_value
+        result = asyncio.run(self.encryption.encrypt(value, schema_version, salt))
+        self.assertEqual(result, encyrpted_value)
+        mock_get_secret_key.assert_called_once()
+        mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_encrypt_value")
+    def test_encrypt_get_secret_key_raises(
+        self, mock_encrypt_value, mock_get_secret_key
+    ):
+        """Method get_secret_key raises error."""
+        mock_get_secret_key.side_effect = DbException("Unexpected type.")
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.encrypt(value, schema_version, salt))
+        self.assertEqual(str(error.exception), "database exception Unexpected type.")
+        mock_get_secret_key.assert_called_once()
+        mock_encrypt_value.assert_not_called()
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_encrypt_value")
+    def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
+        """Method _encrypt raises error."""
+        mock_encrypt_value.side_effect = TypeError(
+            "A bytes-like object is required, not 'str'"
+        )
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.encrypt(value, schema_version, salt))
+        self.assertEqual(
+            str(error.exception), "A bytes-like object is required, not 'str'"
+        )
+        mock_get_secret_key.assert_called_once()
+        mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_decrypt_value")
+    def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
+        """Decrypted successfully."""
+        mock_decrypt_value.return_value = value
+        result = asyncio.run(
+            self.encryption.decrypt(encyrpted_value, schema_version, salt)
+        )
+        self.assertEqual(result, value)
+        mock_get_secret_key.assert_called_once()
+        mock_decrypt_value.assert_called_once_with(
+            encyrpted_value, schema_version, salt
+        )
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_decrypt_value")
+    def test_decrypt_get_secret_key_raises(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """Method get_secret_key raises error."""
+        mock_get_secret_key.side_effect = DbException("Unexpected type.")
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
+        self.assertEqual(str(error.exception), "database exception Unexpected type.")
+        mock_get_secret_key.assert_called_once()
+        mock_decrypt_value.assert_not_called()
+
+    @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
+    @patch.object(Encryption, "_decrypt_value")
+    def test_decrypt_decrypt_value_raises(
+        self, mock_decrypt_value, mock_get_secret_key
+    ):
+        """Method get_secret_key raises error."""
+        mock_decrypt_value.side_effect = TypeError(
+            "A bytes-like object is required, not 'str'"
+        )
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
+        self.assertEqual(
+            str(error.exception), "A bytes-like object is required, not 'str'"
+        )
+        mock_get_secret_key.assert_called_once()
+        mock_decrypt_value.assert_called_once_with(
+            encyrpted_value, schema_version, salt
+        )
+
+    def test_join_keys_string_key(self):
+        """key is string."""
+        string_key = "sample key"
+        result = self.encryption._join_keys(string_key, secret_key)
+        self.assertEqual(result, joined_key)
+        self.assertTrue(isinstance(result, bytes))
+
+    def test_join_keys_bytes_key(self):
+        """key is bytes."""
+        bytes_key = b"sample key"
+        result = self.encryption._join_keys(bytes_key, secret_key)
+        self.assertEqual(result, joined_key)
+        self.assertTrue(isinstance(result, bytes))
+        self.assertEqual(len(result.decode("unicode_escape")), 32)
+
+    def test_join_keys_int_key(self):
+        """key is int."""
+        int_key = 923
+        with self.assertRaises(Exception) as error:
+            self.encryption._join_keys(int_key, None)
+        self.assertEqual(str(error.exception), "'int' object is not iterable")
+
+    def test_join_keys_none_secret_key(self):
+        """key is as bytes and secret key is None."""
+        bytes_key = b"sample key"
+        result = self.encryption._join_keys(bytes_key, None)
+        self.assertEqual(
+            result,
+            b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
+        )
+        self.assertTrue(isinstance(result, bytes))
+        self.assertEqual(len(result.decode("unicode_escape")), 32)
+
+    def test_join_keys_none_key_none_secret_key(self):
+        """key is None and secret key is None."""
+        with self.assertRaises(Exception) as error:
+            self.encryption._join_keys(None, None)
+        self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
+
+    def test_join_keys_none_key(self):
+        """key is None and secret key exists."""
+        with self.assertRaises(Exception) as error:
+            self.encryption._join_keys(None, secret_key)
+        self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
+
+    @patch.object(Encryption, "_join_keys")
+    def test_join_secret_key_string_sample_key(self, mock_join_keys):
+        """key is None and secret key exists as string."""
+        update_key = "sample key"
+        mock_join_keys.return_value = joined_key
+        result = self.encryption._join_secret_key(update_key)
+        self.assertEqual(result, joined_key)
+        self.assertTrue(isinstance(result, bytes))
+        mock_join_keys.assert_called_once_with(update_key, secret_key)
+
+    @patch.object(Encryption, "_join_keys")
+    def test_join_secret_key_byte_sample_key(self, mock_join_keys):
+        """key is None and secret key exists as bytes."""
+        update_key = b"sample key"
+        mock_join_keys.return_value = joined_key
+        result = self.encryption._join_secret_key(update_key)
+        self.assertEqual(result, joined_key)
+        self.assertTrue(isinstance(result, bytes))
+        mock_join_keys.assert_called_once_with(update_key, secret_key)
+
+    @patch.object(Encryption, "_join_keys")
+    def test_join_secret_key_join_keys_raises(self, mock_join_keys):
+        """Method _join_secret_key raises."""
+        update_key = 3434
+        mock_join_keys.side_effect = TypeError("'int' object is not iterable")
+        with self.assertRaises(Exception) as error:
+            self.encryption._join_secret_key(update_key)
+        self.assertEqual(str(error.exception), "'int' object is not iterable")
+        mock_join_keys.assert_called_once_with(update_key, secret_key)
+
+    @patch.object(Encryption, "_join_keys")
+    def test_get_secret_key_exists(self, mock_join_keys):
+        """secret_key exists."""
+        self.encryption._secret_key = secret_key
+        asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(self.encryption.secret_key, secret_key)
+        mock_join_keys.assert_not_called()
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_not_exist_database_key_exist(
+        self, mock_b64decode, mock_join_keys
+    ):
+        """secret_key does not exist, database key exists."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.return_value = None
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.return_value = joined_key
+        asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(self.encryption.secret_key, joined_key)
+        self.assertEqual(mock_join_keys.call_count, 1)
+        mock_b64decode.assert_not_called()
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
+        self, mock_b64decode, mock_join_keys
+    ):
+        """secret_key does not exist, database key exists."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.return_value = joined_key
+        asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(self.encryption.secret_key, joined_key)
+        self.assertEqual(mock_join_keys.call_count, 1)
+        mock_b64decode.assert_not_called()
+        self.encryption._admin_collection.find_one.assert_called_once_with(
+            {"_id": "version"}
+        )
+        _call_mock_join_keys = mock_join_keys.call_args_list
+        self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
+        self, mock_b64decode, mock_join_keys
+    ):
+        """secret_key does not exist, database key exists, version and serial exist
+        in admin collection."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.return_value = {
+            "version": "1.0",
+            "serial": serial_bytes,
+        }
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.side_effect = [secret_key, joined_key]
+        mock_b64decode.return_value = base64_decoded_serial
+        asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(self.encryption.secret_key, joined_key)
+        self.assertEqual(mock_join_keys.call_count, 2)
+        mock_b64decode.assert_called_once_with(serial_bytes)
+        self.encryption._admin_collection.find_one.assert_called_once_with(
+            {"_id": "version"}
+        )
+        _call_mock_join_keys = mock_join_keys.call_args_list
+        self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
+        self.assertEqual(
+            _call_mock_join_keys[1].args, (base64_decoded_serial, secret_key)
+        )
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys):
+        """Method _join_keys raises."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.return_value = {
+            "version": "1.0",
+            "serial": serial_bytes,
+        }
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.side_effect = DbException("Invalid data type.")
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(str(error.exception), "database exception Invalid data type.")
+        self.assertEqual(mock_join_keys.call_count, 1)
+        check_if_assert_not_called(
+            [mock_b64decode, self.encryption._admin_collection.find_one]
+        )
+        _call_mock_join_keys = mock_join_keys.call_args_list
+        self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys):
+        """Method b64decode raises."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.return_value = {
+            "version": "1.0",
+            "serial": base64_decoded_serial,
+        }
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.return_value = secret_key
+        mock_b64decode.side_effect = TypeError(
+            "A bytes-like object is required, not 'str'"
+        )
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(
+            str(error.exception), "A bytes-like object is required, not 'str'"
+        )
+        self.assertEqual(self.encryption.secret_key, None)
+        self.assertEqual(mock_join_keys.call_count, 1)
+        mock_b64decode.assert_called_once_with(base64_decoded_serial)
+        self.encryption._admin_collection.find_one.assert_called_once_with(
+            {"_id": "version"}
+        )
+        _call_mock_join_keys = mock_join_keys.call_args_list
+        self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
+
+    @patch.object(Encryption, "_join_keys")
+    @patch("osm_common.dbbase.b64decode")
+    def test_get_secret_key_admin_collection_find_one_raises(
+        self, mock_b64decode, mock_join_keys
+    ):
+        """admin_collection find_one raises."""
+        self.encryption._secret_key = None
+        self.encryption._admin_collection.find_one.side_effect = DbException(
+            "Connection failed."
+        )
+        self.encryption._config = {"database_commonkey": "osm_new_key"}
+        mock_join_keys.return_value = secret_key
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.get_secret_key())
+        self.assertEqual(str(error.exception), "database exception Connection failed.")
+        self.assertEqual(self.encryption.secret_key, None)
+        self.assertEqual(mock_join_keys.call_count, 1)
+        mock_b64decode.assert_not_called()
+        self.encryption._admin_collection.find_one.assert_called_once_with(
+            {"_id": "version"}
+        )
+        _call_mock_join_keys = mock_join_keys.call_args_list
+        self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
+
+    def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
+        """Encrypt and decrypt with schema version 1.1, salt exists."""
+        encrypted_msg = asyncio.run(
+            self.encryption.encrypt(value, schema_version, salt)
+        )
+        decrypted_msg = asyncio.run(
+            self.encryption.decrypt(encrypted_msg, schema_version, salt)
+        )
+        self.assertEqual(value, decrypted_msg)
+
+    def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
+        """Encrypt and decrypt with schema version 1.0, salt exists."""
+        schema_version = "1.0"
+        encrypted_msg = asyncio.run(
+            self.encryption.encrypt(value, schema_version, salt)
+        )
+        decrypted_msg = asyncio.run(
+            self.encryption.decrypt(encrypted_msg, schema_version, salt)
+        )
+        self.assertEqual(value, decrypted_msg)
+
+    def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
+        """Encrypt and decrypt with schema version 1.1, without salt."""
+        salt = None
+        with self.assertRaises(Exception) as error:
+            asyncio.run(self.encryption.encrypt(value, schema_version, salt))
+        self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
+
+
+class TestDeepUpdate(unittest.TestCase):
+    def test_update_dict(self):
+        # Original, patch, expected result
+        TEST = (
+            ({"a": "b"}, {"a": "c"}, {"a": "c"}),
+            ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
+            ({"a": "b"}, {"a": None}, {}),
+            ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
+            ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
+            ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
+            ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
+            ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
+            ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
+            ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
+            ({1: {"a": "foo"}}, {1: None}, {}),
+            ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
+            ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
+            ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
+            ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
+        )
+        for t in TEST:
+            deep_update(t[0], t[1])
+            self.assertEqual(t[0], t[2])
+        # test deepcopy is done. So that original dictionary does not reference the pach
+        test_original = {1: {"a": "b"}}
+        test_patch = {1: {"c": {"d": "e"}}}
+        test_result = {1: {"a": "b", "c": {"d": "e"}}}
+        deep_update(test_original, test_patch)
+        self.assertEqual(test_original, test_result)
+        test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
+        self.assertEqual(test_original, test_result)
+
+    def test_update_array(self):
+        # This TEST contains a list with the the Original, patch, and expected result
+        TEST = (
+            # delete all instances of "a"/"d"
+            ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
+            ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
+            # delete and insert at 0
+            (
+                {"A": ["a", "b", "c"]},
+                {"A": {"$b": None, "$+[0]": "b"}},
+                {"A": ["b", "a", "c"]},
+            ),
+            # delete and edit
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$a": None, "$[1]": {"c": "d"}}},
+                {"A": [{"c": "d"}]},
+            ),
+            # insert if not exist
+            ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
+            ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
+            # edit by filter
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$b": {"c": "d"}}},
+                {"A": ["a", {"c": "d"}, "a"]},
+            ),
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
+                {"A": ["b", "a", "a", "c"]},
+            ),
+            ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
+            # index deletion out of range
+            ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
+            # nested array->dict
+            (
+                {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
+                {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
+                {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
+            ),
+            (
+                {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
+                {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
+                {
+                    "A": [
+                        {"id": 1, "c": {"d": "e", "f": "g"}},
+                        {"id": 1, "c": {"d": "e", "f": "g"}},
+                    ]
+                },
+            ),
+            # nested array->array
+            (
+                {"A": ["a", "b", ["a", "b"]]},
+                {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
+                {"A": ["a", ["a", {}, "c"]]},
+            ),
+            # types str and int different, so not found
+            (
+                {"A": ["a", {"id": "1", "c": "d"}]},
+                {"A": {"$id: 1": {"c": "e"}}},
+                {"A": ["a", {"id": "1", "c": "d"}]},
+            ),
+        )
+        for t in TEST:
+            print(t)
+            deep_update(t[0], t[1])
+            self.assertEqual(t[0], t[2])
+
+    def test_update_badformat(self):
+        # This TEST contains original, incorrect patch and #TODO text that must be present
+        TEST = (
+            # conflict, index 0 is edited twice
+            ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
+            # conflict, two insertions at same index
+            ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
+            ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
+            # bad format keys with and without $
+            ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
+            # bad format empty $ and yaml incorrect
+            ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
+            ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
+            ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
+            # insertion of None
+            ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
+            # Not found, insertion of None
+            ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
+            # index edition out of range
+            ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
+            # conflict, two editions on index 2
+            (
+                {"A": ["a", {"id": "1", "c": "d"}]},
+                {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
+            ),
+        )
+        for t in TEST:
+            print(t)
+            self.assertRaises(DbException, deep_update, t[0], t[1])
+            try:
+                deep_update(t[0], t[1])
+            except DbException as e:
+                print(e)
+
+
+if __name__ == "__main__":
+    unittest.main()