1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
21 from copy
import deepcopy
23 from http
import HTTPStatus
25 from os
import urandom
27 from unittest
.mock
import MagicMock
, Mock
, patch
29 from Crypto
.Cipher
import AES
30 from osm_common
.dbbase
import DbBase
, DbException
, deep_update
, Encryption
34 # Variables used in TestBaseEncryption and TestAsyncEncryption
35 salt
= "1afd5d1a-4a7e-4d9c-8c65-251290183106"
36 value
= "private key txt"
37 padded_value
= b
"private key txt\0"
38 padded_encoded_value
= b
"private key txt\x00"
39 encoding_type
= "ascii"
40 encyrpt_mode
= AES
.MODE_ECB
41 secret_key
= b
"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
42 encyrpted_value
= "ZW5jcnlwdGVkIGRhdGE="
43 encyrpted_bytes
= b
"ZW5jcnlwdGVkIGRhdGE="
44 data_to_b4_encode
= b
"encrypted data"
45 b64_decoded
= b
"decrypted data"
46 schema_version
= "1.1"
47 joined_key
= b
"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
48 serial_bytes
= b
"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
49 base64_decoded_serial
= b
"g\xbe\xdb"
50 decrypted_val1
= "BiV9YZEuSRAudqvz7Gs+bg=="
51 decrypted_val2
= "q4LwnFdoryzbZJM5mCAnpA=="
56 "ip": "192.168.12.23",
60 def exception_message(message
):
61 return "database exception " + message
69 def test_constructor():
71 assert db_base
is not None
72 assert isinstance(db_base
, DbBase
)
75 def test_db_connect(db_base
):
76 with pytest
.raises(DbException
) as excinfo
:
77 db_base
.db_connect(None)
78 assert str(excinfo
.value
).startswith(
79 exception_message("Method 'db_connect' not implemented")
83 def test_db_disconnect(db_base
):
84 db_base
.db_disconnect()
87 def test_get_list(db_base
):
88 with pytest
.raises(DbException
) as excinfo
:
89 db_base
.get_list(None, None)
90 assert str(excinfo
.value
).startswith(
91 exception_message("Method 'get_list' not implemented")
93 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
96 def test_get_one(db_base
):
97 with pytest
.raises(DbException
) as excinfo
:
98 db_base
.get_one(None, None, None, None)
99 assert str(excinfo
.value
).startswith(
100 exception_message("Method 'get_one' not implemented")
102 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
105 def test_create(db_base
):
106 with pytest
.raises(DbException
) as excinfo
:
107 db_base
.create(None, None)
108 assert str(excinfo
.value
).startswith(
109 exception_message("Method 'create' not implemented")
111 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
114 def test_create_list(db_base
):
115 with pytest
.raises(DbException
) as excinfo
:
116 db_base
.create_list(None, None)
117 assert str(excinfo
.value
).startswith(
118 exception_message("Method 'create_list' not implemented")
120 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
123 def test_del_list(db_base
):
124 with pytest
.raises(DbException
) as excinfo
:
125 db_base
.del_list(None, None)
126 assert str(excinfo
.value
).startswith(
127 exception_message("Method 'del_list' not implemented")
129 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
132 def test_del_one(db_base
):
133 with pytest
.raises(DbException
) as excinfo
:
134 db_base
.del_one(None, None, None)
135 assert str(excinfo
.value
).startswith(
136 exception_message("Method 'del_one' not implemented")
138 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
141 class TestEncryption(unittest
.TestCase
):
143 master_key
= "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
147 # set self.secret_key obtained when connect
148 db_base1
.set_secret_key(master_key
, replace
=True)
149 db_base1
.set_secret_key(urandom(32))
150 db_base2
.set_secret_key(None, replace
=True)
151 db_base2
.set_secret_key(urandom(30))
152 db_base3
.set_secret_key(master_key
)
153 self
.db_bases
= [db_base1
, db_base2
, db_base3
]
155 def test_encrypt_decrypt(self
):
157 ("plain text 1 ! ", None),
158 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
160 for db_base
in self
.db_bases
:
161 for value
, salt
in TEST
:
163 encrypted
= db_base
.encrypt(value
, schema_version
="1.0", salt
=salt
)
165 encrypted
, value
, "value '{}' has been encrypted".format(value
)
167 decrypted
= db_base
.decrypt(encrypted
, schema_version
="1.0", salt
=salt
)
169 decrypted
, value
, "value '{}' has been decrypted".format(value
)
173 encrypted
= db_base
.encrypt(value
, schema_version
="1.1", salt
=salt
)
175 encrypted
, value
, "value '{}' has not been encrypted".format(value
)
177 self
.assertIsInstance(encrypted
, str, "Encrypted is not ascii text")
178 decrypted
= db_base
.decrypt(encrypted
, schema_version
="1.1", salt
=salt
)
180 decrypted
, value
, "value is not equal after encryption/decryption"
183 def test_encrypt_decrypt_salt(self
):
184 value
= "value to be encrypted!"
186 for db_base
in self
.db_bases
:
187 for salt
in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
190 db_base
.encrypt(value
, schema_version
="1.1", salt
=salt
)
195 "value '{}' has not been encrypted".format(value
),
197 self
.assertIsInstance(encrypted
[-1], str, "Encrypted is not ascii text")
198 decrypted
= db_base
.decrypt(
199 encrypted
[-1], schema_version
="1.1", salt
=salt
202 decrypted
, value
, "value is not equal after encryption/decryption"
204 for i
in range(0, len(encrypted
)):
205 for j
in range(i
+ 1, len(encrypted
)):
209 "encryption with different salt must contain different result",
211 # decrypt with a different master key
213 decrypted
= self
.db_bases
[-1].decrypt(
214 encrypted
[0], schema_version
="1.1", salt
=None
219 "Decryption with different KEY must generate different result",
221 except DbException
as e
:
224 HTTPStatus
.INTERNAL_SERVER_ERROR
,
225 "Decryption with different KEY does not provide expected http_code",
229 class AsyncMock(MagicMock
):
230 async def __call__(self
, *args
, **kwargs
):
231 args
= deepcopy(args
)
232 kwargs
= deepcopy(kwargs
)
233 return super(AsyncMock
, self
).__call
__(*args
, **kwargs
)
236 class CopyingMock(MagicMock
):
237 def __call__(self
, *args
, **kwargs
):
238 args
= deepcopy(args
)
239 kwargs
= deepcopy(kwargs
)
240 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
243 def check_if_assert_not_called(mocks
: list):
244 for mocking
in mocks
:
245 mocking
.assert_not_called()
248 class TestBaseEncryption(unittest
.TestCase
):
249 @patch("logging.getLogger", autospec
=True)
250 def setUp(self
, mock_logger
):
251 mock_logger
= logging
.getLogger()
252 mock_logger
.disabled
= True
253 self
.db_base
= DbBase()
254 self
.mock_cipher
= CopyingMock()
255 self
.db_base
.encoding_type
= encoding_type
256 self
.db_base
.encrypt_mode
= encyrpt_mode
257 self
.db_base
.secret_key
= secret_key
258 self
.mock_padded_msg
= CopyingMock()
260 def test_pad_data_len_not_multiplication_of_16(self
):
261 data
= "hello word hello hello word hello word"
264 padded
= self
.db_base
.pad_data(data
)
265 self
.assertEqual(len(padded
), expected_len
)
266 self
.assertTrue("\0" * (expected_len
- data_len
) in padded
)
268 def test_pad_data_len_multiplication_of_16(self
):
269 data
= "hello word!!!!!!"
270 padded
= self
.db_base
.pad_data(data
)
271 self
.assertEqual(padded
, data
)
272 self
.assertFalse("\0" in padded
)
274 def test_pad_data_empty_string(self
):
277 padded
= self
.db_base
.pad_data(data
)
278 self
.assertEqual(len(padded
), expected_len
)
279 self
.assertFalse("\0" in padded
)
281 def test_pad_data_not_string(self
):
283 with self
.assertRaises(Exception) as err
:
284 self
.db_base
.pad_data(data
)
287 "database exception Incorrect data type: type(None), string is expected.",
290 def test_unpad_data_null_char_at_right(self
):
291 null_padded_data
= "hell0word\0\0"
292 expected_length
= len(null_padded_data
) - 2
293 unpadded
= self
.db_base
.unpad_data(null_padded_data
)
294 self
.assertEqual(len(unpadded
), expected_length
)
295 self
.assertFalse("\0" in unpadded
)
296 self
.assertTrue("0" in unpadded
)
298 def test_unpad_data_null_char_is_not_rightest(self
):
299 null_padded_data
= "hell0word\r\t\0\n"
300 expected_length
= len(null_padded_data
)
301 unpadded
= self
.db_base
.unpad_data(null_padded_data
)
302 self
.assertEqual(len(unpadded
), expected_length
)
303 self
.assertTrue("\0" in unpadded
)
305 def test_unpad_data_with_spaces_at_right(self
):
306 null_padded_data
= " hell0word\0 "
307 expected_length
= len(null_padded_data
)
308 unpadded
= self
.db_base
.unpad_data(null_padded_data
)
309 self
.assertEqual(len(unpadded
), expected_length
)
310 self
.assertTrue("\0" in unpadded
)
312 def test_unpad_data_empty_string(self
):
314 unpadded
= self
.db_base
.unpad_data(data
)
315 self
.assertEqual(unpadded
, "")
316 self
.assertFalse("\0" in unpadded
)
318 def test_unpad_data_not_string(self
):
320 with self
.assertRaises(Exception) as err
:
321 self
.db_base
.unpad_data(data
)
324 "database exception Incorrect data type: type(None), string is expected.",
327 @patch.object(DbBase
, "_join_secret_key")
328 @patch.object(DbBase
, "pad_data")
329 def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
330 self
, mock_pad_data
, mock_join_secret_key
332 """schema_version 1.0, secret_key is None and salt is None."""
333 schema_version
= "1.0"
335 self
.db_base
.secret_key
= None
336 result
= self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
337 self
.assertEqual(result
, value
)
338 check_if_assert_not_called([mock_pad_data
, mock_join_secret_key
])
340 @patch("osm_common.dbbase.b64encode")
341 @patch("osm_common.dbbase.AES")
342 @patch.object(DbBase
, "_join_secret_key")
343 @patch.object(DbBase
, "pad_data")
344 def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
347 mock_join_secret_key
,
351 """schema_version 1.1, secret_key exists, salt exists."""
352 mock_aes
.new
.return_value
= self
.mock_cipher
353 self
.mock_cipher
.encrypt
.return_value
= data_to_b4_encode
354 self
.mock_padded_msg
.return_value
= padded_value
355 mock_pad_data
.return_value
= self
.mock_padded_msg
356 self
.mock_padded_msg
.encode
.return_value
= padded_encoded_value
358 mock_b64_encode
.return_value
= encyrpted_bytes
360 result
= self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
362 self
.assertTrue(isinstance(result
, str))
363 self
.assertEqual(result
, encyrpted_value
)
364 mock_join_secret_key
.assert_called_once_with(salt
)
365 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
366 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
367 mock_pad_data
.assert_called_once_with(value
)
368 mock_b64_encode
.assert_called_once_with(data_to_b4_encode
)
369 self
.mock_cipher
.encrypt
.assert_called_once_with(padded_encoded_value
)
370 self
.mock_padded_msg
.encode
.assert_called_with(encoding_type
)
372 @patch.object(DbBase
, "_join_secret_key")
373 @patch.object(DbBase
, "pad_data")
374 def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
375 self
, mock_pad_data
, mock_join_secret_key
377 """schema_version 1.0, secret_key is None, salt exists."""
378 schema_version
= "1.0"
379 self
.db_base
.secret_key
= None
380 result
= self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
381 self
.assertEqual(result
, value
)
382 check_if_assert_not_called([mock_pad_data
, mock_join_secret_key
])
384 @patch.object(DbBase
, "_join_secret_key")
385 @patch.object(DbBase
, "pad_data")
386 def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
387 self
, mock_pad_data
, mock_join_secret_key
389 """schema_version 1.1, secret_key is None, salt exists."""
390 self
.db_base
.secret_key
= None
391 result
= self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
392 self
.assertEqual(result
, value
)
393 check_if_assert_not_called([mock_pad_data
, mock_join_secret_key
])
395 @patch("osm_common.dbbase.b64encode")
396 @patch("osm_common.dbbase.AES")
397 @patch.object(DbBase
, "_join_secret_key")
398 @patch.object(DbBase
, "pad_data")
399 def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
402 mock_join_secret_key
,
406 """schema_version 1.1, secret_key exists, salt is None."""
408 mock_aes
.new
.return_value
= self
.mock_cipher
409 self
.mock_cipher
.encrypt
.return_value
= data_to_b4_encode
411 self
.mock_padded_msg
.return_value
= padded_value
412 mock_pad_data
.return_value
= self
.mock_padded_msg
413 self
.mock_padded_msg
.encode
.return_value
= padded_encoded_value
415 mock_b64_encode
.return_value
= encyrpted_bytes
417 result
= self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
419 self
.assertEqual(result
, encyrpted_value
)
420 mock_join_secret_key
.assert_called_once_with(salt
)
421 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
422 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
423 mock_pad_data
.assert_called_once_with(value
)
424 mock_b64_encode
.assert_called_once_with(data_to_b4_encode
)
425 self
.mock_cipher
.encrypt
.assert_called_once_with(padded_encoded_value
)
426 self
.mock_padded_msg
.encode
.assert_called_with(encoding_type
)
428 @patch("osm_common.dbbase.b64encode")
429 @patch("osm_common.dbbase.AES")
430 @patch.object(DbBase
, "_join_secret_key")
431 @patch.object(DbBase
, "pad_data")
432 def test__encrypt_value_invalid_encrpt_mode(
435 mock_join_secret_key
,
439 """encrypt_mode is invalid."""
440 mock_aes
.new
.side_effect
= Exception("Invalid ciphering mode.")
441 self
.db_base
.encrypt_mode
= "AES.MODE_XXX"
443 with self
.assertRaises(Exception) as err
:
444 self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
446 self
.assertEqual(str(err
.exception
), "Invalid ciphering mode.")
447 mock_join_secret_key
.assert_called_once_with(salt
)
448 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
449 self
.assertEqual(_call_mock_aes_new
[1], "AES.MODE_XXX")
450 check_if_assert_not_called([mock_pad_data
, mock_b64_encode
])
452 @patch("osm_common.dbbase.b64encode")
453 @patch("osm_common.dbbase.AES")
454 @patch.object(DbBase
, "_join_secret_key")
455 @patch.object(DbBase
, "pad_data")
456 def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
459 mock_join_secret_key
,
463 """schema_version 1.1, secret_key exists, value is None."""
465 mock_aes
.new
.return_value
= self
.mock_cipher
466 mock_pad_data
.side_effect
= DbException(
467 "Incorrect data type: type(None), string is expected."
470 with self
.assertRaises(Exception) as err
:
471 self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
474 "database exception Incorrect data type: type(None), string is expected.",
477 mock_join_secret_key
.assert_called_once_with(salt
)
478 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
479 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
480 mock_pad_data
.assert_called_once_with(value
)
481 check_if_assert_not_called(
482 [mock_b64_encode
, self
.mock_cipher
.encrypt
, mock_b64_encode
]
485 @patch("osm_common.dbbase.b64encode")
486 @patch("osm_common.dbbase.AES")
487 @patch.object(DbBase
, "_join_secret_key")
488 @patch.object(DbBase
, "pad_data")
489 def test__encrypt_value_join_secret_key_raises(
492 mock_join_secret_key
,
496 """Method join_secret_key raises DbException."""
497 salt
= b
"3434o34-3wewrwr-222424-2242dwew"
499 mock_join_secret_key
.side_effect
= DbException("Unexpected type")
501 mock_aes
.new
.return_value
= self
.mock_cipher
503 with self
.assertRaises(Exception) as err
:
504 self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
506 self
.assertEqual(str(err
.exception
), "database exception Unexpected type")
507 check_if_assert_not_called(
508 [mock_pad_data
, mock_aes
.new
, mock_b64_encode
, self
.mock_cipher
.encrypt
]
510 mock_join_secret_key
.assert_called_once_with(salt
)
512 @patch("osm_common.dbbase.b64encode")
513 @patch("osm_common.dbbase.AES")
514 @patch.object(DbBase
, "_join_secret_key")
515 @patch.object(DbBase
, "pad_data")
516 def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
519 mock_join_secret_key
,
523 """schema_version 1.1, secret_key exists, b64encode raises TypeError."""
524 mock_aes
.new
.return_value
= self
.mock_cipher
525 self
.mock_cipher
.encrypt
.return_value
= "encrypted data"
527 self
.mock_padded_msg
.return_value
= padded_value
528 mock_pad_data
.return_value
= self
.mock_padded_msg
529 self
.mock_padded_msg
.encode
.return_value
= padded_encoded_value
531 mock_b64_encode
.side_effect
= TypeError(
532 "A bytes-like object is required, not 'str'"
535 with self
.assertRaises(Exception) as error
:
536 self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
538 str(error
.exception
), "A bytes-like object is required, not 'str'"
540 mock_join_secret_key
.assert_called_once_with(salt
)
541 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
542 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
543 mock_pad_data
.assert_called_once_with(value
)
544 self
.mock_cipher
.encrypt
.assert_called_once_with(padded_encoded_value
)
545 self
.mock_padded_msg
.encode
.assert_called_with(encoding_type
)
546 mock_b64_encode
.assert_called_once_with("encrypted data")
548 @patch("osm_common.dbbase.b64encode")
549 @patch("osm_common.dbbase.AES")
550 @patch.object(DbBase
, "_join_secret_key")
551 @patch.object(DbBase
, "pad_data")
552 def test__encrypt_value_cipher_encrypt_raises(
555 mock_join_secret_key
,
559 """AES encrypt method raises Exception."""
560 mock_aes
.new
.return_value
= self
.mock_cipher
561 self
.mock_cipher
.encrypt
.side_effect
= Exception("Invalid data type.")
563 self
.mock_padded_msg
.return_value
= padded_value
564 mock_pad_data
.return_value
= self
.mock_padded_msg
565 self
.mock_padded_msg
.encode
.return_value
= padded_encoded_value
567 with self
.assertRaises(Exception) as error
:
568 self
.db_base
._encrypt
_value
(value
, schema_version
, salt
)
570 self
.assertEqual(str(error
.exception
), "Invalid data type.")
571 mock_join_secret_key
.assert_called_once_with(salt
)
572 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
573 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
574 mock_pad_data
.assert_called_once_with(value
)
575 self
.mock_cipher
.encrypt
.assert_called_once_with(padded_encoded_value
)
576 self
.mock_padded_msg
.encode
.assert_called_with(encoding_type
)
577 mock_b64_encode
.assert_not_called()
579 @patch.object(DbBase
, "get_secret_key")
580 @patch.object(DbBase
, "_encrypt_value")
581 def test_encrypt_without_schema_version_without_salt(
582 self
, mock_encrypt_value
, mock_get_secret_key
584 """schema and salt is None."""
585 mock_encrypt_value
.return_value
= encyrpted_value
586 result
= self
.db_base
.encrypt(value
)
587 mock_encrypt_value
.assert_called_once_with(value
, None, None)
588 mock_get_secret_key
.assert_called_once()
589 self
.assertEqual(result
, encyrpted_value
)
591 @patch.object(DbBase
, "get_secret_key")
592 @patch.object(DbBase
, "_encrypt_value")
593 def test_encrypt_with_schema_version_with_salt(
594 self
, mock_encrypt_value
, mock_get_secret_key
596 """schema version exists, salt is None."""
597 mock_encrypt_value
.return_value
= encyrpted_value
598 result
= self
.db_base
.encrypt(value
, schema_version
, salt
)
599 mock_encrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
600 mock_get_secret_key
.assert_called_once()
601 self
.assertEqual(result
, encyrpted_value
)
603 @patch.object(DbBase
, "get_secret_key")
604 @patch.object(DbBase
, "_encrypt_value")
605 def test_encrypt_get_secret_key_raises(
606 self
, mock_encrypt_value
, mock_get_secret_key
608 """get_secret_key method raises DbException."""
609 mock_get_secret_key
.side_effect
= DbException("KeyError")
610 with self
.assertRaises(Exception) as error
:
611 self
.db_base
.encrypt(value
)
612 self
.assertEqual(str(error
.exception
), "database exception KeyError")
613 mock_encrypt_value
.assert_not_called()
614 mock_get_secret_key
.assert_called_once()
616 @patch.object(DbBase
, "get_secret_key")
617 @patch.object(DbBase
, "_encrypt_value")
618 def test_encrypt_encrypt_raises(self
, mock_encrypt_value
, mock_get_secret_key
):
619 """_encrypt method raises DbException."""
620 mock_encrypt_value
.side_effect
= DbException(
621 "Incorrect data type: type(None), string is expected."
623 with self
.assertRaises(Exception) as error
:
624 self
.db_base
.encrypt(value
, schema_version
, salt
)
626 str(error
.exception
),
627 "database exception Incorrect data type: type(None), string is expected.",
629 mock_encrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
630 mock_get_secret_key
.assert_called_once()
632 @patch("osm_common.dbbase.b64decode")
633 @patch("osm_common.dbbase.AES")
634 @patch.object(DbBase
, "_join_secret_key")
635 @patch.object(DbBase
, "unpad_data")
636 def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
639 mock_join_secret_key
,
643 """schema_version 1.1, secret_key exists, salt is None."""
645 mock_aes
.new
.return_value
= self
.mock_cipher
646 self
.mock_cipher
.decrypt
.return_value
= padded_encoded_value
648 mock_b64_decode
.return_value
= b64_decoded
650 mock_unpad_data
.return_value
= value
652 result
= self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
653 self
.assertEqual(result
, value
)
655 mock_join_secret_key
.assert_called_once_with(salt
)
656 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
657 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
658 mock_unpad_data
.assert_called_once_with("private key txt\0")
659 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
660 self
.mock_cipher
.decrypt
.assert_called_once_with(b64_decoded
)
662 @patch("osm_common.dbbase.b64decode")
663 @patch("osm_common.dbbase.AES")
664 @patch.object(DbBase
, "_join_secret_key")
665 @patch.object(DbBase
, "unpad_data")
666 def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
669 mock_join_secret_key
,
673 """schema_version 1.1, secret_key exists, salt is None."""
674 mock_aes
.new
.return_value
= self
.mock_cipher
675 self
.mock_cipher
.decrypt
.return_value
= padded_encoded_value
677 mock_b64_decode
.return_value
= b64_decoded
679 mock_unpad_data
.return_value
= value
681 result
= self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
682 self
.assertEqual(result
, value
)
684 mock_join_secret_key
.assert_called_once_with(salt
)
685 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
686 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
687 mock_unpad_data
.assert_called_once_with("private key txt\0")
688 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
689 self
.mock_cipher
.decrypt
.assert_called_once_with(b64_decoded
)
691 @patch("osm_common.dbbase.b64decode")
692 @patch("osm_common.dbbase.AES")
693 @patch.object(DbBase
, "_join_secret_key")
694 @patch.object(DbBase
, "unpad_data")
695 def test__decrypt_value_schema_version_1_1_without_secret_key(
698 mock_join_secret_key
,
702 """schema_version 1.1, secret_key is None, salt exists."""
703 self
.db_base
.secret_key
= None
705 result
= self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
707 self
.assertEqual(result
, encyrpted_value
)
708 check_if_assert_not_called(
710 mock_join_secret_key
,
714 self
.mock_cipher
.decrypt
,
718 @patch("osm_common.dbbase.b64decode")
719 @patch("osm_common.dbbase.AES")
720 @patch.object(DbBase
, "_join_secret_key")
721 @patch.object(DbBase
, "unpad_data")
722 def test__decrypt_value_schema_version_1_0_with_secret_key(
725 mock_join_secret_key
,
729 """schema_version 1.0, secret_key exists, salt exists."""
730 schema_version
= "1.0"
731 result
= self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
733 self
.assertEqual(result
, encyrpted_value
)
734 check_if_assert_not_called(
736 mock_join_secret_key
,
740 self
.mock_cipher
.decrypt
,
744 @patch("osm_common.dbbase.b64decode")
745 @patch("osm_common.dbbase.AES")
746 @patch.object(DbBase
, "_join_secret_key")
747 @patch.object(DbBase
, "unpad_data")
748 def test__decrypt_value_join_secret_key_raises(
751 mock_join_secret_key
,
755 """_join_secret_key raises TypeError."""
757 mock_join_secret_key
.side_effect
= TypeError("'type' object is not iterable")
759 with self
.assertRaises(Exception) as error
:
760 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
761 self
.assertEqual(str(error
.exception
), "'type' object is not iterable")
763 mock_join_secret_key
.assert_called_once_with(salt
)
764 check_if_assert_not_called(
765 [mock_aes
.new
, mock_unpad_data
, mock_b64_decode
, self
.mock_cipher
.decrypt
]
768 @patch("osm_common.dbbase.b64decode")
769 @patch("osm_common.dbbase.AES")
770 @patch.object(DbBase
, "_join_secret_key")
771 @patch.object(DbBase
, "unpad_data")
772 def test__decrypt_value_b64decode_raises(
775 mock_join_secret_key
,
779 """b64decode raises TypeError."""
780 mock_b64_decode
.side_effect
= TypeError(
781 "A str-like object is required, not 'bytes'"
783 with self
.assertRaises(Exception) as error
:
784 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
786 str(error
.exception
), "A str-like object is required, not 'bytes'"
789 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
790 mock_join_secret_key
.assert_called_once_with(salt
)
791 check_if_assert_not_called(
792 [mock_aes
.new
, self
.mock_cipher
.decrypt
, mock_unpad_data
]
795 @patch("osm_common.dbbase.b64decode")
796 @patch("osm_common.dbbase.AES")
797 @patch.object(DbBase
, "_join_secret_key")
798 @patch.object(DbBase
, "unpad_data")
799 def test__decrypt_value_invalid_encrypt_mode(
802 mock_join_secret_key
,
806 """Invalid AES encrypt mode."""
807 mock_aes
.new
.side_effect
= Exception("Invalid ciphering mode.")
808 self
.db_base
.encrypt_mode
= "AES.MODE_XXX"
810 mock_b64_decode
.return_value
= b64_decoded
811 with self
.assertRaises(Exception) as error
:
812 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
814 self
.assertEqual(str(error
.exception
), "Invalid ciphering mode.")
815 mock_join_secret_key
.assert_called_once_with(salt
)
816 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
817 self
.assertEqual(_call_mock_aes_new
[1], "AES.MODE_XXX")
818 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
819 check_if_assert_not_called([mock_unpad_data
, self
.mock_cipher
.decrypt
])
821 @patch("osm_common.dbbase.b64decode")
822 @patch("osm_common.dbbase.AES")
823 @patch.object(DbBase
, "_join_secret_key")
824 @patch.object(DbBase
, "unpad_data")
825 def test__decrypt_value_cipher_decrypt_raises(
828 mock_join_secret_key
,
832 """AES decrypt raises Exception."""
833 mock_b64_decode
.return_value
= b64_decoded
835 mock_aes
.new
.return_value
= self
.mock_cipher
836 self
.mock_cipher
.decrypt
.side_effect
= Exception("Invalid data type.")
838 with self
.assertRaises(Exception) as error
:
839 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
840 self
.assertEqual(str(error
.exception
), "Invalid data type.")
842 mock_join_secret_key
.assert_called_once_with(salt
)
843 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
844 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
845 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
846 self
.mock_cipher
.decrypt
.assert_called_once_with(b64_decoded
)
847 mock_unpad_data
.assert_not_called()
849 @patch("osm_common.dbbase.b64decode")
850 @patch("osm_common.dbbase.AES")
851 @patch.object(DbBase
, "_join_secret_key")
852 @patch.object(DbBase
, "unpad_data")
853 def test__decrypt_value_decode_raises(
856 mock_join_secret_key
,
860 """Decode raises UnicodeDecodeError."""
861 mock_aes
.new
.return_value
= self
.mock_cipher
862 self
.mock_cipher
.decrypt
.return_value
= b
"\xd0\x000091"
864 mock_b64_decode
.return_value
= b64_decoded
866 mock_unpad_data
.return_value
= value
867 with self
.assertRaises(Exception) as error
:
868 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
870 str(error
.exception
),
871 "database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
873 self
.assertEqual(type(error
.exception
), DbException
)
874 mock_join_secret_key
.assert_called_once_with(salt
)
875 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
876 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
877 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
878 self
.mock_cipher
.decrypt
.assert_called_once_with(b64_decoded
)
879 mock_unpad_data
.assert_not_called()
881 @patch("osm_common.dbbase.b64decode")
882 @patch("osm_common.dbbase.AES")
883 @patch.object(DbBase
, "_join_secret_key")
884 @patch.object(DbBase
, "unpad_data")
885 def test__decrypt_value_unpad_data_raises(
888 mock_join_secret_key
,
892 """Method unpad_data raises error."""
893 mock_decrypted_message
= MagicMock()
894 mock_decrypted_message
.decode
.return_value
= None
895 mock_aes
.new
.return_value
= self
.mock_cipher
896 self
.mock_cipher
.decrypt
.return_value
= mock_decrypted_message
897 mock_unpad_data
.side_effect
= DbException(
898 "Incorrect data type: type(None), string is expected."
900 mock_b64_decode
.return_value
= b64_decoded
902 with self
.assertRaises(Exception) as error
:
903 self
.db_base
._decrypt
_value
(encyrpted_value
, schema_version
, salt
)
905 str(error
.exception
),
906 "database exception Incorrect data type: type(None), string is expected.",
908 self
.assertEqual(type(error
.exception
), DbException
)
909 mock_join_secret_key
.assert_called_once_with(salt
)
910 _call_mock_aes_new
= mock_aes
.new
.call_args_list
[0].args
911 self
.assertEqual(_call_mock_aes_new
[1], AES
.MODE_ECB
)
912 mock_b64_decode
.assert_called_once_with(encyrpted_value
)
913 self
.mock_cipher
.decrypt
.assert_called_once_with(b64_decoded
)
914 mock_decrypted_message
.decode
.assert_called_once_with(
915 self
.db_base
.encoding_type
917 mock_unpad_data
.assert_called_once_with(None)
919 @patch.object(DbBase
, "get_secret_key")
920 @patch.object(DbBase
, "_decrypt_value")
921 def test_decrypt_without_schema_version_without_salt(
922 self
, mock_decrypt_value
, mock_get_secret_key
924 """schema_version is None, salt is None."""
925 mock_decrypt_value
.return_value
= encyrpted_value
926 result
= self
.db_base
.decrypt(value
)
927 mock_decrypt_value
.assert_called_once_with(value
, None, None)
928 mock_get_secret_key
.assert_called_once()
929 self
.assertEqual(result
, encyrpted_value
)
931 @patch.object(DbBase
, "get_secret_key")
932 @patch.object(DbBase
, "_decrypt_value")
933 def test_decrypt_with_schema_version_with_salt(
934 self
, mock_decrypt_value
, mock_get_secret_key
936 """schema_version and salt exist."""
937 mock_decrypt_value
.return_value
= encyrpted_value
938 result
= self
.db_base
.decrypt(value
, schema_version
, salt
)
939 mock_decrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
940 mock_get_secret_key
.assert_called_once()
941 self
.assertEqual(result
, encyrpted_value
)
943 @patch.object(DbBase
, "get_secret_key")
944 @patch.object(DbBase
, "_decrypt_value")
945 def test_decrypt_get_secret_key_raises(
946 self
, mock_decrypt_value
, mock_get_secret_key
948 """Method get_secret_key raises KeyError."""
949 mock_get_secret_key
.side_effect
= DbException("KeyError")
950 with self
.assertRaises(Exception) as error
:
951 self
.db_base
.decrypt(value
)
952 self
.assertEqual(str(error
.exception
), "database exception KeyError")
953 mock_decrypt_value
.assert_not_called()
954 mock_get_secret_key
.assert_called_once()
956 @patch.object(DbBase
, "get_secret_key")
957 @patch.object(DbBase
, "_decrypt_value")
958 def test_decrypt_decrypt_value_raises(
959 self
, mock_decrypt_value
, mock_get_secret_key
961 """Method _decrypt raises error."""
962 mock_decrypt_value
.side_effect
= DbException(
963 "Incorrect data type: type(None), string is expected."
965 with self
.assertRaises(Exception) as error
:
966 self
.db_base
.decrypt(value
, schema_version
, salt
)
968 str(error
.exception
),
969 "database exception Incorrect data type: type(None), string is expected.",
971 mock_decrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
972 mock_get_secret_key
.assert_called_once()
974 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self
):
975 """Encrypt and decrypt with schema version 1.1, salt exists."""
976 encrypted_msg
= self
.db_base
.encrypt(value
, schema_version
, salt
)
977 decrypted_msg
= self
.db_base
.decrypt(encrypted_msg
, schema_version
, salt
)
978 self
.assertEqual(value
, decrypted_msg
)
980 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self
):
981 """Encrypt and decrypt with schema version 1.0, salt exists."""
982 schema_version
= "1.0"
983 encrypted_msg
= self
.db_base
.encrypt(value
, schema_version
, salt
)
984 decrypted_msg
= self
.db_base
.decrypt(encrypted_msg
, schema_version
, salt
)
985 self
.assertEqual(value
, decrypted_msg
)
987 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self
):
988 """Encrypt and decrypt with schema version 1.1 and without salt."""
990 encrypted_msg
= self
.db_base
.encrypt(value
, schema_version
, salt
)
991 decrypted_msg
= self
.db_base
.decrypt(encrypted_msg
, schema_version
, salt
)
992 self
.assertEqual(value
, decrypted_msg
)
995 class TestAsyncEncryption(unittest
.TestCase
):
996 @patch("logging.getLogger", autospec
=True)
997 def setUp(self
, mock_logger
):
998 mock_logger
= logging
.getLogger()
999 mock_logger
.disabled
= True
1000 self
.encryption
= Encryption(uri
="uri", config
={})
1001 self
.encryption
.encoding_type
= encoding_type
1002 self
.encryption
.encrypt_mode
= encyrpt_mode
1003 self
.encryption
._secret
_key
= secret_key
1004 self
.admin_collection
= Mock()
1005 self
.admin_collection
.find_one
= AsyncMock()
1006 self
.encryption
._client
= {
1008 "admin": self
.admin_collection
,
1012 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1013 def test_decrypt_fields_with_item_with_fields(self
, mock_decrypt
):
1014 """item and fields exist."""
1015 mock_decrypt
.side_effect
= [decrypted_val1
, decrypted_val2
]
1016 input_item
= copy
.deepcopy(item
)
1018 "secret": decrypted_val1
,
1019 "cacert": decrypted_val2
,
1021 "ip": "192.168.12.23",
1023 fields
= ["secret", "cacert"]
1026 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1028 self
.assertEqual(input_item
, expected_item
)
1029 _call_mock_decrypt
= mock_decrypt
.call_args_list
1030 self
.assertEqual(_call_mock_decrypt
[0].args
, ("mysecret", "1.1", salt
))
1031 self
.assertEqual(_call_mock_decrypt
[1].args
, ("mycacert", "1.1", salt
))
1033 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1034 def test_decrypt_fields_empty_item_with_fields(self
, mock_decrypt
):
1035 """item is empty and fields exists."""
1037 fields
= ["secret", "cacert"]
1039 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1041 self
.assertEqual(input_item
, {})
1042 mock_decrypt
.assert_not_called()
1044 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1045 def test_decrypt_fields_with_item_without_fields(self
, mock_decrypt
):
1046 """item exists and fields is empty."""
1047 input_item
= copy
.deepcopy(item
)
1050 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1052 self
.assertEqual(input_item
, item
)
1053 mock_decrypt
.assert_not_called()
1055 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1056 def test_decrypt_fields_with_item_with_single_field(self
, mock_decrypt
):
1057 """item exists and field has single value."""
1058 mock_decrypt
.return_value
= decrypted_val1
1060 input_item
= copy
.deepcopy(item
)
1062 "secret": decrypted_val1
,
1063 "cacert": "mycacert",
1065 "ip": "192.168.12.23",
1068 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1070 self
.assertEqual(input_item
, expected_item
)
1071 _call_mock_decrypt
= mock_decrypt
.call_args_list
1072 self
.assertEqual(_call_mock_decrypt
[0].args
, ("mysecret", "1.1", salt
))
1074 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1075 def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
1078 """item exists and field has single value, salt is None, schema version is 1.0."""
1079 schema_version
= "1.0"
1081 mock_decrypt
.return_value
= "mysecret"
1082 input_item
= copy
.deepcopy(item
)
1085 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1087 self
.assertEqual(input_item
, item
)
1088 _call_mock_decrypt
= mock_decrypt
.call_args_list
1089 self
.assertEqual(_call_mock_decrypt
[0].args
, ("mysecret", "1.0", None))
1091 @patch.object(Encryption
, "decrypt", new_callable
=AsyncMock
)
1092 def test_decrypt_fields_decrypt_raises(self
, mock_decrypt
):
1093 """Method decrypt raises error."""
1094 mock_decrypt
.side_effect
= DbException(
1095 "Incorrect data type: type(None), string is expected."
1098 input_item
= copy
.deepcopy(item
)
1099 with self
.assertRaises(Exception) as error
:
1101 self
.encryption
.decrypt_fields(input_item
, fields
, schema_version
, salt
)
1104 str(error
.exception
),
1105 "database exception Incorrect data type: type(None), string is expected.",
1107 self
.assertEqual(input_item
, item
)
1108 _call_mock_decrypt
= mock_decrypt
.call_args_list
1109 self
.assertEqual(_call_mock_decrypt
[0].args
, ("mysecret", "1.1", salt
))
1111 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1112 @patch.object(Encryption
, "_encrypt_value")
1113 def test_encrypt(self
, mock_encrypt_value
, mock_get_secret_key
):
1114 """Method decrypt raises error."""
1115 mock_encrypt_value
.return_value
= encyrpted_value
1116 result
= asyncio
.run(self
.encryption
.encrypt(value
, schema_version
, salt
))
1117 self
.assertEqual(result
, encyrpted_value
)
1118 mock_get_secret_key
.assert_called_once()
1119 mock_encrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
1121 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1122 @patch.object(Encryption
, "_encrypt_value")
1123 def test_encrypt_get_secret_key_raises(
1124 self
, mock_encrypt_value
, mock_get_secret_key
1126 """Method get_secret_key raises error."""
1127 mock_get_secret_key
.side_effect
= DbException("Unexpected type.")
1128 with self
.assertRaises(Exception) as error
:
1129 asyncio
.run(self
.encryption
.encrypt(value
, schema_version
, salt
))
1130 self
.assertEqual(str(error
.exception
), "database exception Unexpected type.")
1131 mock_get_secret_key
.assert_called_once()
1132 mock_encrypt_value
.assert_not_called()
1134 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1135 @patch.object(Encryption
, "_encrypt_value")
1136 def test_encrypt_get_encrypt_raises(self
, mock_encrypt_value
, mock_get_secret_key
):
1137 """Method _encrypt raises error."""
1138 mock_encrypt_value
.side_effect
= TypeError(
1139 "A bytes-like object is required, not 'str'"
1141 with self
.assertRaises(Exception) as error
:
1142 asyncio
.run(self
.encryption
.encrypt(value
, schema_version
, salt
))
1144 str(error
.exception
), "A bytes-like object is required, not 'str'"
1146 mock_get_secret_key
.assert_called_once()
1147 mock_encrypt_value
.assert_called_once_with(value
, schema_version
, salt
)
1149 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1150 @patch.object(Encryption
, "_decrypt_value")
1151 def test_decrypt(self
, mock_decrypt_value
, mock_get_secret_key
):
1152 """Decrypted successfully."""
1153 mock_decrypt_value
.return_value
= value
1154 result
= asyncio
.run(
1155 self
.encryption
.decrypt(encyrpted_value
, schema_version
, salt
)
1157 self
.assertEqual(result
, value
)
1158 mock_get_secret_key
.assert_called_once()
1159 mock_decrypt_value
.assert_called_once_with(
1160 encyrpted_value
, schema_version
, salt
1163 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1164 @patch.object(Encryption
, "_decrypt_value")
1165 def test_decrypt_get_secret_key_raises(
1166 self
, mock_decrypt_value
, mock_get_secret_key
1168 """Method get_secret_key raises error."""
1169 mock_get_secret_key
.side_effect
= DbException("Unexpected type.")
1170 with self
.assertRaises(Exception) as error
:
1171 asyncio
.run(self
.encryption
.decrypt(encyrpted_value
, schema_version
, salt
))
1172 self
.assertEqual(str(error
.exception
), "database exception Unexpected type.")
1173 mock_get_secret_key
.assert_called_once()
1174 mock_decrypt_value
.assert_not_called()
1176 @patch.object(Encryption
, "get_secret_key", new_callable
=AsyncMock
)
1177 @patch.object(Encryption
, "_decrypt_value")
1178 def test_decrypt_decrypt_value_raises(
1179 self
, mock_decrypt_value
, mock_get_secret_key
1181 """Method get_secret_key raises error."""
1182 mock_decrypt_value
.side_effect
= TypeError(
1183 "A bytes-like object is required, not 'str'"
1185 with self
.assertRaises(Exception) as error
:
1186 asyncio
.run(self
.encryption
.decrypt(encyrpted_value
, schema_version
, salt
))
1188 str(error
.exception
), "A bytes-like object is required, not 'str'"
1190 mock_get_secret_key
.assert_called_once()
1191 mock_decrypt_value
.assert_called_once_with(
1192 encyrpted_value
, schema_version
, salt
1195 def test_join_keys_string_key(self
):
1196 """key is string."""
1197 string_key
= "sample key"
1198 result
= self
.encryption
._join
_keys
(string_key
, secret_key
)
1199 self
.assertEqual(result
, joined_key
)
1200 self
.assertTrue(isinstance(result
, bytes
))
1202 def test_join_keys_bytes_key(self
):
1204 bytes_key
= b
"sample key"
1205 result
= self
.encryption
._join
_keys
(bytes_key
, secret_key
)
1206 self
.assertEqual(result
, joined_key
)
1207 self
.assertTrue(isinstance(result
, bytes
))
1208 self
.assertEqual(len(result
.decode("unicode_escape")), 32)
1210 def test_join_keys_int_key(self
):
1213 with self
.assertRaises(Exception) as error
:
1214 self
.encryption
._join
_keys
(int_key
, None)
1215 self
.assertEqual(str(error
.exception
), "'int' object is not iterable")
1217 def test_join_keys_none_secret_key(self
):
1218 """key is as bytes and secret key is None."""
1219 bytes_key
= b
"sample key"
1220 result
= self
.encryption
._join
_keys
(bytes_key
, None)
1223 b
"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
1225 self
.assertTrue(isinstance(result
, bytes
))
1226 self
.assertEqual(len(result
.decode("unicode_escape")), 32)
1228 def test_join_keys_none_key_none_secret_key(self
):
1229 """key is None and secret key is None."""
1230 with self
.assertRaises(Exception) as error
:
1231 self
.encryption
._join
_keys
(None, None)
1232 self
.assertEqual(str(error
.exception
), "'NoneType' object is not iterable")
1234 def test_join_keys_none_key(self
):
1235 """key is None and secret key exists."""
1236 with self
.assertRaises(Exception) as error
:
1237 self
.encryption
._join
_keys
(None, secret_key
)
1238 self
.assertEqual(str(error
.exception
), "'NoneType' object is not iterable")
1240 @patch.object(Encryption
, "_join_keys")
1241 def test_join_secret_key_string_sample_key(self
, mock_join_keys
):
1242 """key is None and secret key exists as string."""
1243 update_key
= "sample key"
1244 mock_join_keys
.return_value
= joined_key
1245 result
= self
.encryption
._join
_secret
_key
(update_key
)
1246 self
.assertEqual(result
, joined_key
)
1247 self
.assertTrue(isinstance(result
, bytes
))
1248 mock_join_keys
.assert_called_once_with(update_key
, secret_key
)
1250 @patch.object(Encryption
, "_join_keys")
1251 def test_join_secret_key_byte_sample_key(self
, mock_join_keys
):
1252 """key is None and secret key exists as bytes."""
1253 update_key
= b
"sample key"
1254 mock_join_keys
.return_value
= joined_key
1255 result
= self
.encryption
._join
_secret
_key
(update_key
)
1256 self
.assertEqual(result
, joined_key
)
1257 self
.assertTrue(isinstance(result
, bytes
))
1258 mock_join_keys
.assert_called_once_with(update_key
, secret_key
)
1260 @patch.object(Encryption
, "_join_keys")
1261 def test_join_secret_key_join_keys_raises(self
, mock_join_keys
):
1262 """Method _join_secret_key raises."""
1264 mock_join_keys
.side_effect
= TypeError("'int' object is not iterable")
1265 with self
.assertRaises(Exception) as error
:
1266 self
.encryption
._join
_secret
_key
(update_key
)
1267 self
.assertEqual(str(error
.exception
), "'int' object is not iterable")
1268 mock_join_keys
.assert_called_once_with(update_key
, secret_key
)
1270 @patch.object(Encryption
, "_join_keys")
1271 def test_get_secret_key_exists(self
, mock_join_keys
):
1272 """secret_key exists."""
1273 self
.encryption
._secret
_key
= secret_key
1274 asyncio
.run(self
.encryption
.get_secret_key())
1275 self
.assertEqual(self
.encryption
.secret_key
, secret_key
)
1276 mock_join_keys
.assert_not_called()
1278 @patch.object(Encryption
, "_join_keys")
1279 @patch("osm_common.dbbase.b64decode")
1280 def test_get_secret_key_not_exist_database_key_exist(
1281 self
, mock_b64decode
, mock_join_keys
1283 """secret_key does not exist, database key exists."""
1284 self
.encryption
._secret
_key
= None
1285 self
.encryption
._admin
_collection
.find_one
.return_value
= None
1286 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1287 mock_join_keys
.return_value
= joined_key
1288 asyncio
.run(self
.encryption
.get_secret_key())
1289 self
.assertEqual(self
.encryption
.secret_key
, joined_key
)
1290 self
.assertEqual(mock_join_keys
.call_count
, 1)
1291 mock_b64decode
.assert_not_called()
1293 @patch.object(Encryption
, "_join_keys")
1294 @patch("osm_common.dbbase.b64decode")
1295 def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
1296 self
, mock_b64decode
, mock_join_keys
1298 """secret_key does not exist, database key exists."""
1299 self
.encryption
._secret
_key
= None
1300 self
.encryption
._admin
_collection
.find_one
.return_value
= {"version": "1.0"}
1301 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1302 mock_join_keys
.return_value
= joined_key
1303 asyncio
.run(self
.encryption
.get_secret_key())
1304 self
.assertEqual(self
.encryption
.secret_key
, joined_key
)
1305 self
.assertEqual(mock_join_keys
.call_count
, 1)
1306 mock_b64decode
.assert_not_called()
1307 self
.encryption
._admin
_collection
.find_one
.assert_called_once_with(
1310 _call_mock_join_keys
= mock_join_keys
.call_args_list
1311 self
.assertEqual(_call_mock_join_keys
[0].args
, ("osm_new_key", None))
1313 @patch.object(Encryption
, "_join_keys")
1314 @patch("osm_common.dbbase.b64decode")
1315 def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
1316 self
, mock_b64decode
, mock_join_keys
1318 """secret_key does not exist, database key exists, version and serial exist
1319 in admin collection."""
1320 self
.encryption
._secret
_key
= None
1321 self
.encryption
._admin
_collection
.find_one
.return_value
= {
1323 "serial": serial_bytes
,
1325 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1326 mock_join_keys
.side_effect
= [secret_key
, joined_key
]
1327 mock_b64decode
.return_value
= base64_decoded_serial
1328 asyncio
.run(self
.encryption
.get_secret_key())
1329 self
.assertEqual(self
.encryption
.secret_key
, joined_key
)
1330 self
.assertEqual(mock_join_keys
.call_count
, 2)
1331 mock_b64decode
.assert_called_once_with(serial_bytes
)
1332 self
.encryption
._admin
_collection
.find_one
.assert_called_once_with(
1335 _call_mock_join_keys
= mock_join_keys
.call_args_list
1336 self
.assertEqual(_call_mock_join_keys
[0].args
, ("osm_new_key", None))
1338 _call_mock_join_keys
[1].args
, (base64_decoded_serial
, secret_key
)
1341 @patch.object(Encryption
, "_join_keys")
1342 @patch("osm_common.dbbase.b64decode")
1343 def test_get_secret_key_join_keys_raises(self
, mock_b64decode
, mock_join_keys
):
1344 """Method _join_keys raises."""
1345 self
.encryption
._secret
_key
= None
1346 self
.encryption
._admin
_collection
.find_one
.return_value
= {
1348 "serial": serial_bytes
,
1350 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1351 mock_join_keys
.side_effect
= DbException("Invalid data type.")
1352 with self
.assertRaises(Exception) as error
:
1353 asyncio
.run(self
.encryption
.get_secret_key())
1354 self
.assertEqual(str(error
.exception
), "database exception Invalid data type.")
1355 self
.assertEqual(mock_join_keys
.call_count
, 1)
1356 check_if_assert_not_called(
1357 [mock_b64decode
, self
.encryption
._admin
_collection
.find_one
]
1359 _call_mock_join_keys
= mock_join_keys
.call_args_list
1360 self
.assertEqual(_call_mock_join_keys
[0].args
, ("osm_new_key", None))
1362 @patch.object(Encryption
, "_join_keys")
1363 @patch("osm_common.dbbase.b64decode")
1364 def test_get_secret_key_b64decode_raises(self
, mock_b64decode
, mock_join_keys
):
1365 """Method b64decode raises."""
1366 self
.encryption
._secret
_key
= None
1367 self
.encryption
._admin
_collection
.find_one
.return_value
= {
1369 "serial": base64_decoded_serial
,
1371 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1372 mock_join_keys
.return_value
= secret_key
1373 mock_b64decode
.side_effect
= TypeError(
1374 "A bytes-like object is required, not 'str'"
1376 with self
.assertRaises(Exception) as error
:
1377 asyncio
.run(self
.encryption
.get_secret_key())
1379 str(error
.exception
), "A bytes-like object is required, not 'str'"
1381 self
.assertEqual(self
.encryption
.secret_key
, None)
1382 self
.assertEqual(mock_join_keys
.call_count
, 1)
1383 mock_b64decode
.assert_called_once_with(base64_decoded_serial
)
1384 self
.encryption
._admin
_collection
.find_one
.assert_called_once_with(
1387 _call_mock_join_keys
= mock_join_keys
.call_args_list
1388 self
.assertEqual(_call_mock_join_keys
[0].args
, ("osm_new_key", None))
1390 @patch.object(Encryption
, "_join_keys")
1391 @patch("osm_common.dbbase.b64decode")
1392 def test_get_secret_key_admin_collection_find_one_raises(
1393 self
, mock_b64decode
, mock_join_keys
1395 """admin_collection find_one raises."""
1396 self
.encryption
._secret
_key
= None
1397 self
.encryption
._admin
_collection
.find_one
.side_effect
= DbException(
1398 "Connection failed."
1400 self
.encryption
._config
= {"database_commonkey": "osm_new_key"}
1401 mock_join_keys
.return_value
= secret_key
1402 with self
.assertRaises(Exception) as error
:
1403 asyncio
.run(self
.encryption
.get_secret_key())
1404 self
.assertEqual(str(error
.exception
), "database exception Connection failed.")
1405 self
.assertEqual(self
.encryption
.secret_key
, None)
1406 self
.assertEqual(mock_join_keys
.call_count
, 1)
1407 mock_b64decode
.assert_not_called()
1408 self
.encryption
._admin
_collection
.find_one
.assert_called_once_with(
1411 _call_mock_join_keys
= mock_join_keys
.call_args_list
1412 self
.assertEqual(_call_mock_join_keys
[0].args
, ("osm_new_key", None))
1414 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self
):
1415 """Encrypt and decrypt with schema version 1.1, salt exists."""
1416 encrypted_msg
= asyncio
.run(
1417 self
.encryption
.encrypt(value
, schema_version
, salt
)
1419 decrypted_msg
= asyncio
.run(
1420 self
.encryption
.decrypt(encrypted_msg
, schema_version
, salt
)
1422 self
.assertEqual(value
, decrypted_msg
)
1424 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self
):
1425 """Encrypt and decrypt with schema version 1.0, salt exists."""
1426 schema_version
= "1.0"
1427 encrypted_msg
= asyncio
.run(
1428 self
.encryption
.encrypt(value
, schema_version
, salt
)
1430 decrypted_msg
= asyncio
.run(
1431 self
.encryption
.decrypt(encrypted_msg
, schema_version
, salt
)
1433 self
.assertEqual(value
, decrypted_msg
)
1435 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self
):
1436 """Encrypt and decrypt with schema version 1.1, without salt."""
1438 with self
.assertRaises(Exception) as error
:
1439 asyncio
.run(self
.encryption
.encrypt(value
, schema_version
, salt
))
1440 self
.assertEqual(str(error
.exception
), "'NoneType' object is not iterable")
1443 class TestDeepUpdate(unittest
.TestCase
):
1444 def test_update_dict(self
):
1445 # Original, patch, expected result
1447 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
1448 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
1449 ({"a": "b"}, {"a": None}, {}),
1450 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
1451 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
1452 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
1453 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
1454 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
1455 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
1456 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
1457 ({1: {"a": "foo"}}, {1: None}, {}),
1458 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
1459 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
1460 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
1461 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
1464 deep_update(t
[0], t
[1])
1465 self
.assertEqual(t
[0], t
[2])
1466 # test deepcopy is done. So that original dictionary does not reference the pach
1467 test_original
= {1: {"a": "b"}}
1468 test_patch
= {1: {"c": {"d": "e"}}}
1469 test_result
= {1: {"a": "b", "c": {"d": "e"}}}
1470 deep_update(test_original
, test_patch
)
1471 self
.assertEqual(test_original
, test_result
)
1472 test_patch
[1]["c"]["f"] = "edition of patch, must not modify original"
1473 self
.assertEqual(test_original
, test_result
)
1475 def test_update_array(self
):
1476 # This TEST contains a list with the the Original, patch, and expected result
1478 # delete all instances of "a"/"d"
1479 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
1480 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
1481 # delete and insert at 0
1483 {"A": ["a", "b", "c"]},
1484 {"A": {"$b": None, "$+[0]": "b"}},
1485 {"A": ["b", "a", "c"]},
1489 {"A": ["a", "b", "a"]},
1490 {"A": {"$a": None, "$[1]": {"c": "d"}}},
1491 {"A": [{"c": "d"}]},
1493 # insert if not exist
1494 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
1495 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
1498 {"A": ["a", "b", "a"]},
1499 {"A": {"$b": {"c": "d"}}},
1500 {"A": ["a", {"c": "d"}, "a"]},
1503 {"A": ["a", "b", "a"]},
1504 {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
1505 {"A": ["b", "a", "a", "c"]},
1507 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
1508 # index deletion out of range
1509 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
1510 # nested array->dict
1512 {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
1513 {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
1514 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
1517 {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
1518 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
1521 {"id": 1, "c": {"d": "e", "f": "g"}},
1522 {"id": 1, "c": {"d": "e", "f": "g"}},
1526 # nested array->array
1528 {"A": ["a", "b", ["a", "b"]]},
1529 {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
1530 {"A": ["a", ["a", {}, "c"]]},
1532 # types str and int different, so not found
1534 {"A": ["a", {"id": "1", "c": "d"}]},
1535 {"A": {"$id: 1": {"c": "e"}}},
1536 {"A": ["a", {"id": "1", "c": "d"}]},
1541 deep_update(t
[0], t
[1])
1542 self
.assertEqual(t
[0], t
[2])
1544 def test_update_badformat(self
):
1545 # This TEST contains original, incorrect patch and #TODO text that must be present
1547 # conflict, index 0 is edited twice
1548 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
1549 # conflict, two insertions at same index
1550 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
1551 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
1552 # bad format keys with and without $
1553 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
1554 # bad format empty $ and yaml incorrect
1555 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
1556 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
1557 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
1559 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
1560 # Not found, insertion of None
1561 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
1562 # index edition out of range
1563 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
1564 # conflict, two editions on index 2
1566 {"A": ["a", {"id": "1", "c": "d"}]},
1567 {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
1572 self
.assertRaises(DbException
, deep_update
, t
[0], t
[1])
1574 deep_update(t
[0], t
[1])
1575 except DbException
as e
:
1579 if __name__
== "__main__":