Code Coverage

Cobertura Coverage Report > osm_common.tests >

test_dbbase.py

Trend

File Coverage summary

NameClassesLinesConditionals
test_dbbase.py
100%
1/1
96%
858/890
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_dbbase.py
96%
858/890
N/A

Source

osm_common/tests/test_dbbase.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19 1 import asyncio
20 1 import copy
21 1 from copy import deepcopy
22 1 import http
23 1 from http import HTTPStatus
24 1 import logging
25 1 from os import urandom
26 1 import unittest
27 1 from unittest.mock import MagicMock, Mock, patch
28
29 1 from Crypto.Cipher import AES
30 1 from osm_common.dbbase import DbBase, DbException, deep_update, Encryption
31 1 import pytest
32
33
34 # Variables used in TestBaseEncryption and TestAsyncEncryption
35 1 salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106"
36 1 value = "private key txt"
37 1 padded_value = b"private key txt\0"
38 1 padded_encoded_value = b"private key txt\x00"
39 1 encoding_type = "ascii"
40 1 encyrpt_mode = AES.MODE_ECB
41 1 secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
42 1 encyrpted_value = "ZW5jcnlwdGVkIGRhdGE="
43 1 encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE="
44 1 data_to_b4_encode = b"encrypted data"
45 1 b64_decoded = b"decrypted data"
46 1 schema_version = "1.1"
47 1 joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
48 1 serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
49 1 base64_decoded_serial = b"g\xbe\xdb"
50 1 decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg=="
51 1 decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA=="
52 1 item = {
53     "secret": "mysecret",
54     "cacert": "mycacert",
55     "path": "/var",
56     "ip": "192.168.12.23",
57 }
58
59
60 1 def exception_message(message):
61 0     return "database exception " + message
62
63
64 1 @pytest.fixture
65 1 def db_base():
66 0     return DbBase()
67
68
69 1 def test_constructor():
70 1     db_base = DbBase()
71 1     assert db_base is not None
72 1     assert isinstance(db_base, DbBase)
73
74
75 1 def test_db_connect(db_base):
76 0     with pytest.raises(DbException) as excinfo:
77 0         db_base.db_connect(None)
78 0     assert str(excinfo.value).startswith(
79         exception_message("Method 'db_connect' not implemented")
80     )
81
82
83 1 def test_db_disconnect(db_base):
84 0     db_base.db_disconnect()
85
86
87 1 def test_get_list(db_base):
88 0     with pytest.raises(DbException) as excinfo:
89 0         db_base.get_list(None, None)
90 0     assert str(excinfo.value).startswith(
91         exception_message("Method 'get_list' not implemented")
92     )
93 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
94
95
96 1 def test_get_one(db_base):
97 0     with pytest.raises(DbException) as excinfo:
98 0         db_base.get_one(None, None, None, None)
99 0     assert str(excinfo.value).startswith(
100         exception_message("Method 'get_one' not implemented")
101     )
102 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
103
104
105 1 def test_create(db_base):
106 0     with pytest.raises(DbException) as excinfo:
107 0         db_base.create(None, None)
108 0     assert str(excinfo.value).startswith(
109         exception_message("Method 'create' not implemented")
110     )
111 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
112
113
114 1 def test_create_list(db_base):
115 0     with pytest.raises(DbException) as excinfo:
116 0         db_base.create_list(None, None)
117 0     assert str(excinfo.value).startswith(
118         exception_message("Method 'create_list' not implemented")
119     )
120 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
121
122
123 1 def test_del_list(db_base):
124 0     with pytest.raises(DbException) as excinfo:
125 0         db_base.del_list(None, None)
126 0     assert str(excinfo.value).startswith(
127         exception_message("Method 'del_list' not implemented")
128     )
129 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
130
131
132 1 def test_del_one(db_base):
133 0     with pytest.raises(DbException) as excinfo:
134 0         db_base.del_one(None, None, None)
135 0     assert str(excinfo.value).startswith(
136         exception_message("Method 'del_one' not implemented")
137     )
138 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
139
140
141 1 class TestEncryption(unittest.TestCase):
142 1     def setUp(self):
143 1         master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
144 1         db_base1 = DbBase()
145 1         db_base2 = DbBase()
146 1         db_base3 = DbBase()
147         # set self.secret_key obtained when connect
148 1         db_base1.set_secret_key(master_key, replace=True)
149 1         db_base1.set_secret_key(urandom(32))
150 1         db_base2.set_secret_key(None, replace=True)
151 1         db_base2.set_secret_key(urandom(30))
152 1         db_base3.set_secret_key(master_key)
153 1         self.db_bases = [db_base1, db_base2, db_base3]
154
155 1     def test_encrypt_decrypt(self):
156 1         TEST = (
157             ("plain text 1 ! ", None),
158             ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
159         )
160 1         for db_base in self.db_bases:
161 1             for value, salt in TEST:
162                 # no encryption
163 1                 encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
164 1                 self.assertEqual(
165                     encrypted, value, "value '{}' has been encrypted".format(value)
166                 )
167 1                 decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
168 1                 self.assertEqual(
169                     decrypted, value, "value '{}' has been decrypted".format(value)
170                 )
171
172                 # encrypt/decrypt
173 1                 encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
174 1                 self.assertNotEqual(
175                     encrypted, value, "value '{}' has not been encrypted".format(value)
176                 )
177 1                 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
178 1                 decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
179 1                 self.assertEqual(
180                     decrypted, value, "value is not equal after encryption/decryption"
181                 )
182
183 1     def test_encrypt_decrypt_salt(self):
184 1         value = "value to be encrypted!"
185 1         encrypted = []
186 1         for db_base in self.db_bases:
187 1             for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
188                 # encrypt/decrypt
189 1                 encrypted.append(
190                     db_base.encrypt(value, schema_version="1.1", salt=salt)
191                 )
192 1                 self.assertNotEqual(
193                     encrypted[-1],
194                     value,
195                     "value '{}' has not been encrypted".format(value),
196                 )
197 1                 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
198 1                 decrypted = db_base.decrypt(
199                     encrypted[-1], schema_version="1.1", salt=salt
200                 )
201 1                 self.assertEqual(
202                     decrypted, value, "value is not equal after encryption/decryption"
203                 )
204 1         for i in range(0, len(encrypted)):
205 1             for j in range(i + 1, len(encrypted)):
206 1                 self.assertNotEqual(
207                     encrypted[i],
208                     encrypted[j],
209                     "encryption with different salt must contain different result",
210                 )
211         # decrypt with a different master key
212 1         try:
213 1             decrypted = self.db_bases[-1].decrypt(
214                 encrypted[0], schema_version="1.1", salt=None
215             )
216 0             self.assertNotEqual(
217                 encrypted[0],
218                 decrypted,
219                 "Decryption with different KEY must generate different result",
220             )
221 1         except DbException as e:
222 1             self.assertEqual(
223                 e.http_code,
224                 HTTPStatus.INTERNAL_SERVER_ERROR,
225                 "Decryption with different KEY does not provide expected http_code",
226             )
227
228
229 1 class AsyncMock(MagicMock):
230 1     async def __call__(self, *args, **kwargs):
231 1         args = deepcopy(args)
232 1         kwargs = deepcopy(kwargs)
233 1         return super(AsyncMock, self).__call__(*args, **kwargs)
234
235
236 1 class CopyingMock(MagicMock):
237 1     def __call__(self, *args, **kwargs):
238 1         args = deepcopy(args)
239 1         kwargs = deepcopy(kwargs)
240 1         return super(CopyingMock, self).__call__(*args, **kwargs)
241
242
243 1 def check_if_assert_not_called(mocks: list):
244 1     for mocking in mocks:
245 1         mocking.assert_not_called()
246
247
248 1 class TestBaseEncryption(unittest.TestCase):
249 1     @patch("logging.getLogger", autospec=True)
250 1     def setUp(self, mock_logger):
251 1         mock_logger = logging.getLogger()
252 1         mock_logger.disabled = True
253 1         self.db_base = DbBase()
254 1         self.mock_cipher = CopyingMock()
255 1         self.db_base.encoding_type = encoding_type
256 1         self.db_base.encrypt_mode = encyrpt_mode
257 1         self.db_base.secret_key = secret_key
258 1         self.mock_padded_msg = CopyingMock()
259
260 1     def test_pad_data_len_not_multiplication_of_16(self):
261 1         data = "hello word hello hello word hello word"
262 1         data_len = len(data)
263 1         expected_len = 48
264 1         padded = self.db_base.pad_data(data)
265 1         self.assertEqual(len(padded), expected_len)
266 1         self.assertTrue("\0" * (expected_len - data_len) in padded)
267
268 1     def test_pad_data_len_multiplication_of_16(self):
269 1         data = "hello word!!!!!!"
270 1         padded = self.db_base.pad_data(data)
271 1         self.assertEqual(padded, data)
272 1         self.assertFalse("\0" in padded)
273
274 1     def test_pad_data_empty_string(self):
275 1         data = ""
276 1         expected_len = 0
277 1         padded = self.db_base.pad_data(data)
278 1         self.assertEqual(len(padded), expected_len)
279 1         self.assertFalse("\0" in padded)
280
281 1     def test_pad_data_not_string(self):
282 1         data = None
283 1         with self.assertRaises(Exception) as err:
284 1             self.db_base.pad_data(data)
285 1         self.assertEqual(
286             str(err.exception),
287             "database exception Incorrect data type: type(None), string is expected.",
288         )
289
290 1     def test_unpad_data_null_char_at_right(self):
291 1         null_padded_data = "hell0word\0\0"
292 1         expected_length = len(null_padded_data) - 2
293 1         unpadded = self.db_base.unpad_data(null_padded_data)
294 1         self.assertEqual(len(unpadded), expected_length)
295 1         self.assertFalse("\0" in unpadded)
296 1         self.assertTrue("0" in unpadded)
297
298 1     def test_unpad_data_null_char_is_not_rightest(self):
299 1         null_padded_data = "hell0word\r\t\0\n"
300 1         expected_length = len(null_padded_data)
301 1         unpadded = self.db_base.unpad_data(null_padded_data)
302 1         self.assertEqual(len(unpadded), expected_length)
303 1         self.assertTrue("\0" in unpadded)
304
305 1     def test_unpad_data_with_spaces_at_right(self):
306 1         null_padded_data = "  hell0word\0  "
307 1         expected_length = len(null_padded_data)
308 1         unpadded = self.db_base.unpad_data(null_padded_data)
309 1         self.assertEqual(len(unpadded), expected_length)
310 1         self.assertTrue("\0" in unpadded)
311
312 1     def test_unpad_data_empty_string(self):
313 1         data = ""
314 1         unpadded = self.db_base.unpad_data(data)
315 1         self.assertEqual(unpadded, "")
316 1         self.assertFalse("\0" in unpadded)
317
318 1     def test_unpad_data_not_string(self):
319 1         data = None
320 1         with self.assertRaises(Exception) as err:
321 1             self.db_base.unpad_data(data)
322 1         self.assertEqual(
323             str(err.exception),
324             "database exception Incorrect data type: type(None), string is expected.",
325         )
326
327 1     @patch.object(DbBase, "_join_secret_key")
328 1     @patch.object(DbBase, "pad_data")
329 1     def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
330         self, mock_pad_data, mock_join_secret_key
331     ):
332         """schema_version 1.0, secret_key is None and salt is None."""
333 1         schema_version = "1.0"
334 1         salt = None
335 1         self.db_base.secret_key = None
336 1         result = self.db_base._encrypt_value(value, schema_version, salt)
337 1         self.assertEqual(result, value)
338 1         check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
339
340 1     @patch("osm_common.dbbase.b64encode")
341 1     @patch("osm_common.dbbase.AES")
342 1     @patch.object(DbBase, "_join_secret_key")
343 1     @patch.object(DbBase, "pad_data")
344 1     def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
345         self,
346         mock_pad_data,
347         mock_join_secret_key,
348         mock_aes,
349         mock_b64_encode,
350     ):
351         """schema_version 1.1, secret_key exists, salt exists."""
352 1         mock_aes.new.return_value = self.mock_cipher
353 1         self.mock_cipher.encrypt.return_value = data_to_b4_encode
354 1         self.mock_padded_msg.return_value = padded_value
355 1         mock_pad_data.return_value = self.mock_padded_msg
356 1         self.mock_padded_msg.encode.return_value = padded_encoded_value
357
358 1         mock_b64_encode.return_value = encyrpted_bytes
359
360 1         result = self.db_base._encrypt_value(value, schema_version, salt)
361
362 1         self.assertTrue(isinstance(result, str))
363 1         self.assertEqual(result, encyrpted_value)
364 1         mock_join_secret_key.assert_called_once_with(salt)
365 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
366 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
367 1         mock_pad_data.assert_called_once_with(value)
368 1         mock_b64_encode.assert_called_once_with(data_to_b4_encode)
369 1         self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
370 1         self.mock_padded_msg.encode.assert_called_with(encoding_type)
371
372 1     @patch.object(DbBase, "_join_secret_key")
373 1     @patch.object(DbBase, "pad_data")
374 1     def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
375         self, mock_pad_data, mock_join_secret_key
376     ):
377         """schema_version 1.0, secret_key is None, salt exists."""
378 1         schema_version = "1.0"
379 1         self.db_base.secret_key = None
380 1         result = self.db_base._encrypt_value(value, schema_version, salt)
381 1         self.assertEqual(result, value)
382 1         check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
383
384 1     @patch.object(DbBase, "_join_secret_key")
385 1     @patch.object(DbBase, "pad_data")
386 1     def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
387         self, mock_pad_data, mock_join_secret_key
388     ):
389         """schema_version 1.1, secret_key is None, salt exists."""
390 1         self.db_base.secret_key = None
391 1         result = self.db_base._encrypt_value(value, schema_version, salt)
392 1         self.assertEqual(result, value)
393 1         check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
394
395 1     @patch("osm_common.dbbase.b64encode")
396 1     @patch("osm_common.dbbase.AES")
397 1     @patch.object(DbBase, "_join_secret_key")
398 1     @patch.object(DbBase, "pad_data")
399 1     def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
400         self,
401         mock_pad_data,
402         mock_join_secret_key,
403         mock_aes,
404         mock_b64_encode,
405     ):
406         """schema_version 1.1, secret_key exists, salt is None."""
407 1         salt = None
408 1         mock_aes.new.return_value = self.mock_cipher
409 1         self.mock_cipher.encrypt.return_value = data_to_b4_encode
410
411 1         self.mock_padded_msg.return_value = padded_value
412 1         mock_pad_data.return_value = self.mock_padded_msg
413 1         self.mock_padded_msg.encode.return_value = padded_encoded_value
414
415 1         mock_b64_encode.return_value = encyrpted_bytes
416
417 1         result = self.db_base._encrypt_value(value, schema_version, salt)
418
419 1         self.assertEqual(result, encyrpted_value)
420 1         mock_join_secret_key.assert_called_once_with(salt)
421 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
422 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
423 1         mock_pad_data.assert_called_once_with(value)
424 1         mock_b64_encode.assert_called_once_with(data_to_b4_encode)
425 1         self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
426 1         self.mock_padded_msg.encode.assert_called_with(encoding_type)
427
428 1     @patch("osm_common.dbbase.b64encode")
429 1     @patch("osm_common.dbbase.AES")
430 1     @patch.object(DbBase, "_join_secret_key")
431 1     @patch.object(DbBase, "pad_data")
432 1     def test__encrypt_value_invalid_encrpt_mode(
433         self,
434         mock_pad_data,
435         mock_join_secret_key,
436         mock_aes,
437         mock_b64_encode,
438     ):
439         """encrypt_mode is invalid."""
440 1         mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
441 1         self.db_base.encrypt_mode = "AES.MODE_XXX"
442
443 1         with self.assertRaises(Exception) as err:
444 1             self.db_base._encrypt_value(value, schema_version, salt)
445
446 1         self.assertEqual(str(err.exception), "Invalid ciphering mode.")
447 1         mock_join_secret_key.assert_called_once_with(salt)
448 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
449 1         self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
450 1         check_if_assert_not_called([mock_pad_data, mock_b64_encode])
451
452 1     @patch("osm_common.dbbase.b64encode")
453 1     @patch("osm_common.dbbase.AES")
454 1     @patch.object(DbBase, "_join_secret_key")
455 1     @patch.object(DbBase, "pad_data")
456 1     def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
457         self,
458         mock_pad_data,
459         mock_join_secret_key,
460         mock_aes,
461         mock_b64_encode,
462     ):
463         """schema_version 1.1, secret_key exists, value is None."""
464 1         value = None
465 1         mock_aes.new.return_value = self.mock_cipher
466 1         mock_pad_data.side_effect = DbException(
467             "Incorrect data type: type(None), string is expected."
468         )
469
470 1         with self.assertRaises(Exception) as err:
471 1             self.db_base._encrypt_value(value, schema_version, salt)
472 1         self.assertEqual(
473             str(err.exception),
474             "database exception Incorrect data type: type(None), string is expected.",
475         )
476
477 1         mock_join_secret_key.assert_called_once_with(salt)
478 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
479 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
480 1         mock_pad_data.assert_called_once_with(value)
481 1         check_if_assert_not_called(
482             [mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode]
483         )
484
485 1     @patch("osm_common.dbbase.b64encode")
486 1     @patch("osm_common.dbbase.AES")
487 1     @patch.object(DbBase, "_join_secret_key")
488 1     @patch.object(DbBase, "pad_data")
489 1     def test__encrypt_value_join_secret_key_raises(
490         self,
491         mock_pad_data,
492         mock_join_secret_key,
493         mock_aes,
494         mock_b64_encode,
495     ):
496         """Method join_secret_key raises DbException."""
497 1         salt = b"3434o34-3wewrwr-222424-2242dwew"
498
499 1         mock_join_secret_key.side_effect = DbException("Unexpected type")
500
501 1         mock_aes.new.return_value = self.mock_cipher
502
503 1         with self.assertRaises(Exception) as err:
504 1             self.db_base._encrypt_value(value, schema_version, salt)
505
506 1         self.assertEqual(str(err.exception), "database exception Unexpected type")
507 1         check_if_assert_not_called(
508             [mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt]
509         )
510 1         mock_join_secret_key.assert_called_once_with(salt)
511
512 1     @patch("osm_common.dbbase.b64encode")
513 1     @patch("osm_common.dbbase.AES")
514 1     @patch.object(DbBase, "_join_secret_key")
515 1     @patch.object(DbBase, "pad_data")
516 1     def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
517         self,
518         mock_pad_data,
519         mock_join_secret_key,
520         mock_aes,
521         mock_b64_encode,
522     ):
523         """schema_version 1.1, secret_key exists, b64encode raises TypeError."""
524 1         mock_aes.new.return_value = self.mock_cipher
525 1         self.mock_cipher.encrypt.return_value = "encrypted data"
526
527 1         self.mock_padded_msg.return_value = padded_value
528 1         mock_pad_data.return_value = self.mock_padded_msg
529 1         self.mock_padded_msg.encode.return_value = padded_encoded_value
530
531 1         mock_b64_encode.side_effect = TypeError(
532             "A bytes-like object is required, not 'str'"
533         )
534
535 1         with self.assertRaises(Exception) as error:
536 1             self.db_base._encrypt_value(value, schema_version, salt)
537 1         self.assertEqual(
538             str(error.exception), "A bytes-like object is required, not 'str'"
539         )
540 1         mock_join_secret_key.assert_called_once_with(salt)
541 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
542 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
543 1         mock_pad_data.assert_called_once_with(value)
544 1         self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
545 1         self.mock_padded_msg.encode.assert_called_with(encoding_type)
546 1         mock_b64_encode.assert_called_once_with("encrypted data")
547
548 1     @patch("osm_common.dbbase.b64encode")
549 1     @patch("osm_common.dbbase.AES")
550 1     @patch.object(DbBase, "_join_secret_key")
551 1     @patch.object(DbBase, "pad_data")
552 1     def test__encrypt_value_cipher_encrypt_raises(
553         self,
554         mock_pad_data,
555         mock_join_secret_key,
556         mock_aes,
557         mock_b64_encode,
558     ):
559         """AES encrypt method raises Exception."""
560 1         mock_aes.new.return_value = self.mock_cipher
561 1         self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.")
562
563 1         self.mock_padded_msg.return_value = padded_value
564 1         mock_pad_data.return_value = self.mock_padded_msg
565 1         self.mock_padded_msg.encode.return_value = padded_encoded_value
566
567 1         with self.assertRaises(Exception) as error:
568 1             self.db_base._encrypt_value(value, schema_version, salt)
569
570 1         self.assertEqual(str(error.exception), "Invalid data type.")
571 1         mock_join_secret_key.assert_called_once_with(salt)
572 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
573 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
574 1         mock_pad_data.assert_called_once_with(value)
575 1         self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
576 1         self.mock_padded_msg.encode.assert_called_with(encoding_type)
577 1         mock_b64_encode.assert_not_called()
578
579 1     @patch.object(DbBase, "get_secret_key")
580 1     @patch.object(DbBase, "_encrypt_value")
581 1     def test_encrypt_without_schema_version_without_salt(
582         self, mock_encrypt_value, mock_get_secret_key
583     ):
584         """schema and salt is None."""
585 1         mock_encrypt_value.return_value = encyrpted_value
586 1         result = self.db_base.encrypt(value)
587 1         mock_encrypt_value.assert_called_once_with(value, None, None)
588 1         mock_get_secret_key.assert_called_once()
589 1         self.assertEqual(result, encyrpted_value)
590
591 1     @patch.object(DbBase, "get_secret_key")
592 1     @patch.object(DbBase, "_encrypt_value")
593 1     def test_encrypt_with_schema_version_with_salt(
594         self, mock_encrypt_value, mock_get_secret_key
595     ):
596         """schema version exists, salt is None."""
597 1         mock_encrypt_value.return_value = encyrpted_value
598 1         result = self.db_base.encrypt(value, schema_version, salt)
599 1         mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
600 1         mock_get_secret_key.assert_called_once()
601 1         self.assertEqual(result, encyrpted_value)
602
603 1     @patch.object(DbBase, "get_secret_key")
604 1     @patch.object(DbBase, "_encrypt_value")
605 1     def test_encrypt_get_secret_key_raises(
606         self, mock_encrypt_value, mock_get_secret_key
607     ):
608         """get_secret_key method raises DbException."""
609 1         mock_get_secret_key.side_effect = DbException("KeyError")
610 1         with self.assertRaises(Exception) as error:
611 1             self.db_base.encrypt(value)
612 1         self.assertEqual(str(error.exception), "database exception KeyError")
613 1         mock_encrypt_value.assert_not_called()
614 1         mock_get_secret_key.assert_called_once()
615
616 1     @patch.object(DbBase, "get_secret_key")
617 1     @patch.object(DbBase, "_encrypt_value")
618 1     def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
619         """_encrypt method raises DbException."""
620 1         mock_encrypt_value.side_effect = DbException(
621             "Incorrect data type: type(None), string is expected."
622         )
623 1         with self.assertRaises(Exception) as error:
624 1             self.db_base.encrypt(value, schema_version, salt)
625 1         self.assertEqual(
626             str(error.exception),
627             "database exception Incorrect data type: type(None), string is expected.",
628         )
629 1         mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
630 1         mock_get_secret_key.assert_called_once()
631
632 1     @patch("osm_common.dbbase.b64decode")
633 1     @patch("osm_common.dbbase.AES")
634 1     @patch.object(DbBase, "_join_secret_key")
635 1     @patch.object(DbBase, "unpad_data")
636 1     def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
637         self,
638         mock_unpad_data,
639         mock_join_secret_key,
640         mock_aes,
641         mock_b64_decode,
642     ):
643         """schema_version 1.1, secret_key exists, salt is None."""
644 1         salt = None
645 1         mock_aes.new.return_value = self.mock_cipher
646 1         self.mock_cipher.decrypt.return_value = padded_encoded_value
647
648 1         mock_b64_decode.return_value = b64_decoded
649
650 1         mock_unpad_data.return_value = value
651
652 1         result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
653 1         self.assertEqual(result, value)
654
655 1         mock_join_secret_key.assert_called_once_with(salt)
656 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
657 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
658 1         mock_unpad_data.assert_called_once_with("private key txt\0")
659 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
660 1         self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
661
662 1     @patch("osm_common.dbbase.b64decode")
663 1     @patch("osm_common.dbbase.AES")
664 1     @patch.object(DbBase, "_join_secret_key")
665 1     @patch.object(DbBase, "unpad_data")
666 1     def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
667         self,
668         mock_unpad_data,
669         mock_join_secret_key,
670         mock_aes,
671         mock_b64_decode,
672     ):
673         """schema_version 1.1, secret_key exists, salt is None."""
674 1         mock_aes.new.return_value = self.mock_cipher
675 1         self.mock_cipher.decrypt.return_value = padded_encoded_value
676
677 1         mock_b64_decode.return_value = b64_decoded
678
679 1         mock_unpad_data.return_value = value
680
681 1         result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
682 1         self.assertEqual(result, value)
683
684 1         mock_join_secret_key.assert_called_once_with(salt)
685 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
686 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
687 1         mock_unpad_data.assert_called_once_with("private key txt\0")
688 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
689 1         self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
690
691 1     @patch("osm_common.dbbase.b64decode")
692 1     @patch("osm_common.dbbase.AES")
693 1     @patch.object(DbBase, "_join_secret_key")
694 1     @patch.object(DbBase, "unpad_data")
695 1     def test__decrypt_value_schema_version_1_1_without_secret_key(
696         self,
697         mock_unpad_data,
698         mock_join_secret_key,
699         mock_aes,
700         mock_b64_decode,
701     ):
702         """schema_version 1.1, secret_key is None, salt exists."""
703 1         self.db_base.secret_key = None
704
705 1         result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
706
707 1         self.assertEqual(result, encyrpted_value)
708 1         check_if_assert_not_called(
709             [
710                 mock_join_secret_key,
711                 mock_aes.new,
712                 mock_unpad_data,
713                 mock_b64_decode,
714                 self.mock_cipher.decrypt,
715             ]
716         )
717
718 1     @patch("osm_common.dbbase.b64decode")
719 1     @patch("osm_common.dbbase.AES")
720 1     @patch.object(DbBase, "_join_secret_key")
721 1     @patch.object(DbBase, "unpad_data")
722 1     def test__decrypt_value_schema_version_1_0_with_secret_key(
723         self,
724         mock_unpad_data,
725         mock_join_secret_key,
726         mock_aes,
727         mock_b64_decode,
728     ):
729         """schema_version 1.0, secret_key exists, salt exists."""
730 1         schema_version = "1.0"
731 1         result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
732
733 1         self.assertEqual(result, encyrpted_value)
734 1         check_if_assert_not_called(
735             [
736                 mock_join_secret_key,
737                 mock_aes.new,
738                 mock_unpad_data,
739                 mock_b64_decode,
740                 self.mock_cipher.decrypt,
741             ]
742         )
743
744 1     @patch("osm_common.dbbase.b64decode")
745 1     @patch("osm_common.dbbase.AES")
746 1     @patch.object(DbBase, "_join_secret_key")
747 1     @patch.object(DbBase, "unpad_data")
748 1     def test__decrypt_value_join_secret_key_raises(
749         self,
750         mock_unpad_data,
751         mock_join_secret_key,
752         mock_aes,
753         mock_b64_decode,
754     ):
755         """_join_secret_key raises TypeError."""
756 1         salt = object()
757 1         mock_join_secret_key.side_effect = TypeError("'type' object is not iterable")
758
759 1         with self.assertRaises(Exception) as error:
760 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
761 1         self.assertEqual(str(error.exception), "'type' object is not iterable")
762
763 1         mock_join_secret_key.assert_called_once_with(salt)
764 1         check_if_assert_not_called(
765             [mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt]
766         )
767
768 1     @patch("osm_common.dbbase.b64decode")
769 1     @patch("osm_common.dbbase.AES")
770 1     @patch.object(DbBase, "_join_secret_key")
771 1     @patch.object(DbBase, "unpad_data")
772 1     def test__decrypt_value_b64decode_raises(
773         self,
774         mock_unpad_data,
775         mock_join_secret_key,
776         mock_aes,
777         mock_b64_decode,
778     ):
779         """b64decode raises TypeError."""
780 1         mock_b64_decode.side_effect = TypeError(
781             "A str-like object is required, not 'bytes'"
782         )
783 1         with self.assertRaises(Exception) as error:
784 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
785 1         self.assertEqual(
786             str(error.exception), "A str-like object is required, not 'bytes'"
787         )
788
789 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
790 1         mock_join_secret_key.assert_called_once_with(salt)
791 1         check_if_assert_not_called(
792             [mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data]
793         )
794
795 1     @patch("osm_common.dbbase.b64decode")
796 1     @patch("osm_common.dbbase.AES")
797 1     @patch.object(DbBase, "_join_secret_key")
798 1     @patch.object(DbBase, "unpad_data")
799 1     def test__decrypt_value_invalid_encrypt_mode(
800         self,
801         mock_unpad_data,
802         mock_join_secret_key,
803         mock_aes,
804         mock_b64_decode,
805     ):
806         """Invalid AES encrypt mode."""
807 1         mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
808 1         self.db_base.encrypt_mode = "AES.MODE_XXX"
809
810 1         mock_b64_decode.return_value = b64_decoded
811 1         with self.assertRaises(Exception) as error:
812 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
813
814 1         self.assertEqual(str(error.exception), "Invalid ciphering mode.")
815 1         mock_join_secret_key.assert_called_once_with(salt)
816 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
817 1         self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
818 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
819 1         check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt])
820
821 1     @patch("osm_common.dbbase.b64decode")
822 1     @patch("osm_common.dbbase.AES")
823 1     @patch.object(DbBase, "_join_secret_key")
824 1     @patch.object(DbBase, "unpad_data")
825 1     def test__decrypt_value_cipher_decrypt_raises(
826         self,
827         mock_unpad_data,
828         mock_join_secret_key,
829         mock_aes,
830         mock_b64_decode,
831     ):
832         """AES decrypt raises Exception."""
833 1         mock_b64_decode.return_value = b64_decoded
834
835 1         mock_aes.new.return_value = self.mock_cipher
836 1         self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.")
837
838 1         with self.assertRaises(Exception) as error:
839 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
840 1         self.assertEqual(str(error.exception), "Invalid data type.")
841
842 1         mock_join_secret_key.assert_called_once_with(salt)
843 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
844 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
845 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
846 1         self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
847 1         mock_unpad_data.assert_not_called()
848
849 1     @patch("osm_common.dbbase.b64decode")
850 1     @patch("osm_common.dbbase.AES")
851 1     @patch.object(DbBase, "_join_secret_key")
852 1     @patch.object(DbBase, "unpad_data")
853 1     def test__decrypt_value_decode_raises(
854         self,
855         mock_unpad_data,
856         mock_join_secret_key,
857         mock_aes,
858         mock_b64_decode,
859     ):
860         """Decode raises UnicodeDecodeError."""
861 1         mock_aes.new.return_value = self.mock_cipher
862 1         self.mock_cipher.decrypt.return_value = b"\xd0\x000091"
863
864 1         mock_b64_decode.return_value = b64_decoded
865
866 1         mock_unpad_data.return_value = value
867 1         with self.assertRaises(Exception) as error:
868 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
869 1         self.assertEqual(
870             str(error.exception),
871             "database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
872         )
873 1         self.assertEqual(type(error.exception), DbException)
874 1         mock_join_secret_key.assert_called_once_with(salt)
875 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
876 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
877 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
878 1         self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
879 1         mock_unpad_data.assert_not_called()
880
881 1     @patch("osm_common.dbbase.b64decode")
882 1     @patch("osm_common.dbbase.AES")
883 1     @patch.object(DbBase, "_join_secret_key")
884 1     @patch.object(DbBase, "unpad_data")
885 1     def test__decrypt_value_unpad_data_raises(
886         self,
887         mock_unpad_data,
888         mock_join_secret_key,
889         mock_aes,
890         mock_b64_decode,
891     ):
892         """Method unpad_data raises error."""
893 1         mock_decrypted_message = MagicMock()
894 1         mock_decrypted_message.decode.return_value = None
895 1         mock_aes.new.return_value = self.mock_cipher
896 1         self.mock_cipher.decrypt.return_value = mock_decrypted_message
897 1         mock_unpad_data.side_effect = DbException(
898             "Incorrect data type: type(None), string is expected."
899         )
900 1         mock_b64_decode.return_value = b64_decoded
901
902 1         with self.assertRaises(Exception) as error:
903 1             self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
904 1         self.assertEqual(
905             str(error.exception),
906             "database exception Incorrect data type: type(None), string is expected.",
907         )
908 1         self.assertEqual(type(error.exception), DbException)
909 1         mock_join_secret_key.assert_called_once_with(salt)
910 1         _call_mock_aes_new = mock_aes.new.call_args_list[0].args
911 1         self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
912 1         mock_b64_decode.assert_called_once_with(encyrpted_value)
913 1         self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
914 1         mock_decrypted_message.decode.assert_called_once_with(
915             self.db_base.encoding_type
916         )
917 1         mock_unpad_data.assert_called_once_with(None)
918
919 1     @patch.object(DbBase, "get_secret_key")
920 1     @patch.object(DbBase, "_decrypt_value")
921 1     def test_decrypt_without_schema_version_without_salt(
922         self, mock_decrypt_value, mock_get_secret_key
923     ):
924         """schema_version is None, salt is None."""
925 1         mock_decrypt_value.return_value = encyrpted_value
926 1         result = self.db_base.decrypt(value)
927 1         mock_decrypt_value.assert_called_once_with(value, None, None)
928 1         mock_get_secret_key.assert_called_once()
929 1         self.assertEqual(result, encyrpted_value)
930
931 1     @patch.object(DbBase, "get_secret_key")
932 1     @patch.object(DbBase, "_decrypt_value")
933 1     def test_decrypt_with_schema_version_with_salt(
934         self, mock_decrypt_value, mock_get_secret_key
935     ):
936         """schema_version and salt exist."""
937 1         mock_decrypt_value.return_value = encyrpted_value
938 1         result = self.db_base.decrypt(value, schema_version, salt)
939 1         mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
940 1         mock_get_secret_key.assert_called_once()
941 1         self.assertEqual(result, encyrpted_value)
942
943 1     @patch.object(DbBase, "get_secret_key")
944 1     @patch.object(DbBase, "_decrypt_value")
945 1     def test_decrypt_get_secret_key_raises(
946         self, mock_decrypt_value, mock_get_secret_key
947     ):
948         """Method get_secret_key raises KeyError."""
949 1         mock_get_secret_key.side_effect = DbException("KeyError")
950 1         with self.assertRaises(Exception) as error:
951 1             self.db_base.decrypt(value)
952 1         self.assertEqual(str(error.exception), "database exception KeyError")
953 1         mock_decrypt_value.assert_not_called()
954 1         mock_get_secret_key.assert_called_once()
955
956 1     @patch.object(DbBase, "get_secret_key")
957 1     @patch.object(DbBase, "_decrypt_value")
958 1     def test_decrypt_decrypt_value_raises(
959         self, mock_decrypt_value, mock_get_secret_key
960     ):
961         """Method _decrypt raises error."""
962 1         mock_decrypt_value.side_effect = DbException(
963             "Incorrect data type: type(None), string is expected."
964         )
965 1         with self.assertRaises(Exception) as error:
966 1             self.db_base.decrypt(value, schema_version, salt)
967 1         self.assertEqual(
968             str(error.exception),
969             "database exception Incorrect data type: type(None), string is expected.",
970         )
971 1         mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
972 1         mock_get_secret_key.assert_called_once()
973
974 1     def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
975         """Encrypt and decrypt with schema version 1.1, salt exists."""
976 1         encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
977 1         decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
978 1         self.assertEqual(value, decrypted_msg)
979
980 1     def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
981         """Encrypt and decrypt with schema version 1.0, salt exists."""
982 1         schema_version = "1.0"
983 1         encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
984 1         decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
985 1         self.assertEqual(value, decrypted_msg)
986
987 1     def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
988         """Encrypt and decrypt with schema version 1.1 and without salt."""
989 1         salt = None
990 1         encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
991 1         decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
992 1         self.assertEqual(value, decrypted_msg)
993
994
995 1 class TestAsyncEncryption(unittest.TestCase):
996 1     @patch("logging.getLogger", autospec=True)
997 1     def setUp(self, mock_logger):
998 1         mock_logger = logging.getLogger()
999 1         mock_logger.disabled = True
1000 1         self.encryption = Encryption(uri="uri", config={})
1001 1         self.encryption.encoding_type = encoding_type
1002 1         self.encryption.encrypt_mode = encyrpt_mode
1003 1         self.encryption._secret_key = secret_key
1004 1         self.admin_collection = Mock()
1005 1         self.admin_collection.find_one = AsyncMock()
1006 1         self.encryption._client = {
1007             "osm": {
1008                 "admin": self.admin_collection,
1009             }
1010         }
1011
1012 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1013 1     def test_decrypt_fields_with_item_with_fields(self, mock_decrypt):
1014         """item and fields exist."""
1015 1         mock_decrypt.side_effect = [decrypted_val1, decrypted_val2]
1016 1         input_item = copy.deepcopy(item)
1017 1         expected_item = {
1018             "secret": decrypted_val1,
1019             "cacert": decrypted_val2,
1020             "path": "/var",
1021             "ip": "192.168.12.23",
1022         }
1023 1         fields = ["secret", "cacert"]
1024
1025 1         asyncio.run(
1026             self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1027         )
1028 1         self.assertEqual(input_item, expected_item)
1029 1         _call_mock_decrypt = mock_decrypt.call_args_list
1030 1         self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1031 1         self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt))
1032
1033 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1034 1     def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt):
1035         """item is empty and fields exists."""
1036 1         input_item = {}
1037 1         fields = ["secret", "cacert"]
1038 1         asyncio.run(
1039             self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1040         )
1041 1         self.assertEqual(input_item, {})
1042 1         mock_decrypt.assert_not_called()
1043
1044 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1045 1     def test_decrypt_fields_with_item_without_fields(self, mock_decrypt):
1046         """item exists and fields is empty."""
1047 1         input_item = copy.deepcopy(item)
1048 1         fields = []
1049 1         asyncio.run(
1050             self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1051         )
1052 1         self.assertEqual(input_item, item)
1053 1         mock_decrypt.assert_not_called()
1054
1055 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1056 1     def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt):
1057         """item exists and field has single value."""
1058 1         mock_decrypt.return_value = decrypted_val1
1059 1         fields = ["secret"]
1060 1         input_item = copy.deepcopy(item)
1061 1         expected_item = {
1062             "secret": decrypted_val1,
1063             "cacert": "mycacert",
1064             "path": "/var",
1065             "ip": "192.168.12.23",
1066         }
1067 1         asyncio.run(
1068             self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1069         )
1070 1         self.assertEqual(input_item, expected_item)
1071 1         _call_mock_decrypt = mock_decrypt.call_args_list
1072 1         self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1073
1074 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1075 1     def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
1076         self, mock_decrypt
1077     ):
1078         """item exists and field has single value, salt is None, schema version is 1.0."""
1079 1         schema_version = "1.0"
1080 1         salt = None
1081 1         mock_decrypt.return_value = "mysecret"
1082 1         input_item = copy.deepcopy(item)
1083 1         fields = ["secret"]
1084 1         asyncio.run(
1085             self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1086         )
1087 1         self.assertEqual(input_item, item)
1088 1         _call_mock_decrypt = mock_decrypt.call_args_list
1089 1         self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None))
1090
1091 1     @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1092 1     def test_decrypt_fields_decrypt_raises(self, mock_decrypt):
1093         """Method decrypt raises error."""
1094 1         mock_decrypt.side_effect = DbException(
1095             "Incorrect data type: type(None), string is expected."
1096         )
1097 1         fields = ["secret"]
1098 1         input_item = copy.deepcopy(item)
1099 1         with self.assertRaises(Exception) as error:
1100 1             asyncio.run(
1101                 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1102             )
1103 1         self.assertEqual(
1104             str(error.exception),
1105             "database exception Incorrect data type: type(None), string is expected.",
1106         )
1107 1         self.assertEqual(input_item, item)
1108 1         _call_mock_decrypt = mock_decrypt.call_args_list
1109 1         self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1110
1111 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1112 1     @patch.object(Encryption, "_encrypt_value")
1113 1     def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
1114         """Method decrypt raises error."""
1115 1         mock_encrypt_value.return_value = encyrpted_value
1116 1         result = asyncio.run(self.encryption.encrypt(value, schema_version, salt))
1117 1         self.assertEqual(result, encyrpted_value)
1118 1         mock_get_secret_key.assert_called_once()
1119 1         mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1120
1121 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1122 1     @patch.object(Encryption, "_encrypt_value")
1123 1     def test_encrypt_get_secret_key_raises(
1124         self, mock_encrypt_value, mock_get_secret_key
1125     ):
1126         """Method get_secret_key raises error."""
1127 1         mock_get_secret_key.side_effect = DbException("Unexpected type.")
1128 1         with self.assertRaises(Exception) as error:
1129 1             asyncio.run(self.encryption.encrypt(value, schema_version, salt))
1130 1         self.assertEqual(str(error.exception), "database exception Unexpected type.")
1131 1         mock_get_secret_key.assert_called_once()
1132 1         mock_encrypt_value.assert_not_called()
1133
1134 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1135 1     @patch.object(Encryption, "_encrypt_value")
1136 1     def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
1137         """Method _encrypt raises error."""
1138 1         mock_encrypt_value.side_effect = TypeError(
1139             "A bytes-like object is required, not 'str'"
1140         )
1141 1         with self.assertRaises(Exception) as error:
1142 1             asyncio.run(self.encryption.encrypt(value, schema_version, salt))
1143 1         self.assertEqual(
1144             str(error.exception), "A bytes-like object is required, not 'str'"
1145         )
1146 1         mock_get_secret_key.assert_called_once()
1147 1         mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1148
1149 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1150 1     @patch.object(Encryption, "_decrypt_value")
1151 1     def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
1152         """Decrypted successfully."""
1153 1         mock_decrypt_value.return_value = value
1154 1         result = asyncio.run(
1155             self.encryption.decrypt(encyrpted_value, schema_version, salt)
1156         )
1157 1         self.assertEqual(result, value)
1158 1         mock_get_secret_key.assert_called_once()
1159 1         mock_decrypt_value.assert_called_once_with(
1160             encyrpted_value, schema_version, salt
1161         )
1162
1163 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1164 1     @patch.object(Encryption, "_decrypt_value")
1165 1     def test_decrypt_get_secret_key_raises(
1166         self, mock_decrypt_value, mock_get_secret_key
1167     ):
1168         """Method get_secret_key raises error."""
1169 1         mock_get_secret_key.side_effect = DbException("Unexpected type.")
1170 1         with self.assertRaises(Exception) as error:
1171 1             asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
1172 1         self.assertEqual(str(error.exception), "database exception Unexpected type.")
1173 1         mock_get_secret_key.assert_called_once()
1174 1         mock_decrypt_value.assert_not_called()
1175
1176 1     @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1177 1     @patch.object(Encryption, "_decrypt_value")
1178 1     def test_decrypt_decrypt_value_raises(
1179         self, mock_decrypt_value, mock_get_secret_key
1180     ):
1181         """Method get_secret_key raises error."""
1182 1         mock_decrypt_value.side_effect = TypeError(
1183             "A bytes-like object is required, not 'str'"
1184         )
1185 1         with self.assertRaises(Exception) as error:
1186 1             asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
1187 1         self.assertEqual(
1188             str(error.exception), "A bytes-like object is required, not 'str'"
1189         )
1190 1         mock_get_secret_key.assert_called_once()
1191 1         mock_decrypt_value.assert_called_once_with(
1192             encyrpted_value, schema_version, salt
1193         )
1194
1195 1     def test_join_keys_string_key(self):
1196         """key is string."""
1197 1         string_key = "sample key"
1198 1         result = self.encryption._join_keys(string_key, secret_key)
1199 1         self.assertEqual(result, joined_key)
1200 1         self.assertTrue(isinstance(result, bytes))
1201
1202 1     def test_join_keys_bytes_key(self):
1203         """key is bytes."""
1204 1         bytes_key = b"sample key"
1205 1         result = self.encryption._join_keys(bytes_key, secret_key)
1206 1         self.assertEqual(result, joined_key)
1207 1         self.assertTrue(isinstance(result, bytes))
1208 1         self.assertEqual(len(result.decode("unicode_escape")), 32)
1209
1210 1     def test_join_keys_int_key(self):
1211         """key is int."""
1212 1         int_key = 923
1213 1         with self.assertRaises(Exception) as error:
1214 1             self.encryption._join_keys(int_key, None)
1215 1         self.assertEqual(str(error.exception), "'int' object is not iterable")
1216
1217 1     def test_join_keys_none_secret_key(self):
1218         """key is as bytes and secret key is None."""
1219 1         bytes_key = b"sample key"
1220 1         result = self.encryption._join_keys(bytes_key, None)
1221 1         self.assertEqual(
1222             result,
1223             b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
1224         )
1225 1         self.assertTrue(isinstance(result, bytes))
1226 1         self.assertEqual(len(result.decode("unicode_escape")), 32)
1227
1228 1     def test_join_keys_none_key_none_secret_key(self):
1229         """key is None and secret key is None."""
1230 1         with self.assertRaises(Exception) as error:
1231 1             self.encryption._join_keys(None, None)
1232 1         self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1233
1234 1     def test_join_keys_none_key(self):
1235         """key is None and secret key exists."""
1236 1         with self.assertRaises(Exception) as error:
1237 1             self.encryption._join_keys(None, secret_key)
1238 1         self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1239
1240 1     @patch.object(Encryption, "_join_keys")
1241 1     def test_join_secret_key_string_sample_key(self, mock_join_keys):
1242         """key is None and secret key exists as string."""
1243 1         update_key = "sample key"
1244 1         mock_join_keys.return_value = joined_key
1245 1         result = self.encryption._join_secret_key(update_key)
1246 1         self.assertEqual(result, joined_key)
1247 1         self.assertTrue(isinstance(result, bytes))
1248 1         mock_join_keys.assert_called_once_with(update_key, secret_key)
1249
1250 1     @patch.object(Encryption, "_join_keys")
1251 1     def test_join_secret_key_byte_sample_key(self, mock_join_keys):
1252         """key is None and secret key exists as bytes."""
1253 1         update_key = b"sample key"
1254 1         mock_join_keys.return_value = joined_key
1255 1         result = self.encryption._join_secret_key(update_key)
1256 1         self.assertEqual(result, joined_key)
1257 1         self.assertTrue(isinstance(result, bytes))
1258 1         mock_join_keys.assert_called_once_with(update_key, secret_key)
1259
1260 1     @patch.object(Encryption, "_join_keys")
1261 1     def test_join_secret_key_join_keys_raises(self, mock_join_keys):
1262         """Method _join_secret_key raises."""
1263 1         update_key = 3434
1264 1         mock_join_keys.side_effect = TypeError("'int' object is not iterable")
1265 1         with self.assertRaises(Exception) as error:
1266 1             self.encryption._join_secret_key(update_key)
1267 1         self.assertEqual(str(error.exception), "'int' object is not iterable")
1268 1         mock_join_keys.assert_called_once_with(update_key, secret_key)
1269
1270 1     @patch.object(Encryption, "_join_keys")
1271 1     def test_get_secret_key_exists(self, mock_join_keys):
1272         """secret_key exists."""
1273 1         self.encryption._secret_key = secret_key
1274 1         asyncio.run(self.encryption.get_secret_key())
1275 1         self.assertEqual(self.encryption.secret_key, secret_key)
1276 1         mock_join_keys.assert_not_called()
1277
1278 1     @patch.object(Encryption, "_join_keys")
1279 1     @patch("osm_common.dbbase.b64decode")
1280 1     def test_get_secret_key_not_exist_database_key_exist(
1281         self, mock_b64decode, mock_join_keys
1282     ):
1283         """secret_key does not exist, database key exists."""
1284 1         self.encryption._secret_key = None
1285 1         self.encryption._admin_collection.find_one.return_value = None
1286 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1287 1         mock_join_keys.return_value = joined_key
1288 1         asyncio.run(self.encryption.get_secret_key())
1289 1         self.assertEqual(self.encryption.secret_key, joined_key)
1290 1         self.assertEqual(mock_join_keys.call_count, 1)
1291 1         mock_b64decode.assert_not_called()
1292
1293 1     @patch.object(Encryption, "_join_keys")
1294 1     @patch("osm_common.dbbase.b64decode")
1295 1     def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
1296         self, mock_b64decode, mock_join_keys
1297     ):
1298         """secret_key does not exist, database key exists."""
1299 1         self.encryption._secret_key = None
1300 1         self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
1301 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1302 1         mock_join_keys.return_value = joined_key
1303 1         asyncio.run(self.encryption.get_secret_key())
1304 1         self.assertEqual(self.encryption.secret_key, joined_key)
1305 1         self.assertEqual(mock_join_keys.call_count, 1)
1306 1         mock_b64decode.assert_not_called()
1307 1         self.encryption._admin_collection.find_one.assert_called_once_with(
1308             {"_id": "version"}
1309         )
1310 1         _call_mock_join_keys = mock_join_keys.call_args_list
1311 1         self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1312
1313 1     @patch.object(Encryption, "_join_keys")
1314 1     @patch("osm_common.dbbase.b64decode")
1315 1     def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
1316         self, mock_b64decode, mock_join_keys
1317     ):
1318         """secret_key does not exist, database key exists, version and serial exist
1319         in admin collection."""
1320 1         self.encryption._secret_key = None
1321 1         self.encryption._admin_collection.find_one.return_value = {
1322             "version": "1.0",
1323             "serial": serial_bytes,
1324         }
1325 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1326 1         mock_join_keys.side_effect = [secret_key, joined_key]
1327 1         mock_b64decode.return_value = base64_decoded_serial
1328 1         asyncio.run(self.encryption.get_secret_key())
1329 1         self.assertEqual(self.encryption.secret_key, joined_key)
1330 1         self.assertEqual(mock_join_keys.call_count, 2)
1331 1         mock_b64decode.assert_called_once_with(serial_bytes)
1332 1         self.encryption._admin_collection.find_one.assert_called_once_with(
1333             {"_id": "version"}
1334         )
1335 1         _call_mock_join_keys = mock_join_keys.call_args_list
1336 1         self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1337 1         self.assertEqual(
1338             _call_mock_join_keys[1].args, (base64_decoded_serial, secret_key)
1339         )
1340
1341 1     @patch.object(Encryption, "_join_keys")
1342 1     @patch("osm_common.dbbase.b64decode")
1343 1     def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys):
1344         """Method _join_keys raises."""
1345 1         self.encryption._secret_key = None
1346 1         self.encryption._admin_collection.find_one.return_value = {
1347             "version": "1.0",
1348             "serial": serial_bytes,
1349         }
1350 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1351 1         mock_join_keys.side_effect = DbException("Invalid data type.")
1352 1         with self.assertRaises(Exception) as error:
1353 1             asyncio.run(self.encryption.get_secret_key())
1354 1         self.assertEqual(str(error.exception), "database exception Invalid data type.")
1355 1         self.assertEqual(mock_join_keys.call_count, 1)
1356 1         check_if_assert_not_called(
1357             [mock_b64decode, self.encryption._admin_collection.find_one]
1358         )
1359 1         _call_mock_join_keys = mock_join_keys.call_args_list
1360 1         self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1361
1362 1     @patch.object(Encryption, "_join_keys")
1363 1     @patch("osm_common.dbbase.b64decode")
1364 1     def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys):
1365         """Method b64decode raises."""
1366 1         self.encryption._secret_key = None
1367 1         self.encryption._admin_collection.find_one.return_value = {
1368             "version": "1.0",
1369             "serial": base64_decoded_serial,
1370         }
1371 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1372 1         mock_join_keys.return_value = secret_key
1373 1         mock_b64decode.side_effect = TypeError(
1374             "A bytes-like object is required, not 'str'"
1375         )
1376 1         with self.assertRaises(Exception) as error:
1377 1             asyncio.run(self.encryption.get_secret_key())
1378 1         self.assertEqual(
1379             str(error.exception), "A bytes-like object is required, not 'str'"
1380         )
1381 1         self.assertEqual(self.encryption.secret_key, None)
1382 1         self.assertEqual(mock_join_keys.call_count, 1)
1383 1         mock_b64decode.assert_called_once_with(base64_decoded_serial)
1384 1         self.encryption._admin_collection.find_one.assert_called_once_with(
1385             {"_id": "version"}
1386         )
1387 1         _call_mock_join_keys = mock_join_keys.call_args_list
1388 1         self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1389
1390 1     @patch.object(Encryption, "_join_keys")
1391 1     @patch("osm_common.dbbase.b64decode")
1392 1     def test_get_secret_key_admin_collection_find_one_raises(
1393         self, mock_b64decode, mock_join_keys
1394     ):
1395         """admin_collection find_one raises."""
1396 1         self.encryption._secret_key = None
1397 1         self.encryption._admin_collection.find_one.side_effect = DbException(
1398             "Connection failed."
1399         )
1400 1         self.encryption._config = {"database_commonkey": "osm_new_key"}
1401 1         mock_join_keys.return_value = secret_key
1402 1         with self.assertRaises(Exception) as error:
1403 1             asyncio.run(self.encryption.get_secret_key())
1404 1         self.assertEqual(str(error.exception), "database exception Connection failed.")
1405 1         self.assertEqual(self.encryption.secret_key, None)
1406 1         self.assertEqual(mock_join_keys.call_count, 1)
1407 1         mock_b64decode.assert_not_called()
1408 1         self.encryption._admin_collection.find_one.assert_called_once_with(
1409             {"_id": "version"}
1410         )
1411 1         _call_mock_join_keys = mock_join_keys.call_args_list
1412 1         self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1413
1414 1     def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
1415         """Encrypt and decrypt with schema version 1.1, salt exists."""
1416 1         encrypted_msg = asyncio.run(
1417             self.encryption.encrypt(value, schema_version, salt)
1418         )
1419 1         decrypted_msg = asyncio.run(
1420             self.encryption.decrypt(encrypted_msg, schema_version, salt)
1421         )
1422 1         self.assertEqual(value, decrypted_msg)
1423
1424 1     def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
1425         """Encrypt and decrypt with schema version 1.0, salt exists."""
1426 1         schema_version = "1.0"
1427 1         encrypted_msg = asyncio.run(
1428             self.encryption.encrypt(value, schema_version, salt)
1429         )
1430 1         decrypted_msg = asyncio.run(
1431             self.encryption.decrypt(encrypted_msg, schema_version, salt)
1432         )
1433 1         self.assertEqual(value, decrypted_msg)
1434
1435 1     def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
1436         """Encrypt and decrypt with schema version 1.1, without salt."""
1437 1         salt = None
1438 1         with self.assertRaises(Exception) as error:
1439 1             asyncio.run(self.encryption.encrypt(value, schema_version, salt))
1440 1         self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1441
1442
1443 1 class TestDeepUpdate(unittest.TestCase):
1444 1     def test_update_dict(self):
1445         # Original, patch, expected result
1446 1         TEST = (
1447             ({"a": "b"}, {"a": "c"}, {"a": "c"}),
1448             ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
1449             ({"a": "b"}, {"a": None}, {}),
1450             ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
1451             ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
1452             ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
1453             ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
1454             ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
1455             ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
1456             ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
1457             ({1: {"a": "foo"}}, {1: None}, {}),
1458             ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
1459             ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
1460             ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
1461             ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
1462         )
1463 1         for t in TEST:
1464 1             deep_update(t[0], t[1])
1465 1             self.assertEqual(t[0], t[2])
1466         # test deepcopy is done. So that original dictionary does not reference the pach
1467 1         test_original = {1: {"a": "b"}}
1468 1         test_patch = {1: {"c": {"d": "e"}}}
1469 1         test_result = {1: {"a": "b", "c": {"d": "e"}}}
1470 1         deep_update(test_original, test_patch)
1471 1         self.assertEqual(test_original, test_result)
1472 1         test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
1473 1         self.assertEqual(test_original, test_result)
1474
1475 1     def test_update_array(self):
1476         # This TEST contains a list with the the Original, patch, and expected result
1477 1         TEST = (
1478             # delete all instances of "a"/"d"
1479             ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
1480             ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
1481             # delete and insert at 0
1482             (
1483                 {"A": ["a", "b", "c"]},
1484                 {"A": {"$b": None, "$+[0]": "b"}},
1485                 {"A": ["b", "a", "c"]},
1486             ),
1487             # delete and edit
1488             (
1489                 {"A": ["a", "b", "a"]},
1490                 {"A": {"$a": None, "$[1]": {"c": "d"}}},
1491                 {"A": [{"c": "d"}]},
1492             ),
1493             # insert if not exist
1494             ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
1495             ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
1496             # edit by filter
1497             (
1498                 {"A": ["a", "b", "a"]},
1499                 {"A": {"$b": {"c": "d"}}},
1500                 {"A": ["a", {"c": "d"}, "a"]},
1501             ),
1502             (
1503                 {"A": ["a", "b", "a"]},
1504                 {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
1505                 {"A": ["b", "a", "a", "c"]},
1506             ),
1507             ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
1508             # index deletion out of range
1509             ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
1510             # nested array->dict
1511             (
1512                 {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
1513                 {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
1514                 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
1515             ),
1516             (
1517                 {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
1518                 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
1519                 {
1520                     "A": [
1521                         {"id": 1, "c": {"d": "e", "f": "g"}},
1522                         {"id": 1, "c": {"d": "e", "f": "g"}},
1523                     ]
1524                 },
1525             ),
1526             # nested array->array
1527             (
1528                 {"A": ["a", "b", ["a", "b"]]},
1529                 {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
1530                 {"A": ["a", ["a", {}, "c"]]},
1531             ),
1532             # types str and int different, so not found
1533             (
1534                 {"A": ["a", {"id": "1", "c": "d"}]},
1535                 {"A": {"$id: 1": {"c": "e"}}},
1536                 {"A": ["a", {"id": "1", "c": "d"}]},
1537             ),
1538         )
1539 1         for t in TEST:
1540 1             print(t)
1541 1             deep_update(t[0], t[1])
1542 1             self.assertEqual(t[0], t[2])
1543
1544 1     def test_update_badformat(self):
1545         # This TEST contains original, incorrect patch and #TODO text that must be present
1546 1         TEST = (
1547             # conflict, index 0 is edited twice
1548             ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
1549             # conflict, two insertions at same index
1550             ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
1551             ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
1552             # bad format keys with and without $
1553             ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
1554             # bad format empty $ and yaml incorrect
1555             ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
1556             ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
1557             ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
1558             # insertion of None
1559             ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
1560             # Not found, insertion of None
1561             ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
1562             # index edition out of range
1563             ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
1564             # conflict, two editions on index 2
1565             (
1566                 {"A": ["a", {"id": "1", "c": "d"}]},
1567                 {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
1568             ),
1569         )
1570 1         for t in TEST:
1571 1             print(t)
1572 1             self.assertRaises(DbException, deep_update, t[0], t[1])
1573 1             try:
1574 1                 deep_update(t[0], t[1])
1575 1             except DbException as e:
1576 1                 print(e)
1577
1578
1579 1 if __name__ == "__main__":
1580 0     unittest.main()