blob: e582c7e8f08699c93ba5a1ffe36e2657171329ff [file] [log] [blame]
Eduardo Sousaa0117812019-02-05 15:57:09 +00001# Copyright 2018 Whitestack, LLC
2# Copyright 2018 Telefonica S.A.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15#
16# For those usages not covered by the Apache License, Version 2.0 please
17# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18##
Gulsum Atici76394ef2023-01-09 23:19:18 +030019import asyncio
20import copy
21from copy import deepcopy
Eduardo Sousa4d611d32018-05-09 19:20:37 +010022import http
tiernobd5a4022019-01-30 09:48:38 +000023from http import HTTPStatus
Gulsum Atici76394ef2023-01-09 23:19:18 +030024import logging
aticig3dd0db62022-03-04 19:35:45 +030025from os import urandom
26import unittest
Gulsum Atici76394ef2023-01-09 23:19:18 +030027from unittest.mock import MagicMock, Mock, patch
aticig3dd0db62022-03-04 19:35:45 +030028
Gulsum Atici76394ef2023-01-09 23:19:18 +030029from Crypto.Cipher import AES
30from osm_common.dbbase import DbBase, DbException, deep_update, Encryption
aticig3dd0db62022-03-04 19:35:45 +030031import pytest
Eduardo Sousa4d611d32018-05-09 19:20:37 +010032
tiernob20a9022018-05-22 12:07:05 +020033
Gulsum Atici76394ef2023-01-09 23:19:18 +030034# Variables used in TestBaseEncryption and TestAsyncEncryption
35salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106"
36value = "private key txt"
37padded_value = b"private key txt\0"
38padded_encoded_value = b"private key txt\x00"
39encoding_type = "ascii"
40encyrpt_mode = AES.MODE_ECB
41secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
42encyrpted_value = "ZW5jcnlwdGVkIGRhdGE="
43encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE="
44data_to_b4_encode = b"encrypted data"
45b64_decoded = b"decrypted data"
46schema_version = "1.1"
47joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
48serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
49base64_decoded_serial = b"g\xbe\xdb"
50decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg=="
51decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA=="
52item = {
53 "secret": "mysecret",
54 "cacert": "mycacert",
55 "path": "/var",
56 "ip": "192.168.12.23",
57}
58
59
Eduardo Sousa4d611d32018-05-09 19:20:37 +010060def exception_message(message):
61 return "database exception " + message
62
tiernob20a9022018-05-22 12:07:05 +020063
Eduardo Sousa4d611d32018-05-09 19:20:37 +010064@pytest.fixture
65def db_base():
66 return DbBase()
67
tiernob20a9022018-05-22 12:07:05 +020068
Eduardo Sousa4d611d32018-05-09 19:20:37 +010069def test_constructor():
70 db_base = DbBase()
tiernob20a9022018-05-22 12:07:05 +020071 assert db_base is not None
Eduardo Sousa4d611d32018-05-09 19:20:37 +010072 assert isinstance(db_base, DbBase)
73
tiernob20a9022018-05-22 12:07:05 +020074
Eduardo Sousa4d611d32018-05-09 19:20:37 +010075def test_db_connect(db_base):
tierno136f2952018-10-19 13:01:03 +020076 with pytest.raises(DbException) as excinfo:
77 db_base.db_connect(None)
garciadeblas2644b762021-03-24 09:21:01 +010078 assert str(excinfo.value).startswith(
79 exception_message("Method 'db_connect' not implemented")
80 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +010081
tiernob20a9022018-05-22 12:07:05 +020082
Eduardo Sousa4d611d32018-05-09 19:20:37 +010083def test_db_disconnect(db_base):
84 db_base.db_disconnect()
85
tiernob20a9022018-05-22 12:07:05 +020086
Eduardo Sousa4d611d32018-05-09 19:20:37 +010087def test_get_list(db_base):
88 with pytest.raises(DbException) as excinfo:
89 db_base.get_list(None, None)
garciadeblas2644b762021-03-24 09:21:01 +010090 assert str(excinfo.value).startswith(
91 exception_message("Method 'get_list' not implemented")
92 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +010093 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
94
tiernob20a9022018-05-22 12:07:05 +020095
Eduardo Sousa4d611d32018-05-09 19:20:37 +010096def test_get_one(db_base):
97 with pytest.raises(DbException) as excinfo:
98 db_base.get_one(None, None, None, None)
garciadeblas2644b762021-03-24 09:21:01 +010099 assert str(excinfo.value).startswith(
100 exception_message("Method 'get_one' not implemented")
101 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100102 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
103
tiernob20a9022018-05-22 12:07:05 +0200104
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100105def test_create(db_base):
106 with pytest.raises(DbException) as excinfo:
107 db_base.create(None, None)
garciadeblas2644b762021-03-24 09:21:01 +0100108 assert str(excinfo.value).startswith(
109 exception_message("Method 'create' not implemented")
110 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100111 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
112
tiernob20a9022018-05-22 12:07:05 +0200113
tierno2c9794c2020-04-29 10:24:28 +0000114def test_create_list(db_base):
115 with pytest.raises(DbException) as excinfo:
116 db_base.create_list(None, None)
garciadeblas2644b762021-03-24 09:21:01 +0100117 assert str(excinfo.value).startswith(
118 exception_message("Method 'create_list' not implemented")
119 )
tierno2c9794c2020-04-29 10:24:28 +0000120 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
121
122
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100123def test_del_list(db_base):
124 with pytest.raises(DbException) as excinfo:
125 db_base.del_list(None, None)
garciadeblas2644b762021-03-24 09:21:01 +0100126 assert str(excinfo.value).startswith(
127 exception_message("Method 'del_list' not implemented")
128 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100129 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
130
tiernob20a9022018-05-22 12:07:05 +0200131
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100132def test_del_one(db_base):
133 with pytest.raises(DbException) as excinfo:
134 db_base.del_one(None, None, None)
garciadeblas2644b762021-03-24 09:21:01 +0100135 assert str(excinfo.value).startswith(
136 exception_message("Method 'del_one' not implemented")
137 )
Eduardo Sousa4d611d32018-05-09 19:20:37 +0100138 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
tiernob3e750b2018-09-05 11:25:23 +0200139
140
tierno136f2952018-10-19 13:01:03 +0200141class TestEncryption(unittest.TestCase):
142 def setUp(self):
tiernoeef7cb72018-11-12 11:51:49 +0100143 master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
tiernocfc52722018-10-23 11:41:49 +0200144 db_base1 = DbBase()
tierno136f2952018-10-19 13:01:03 +0200145 db_base2 = DbBase()
tiernoeef7cb72018-11-12 11:51:49 +0100146 db_base3 = DbBase()
tierno136f2952018-10-19 13:01:03 +0200147 # set self.secret_key obtained when connect
tiernoeef7cb72018-11-12 11:51:49 +0100148 db_base1.set_secret_key(master_key, replace=True)
149 db_base1.set_secret_key(urandom(32))
150 db_base2.set_secret_key(None, replace=True)
151 db_base2.set_secret_key(urandom(30))
152 db_base3.set_secret_key(master_key)
153 self.db_bases = [db_base1, db_base2, db_base3]
tierno136f2952018-10-19 13:01:03 +0200154
155 def test_encrypt_decrypt(self):
156 TEST = (
157 ("plain text 1 ! ", None),
158 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
tierno136f2952018-10-19 13:01:03 +0200159 )
tiernoeef7cb72018-11-12 11:51:49 +0100160 for db_base in self.db_bases:
tierno136f2952018-10-19 13:01:03 +0200161 for value, salt in TEST:
162 # no encryption
garciadeblas2644b762021-03-24 09:21:01 +0100163 encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
164 self.assertEqual(
165 encrypted, value, "value '{}' has been encrypted".format(value)
166 )
167 decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
168 self.assertEqual(
169 decrypted, value, "value '{}' has been decrypted".format(value)
170 )
tierno136f2952018-10-19 13:01:03 +0200171
172 # encrypt/decrypt
garciadeblas2644b762021-03-24 09:21:01 +0100173 encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
174 self.assertNotEqual(
175 encrypted, value, "value '{}' has not been encrypted".format(value)
176 )
tierno136f2952018-10-19 13:01:03 +0200177 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
garciadeblas2644b762021-03-24 09:21:01 +0100178 decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
179 self.assertEqual(
180 decrypted, value, "value is not equal after encryption/decryption"
181 )
tierno136f2952018-10-19 13:01:03 +0200182
183 def test_encrypt_decrypt_salt(self):
184 value = "value to be encrypted!"
185 encrypted = []
tiernoeef7cb72018-11-12 11:51:49 +0100186 for db_base in self.db_bases:
tierno136f2952018-10-19 13:01:03 +0200187 for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
188 # encrypt/decrypt
garciadeblas2644b762021-03-24 09:21:01 +0100189 encrypted.append(
190 db_base.encrypt(value, schema_version="1.1", salt=salt)
191 )
192 self.assertNotEqual(
193 encrypted[-1],
194 value,
195 "value '{}' has not been encrypted".format(value),
196 )
tierno136f2952018-10-19 13:01:03 +0200197 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
garciadeblas2644b762021-03-24 09:21:01 +0100198 decrypted = db_base.decrypt(
199 encrypted[-1], schema_version="1.1", salt=salt
200 )
201 self.assertEqual(
202 decrypted, value, "value is not equal after encryption/decryption"
203 )
tierno136f2952018-10-19 13:01:03 +0200204 for i in range(0, len(encrypted)):
garciadeblas2644b762021-03-24 09:21:01 +0100205 for j in range(i + 1, len(encrypted)):
206 self.assertNotEqual(
207 encrypted[i],
208 encrypted[j],
209 "encryption with different salt must contain different result",
210 )
tiernobd5a4022019-01-30 09:48:38 +0000211 # decrypt with a different master key
212 try:
garciadeblas2644b762021-03-24 09:21:01 +0100213 decrypted = self.db_bases[-1].decrypt(
214 encrypted[0], schema_version="1.1", salt=None
215 )
216 self.assertNotEqual(
217 encrypted[0],
218 decrypted,
219 "Decryption with different KEY must generate different result",
220 )
tiernobd5a4022019-01-30 09:48:38 +0000221 except DbException as e:
garciadeblas2644b762021-03-24 09:21:01 +0100222 self.assertEqual(
223 e.http_code,
224 HTTPStatus.INTERNAL_SERVER_ERROR,
225 "Decryption with different KEY does not provide expected http_code",
226 )
tierno136f2952018-10-19 13:01:03 +0200227
228
Gulsum Atici76394ef2023-01-09 23:19:18 +0300229class AsyncMock(MagicMock):
230 async def __call__(self, *args, **kwargs):
231 args = deepcopy(args)
232 kwargs = deepcopy(kwargs)
233 return super(AsyncMock, self).__call__(*args, **kwargs)
234
235
236class CopyingMock(MagicMock):
237 def __call__(self, *args, **kwargs):
238 args = deepcopy(args)
239 kwargs = deepcopy(kwargs)
240 return super(CopyingMock, self).__call__(*args, **kwargs)
241
242
243def check_if_assert_not_called(mocks: list):
244 for mocking in mocks:
245 mocking.assert_not_called()
246
247
248class TestBaseEncryption(unittest.TestCase):
249 @patch("logging.getLogger", autospec=True)
250 def setUp(self, mock_logger):
251 mock_logger = logging.getLogger()
252 mock_logger.disabled = True
253 self.db_base = DbBase()
254 self.mock_cipher = CopyingMock()
255 self.db_base.encoding_type = encoding_type
256 self.db_base.encrypt_mode = encyrpt_mode
257 self.db_base.secret_key = secret_key
258 self.mock_padded_msg = CopyingMock()
259
260 def test_pad_data_len_not_multiplication_of_16(self):
261 data = "hello word hello hello word hello word"
262 data_len = len(data)
263 expected_len = 48
264 padded = self.db_base.pad_data(data)
265 self.assertEqual(len(padded), expected_len)
266 self.assertTrue("\0" * (expected_len - data_len) in padded)
267
268 def test_pad_data_len_multiplication_of_16(self):
269 data = "hello word!!!!!!"
270 padded = self.db_base.pad_data(data)
271 self.assertEqual(padded, data)
272 self.assertFalse("\0" in padded)
273
274 def test_pad_data_empty_string(self):
275 data = ""
276 expected_len = 0
277 padded = self.db_base.pad_data(data)
278 self.assertEqual(len(padded), expected_len)
279 self.assertFalse("\0" in padded)
280
281 def test_pad_data_not_string(self):
282 data = None
283 with self.assertRaises(Exception) as err:
284 self.db_base.pad_data(data)
285 self.assertEqual(
286 str(err.exception),
287 "database exception Incorrect data type: type(None), string is expected.",
288 )
289
290 def test_unpad_data_null_char_at_right(self):
291 null_padded_data = "hell0word\0\0"
292 expected_length = len(null_padded_data) - 2
293 unpadded = self.db_base.unpad_data(null_padded_data)
294 self.assertEqual(len(unpadded), expected_length)
295 self.assertFalse("\0" in unpadded)
296 self.assertTrue("0" in unpadded)
297
298 def test_unpad_data_null_char_is_not_rightest(self):
299 null_padded_data = "hell0word\r\t\0\n"
300 expected_length = len(null_padded_data)
301 unpadded = self.db_base.unpad_data(null_padded_data)
302 self.assertEqual(len(unpadded), expected_length)
303 self.assertTrue("\0" in unpadded)
304
305 def test_unpad_data_with_spaces_at_right(self):
306 null_padded_data = " hell0word\0 "
307 expected_length = len(null_padded_data)
308 unpadded = self.db_base.unpad_data(null_padded_data)
309 self.assertEqual(len(unpadded), expected_length)
310 self.assertTrue("\0" in unpadded)
311
312 def test_unpad_data_empty_string(self):
313 data = ""
314 unpadded = self.db_base.unpad_data(data)
315 self.assertEqual(unpadded, "")
316 self.assertFalse("\0" in unpadded)
317
318 def test_unpad_data_not_string(self):
319 data = None
320 with self.assertRaises(Exception) as err:
321 self.db_base.unpad_data(data)
322 self.assertEqual(
323 str(err.exception),
324 "database exception Incorrect data type: type(None), string is expected.",
325 )
326
327 @patch.object(DbBase, "_join_secret_key")
328 @patch.object(DbBase, "pad_data")
329 def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
330 self, mock_pad_data, mock_join_secret_key
331 ):
332 """schema_version 1.0, secret_key is None and salt is None."""
333 schema_version = "1.0"
334 salt = None
335 self.db_base.secret_key = None
336 result = self.db_base._encrypt_value(value, schema_version, salt)
337 self.assertEqual(result, value)
338 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
339
340 @patch("osm_common.dbbase.b64encode")
341 @patch("osm_common.dbbase.AES")
342 @patch.object(DbBase, "_join_secret_key")
343 @patch.object(DbBase, "pad_data")
344 def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
345 self,
346 mock_pad_data,
347 mock_join_secret_key,
348 mock_aes,
349 mock_b64_encode,
350 ):
351 """schema_version 1.1, secret_key exists, salt exists."""
352 mock_aes.new.return_value = self.mock_cipher
353 self.mock_cipher.encrypt.return_value = data_to_b4_encode
354 self.mock_padded_msg.return_value = padded_value
355 mock_pad_data.return_value = self.mock_padded_msg
356 self.mock_padded_msg.encode.return_value = padded_encoded_value
357
358 mock_b64_encode.return_value = encyrpted_bytes
359
360 result = self.db_base._encrypt_value(value, schema_version, salt)
361
362 self.assertTrue(isinstance(result, str))
363 self.assertEqual(result, encyrpted_value)
364 mock_join_secret_key.assert_called_once_with(salt)
365 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
366 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
367 mock_pad_data.assert_called_once_with(value)
368 mock_b64_encode.assert_called_once_with(data_to_b4_encode)
369 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
370 self.mock_padded_msg.encode.assert_called_with(encoding_type)
371
372 @patch.object(DbBase, "_join_secret_key")
373 @patch.object(DbBase, "pad_data")
374 def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
375 self, mock_pad_data, mock_join_secret_key
376 ):
377 """schema_version 1.0, secret_key is None, salt exists."""
378 schema_version = "1.0"
379 self.db_base.secret_key = None
380 result = self.db_base._encrypt_value(value, schema_version, salt)
381 self.assertEqual(result, value)
382 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
383
384 @patch.object(DbBase, "_join_secret_key")
385 @patch.object(DbBase, "pad_data")
386 def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
387 self, mock_pad_data, mock_join_secret_key
388 ):
389 """schema_version 1.1, secret_key is None, salt exists."""
390 self.db_base.secret_key = None
391 result = self.db_base._encrypt_value(value, schema_version, salt)
392 self.assertEqual(result, value)
393 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
394
395 @patch("osm_common.dbbase.b64encode")
396 @patch("osm_common.dbbase.AES")
397 @patch.object(DbBase, "_join_secret_key")
398 @patch.object(DbBase, "pad_data")
399 def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
400 self,
401 mock_pad_data,
402 mock_join_secret_key,
403 mock_aes,
404 mock_b64_encode,
405 ):
406 """schema_version 1.1, secret_key exists, salt is None."""
407 salt = None
408 mock_aes.new.return_value = self.mock_cipher
409 self.mock_cipher.encrypt.return_value = data_to_b4_encode
410
411 self.mock_padded_msg.return_value = padded_value
412 mock_pad_data.return_value = self.mock_padded_msg
413 self.mock_padded_msg.encode.return_value = padded_encoded_value
414
415 mock_b64_encode.return_value = encyrpted_bytes
416
417 result = self.db_base._encrypt_value(value, schema_version, salt)
418
419 self.assertEqual(result, encyrpted_value)
420 mock_join_secret_key.assert_called_once_with(salt)
421 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
422 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
423 mock_pad_data.assert_called_once_with(value)
424 mock_b64_encode.assert_called_once_with(data_to_b4_encode)
425 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
426 self.mock_padded_msg.encode.assert_called_with(encoding_type)
427
428 @patch("osm_common.dbbase.b64encode")
429 @patch("osm_common.dbbase.AES")
430 @patch.object(DbBase, "_join_secret_key")
431 @patch.object(DbBase, "pad_data")
432 def test__encrypt_value_invalid_encrpt_mode(
433 self,
434 mock_pad_data,
435 mock_join_secret_key,
436 mock_aes,
437 mock_b64_encode,
438 ):
439 """encrypt_mode is invalid."""
440 mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
441 self.db_base.encrypt_mode = "AES.MODE_XXX"
442
443 with self.assertRaises(Exception) as err:
444 self.db_base._encrypt_value(value, schema_version, salt)
445
446 self.assertEqual(str(err.exception), "Invalid ciphering mode.")
447 mock_join_secret_key.assert_called_once_with(salt)
448 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
449 self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
450 check_if_assert_not_called([mock_pad_data, mock_b64_encode])
451
452 @patch("osm_common.dbbase.b64encode")
453 @patch("osm_common.dbbase.AES")
454 @patch.object(DbBase, "_join_secret_key")
455 @patch.object(DbBase, "pad_data")
456 def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
457 self,
458 mock_pad_data,
459 mock_join_secret_key,
460 mock_aes,
461 mock_b64_encode,
462 ):
463 """schema_version 1.1, secret_key exists, value is None."""
464 value = None
465 mock_aes.new.return_value = self.mock_cipher
466 mock_pad_data.side_effect = DbException(
467 "Incorrect data type: type(None), string is expected."
468 )
469
470 with self.assertRaises(Exception) as err:
471 self.db_base._encrypt_value(value, schema_version, salt)
472 self.assertEqual(
473 str(err.exception),
474 "database exception Incorrect data type: type(None), string is expected.",
475 )
476
477 mock_join_secret_key.assert_called_once_with(salt)
478 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
479 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
480 mock_pad_data.assert_called_once_with(value)
481 check_if_assert_not_called(
482 [mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode]
483 )
484
485 @patch("osm_common.dbbase.b64encode")
486 @patch("osm_common.dbbase.AES")
487 @patch.object(DbBase, "_join_secret_key")
488 @patch.object(DbBase, "pad_data")
489 def test__encrypt_value_join_secret_key_raises(
490 self,
491 mock_pad_data,
492 mock_join_secret_key,
493 mock_aes,
494 mock_b64_encode,
495 ):
496 """Method join_secret_key raises DbException."""
497 salt = b"3434o34-3wewrwr-222424-2242dwew"
498
499 mock_join_secret_key.side_effect = DbException("Unexpected type")
500
501 mock_aes.new.return_value = self.mock_cipher
502
503 with self.assertRaises(Exception) as err:
504 self.db_base._encrypt_value(value, schema_version, salt)
505
506 self.assertEqual(str(err.exception), "database exception Unexpected type")
507 check_if_assert_not_called(
508 [mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt]
509 )
510 mock_join_secret_key.assert_called_once_with(salt)
511
512 @patch("osm_common.dbbase.b64encode")
513 @patch("osm_common.dbbase.AES")
514 @patch.object(DbBase, "_join_secret_key")
515 @patch.object(DbBase, "pad_data")
516 def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
517 self,
518 mock_pad_data,
519 mock_join_secret_key,
520 mock_aes,
521 mock_b64_encode,
522 ):
523 """schema_version 1.1, secret_key exists, b64encode raises TypeError."""
524 mock_aes.new.return_value = self.mock_cipher
525 self.mock_cipher.encrypt.return_value = "encrypted data"
526
527 self.mock_padded_msg.return_value = padded_value
528 mock_pad_data.return_value = self.mock_padded_msg
529 self.mock_padded_msg.encode.return_value = padded_encoded_value
530
531 mock_b64_encode.side_effect = TypeError(
532 "A bytes-like object is required, not 'str'"
533 )
534
535 with self.assertRaises(Exception) as error:
536 self.db_base._encrypt_value(value, schema_version, salt)
537 self.assertEqual(
538 str(error.exception), "A bytes-like object is required, not 'str'"
539 )
540 mock_join_secret_key.assert_called_once_with(salt)
541 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
542 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
543 mock_pad_data.assert_called_once_with(value)
544 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
545 self.mock_padded_msg.encode.assert_called_with(encoding_type)
546 mock_b64_encode.assert_called_once_with("encrypted data")
547
548 @patch("osm_common.dbbase.b64encode")
549 @patch("osm_common.dbbase.AES")
550 @patch.object(DbBase, "_join_secret_key")
551 @patch.object(DbBase, "pad_data")
552 def test__encrypt_value_cipher_encrypt_raises(
553 self,
554 mock_pad_data,
555 mock_join_secret_key,
556 mock_aes,
557 mock_b64_encode,
558 ):
559 """AES encrypt method raises Exception."""
560 mock_aes.new.return_value = self.mock_cipher
561 self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.")
562
563 self.mock_padded_msg.return_value = padded_value
564 mock_pad_data.return_value = self.mock_padded_msg
565 self.mock_padded_msg.encode.return_value = padded_encoded_value
566
567 with self.assertRaises(Exception) as error:
568 self.db_base._encrypt_value(value, schema_version, salt)
569
570 self.assertEqual(str(error.exception), "Invalid data type.")
571 mock_join_secret_key.assert_called_once_with(salt)
572 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
573 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
574 mock_pad_data.assert_called_once_with(value)
575 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
576 self.mock_padded_msg.encode.assert_called_with(encoding_type)
577 mock_b64_encode.assert_not_called()
578
579 @patch.object(DbBase, "get_secret_key")
580 @patch.object(DbBase, "_encrypt_value")
581 def test_encrypt_without_schema_version_without_salt(
582 self, mock_encrypt_value, mock_get_secret_key
583 ):
584 """schema and salt is None."""
585 mock_encrypt_value.return_value = encyrpted_value
586 result = self.db_base.encrypt(value)
587 mock_encrypt_value.assert_called_once_with(value, None, None)
588 mock_get_secret_key.assert_called_once()
589 self.assertEqual(result, encyrpted_value)
590
591 @patch.object(DbBase, "get_secret_key")
592 @patch.object(DbBase, "_encrypt_value")
593 def test_encrypt_with_schema_version_with_salt(
594 self, mock_encrypt_value, mock_get_secret_key
595 ):
596 """schema version exists, salt is None."""
597 mock_encrypt_value.return_value = encyrpted_value
598 result = self.db_base.encrypt(value, schema_version, salt)
599 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
600 mock_get_secret_key.assert_called_once()
601 self.assertEqual(result, encyrpted_value)
602
603 @patch.object(DbBase, "get_secret_key")
604 @patch.object(DbBase, "_encrypt_value")
605 def test_encrypt_get_secret_key_raises(
606 self, mock_encrypt_value, mock_get_secret_key
607 ):
608 """get_secret_key method raises DbException."""
609 mock_get_secret_key.side_effect = DbException("KeyError")
610 with self.assertRaises(Exception) as error:
611 self.db_base.encrypt(value)
612 self.assertEqual(str(error.exception), "database exception KeyError")
613 mock_encrypt_value.assert_not_called()
614 mock_get_secret_key.assert_called_once()
615
616 @patch.object(DbBase, "get_secret_key")
617 @patch.object(DbBase, "_encrypt_value")
618 def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
619 """_encrypt method raises DbException."""
620 mock_encrypt_value.side_effect = DbException(
621 "Incorrect data type: type(None), string is expected."
622 )
623 with self.assertRaises(Exception) as error:
624 self.db_base.encrypt(value, schema_version, salt)
625 self.assertEqual(
626 str(error.exception),
627 "database exception Incorrect data type: type(None), string is expected.",
628 )
629 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
630 mock_get_secret_key.assert_called_once()
631
632 @patch("osm_common.dbbase.b64decode")
633 @patch("osm_common.dbbase.AES")
634 @patch.object(DbBase, "_join_secret_key")
635 @patch.object(DbBase, "unpad_data")
636 def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
637 self,
638 mock_unpad_data,
639 mock_join_secret_key,
640 mock_aes,
641 mock_b64_decode,
642 ):
643 """schema_version 1.1, secret_key exists, salt is None."""
644 salt = None
645 mock_aes.new.return_value = self.mock_cipher
646 self.mock_cipher.decrypt.return_value = padded_encoded_value
647
648 mock_b64_decode.return_value = b64_decoded
649
650 mock_unpad_data.return_value = value
651
652 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
653 self.assertEqual(result, value)
654
655 mock_join_secret_key.assert_called_once_with(salt)
656 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
657 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
658 mock_unpad_data.assert_called_once_with("private key txt\0")
659 mock_b64_decode.assert_called_once_with(encyrpted_value)
660 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
661
662 @patch("osm_common.dbbase.b64decode")
663 @patch("osm_common.dbbase.AES")
664 @patch.object(DbBase, "_join_secret_key")
665 @patch.object(DbBase, "unpad_data")
666 def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
667 self,
668 mock_unpad_data,
669 mock_join_secret_key,
670 mock_aes,
671 mock_b64_decode,
672 ):
673 """schema_version 1.1, secret_key exists, salt is None."""
674 mock_aes.new.return_value = self.mock_cipher
675 self.mock_cipher.decrypt.return_value = padded_encoded_value
676
677 mock_b64_decode.return_value = b64_decoded
678
679 mock_unpad_data.return_value = value
680
681 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
682 self.assertEqual(result, value)
683
684 mock_join_secret_key.assert_called_once_with(salt)
685 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
686 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
687 mock_unpad_data.assert_called_once_with("private key txt\0")
688 mock_b64_decode.assert_called_once_with(encyrpted_value)
689 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
690
691 @patch("osm_common.dbbase.b64decode")
692 @patch("osm_common.dbbase.AES")
693 @patch.object(DbBase, "_join_secret_key")
694 @patch.object(DbBase, "unpad_data")
695 def test__decrypt_value_schema_version_1_1_without_secret_key(
696 self,
697 mock_unpad_data,
698 mock_join_secret_key,
699 mock_aes,
700 mock_b64_decode,
701 ):
702 """schema_version 1.1, secret_key is None, salt exists."""
703 self.db_base.secret_key = None
704
705 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
706
707 self.assertEqual(result, encyrpted_value)
708 check_if_assert_not_called(
709 [
710 mock_join_secret_key,
711 mock_aes.new,
712 mock_unpad_data,
713 mock_b64_decode,
714 self.mock_cipher.decrypt,
715 ]
716 )
717
718 @patch("osm_common.dbbase.b64decode")
719 @patch("osm_common.dbbase.AES")
720 @patch.object(DbBase, "_join_secret_key")
721 @patch.object(DbBase, "unpad_data")
722 def test__decrypt_value_schema_version_1_0_with_secret_key(
723 self,
724 mock_unpad_data,
725 mock_join_secret_key,
726 mock_aes,
727 mock_b64_decode,
728 ):
729 """schema_version 1.0, secret_key exists, salt exists."""
730 schema_version = "1.0"
731 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
732
733 self.assertEqual(result, encyrpted_value)
734 check_if_assert_not_called(
735 [
736 mock_join_secret_key,
737 mock_aes.new,
738 mock_unpad_data,
739 mock_b64_decode,
740 self.mock_cipher.decrypt,
741 ]
742 )
743
744 @patch("osm_common.dbbase.b64decode")
745 @patch("osm_common.dbbase.AES")
746 @patch.object(DbBase, "_join_secret_key")
747 @patch.object(DbBase, "unpad_data")
748 def test__decrypt_value_join_secret_key_raises(
749 self,
750 mock_unpad_data,
751 mock_join_secret_key,
752 mock_aes,
753 mock_b64_decode,
754 ):
755 """_join_secret_key raises TypeError."""
756 salt = object()
757 mock_join_secret_key.side_effect = TypeError("'type' object is not iterable")
758
759 with self.assertRaises(Exception) as error:
760 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
761 self.assertEqual(str(error.exception), "'type' object is not iterable")
762
763 mock_join_secret_key.assert_called_once_with(salt)
764 check_if_assert_not_called(
765 [mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt]
766 )
767
768 @patch("osm_common.dbbase.b64decode")
769 @patch("osm_common.dbbase.AES")
770 @patch.object(DbBase, "_join_secret_key")
771 @patch.object(DbBase, "unpad_data")
772 def test__decrypt_value_b64decode_raises(
773 self,
774 mock_unpad_data,
775 mock_join_secret_key,
776 mock_aes,
777 mock_b64_decode,
778 ):
779 """b64decode raises TypeError."""
780 mock_b64_decode.side_effect = TypeError(
781 "A str-like object is required, not 'bytes'"
782 )
783 with self.assertRaises(Exception) as error:
784 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
785 self.assertEqual(
786 str(error.exception), "A str-like object is required, not 'bytes'"
787 )
788
789 mock_b64_decode.assert_called_once_with(encyrpted_value)
790 mock_join_secret_key.assert_called_once_with(salt)
791 check_if_assert_not_called(
792 [mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data]
793 )
794
795 @patch("osm_common.dbbase.b64decode")
796 @patch("osm_common.dbbase.AES")
797 @patch.object(DbBase, "_join_secret_key")
798 @patch.object(DbBase, "unpad_data")
799 def test__decrypt_value_invalid_encrypt_mode(
800 self,
801 mock_unpad_data,
802 mock_join_secret_key,
803 mock_aes,
804 mock_b64_decode,
805 ):
806 """Invalid AES encrypt mode."""
807 mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
808 self.db_base.encrypt_mode = "AES.MODE_XXX"
809
810 mock_b64_decode.return_value = b64_decoded
811 with self.assertRaises(Exception) as error:
812 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
813
814 self.assertEqual(str(error.exception), "Invalid ciphering mode.")
815 mock_join_secret_key.assert_called_once_with(salt)
816 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
817 self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
818 mock_b64_decode.assert_called_once_with(encyrpted_value)
819 check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt])
820
821 @patch("osm_common.dbbase.b64decode")
822 @patch("osm_common.dbbase.AES")
823 @patch.object(DbBase, "_join_secret_key")
824 @patch.object(DbBase, "unpad_data")
825 def test__decrypt_value_cipher_decrypt_raises(
826 self,
827 mock_unpad_data,
828 mock_join_secret_key,
829 mock_aes,
830 mock_b64_decode,
831 ):
832 """AES decrypt raises Exception."""
833 mock_b64_decode.return_value = b64_decoded
834
835 mock_aes.new.return_value = self.mock_cipher
836 self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.")
837
838 with self.assertRaises(Exception) as error:
839 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
840 self.assertEqual(str(error.exception), "Invalid data type.")
841
842 mock_join_secret_key.assert_called_once_with(salt)
843 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
844 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
845 mock_b64_decode.assert_called_once_with(encyrpted_value)
846 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
847 mock_unpad_data.assert_not_called()
848
849 @patch("osm_common.dbbase.b64decode")
850 @patch("osm_common.dbbase.AES")
851 @patch.object(DbBase, "_join_secret_key")
852 @patch.object(DbBase, "unpad_data")
853 def test__decrypt_value_decode_raises(
854 self,
855 mock_unpad_data,
856 mock_join_secret_key,
857 mock_aes,
858 mock_b64_decode,
859 ):
860 """Decode raises UnicodeDecodeError."""
861 mock_aes.new.return_value = self.mock_cipher
862 self.mock_cipher.decrypt.return_value = b"\xd0\x000091"
863
864 mock_b64_decode.return_value = b64_decoded
865
866 mock_unpad_data.return_value = value
867 with self.assertRaises(Exception) as error:
868 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
869 self.assertEqual(
870 str(error.exception),
871 "database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
872 )
873 self.assertEqual(type(error.exception), DbException)
874 mock_join_secret_key.assert_called_once_with(salt)
875 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
876 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
877 mock_b64_decode.assert_called_once_with(encyrpted_value)
878 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
879 mock_unpad_data.assert_not_called()
880
881 @patch("osm_common.dbbase.b64decode")
882 @patch("osm_common.dbbase.AES")
883 @patch.object(DbBase, "_join_secret_key")
884 @patch.object(DbBase, "unpad_data")
885 def test__decrypt_value_unpad_data_raises(
886 self,
887 mock_unpad_data,
888 mock_join_secret_key,
889 mock_aes,
890 mock_b64_decode,
891 ):
892 """Method unpad_data raises error."""
893 mock_decrypted_message = MagicMock()
894 mock_decrypted_message.decode.return_value = None
895 mock_aes.new.return_value = self.mock_cipher
896 self.mock_cipher.decrypt.return_value = mock_decrypted_message
897 mock_unpad_data.side_effect = DbException(
898 "Incorrect data type: type(None), string is expected."
899 )
900 mock_b64_decode.return_value = b64_decoded
901
902 with self.assertRaises(Exception) as error:
903 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
904 self.assertEqual(
905 str(error.exception),
906 "database exception Incorrect data type: type(None), string is expected.",
907 )
908 self.assertEqual(type(error.exception), DbException)
909 mock_join_secret_key.assert_called_once_with(salt)
910 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
911 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
912 mock_b64_decode.assert_called_once_with(encyrpted_value)
913 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
914 mock_decrypted_message.decode.assert_called_once_with(
915 self.db_base.encoding_type
916 )
917 mock_unpad_data.assert_called_once_with(None)
918
919 @patch.object(DbBase, "get_secret_key")
920 @patch.object(DbBase, "_decrypt_value")
921 def test_decrypt_without_schema_version_without_salt(
922 self, mock_decrypt_value, mock_get_secret_key
923 ):
924 """schema_version is None, salt is None."""
925 mock_decrypt_value.return_value = encyrpted_value
926 result = self.db_base.decrypt(value)
927 mock_decrypt_value.assert_called_once_with(value, None, None)
928 mock_get_secret_key.assert_called_once()
929 self.assertEqual(result, encyrpted_value)
930
931 @patch.object(DbBase, "get_secret_key")
932 @patch.object(DbBase, "_decrypt_value")
933 def test_decrypt_with_schema_version_with_salt(
934 self, mock_decrypt_value, mock_get_secret_key
935 ):
936 """schema_version and salt exist."""
937 mock_decrypt_value.return_value = encyrpted_value
938 result = self.db_base.decrypt(value, schema_version, salt)
939 mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
940 mock_get_secret_key.assert_called_once()
941 self.assertEqual(result, encyrpted_value)
942
943 @patch.object(DbBase, "get_secret_key")
944 @patch.object(DbBase, "_decrypt_value")
945 def test_decrypt_get_secret_key_raises(
946 self, mock_decrypt_value, mock_get_secret_key
947 ):
948 """Method get_secret_key raises KeyError."""
949 mock_get_secret_key.side_effect = DbException("KeyError")
950 with self.assertRaises(Exception) as error:
951 self.db_base.decrypt(value)
952 self.assertEqual(str(error.exception), "database exception KeyError")
953 mock_decrypt_value.assert_not_called()
954 mock_get_secret_key.assert_called_once()
955
956 @patch.object(DbBase, "get_secret_key")
957 @patch.object(DbBase, "_decrypt_value")
958 def test_decrypt_decrypt_value_raises(
959 self, mock_decrypt_value, mock_get_secret_key
960 ):
961 """Method _decrypt raises error."""
962 mock_decrypt_value.side_effect = DbException(
963 "Incorrect data type: type(None), string is expected."
964 )
965 with self.assertRaises(Exception) as error:
966 self.db_base.decrypt(value, schema_version, salt)
967 self.assertEqual(
968 str(error.exception),
969 "database exception Incorrect data type: type(None), string is expected.",
970 )
971 mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
972 mock_get_secret_key.assert_called_once()
973
974 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
975 """Encrypt and decrypt with schema version 1.1, salt exists."""
976 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
977 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
978 self.assertEqual(value, decrypted_msg)
979
980 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
981 """Encrypt and decrypt with schema version 1.0, salt exists."""
982 schema_version = "1.0"
983 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
984 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
985 self.assertEqual(value, decrypted_msg)
986
987 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
988 """Encrypt and decrypt with schema version 1.1 and without salt."""
989 salt = None
990 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
991 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
992 self.assertEqual(value, decrypted_msg)
993
994
995class TestAsyncEncryption(unittest.TestCase):
996 @patch("logging.getLogger", autospec=True)
997 def setUp(self, mock_logger):
998 mock_logger = logging.getLogger()
999 mock_logger.disabled = True
Gulsum Atici76394ef2023-01-09 23:19:18 +03001000 self.encryption = Encryption(uri="uri", config={})
1001 self.encryption.encoding_type = encoding_type
1002 self.encryption.encrypt_mode = encyrpt_mode
1003 self.encryption._secret_key = secret_key
1004 self.admin_collection = Mock()
1005 self.admin_collection.find_one = AsyncMock()
1006 self.encryption._client = {
1007 "osm": {
1008 "admin": self.admin_collection,
1009 }
1010 }
1011
1012 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1013 def test_decrypt_fields_with_item_with_fields(self, mock_decrypt):
1014 """item and fields exist."""
1015 mock_decrypt.side_effect = [decrypted_val1, decrypted_val2]
1016 input_item = copy.deepcopy(item)
1017 expected_item = {
1018 "secret": decrypted_val1,
1019 "cacert": decrypted_val2,
1020 "path": "/var",
1021 "ip": "192.168.12.23",
1022 }
1023 fields = ["secret", "cacert"]
Gulsum Aticia06b8542023-05-09 13:42:13 +03001024
1025 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001026 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1027 )
1028 self.assertEqual(input_item, expected_item)
1029 _call_mock_decrypt = mock_decrypt.call_args_list
1030 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1031 self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt))
1032
1033 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1034 def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt):
1035 """item is empty and fields exists."""
1036 input_item = {}
1037 fields = ["secret", "cacert"]
Gulsum Aticia06b8542023-05-09 13:42:13 +03001038 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001039 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1040 )
1041 self.assertEqual(input_item, {})
1042 mock_decrypt.assert_not_called()
1043
1044 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1045 def test_decrypt_fields_with_item_without_fields(self, mock_decrypt):
1046 """item exists and fields is empty."""
1047 input_item = copy.deepcopy(item)
1048 fields = []
Gulsum Aticia06b8542023-05-09 13:42:13 +03001049 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001050 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1051 )
1052 self.assertEqual(input_item, item)
1053 mock_decrypt.assert_not_called()
1054
1055 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1056 def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt):
1057 """item exists and field has single value."""
1058 mock_decrypt.return_value = decrypted_val1
1059 fields = ["secret"]
1060 input_item = copy.deepcopy(item)
1061 expected_item = {
1062 "secret": decrypted_val1,
1063 "cacert": "mycacert",
1064 "path": "/var",
1065 "ip": "192.168.12.23",
1066 }
Gulsum Aticia06b8542023-05-09 13:42:13 +03001067 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001068 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1069 )
1070 self.assertEqual(input_item, expected_item)
1071 _call_mock_decrypt = mock_decrypt.call_args_list
1072 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1073
1074 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1075 def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
1076 self, mock_decrypt
1077 ):
1078 """item exists and field has single value, salt is None, schema version is 1.0."""
1079 schema_version = "1.0"
1080 salt = None
1081 mock_decrypt.return_value = "mysecret"
1082 input_item = copy.deepcopy(item)
1083 fields = ["secret"]
Gulsum Aticia06b8542023-05-09 13:42:13 +03001084 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001085 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1086 )
1087 self.assertEqual(input_item, item)
1088 _call_mock_decrypt = mock_decrypt.call_args_list
1089 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None))
1090
1091 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1092 def test_decrypt_fields_decrypt_raises(self, mock_decrypt):
1093 """Method decrypt raises error."""
1094 mock_decrypt.side_effect = DbException(
1095 "Incorrect data type: type(None), string is expected."
1096 )
1097 fields = ["secret"]
1098 input_item = copy.deepcopy(item)
1099 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001100 asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001101 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1102 )
1103 self.assertEqual(
1104 str(error.exception),
1105 "database exception Incorrect data type: type(None), string is expected.",
1106 )
1107 self.assertEqual(input_item, item)
1108 _call_mock_decrypt = mock_decrypt.call_args_list
1109 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1110
1111 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1112 @patch.object(Encryption, "_encrypt_value")
1113 def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
1114 """Method decrypt raises error."""
1115 mock_encrypt_value.return_value = encyrpted_value
Gulsum Aticia06b8542023-05-09 13:42:13 +03001116 result = asyncio.run(self.encryption.encrypt(value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001117 self.assertEqual(result, encyrpted_value)
1118 mock_get_secret_key.assert_called_once()
1119 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1120
1121 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1122 @patch.object(Encryption, "_encrypt_value")
1123 def test_encrypt_get_secret_key_raises(
1124 self, mock_encrypt_value, mock_get_secret_key
1125 ):
1126 """Method get_secret_key raises error."""
1127 mock_get_secret_key.side_effect = DbException("Unexpected type.")
1128 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001129 asyncio.run(self.encryption.encrypt(value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001130 self.assertEqual(str(error.exception), "database exception Unexpected type.")
1131 mock_get_secret_key.assert_called_once()
1132 mock_encrypt_value.assert_not_called()
1133
1134 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1135 @patch.object(Encryption, "_encrypt_value")
1136 def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
1137 """Method _encrypt raises error."""
1138 mock_encrypt_value.side_effect = TypeError(
1139 "A bytes-like object is required, not 'str'"
1140 )
1141 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001142 asyncio.run(self.encryption.encrypt(value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001143 self.assertEqual(
1144 str(error.exception), "A bytes-like object is required, not 'str'"
1145 )
1146 mock_get_secret_key.assert_called_once()
1147 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1148
1149 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1150 @patch.object(Encryption, "_decrypt_value")
1151 def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
1152 """Decrypted successfully."""
1153 mock_decrypt_value.return_value = value
Gulsum Aticia06b8542023-05-09 13:42:13 +03001154 result = asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001155 self.encryption.decrypt(encyrpted_value, schema_version, salt)
1156 )
1157 self.assertEqual(result, value)
1158 mock_get_secret_key.assert_called_once()
1159 mock_decrypt_value.assert_called_once_with(
1160 encyrpted_value, schema_version, salt
1161 )
1162
1163 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1164 @patch.object(Encryption, "_decrypt_value")
1165 def test_decrypt_get_secret_key_raises(
1166 self, mock_decrypt_value, mock_get_secret_key
1167 ):
1168 """Method get_secret_key raises error."""
1169 mock_get_secret_key.side_effect = DbException("Unexpected type.")
1170 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001171 asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001172 self.assertEqual(str(error.exception), "database exception Unexpected type.")
1173 mock_get_secret_key.assert_called_once()
1174 mock_decrypt_value.assert_not_called()
1175
1176 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1177 @patch.object(Encryption, "_decrypt_value")
1178 def test_decrypt_decrypt_value_raises(
1179 self, mock_decrypt_value, mock_get_secret_key
1180 ):
1181 """Method get_secret_key raises error."""
1182 mock_decrypt_value.side_effect = TypeError(
1183 "A bytes-like object is required, not 'str'"
1184 )
1185 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001186 asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001187 self.assertEqual(
1188 str(error.exception), "A bytes-like object is required, not 'str'"
1189 )
1190 mock_get_secret_key.assert_called_once()
1191 mock_decrypt_value.assert_called_once_with(
1192 encyrpted_value, schema_version, salt
1193 )
1194
1195 def test_join_keys_string_key(self):
1196 """key is string."""
1197 string_key = "sample key"
1198 result = self.encryption._join_keys(string_key, secret_key)
1199 self.assertEqual(result, joined_key)
1200 self.assertTrue(isinstance(result, bytes))
1201
1202 def test_join_keys_bytes_key(self):
1203 """key is bytes."""
1204 bytes_key = b"sample key"
1205 result = self.encryption._join_keys(bytes_key, secret_key)
1206 self.assertEqual(result, joined_key)
1207 self.assertTrue(isinstance(result, bytes))
1208 self.assertEqual(len(result.decode("unicode_escape")), 32)
1209
1210 def test_join_keys_int_key(self):
1211 """key is int."""
1212 int_key = 923
1213 with self.assertRaises(Exception) as error:
1214 self.encryption._join_keys(int_key, None)
1215 self.assertEqual(str(error.exception), "'int' object is not iterable")
1216
1217 def test_join_keys_none_secret_key(self):
1218 """key is as bytes and secret key is None."""
1219 bytes_key = b"sample key"
1220 result = self.encryption._join_keys(bytes_key, None)
1221 self.assertEqual(
1222 result,
1223 b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
1224 )
1225 self.assertTrue(isinstance(result, bytes))
1226 self.assertEqual(len(result.decode("unicode_escape")), 32)
1227
1228 def test_join_keys_none_key_none_secret_key(self):
1229 """key is None and secret key is None."""
1230 with self.assertRaises(Exception) as error:
1231 self.encryption._join_keys(None, None)
1232 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1233
1234 def test_join_keys_none_key(self):
1235 """key is None and secret key exists."""
1236 with self.assertRaises(Exception) as error:
1237 self.encryption._join_keys(None, secret_key)
1238 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1239
1240 @patch.object(Encryption, "_join_keys")
1241 def test_join_secret_key_string_sample_key(self, mock_join_keys):
1242 """key is None and secret key exists as string."""
1243 update_key = "sample key"
1244 mock_join_keys.return_value = joined_key
1245 result = self.encryption._join_secret_key(update_key)
1246 self.assertEqual(result, joined_key)
1247 self.assertTrue(isinstance(result, bytes))
1248 mock_join_keys.assert_called_once_with(update_key, secret_key)
1249
1250 @patch.object(Encryption, "_join_keys")
1251 def test_join_secret_key_byte_sample_key(self, mock_join_keys):
1252 """key is None and secret key exists as bytes."""
1253 update_key = b"sample key"
1254 mock_join_keys.return_value = joined_key
1255 result = self.encryption._join_secret_key(update_key)
1256 self.assertEqual(result, joined_key)
1257 self.assertTrue(isinstance(result, bytes))
1258 mock_join_keys.assert_called_once_with(update_key, secret_key)
1259
1260 @patch.object(Encryption, "_join_keys")
1261 def test_join_secret_key_join_keys_raises(self, mock_join_keys):
1262 """Method _join_secret_key raises."""
1263 update_key = 3434
1264 mock_join_keys.side_effect = TypeError("'int' object is not iterable")
1265 with self.assertRaises(Exception) as error:
1266 self.encryption._join_secret_key(update_key)
1267 self.assertEqual(str(error.exception), "'int' object is not iterable")
1268 mock_join_keys.assert_called_once_with(update_key, secret_key)
1269
1270 @patch.object(Encryption, "_join_keys")
1271 def test_get_secret_key_exists(self, mock_join_keys):
1272 """secret_key exists."""
1273 self.encryption._secret_key = secret_key
Gulsum Aticia06b8542023-05-09 13:42:13 +03001274 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001275 self.assertEqual(self.encryption.secret_key, secret_key)
1276 mock_join_keys.assert_not_called()
1277
1278 @patch.object(Encryption, "_join_keys")
1279 @patch("osm_common.dbbase.b64decode")
1280 def test_get_secret_key_not_exist_database_key_exist(
1281 self, mock_b64decode, mock_join_keys
1282 ):
1283 """secret_key does not exist, database key exists."""
1284 self.encryption._secret_key = None
1285 self.encryption._admin_collection.find_one.return_value = None
1286 self.encryption._config = {"database_commonkey": "osm_new_key"}
1287 mock_join_keys.return_value = joined_key
Gulsum Aticia06b8542023-05-09 13:42:13 +03001288 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001289 self.assertEqual(self.encryption.secret_key, joined_key)
1290 self.assertEqual(mock_join_keys.call_count, 1)
1291 mock_b64decode.assert_not_called()
1292
1293 @patch.object(Encryption, "_join_keys")
1294 @patch("osm_common.dbbase.b64decode")
1295 def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
1296 self, mock_b64decode, mock_join_keys
1297 ):
1298 """secret_key does not exist, database key exists."""
1299 self.encryption._secret_key = None
1300 self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
1301 self.encryption._config = {"database_commonkey": "osm_new_key"}
1302 mock_join_keys.return_value = joined_key
Gulsum Aticia06b8542023-05-09 13:42:13 +03001303 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001304 self.assertEqual(self.encryption.secret_key, joined_key)
1305 self.assertEqual(mock_join_keys.call_count, 1)
1306 mock_b64decode.assert_not_called()
1307 self.encryption._admin_collection.find_one.assert_called_once_with(
1308 {"_id": "version"}
1309 )
1310 _call_mock_join_keys = mock_join_keys.call_args_list
1311 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1312
1313 @patch.object(Encryption, "_join_keys")
1314 @patch("osm_common.dbbase.b64decode")
1315 def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
1316 self, mock_b64decode, mock_join_keys
1317 ):
1318 """secret_key does not exist, database key exists, version and serial exist
1319 in admin collection."""
1320 self.encryption._secret_key = None
1321 self.encryption._admin_collection.find_one.return_value = {
1322 "version": "1.0",
1323 "serial": serial_bytes,
1324 }
1325 self.encryption._config = {"database_commonkey": "osm_new_key"}
1326 mock_join_keys.side_effect = [secret_key, joined_key]
1327 mock_b64decode.return_value = base64_decoded_serial
Gulsum Aticia06b8542023-05-09 13:42:13 +03001328 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001329 self.assertEqual(self.encryption.secret_key, joined_key)
1330 self.assertEqual(mock_join_keys.call_count, 2)
1331 mock_b64decode.assert_called_once_with(serial_bytes)
1332 self.encryption._admin_collection.find_one.assert_called_once_with(
1333 {"_id": "version"}
1334 )
1335 _call_mock_join_keys = mock_join_keys.call_args_list
1336 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1337 self.assertEqual(
1338 _call_mock_join_keys[1].args, (base64_decoded_serial, secret_key)
1339 )
1340
1341 @patch.object(Encryption, "_join_keys")
1342 @patch("osm_common.dbbase.b64decode")
1343 def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys):
1344 """Method _join_keys raises."""
1345 self.encryption._secret_key = None
1346 self.encryption._admin_collection.find_one.return_value = {
1347 "version": "1.0",
1348 "serial": serial_bytes,
1349 }
1350 self.encryption._config = {"database_commonkey": "osm_new_key"}
1351 mock_join_keys.side_effect = DbException("Invalid data type.")
1352 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001353 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001354 self.assertEqual(str(error.exception), "database exception Invalid data type.")
1355 self.assertEqual(mock_join_keys.call_count, 1)
1356 check_if_assert_not_called(
1357 [mock_b64decode, self.encryption._admin_collection.find_one]
1358 )
1359 _call_mock_join_keys = mock_join_keys.call_args_list
1360 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1361
1362 @patch.object(Encryption, "_join_keys")
1363 @patch("osm_common.dbbase.b64decode")
1364 def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys):
1365 """Method b64decode raises."""
1366 self.encryption._secret_key = None
1367 self.encryption._admin_collection.find_one.return_value = {
1368 "version": "1.0",
1369 "serial": base64_decoded_serial,
1370 }
1371 self.encryption._config = {"database_commonkey": "osm_new_key"}
1372 mock_join_keys.return_value = secret_key
1373 mock_b64decode.side_effect = TypeError(
1374 "A bytes-like object is required, not 'str'"
1375 )
1376 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001377 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001378 self.assertEqual(
1379 str(error.exception), "A bytes-like object is required, not 'str'"
1380 )
1381 self.assertEqual(self.encryption.secret_key, None)
1382 self.assertEqual(mock_join_keys.call_count, 1)
1383 mock_b64decode.assert_called_once_with(base64_decoded_serial)
1384 self.encryption._admin_collection.find_one.assert_called_once_with(
1385 {"_id": "version"}
1386 )
1387 _call_mock_join_keys = mock_join_keys.call_args_list
1388 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1389
1390 @patch.object(Encryption, "_join_keys")
1391 @patch("osm_common.dbbase.b64decode")
1392 def test_get_secret_key_admin_collection_find_one_raises(
1393 self, mock_b64decode, mock_join_keys
1394 ):
1395 """admin_collection find_one raises."""
1396 self.encryption._secret_key = None
1397 self.encryption._admin_collection.find_one.side_effect = DbException(
1398 "Connection failed."
1399 )
1400 self.encryption._config = {"database_commonkey": "osm_new_key"}
1401 mock_join_keys.return_value = secret_key
1402 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001403 asyncio.run(self.encryption.get_secret_key())
Gulsum Atici76394ef2023-01-09 23:19:18 +03001404 self.assertEqual(str(error.exception), "database exception Connection failed.")
1405 self.assertEqual(self.encryption.secret_key, None)
1406 self.assertEqual(mock_join_keys.call_count, 1)
1407 mock_b64decode.assert_not_called()
1408 self.encryption._admin_collection.find_one.assert_called_once_with(
1409 {"_id": "version"}
1410 )
1411 _call_mock_join_keys = mock_join_keys.call_args_list
1412 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1413
1414 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
1415 """Encrypt and decrypt with schema version 1.1, salt exists."""
Gulsum Aticia06b8542023-05-09 13:42:13 +03001416 encrypted_msg = asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001417 self.encryption.encrypt(value, schema_version, salt)
1418 )
Gulsum Aticia06b8542023-05-09 13:42:13 +03001419 decrypted_msg = asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001420 self.encryption.decrypt(encrypted_msg, schema_version, salt)
1421 )
1422 self.assertEqual(value, decrypted_msg)
1423
1424 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
1425 """Encrypt and decrypt with schema version 1.0, salt exists."""
1426 schema_version = "1.0"
Gulsum Aticia06b8542023-05-09 13:42:13 +03001427 encrypted_msg = asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001428 self.encryption.encrypt(value, schema_version, salt)
1429 )
Gulsum Aticia06b8542023-05-09 13:42:13 +03001430 decrypted_msg = asyncio.run(
Gulsum Atici76394ef2023-01-09 23:19:18 +03001431 self.encryption.decrypt(encrypted_msg, schema_version, salt)
1432 )
1433 self.assertEqual(value, decrypted_msg)
1434
1435 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
1436 """Encrypt and decrypt with schema version 1.1, without salt."""
1437 salt = None
1438 with self.assertRaises(Exception) as error:
Gulsum Aticia06b8542023-05-09 13:42:13 +03001439 asyncio.run(self.encryption.encrypt(value, schema_version, salt))
Gulsum Atici76394ef2023-01-09 23:19:18 +03001440 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1441
1442
tiernob3e750b2018-09-05 11:25:23 +02001443class TestDeepUpdate(unittest.TestCase):
1444 def test_update_dict(self):
1445 # Original, patch, expected result
1446 TEST = (
1447 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
1448 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
1449 ({"a": "b"}, {"a": None}, {}),
1450 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
1451 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
1452 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
1453 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
1454 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
1455 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
1456 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
1457 ({1: {"a": "foo"}}, {1: None}, {}),
1458 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
1459 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
1460 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
1461 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
1462 )
1463 for t in TEST:
1464 deep_update(t[0], t[1])
1465 self.assertEqual(t[0], t[2])
1466 # test deepcopy is done. So that original dictionary does not reference the pach
1467 test_original = {1: {"a": "b"}}
1468 test_patch = {1: {"c": {"d": "e"}}}
1469 test_result = {1: {"a": "b", "c": {"d": "e"}}}
1470 deep_update(test_original, test_patch)
1471 self.assertEqual(test_original, test_result)
1472 test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
1473 self.assertEqual(test_original, test_result)
1474
1475 def test_update_array(self):
1476 # This TEST contains a list with the the Original, patch, and expected result
1477 TEST = (
1478 # delete all instances of "a"/"d"
1479 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
1480 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
1481 # delete and insert at 0
garciadeblas2644b762021-03-24 09:21:01 +01001482 (
1483 {"A": ["a", "b", "c"]},
1484 {"A": {"$b": None, "$+[0]": "b"}},
1485 {"A": ["b", "a", "c"]},
1486 ),
tiernob3e750b2018-09-05 11:25:23 +02001487 # delete and edit
garciadeblas2644b762021-03-24 09:21:01 +01001488 (
1489 {"A": ["a", "b", "a"]},
1490 {"A": {"$a": None, "$[1]": {"c": "d"}}},
1491 {"A": [{"c": "d"}]},
1492 ),
tiernob3e750b2018-09-05 11:25:23 +02001493 # insert if not exist
1494 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
1495 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
1496 # edit by filter
garciadeblas2644b762021-03-24 09:21:01 +01001497 (
1498 {"A": ["a", "b", "a"]},
1499 {"A": {"$b": {"c": "d"}}},
1500 {"A": ["a", {"c": "d"}, "a"]},
1501 ),
1502 (
1503 {"A": ["a", "b", "a"]},
1504 {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
1505 {"A": ["b", "a", "a", "c"]},
1506 ),
tiernob3e750b2018-09-05 11:25:23 +02001507 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
1508 # index deletion out of range
1509 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
1510 # nested array->dict
garciadeblas2644b762021-03-24 09:21:01 +01001511 (
1512 {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
1513 {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
1514 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
1515 ),
1516 (
1517 {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
1518 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
1519 {
1520 "A": [
1521 {"id": 1, "c": {"d": "e", "f": "g"}},
1522 {"id": 1, "c": {"d": "e", "f": "g"}},
1523 ]
1524 },
1525 ),
tiernob3e750b2018-09-05 11:25:23 +02001526 # nested array->array
garciadeblas2644b762021-03-24 09:21:01 +01001527 (
1528 {"A": ["a", "b", ["a", "b"]]},
1529 {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
1530 {"A": ["a", ["a", {}, "c"]]},
1531 ),
tiernob3e750b2018-09-05 11:25:23 +02001532 # types str and int different, so not found
garciadeblas2644b762021-03-24 09:21:01 +01001533 (
1534 {"A": ["a", {"id": "1", "c": "d"}]},
1535 {"A": {"$id: 1": {"c": "e"}}},
1536 {"A": ["a", {"id": "1", "c": "d"}]},
1537 ),
tiernob3e750b2018-09-05 11:25:23 +02001538 )
1539 for t in TEST:
1540 print(t)
1541 deep_update(t[0], t[1])
1542 self.assertEqual(t[0], t[2])
1543
1544 def test_update_badformat(self):
1545 # This TEST contains original, incorrect patch and #TODO text that must be present
1546 TEST = (
1547 # conflict, index 0 is edited twice
1548 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
1549 # conflict, two insertions at same index
1550 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
1551 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
1552 # bad format keys with and without $
1553 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
1554 # bad format empty $ and yaml incorrect
1555 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
1556 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
1557 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
1558 # insertion of None
1559 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
1560 # Not found, insertion of None
1561 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
1562 # index edition out of range
1563 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
1564 # conflict, two editions on index 2
garciadeblas2644b762021-03-24 09:21:01 +01001565 (
1566 {"A": ["a", {"id": "1", "c": "d"}]},
1567 {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
1568 ),
tiernob3e750b2018-09-05 11:25:23 +02001569 )
1570 for t in TEST:
1571 print(t)
1572 self.assertRaises(DbException, deep_update, t[0], t[1])
1573 try:
1574 deep_update(t[0], t[1])
1575 except DbException as e:
1576 print(e)
tierno136f2952018-10-19 13:01:03 +02001577
1578
garciadeblas2644b762021-03-24 09:21:01 +01001579if __name__ == "__main__":
tierno136f2952018-10-19 13:01:03 +02001580 unittest.main()