Feature 10950 Replace pycrypto with pycryptodome
[osm/common.git] / osm_common / tests / test_dbbase.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19 import asyncio
20 import copy
21 from copy import deepcopy
22 import http
23 from http import HTTPStatus
24 import logging
25 from os import urandom
26 import unittest
27 from unittest.mock import MagicMock, Mock, patch
28
29 from Crypto.Cipher import AES
30 from osm_common.dbbase import DbBase, DbException, deep_update, Encryption
31 import pytest
32
33
34 # Variables used in TestBaseEncryption and TestAsyncEncryption
35 salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106"
36 value = "private key txt"
37 padded_value = b"private key txt\0"
38 padded_encoded_value = b"private key txt\x00"
39 encoding_type = "ascii"
40 encyrpt_mode = AES.MODE_ECB
41 secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
42 encyrpted_value = "ZW5jcnlwdGVkIGRhdGE="
43 encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE="
44 data_to_b4_encode = b"encrypted data"
45 b64_decoded = b"decrypted data"
46 schema_version = "1.1"
47 joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!"
48 serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b"
49 base64_decoded_serial = b"g\xbe\xdb"
50 decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg=="
51 decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA=="
52 item = {
53 "secret": "mysecret",
54 "cacert": "mycacert",
55 "path": "/var",
56 "ip": "192.168.12.23",
57 }
58
59
60 def exception_message(message):
61 return "database exception " + message
62
63
64 @pytest.fixture
65 def db_base():
66 return DbBase()
67
68
69 def test_constructor():
70 db_base = DbBase()
71 assert db_base is not None
72 assert isinstance(db_base, DbBase)
73
74
75 def test_db_connect(db_base):
76 with pytest.raises(DbException) as excinfo:
77 db_base.db_connect(None)
78 assert str(excinfo.value).startswith(
79 exception_message("Method 'db_connect' not implemented")
80 )
81
82
83 def test_db_disconnect(db_base):
84 db_base.db_disconnect()
85
86
87 def test_get_list(db_base):
88 with pytest.raises(DbException) as excinfo:
89 db_base.get_list(None, None)
90 assert str(excinfo.value).startswith(
91 exception_message("Method 'get_list' not implemented")
92 )
93 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
94
95
96 def test_get_one(db_base):
97 with pytest.raises(DbException) as excinfo:
98 db_base.get_one(None, None, None, None)
99 assert str(excinfo.value).startswith(
100 exception_message("Method 'get_one' not implemented")
101 )
102 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
103
104
105 def test_create(db_base):
106 with pytest.raises(DbException) as excinfo:
107 db_base.create(None, None)
108 assert str(excinfo.value).startswith(
109 exception_message("Method 'create' not implemented")
110 )
111 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
112
113
114 def test_create_list(db_base):
115 with pytest.raises(DbException) as excinfo:
116 db_base.create_list(None, None)
117 assert str(excinfo.value).startswith(
118 exception_message("Method 'create_list' not implemented")
119 )
120 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
121
122
123 def test_del_list(db_base):
124 with pytest.raises(DbException) as excinfo:
125 db_base.del_list(None, None)
126 assert str(excinfo.value).startswith(
127 exception_message("Method 'del_list' not implemented")
128 )
129 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
130
131
132 def test_del_one(db_base):
133 with pytest.raises(DbException) as excinfo:
134 db_base.del_one(None, None, None)
135 assert str(excinfo.value).startswith(
136 exception_message("Method 'del_one' not implemented")
137 )
138 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
139
140
141 class TestEncryption(unittest.TestCase):
142 def setUp(self):
143 master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
144 db_base1 = DbBase()
145 db_base2 = DbBase()
146 db_base3 = DbBase()
147 # set self.secret_key obtained when connect
148 db_base1.set_secret_key(master_key, replace=True)
149 db_base1.set_secret_key(urandom(32))
150 db_base2.set_secret_key(None, replace=True)
151 db_base2.set_secret_key(urandom(30))
152 db_base3.set_secret_key(master_key)
153 self.db_bases = [db_base1, db_base2, db_base3]
154
155 def test_encrypt_decrypt(self):
156 TEST = (
157 ("plain text 1 ! ", None),
158 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
159 )
160 for db_base in self.db_bases:
161 for value, salt in TEST:
162 # no encryption
163 encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
164 self.assertEqual(
165 encrypted, value, "value '{}' has been encrypted".format(value)
166 )
167 decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
168 self.assertEqual(
169 decrypted, value, "value '{}' has been decrypted".format(value)
170 )
171
172 # encrypt/decrypt
173 encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
174 self.assertNotEqual(
175 encrypted, value, "value '{}' has not been encrypted".format(value)
176 )
177 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
178 decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
179 self.assertEqual(
180 decrypted, value, "value is not equal after encryption/decryption"
181 )
182
183 def test_encrypt_decrypt_salt(self):
184 value = "value to be encrypted!"
185 encrypted = []
186 for db_base in self.db_bases:
187 for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
188 # encrypt/decrypt
189 encrypted.append(
190 db_base.encrypt(value, schema_version="1.1", salt=salt)
191 )
192 self.assertNotEqual(
193 encrypted[-1],
194 value,
195 "value '{}' has not been encrypted".format(value),
196 )
197 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
198 decrypted = db_base.decrypt(
199 encrypted[-1], schema_version="1.1", salt=salt
200 )
201 self.assertEqual(
202 decrypted, value, "value is not equal after encryption/decryption"
203 )
204 for i in range(0, len(encrypted)):
205 for j in range(i + 1, len(encrypted)):
206 self.assertNotEqual(
207 encrypted[i],
208 encrypted[j],
209 "encryption with different salt must contain different result",
210 )
211 # decrypt with a different master key
212 try:
213 decrypted = self.db_bases[-1].decrypt(
214 encrypted[0], schema_version="1.1", salt=None
215 )
216 self.assertNotEqual(
217 encrypted[0],
218 decrypted,
219 "Decryption with different KEY must generate different result",
220 )
221 except DbException as e:
222 self.assertEqual(
223 e.http_code,
224 HTTPStatus.INTERNAL_SERVER_ERROR,
225 "Decryption with different KEY does not provide expected http_code",
226 )
227
228
229 class AsyncMock(MagicMock):
230 async def __call__(self, *args, **kwargs):
231 args = deepcopy(args)
232 kwargs = deepcopy(kwargs)
233 return super(AsyncMock, self).__call__(*args, **kwargs)
234
235
236 class CopyingMock(MagicMock):
237 def __call__(self, *args, **kwargs):
238 args = deepcopy(args)
239 kwargs = deepcopy(kwargs)
240 return super(CopyingMock, self).__call__(*args, **kwargs)
241
242
243 def check_if_assert_not_called(mocks: list):
244 for mocking in mocks:
245 mocking.assert_not_called()
246
247
248 class TestBaseEncryption(unittest.TestCase):
249 @patch("logging.getLogger", autospec=True)
250 def setUp(self, mock_logger):
251 mock_logger = logging.getLogger()
252 mock_logger.disabled = True
253 self.db_base = DbBase()
254 self.mock_cipher = CopyingMock()
255 self.db_base.encoding_type = encoding_type
256 self.db_base.encrypt_mode = encyrpt_mode
257 self.db_base.secret_key = secret_key
258 self.mock_padded_msg = CopyingMock()
259
260 def test_pad_data_len_not_multiplication_of_16(self):
261 data = "hello word hello hello word hello word"
262 data_len = len(data)
263 expected_len = 48
264 padded = self.db_base.pad_data(data)
265 self.assertEqual(len(padded), expected_len)
266 self.assertTrue("\0" * (expected_len - data_len) in padded)
267
268 def test_pad_data_len_multiplication_of_16(self):
269 data = "hello word!!!!!!"
270 padded = self.db_base.pad_data(data)
271 self.assertEqual(padded, data)
272 self.assertFalse("\0" in padded)
273
274 def test_pad_data_empty_string(self):
275 data = ""
276 expected_len = 0
277 padded = self.db_base.pad_data(data)
278 self.assertEqual(len(padded), expected_len)
279 self.assertFalse("\0" in padded)
280
281 def test_pad_data_not_string(self):
282 data = None
283 with self.assertRaises(Exception) as err:
284 self.db_base.pad_data(data)
285 self.assertEqual(
286 str(err.exception),
287 "database exception Incorrect data type: type(None), string is expected.",
288 )
289
290 def test_unpad_data_null_char_at_right(self):
291 null_padded_data = "hell0word\0\0"
292 expected_length = len(null_padded_data) - 2
293 unpadded = self.db_base.unpad_data(null_padded_data)
294 self.assertEqual(len(unpadded), expected_length)
295 self.assertFalse("\0" in unpadded)
296 self.assertTrue("0" in unpadded)
297
298 def test_unpad_data_null_char_is_not_rightest(self):
299 null_padded_data = "hell0word\r\t\0\n"
300 expected_length = len(null_padded_data)
301 unpadded = self.db_base.unpad_data(null_padded_data)
302 self.assertEqual(len(unpadded), expected_length)
303 self.assertTrue("\0" in unpadded)
304
305 def test_unpad_data_with_spaces_at_right(self):
306 null_padded_data = " hell0word\0 "
307 expected_length = len(null_padded_data)
308 unpadded = self.db_base.unpad_data(null_padded_data)
309 self.assertEqual(len(unpadded), expected_length)
310 self.assertTrue("\0" in unpadded)
311
312 def test_unpad_data_empty_string(self):
313 data = ""
314 unpadded = self.db_base.unpad_data(data)
315 self.assertEqual(unpadded, "")
316 self.assertFalse("\0" in unpadded)
317
318 def test_unpad_data_not_string(self):
319 data = None
320 with self.assertRaises(Exception) as err:
321 self.db_base.unpad_data(data)
322 self.assertEqual(
323 str(err.exception),
324 "database exception Incorrect data type: type(None), string is expected.",
325 )
326
327 @patch.object(DbBase, "_join_secret_key")
328 @patch.object(DbBase, "pad_data")
329 def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt(
330 self, mock_pad_data, mock_join_secret_key
331 ):
332 """schema_version 1.0, secret_key is None and salt is None."""
333 schema_version = "1.0"
334 salt = None
335 self.db_base.secret_key = None
336 result = self.db_base._encrypt_value(value, schema_version, salt)
337 self.assertEqual(result, value)
338 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
339
340 @patch("osm_common.dbbase.b64encode")
341 @patch("osm_common.dbbase.AES")
342 @patch.object(DbBase, "_join_secret_key")
343 @patch.object(DbBase, "pad_data")
344 def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt(
345 self,
346 mock_pad_data,
347 mock_join_secret_key,
348 mock_aes,
349 mock_b64_encode,
350 ):
351 """schema_version 1.1, secret_key exists, salt exists."""
352 mock_aes.new.return_value = self.mock_cipher
353 self.mock_cipher.encrypt.return_value = data_to_b4_encode
354 self.mock_padded_msg.return_value = padded_value
355 mock_pad_data.return_value = self.mock_padded_msg
356 self.mock_padded_msg.encode.return_value = padded_encoded_value
357
358 mock_b64_encode.return_value = encyrpted_bytes
359
360 result = self.db_base._encrypt_value(value, schema_version, salt)
361
362 self.assertTrue(isinstance(result, str))
363 self.assertEqual(result, encyrpted_value)
364 mock_join_secret_key.assert_called_once_with(salt)
365 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
366 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
367 mock_pad_data.assert_called_once_with(value)
368 mock_b64_encode.assert_called_once_with(data_to_b4_encode)
369 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
370 self.mock_padded_msg.encode.assert_called_with(encoding_type)
371
372 @patch.object(DbBase, "_join_secret_key")
373 @patch.object(DbBase, "pad_data")
374 def test__encrypt_value_schema_version_1_0_secret_key_not_exists(
375 self, mock_pad_data, mock_join_secret_key
376 ):
377 """schema_version 1.0, secret_key is None, salt exists."""
378 schema_version = "1.0"
379 self.db_base.secret_key = None
380 result = self.db_base._encrypt_value(value, schema_version, salt)
381 self.assertEqual(result, value)
382 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
383
384 @patch.object(DbBase, "_join_secret_key")
385 @patch.object(DbBase, "pad_data")
386 def test__encrypt_value_schema_version_1_1_secret_key_not_exists(
387 self, mock_pad_data, mock_join_secret_key
388 ):
389 """schema_version 1.1, secret_key is None, salt exists."""
390 self.db_base.secret_key = None
391 result = self.db_base._encrypt_value(value, schema_version, salt)
392 self.assertEqual(result, value)
393 check_if_assert_not_called([mock_pad_data, mock_join_secret_key])
394
395 @patch("osm_common.dbbase.b64encode")
396 @patch("osm_common.dbbase.AES")
397 @patch.object(DbBase, "_join_secret_key")
398 @patch.object(DbBase, "pad_data")
399 def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt(
400 self,
401 mock_pad_data,
402 mock_join_secret_key,
403 mock_aes,
404 mock_b64_encode,
405 ):
406 """schema_version 1.1, secret_key exists, salt is None."""
407 salt = None
408 mock_aes.new.return_value = self.mock_cipher
409 self.mock_cipher.encrypt.return_value = data_to_b4_encode
410
411 self.mock_padded_msg.return_value = padded_value
412 mock_pad_data.return_value = self.mock_padded_msg
413 self.mock_padded_msg.encode.return_value = padded_encoded_value
414
415 mock_b64_encode.return_value = encyrpted_bytes
416
417 result = self.db_base._encrypt_value(value, schema_version, salt)
418
419 self.assertEqual(result, encyrpted_value)
420 mock_join_secret_key.assert_called_once_with(salt)
421 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
422 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
423 mock_pad_data.assert_called_once_with(value)
424 mock_b64_encode.assert_called_once_with(data_to_b4_encode)
425 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
426 self.mock_padded_msg.encode.assert_called_with(encoding_type)
427
428 @patch("osm_common.dbbase.b64encode")
429 @patch("osm_common.dbbase.AES")
430 @patch.object(DbBase, "_join_secret_key")
431 @patch.object(DbBase, "pad_data")
432 def test__encrypt_value_invalid_encrpt_mode(
433 self,
434 mock_pad_data,
435 mock_join_secret_key,
436 mock_aes,
437 mock_b64_encode,
438 ):
439 """encrypt_mode is invalid."""
440 mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
441 self.db_base.encrypt_mode = "AES.MODE_XXX"
442
443 with self.assertRaises(Exception) as err:
444 self.db_base._encrypt_value(value, schema_version, salt)
445
446 self.assertEqual(str(err.exception), "Invalid ciphering mode.")
447 mock_join_secret_key.assert_called_once_with(salt)
448 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
449 self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
450 check_if_assert_not_called([mock_pad_data, mock_b64_encode])
451
452 @patch("osm_common.dbbase.b64encode")
453 @patch("osm_common.dbbase.AES")
454 @patch.object(DbBase, "_join_secret_key")
455 @patch.object(DbBase, "pad_data")
456 def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none(
457 self,
458 mock_pad_data,
459 mock_join_secret_key,
460 mock_aes,
461 mock_b64_encode,
462 ):
463 """schema_version 1.1, secret_key exists, value is None."""
464 value = None
465 mock_aes.new.return_value = self.mock_cipher
466 mock_pad_data.side_effect = DbException(
467 "Incorrect data type: type(None), string is expected."
468 )
469
470 with self.assertRaises(Exception) as err:
471 self.db_base._encrypt_value(value, schema_version, salt)
472 self.assertEqual(
473 str(err.exception),
474 "database exception Incorrect data type: type(None), string is expected.",
475 )
476
477 mock_join_secret_key.assert_called_once_with(salt)
478 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
479 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
480 mock_pad_data.assert_called_once_with(value)
481 check_if_assert_not_called(
482 [mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode]
483 )
484
485 @patch("osm_common.dbbase.b64encode")
486 @patch("osm_common.dbbase.AES")
487 @patch.object(DbBase, "_join_secret_key")
488 @patch.object(DbBase, "pad_data")
489 def test__encrypt_value_join_secret_key_raises(
490 self,
491 mock_pad_data,
492 mock_join_secret_key,
493 mock_aes,
494 mock_b64_encode,
495 ):
496 """Method join_secret_key raises DbException."""
497 salt = b"3434o34-3wewrwr-222424-2242dwew"
498
499 mock_join_secret_key.side_effect = DbException("Unexpected type")
500
501 mock_aes.new.return_value = self.mock_cipher
502
503 with self.assertRaises(Exception) as err:
504 self.db_base._encrypt_value(value, schema_version, salt)
505
506 self.assertEqual(str(err.exception), "database exception Unexpected type")
507 check_if_assert_not_called(
508 [mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt]
509 )
510 mock_join_secret_key.assert_called_once_with(salt)
511
512 @patch("osm_common.dbbase.b64encode")
513 @patch("osm_common.dbbase.AES")
514 @patch.object(DbBase, "_join_secret_key")
515 @patch.object(DbBase, "pad_data")
516 def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises(
517 self,
518 mock_pad_data,
519 mock_join_secret_key,
520 mock_aes,
521 mock_b64_encode,
522 ):
523 """schema_version 1.1, secret_key exists, b64encode raises TypeError."""
524 mock_aes.new.return_value = self.mock_cipher
525 self.mock_cipher.encrypt.return_value = "encrypted data"
526
527 self.mock_padded_msg.return_value = padded_value
528 mock_pad_data.return_value = self.mock_padded_msg
529 self.mock_padded_msg.encode.return_value = padded_encoded_value
530
531 mock_b64_encode.side_effect = TypeError(
532 "A bytes-like object is required, not 'str'"
533 )
534
535 with self.assertRaises(Exception) as error:
536 self.db_base._encrypt_value(value, schema_version, salt)
537 self.assertEqual(
538 str(error.exception), "A bytes-like object is required, not 'str'"
539 )
540 mock_join_secret_key.assert_called_once_with(salt)
541 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
542 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
543 mock_pad_data.assert_called_once_with(value)
544 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
545 self.mock_padded_msg.encode.assert_called_with(encoding_type)
546 mock_b64_encode.assert_called_once_with("encrypted data")
547
548 @patch("osm_common.dbbase.b64encode")
549 @patch("osm_common.dbbase.AES")
550 @patch.object(DbBase, "_join_secret_key")
551 @patch.object(DbBase, "pad_data")
552 def test__encrypt_value_cipher_encrypt_raises(
553 self,
554 mock_pad_data,
555 mock_join_secret_key,
556 mock_aes,
557 mock_b64_encode,
558 ):
559 """AES encrypt method raises Exception."""
560 mock_aes.new.return_value = self.mock_cipher
561 self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.")
562
563 self.mock_padded_msg.return_value = padded_value
564 mock_pad_data.return_value = self.mock_padded_msg
565 self.mock_padded_msg.encode.return_value = padded_encoded_value
566
567 with self.assertRaises(Exception) as error:
568 self.db_base._encrypt_value(value, schema_version, salt)
569
570 self.assertEqual(str(error.exception), "Invalid data type.")
571 mock_join_secret_key.assert_called_once_with(salt)
572 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
573 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
574 mock_pad_data.assert_called_once_with(value)
575 self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value)
576 self.mock_padded_msg.encode.assert_called_with(encoding_type)
577 mock_b64_encode.assert_not_called()
578
579 @patch.object(DbBase, "get_secret_key")
580 @patch.object(DbBase, "_encrypt_value")
581 def test_encrypt_without_schema_version_without_salt(
582 self, mock_encrypt_value, mock_get_secret_key
583 ):
584 """schema and salt is None."""
585 mock_encrypt_value.return_value = encyrpted_value
586 result = self.db_base.encrypt(value)
587 mock_encrypt_value.assert_called_once_with(value, None, None)
588 mock_get_secret_key.assert_called_once()
589 self.assertEqual(result, encyrpted_value)
590
591 @patch.object(DbBase, "get_secret_key")
592 @patch.object(DbBase, "_encrypt_value")
593 def test_encrypt_with_schema_version_with_salt(
594 self, mock_encrypt_value, mock_get_secret_key
595 ):
596 """schema version exists, salt is None."""
597 mock_encrypt_value.return_value = encyrpted_value
598 result = self.db_base.encrypt(value, schema_version, salt)
599 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
600 mock_get_secret_key.assert_called_once()
601 self.assertEqual(result, encyrpted_value)
602
603 @patch.object(DbBase, "get_secret_key")
604 @patch.object(DbBase, "_encrypt_value")
605 def test_encrypt_get_secret_key_raises(
606 self, mock_encrypt_value, mock_get_secret_key
607 ):
608 """get_secret_key method raises DbException."""
609 mock_get_secret_key.side_effect = DbException("KeyError")
610 with self.assertRaises(Exception) as error:
611 self.db_base.encrypt(value)
612 self.assertEqual(str(error.exception), "database exception KeyError")
613 mock_encrypt_value.assert_not_called()
614 mock_get_secret_key.assert_called_once()
615
616 @patch.object(DbBase, "get_secret_key")
617 @patch.object(DbBase, "_encrypt_value")
618 def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
619 """_encrypt method raises DbException."""
620 mock_encrypt_value.side_effect = DbException(
621 "Incorrect data type: type(None), string is expected."
622 )
623 with self.assertRaises(Exception) as error:
624 self.db_base.encrypt(value, schema_version, salt)
625 self.assertEqual(
626 str(error.exception),
627 "database exception Incorrect data type: type(None), string is expected.",
628 )
629 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
630 mock_get_secret_key.assert_called_once()
631
632 @patch("osm_common.dbbase.b64decode")
633 @patch("osm_common.dbbase.AES")
634 @patch.object(DbBase, "_join_secret_key")
635 @patch.object(DbBase, "unpad_data")
636 def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt(
637 self,
638 mock_unpad_data,
639 mock_join_secret_key,
640 mock_aes,
641 mock_b64_decode,
642 ):
643 """schema_version 1.1, secret_key exists, salt is None."""
644 salt = None
645 mock_aes.new.return_value = self.mock_cipher
646 self.mock_cipher.decrypt.return_value = padded_encoded_value
647
648 mock_b64_decode.return_value = b64_decoded
649
650 mock_unpad_data.return_value = value
651
652 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
653 self.assertEqual(result, value)
654
655 mock_join_secret_key.assert_called_once_with(salt)
656 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
657 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
658 mock_unpad_data.assert_called_once_with("private key txt\0")
659 mock_b64_decode.assert_called_once_with(encyrpted_value)
660 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
661
662 @patch("osm_common.dbbase.b64decode")
663 @patch("osm_common.dbbase.AES")
664 @patch.object(DbBase, "_join_secret_key")
665 @patch.object(DbBase, "unpad_data")
666 def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt(
667 self,
668 mock_unpad_data,
669 mock_join_secret_key,
670 mock_aes,
671 mock_b64_decode,
672 ):
673 """schema_version 1.1, secret_key exists, salt is None."""
674 mock_aes.new.return_value = self.mock_cipher
675 self.mock_cipher.decrypt.return_value = padded_encoded_value
676
677 mock_b64_decode.return_value = b64_decoded
678
679 mock_unpad_data.return_value = value
680
681 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
682 self.assertEqual(result, value)
683
684 mock_join_secret_key.assert_called_once_with(salt)
685 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
686 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
687 mock_unpad_data.assert_called_once_with("private key txt\0")
688 mock_b64_decode.assert_called_once_with(encyrpted_value)
689 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
690
691 @patch("osm_common.dbbase.b64decode")
692 @patch("osm_common.dbbase.AES")
693 @patch.object(DbBase, "_join_secret_key")
694 @patch.object(DbBase, "unpad_data")
695 def test__decrypt_value_schema_version_1_1_without_secret_key(
696 self,
697 mock_unpad_data,
698 mock_join_secret_key,
699 mock_aes,
700 mock_b64_decode,
701 ):
702 """schema_version 1.1, secret_key is None, salt exists."""
703 self.db_base.secret_key = None
704
705 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
706
707 self.assertEqual(result, encyrpted_value)
708 check_if_assert_not_called(
709 [
710 mock_join_secret_key,
711 mock_aes.new,
712 mock_unpad_data,
713 mock_b64_decode,
714 self.mock_cipher.decrypt,
715 ]
716 )
717
718 @patch("osm_common.dbbase.b64decode")
719 @patch("osm_common.dbbase.AES")
720 @patch.object(DbBase, "_join_secret_key")
721 @patch.object(DbBase, "unpad_data")
722 def test__decrypt_value_schema_version_1_0_with_secret_key(
723 self,
724 mock_unpad_data,
725 mock_join_secret_key,
726 mock_aes,
727 mock_b64_decode,
728 ):
729 """schema_version 1.0, secret_key exists, salt exists."""
730 schema_version = "1.0"
731 result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
732
733 self.assertEqual(result, encyrpted_value)
734 check_if_assert_not_called(
735 [
736 mock_join_secret_key,
737 mock_aes.new,
738 mock_unpad_data,
739 mock_b64_decode,
740 self.mock_cipher.decrypt,
741 ]
742 )
743
744 @patch("osm_common.dbbase.b64decode")
745 @patch("osm_common.dbbase.AES")
746 @patch.object(DbBase, "_join_secret_key")
747 @patch.object(DbBase, "unpad_data")
748 def test__decrypt_value_join_secret_key_raises(
749 self,
750 mock_unpad_data,
751 mock_join_secret_key,
752 mock_aes,
753 mock_b64_decode,
754 ):
755 """_join_secret_key raises TypeError."""
756 salt = object()
757 mock_join_secret_key.side_effect = TypeError("'type' object is not iterable")
758
759 with self.assertRaises(Exception) as error:
760 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
761 self.assertEqual(str(error.exception), "'type' object is not iterable")
762
763 mock_join_secret_key.assert_called_once_with(salt)
764 check_if_assert_not_called(
765 [mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt]
766 )
767
768 @patch("osm_common.dbbase.b64decode")
769 @patch("osm_common.dbbase.AES")
770 @patch.object(DbBase, "_join_secret_key")
771 @patch.object(DbBase, "unpad_data")
772 def test__decrypt_value_b64decode_raises(
773 self,
774 mock_unpad_data,
775 mock_join_secret_key,
776 mock_aes,
777 mock_b64_decode,
778 ):
779 """b64decode raises TypeError."""
780 mock_b64_decode.side_effect = TypeError(
781 "A str-like object is required, not 'bytes'"
782 )
783 with self.assertRaises(Exception) as error:
784 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
785 self.assertEqual(
786 str(error.exception), "A str-like object is required, not 'bytes'"
787 )
788
789 mock_b64_decode.assert_called_once_with(encyrpted_value)
790 mock_join_secret_key.assert_called_once_with(salt)
791 check_if_assert_not_called(
792 [mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data]
793 )
794
795 @patch("osm_common.dbbase.b64decode")
796 @patch("osm_common.dbbase.AES")
797 @patch.object(DbBase, "_join_secret_key")
798 @patch.object(DbBase, "unpad_data")
799 def test__decrypt_value_invalid_encrypt_mode(
800 self,
801 mock_unpad_data,
802 mock_join_secret_key,
803 mock_aes,
804 mock_b64_decode,
805 ):
806 """Invalid AES encrypt mode."""
807 mock_aes.new.side_effect = Exception("Invalid ciphering mode.")
808 self.db_base.encrypt_mode = "AES.MODE_XXX"
809
810 mock_b64_decode.return_value = b64_decoded
811 with self.assertRaises(Exception) as error:
812 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
813
814 self.assertEqual(str(error.exception), "Invalid ciphering mode.")
815 mock_join_secret_key.assert_called_once_with(salt)
816 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
817 self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX")
818 mock_b64_decode.assert_called_once_with(encyrpted_value)
819 check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt])
820
821 @patch("osm_common.dbbase.b64decode")
822 @patch("osm_common.dbbase.AES")
823 @patch.object(DbBase, "_join_secret_key")
824 @patch.object(DbBase, "unpad_data")
825 def test__decrypt_value_cipher_decrypt_raises(
826 self,
827 mock_unpad_data,
828 mock_join_secret_key,
829 mock_aes,
830 mock_b64_decode,
831 ):
832 """AES decrypt raises Exception."""
833 mock_b64_decode.return_value = b64_decoded
834
835 mock_aes.new.return_value = self.mock_cipher
836 self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.")
837
838 with self.assertRaises(Exception) as error:
839 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
840 self.assertEqual(str(error.exception), "Invalid data type.")
841
842 mock_join_secret_key.assert_called_once_with(salt)
843 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
844 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
845 mock_b64_decode.assert_called_once_with(encyrpted_value)
846 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
847 mock_unpad_data.assert_not_called()
848
849 @patch("osm_common.dbbase.b64decode")
850 @patch("osm_common.dbbase.AES")
851 @patch.object(DbBase, "_join_secret_key")
852 @patch.object(DbBase, "unpad_data")
853 def test__decrypt_value_decode_raises(
854 self,
855 mock_unpad_data,
856 mock_join_secret_key,
857 mock_aes,
858 mock_b64_decode,
859 ):
860 """Decode raises UnicodeDecodeError."""
861 mock_aes.new.return_value = self.mock_cipher
862 self.mock_cipher.decrypt.return_value = b"\xd0\x000091"
863
864 mock_b64_decode.return_value = b64_decoded
865
866 mock_unpad_data.return_value = value
867 with self.assertRaises(Exception) as error:
868 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
869 self.assertEqual(
870 str(error.exception),
871 "database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
872 )
873 self.assertEqual(type(error.exception), DbException)
874 mock_join_secret_key.assert_called_once_with(salt)
875 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
876 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
877 mock_b64_decode.assert_called_once_with(encyrpted_value)
878 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
879 mock_unpad_data.assert_not_called()
880
881 @patch("osm_common.dbbase.b64decode")
882 @patch("osm_common.dbbase.AES")
883 @patch.object(DbBase, "_join_secret_key")
884 @patch.object(DbBase, "unpad_data")
885 def test__decrypt_value_unpad_data_raises(
886 self,
887 mock_unpad_data,
888 mock_join_secret_key,
889 mock_aes,
890 mock_b64_decode,
891 ):
892 """Method unpad_data raises error."""
893 mock_decrypted_message = MagicMock()
894 mock_decrypted_message.decode.return_value = None
895 mock_aes.new.return_value = self.mock_cipher
896 self.mock_cipher.decrypt.return_value = mock_decrypted_message
897 mock_unpad_data.side_effect = DbException(
898 "Incorrect data type: type(None), string is expected."
899 )
900 mock_b64_decode.return_value = b64_decoded
901
902 with self.assertRaises(Exception) as error:
903 self.db_base._decrypt_value(encyrpted_value, schema_version, salt)
904 self.assertEqual(
905 str(error.exception),
906 "database exception Incorrect data type: type(None), string is expected.",
907 )
908 self.assertEqual(type(error.exception), DbException)
909 mock_join_secret_key.assert_called_once_with(salt)
910 _call_mock_aes_new = mock_aes.new.call_args_list[0].args
911 self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB)
912 mock_b64_decode.assert_called_once_with(encyrpted_value)
913 self.mock_cipher.decrypt.assert_called_once_with(b64_decoded)
914 mock_decrypted_message.decode.assert_called_once_with(
915 self.db_base.encoding_type
916 )
917 mock_unpad_data.assert_called_once_with(None)
918
919 @patch.object(DbBase, "get_secret_key")
920 @patch.object(DbBase, "_decrypt_value")
921 def test_decrypt_without_schema_version_without_salt(
922 self, mock_decrypt_value, mock_get_secret_key
923 ):
924 """schema_version is None, salt is None."""
925 mock_decrypt_value.return_value = encyrpted_value
926 result = self.db_base.decrypt(value)
927 mock_decrypt_value.assert_called_once_with(value, None, None)
928 mock_get_secret_key.assert_called_once()
929 self.assertEqual(result, encyrpted_value)
930
931 @patch.object(DbBase, "get_secret_key")
932 @patch.object(DbBase, "_decrypt_value")
933 def test_decrypt_with_schema_version_with_salt(
934 self, mock_decrypt_value, mock_get_secret_key
935 ):
936 """schema_version and salt exist."""
937 mock_decrypt_value.return_value = encyrpted_value
938 result = self.db_base.decrypt(value, schema_version, salt)
939 mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
940 mock_get_secret_key.assert_called_once()
941 self.assertEqual(result, encyrpted_value)
942
943 @patch.object(DbBase, "get_secret_key")
944 @patch.object(DbBase, "_decrypt_value")
945 def test_decrypt_get_secret_key_raises(
946 self, mock_decrypt_value, mock_get_secret_key
947 ):
948 """Method get_secret_key raises KeyError."""
949 mock_get_secret_key.side_effect = DbException("KeyError")
950 with self.assertRaises(Exception) as error:
951 self.db_base.decrypt(value)
952 self.assertEqual(str(error.exception), "database exception KeyError")
953 mock_decrypt_value.assert_not_called()
954 mock_get_secret_key.assert_called_once()
955
956 @patch.object(DbBase, "get_secret_key")
957 @patch.object(DbBase, "_decrypt_value")
958 def test_decrypt_decrypt_value_raises(
959 self, mock_decrypt_value, mock_get_secret_key
960 ):
961 """Method _decrypt raises error."""
962 mock_decrypt_value.side_effect = DbException(
963 "Incorrect data type: type(None), string is expected."
964 )
965 with self.assertRaises(Exception) as error:
966 self.db_base.decrypt(value, schema_version, salt)
967 self.assertEqual(
968 str(error.exception),
969 "database exception Incorrect data type: type(None), string is expected.",
970 )
971 mock_decrypt_value.assert_called_once_with(value, schema_version, salt)
972 mock_get_secret_key.assert_called_once()
973
974 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
975 """Encrypt and decrypt with schema version 1.1, salt exists."""
976 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
977 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
978 self.assertEqual(value, decrypted_msg)
979
980 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
981 """Encrypt and decrypt with schema version 1.0, salt exists."""
982 schema_version = "1.0"
983 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
984 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
985 self.assertEqual(value, decrypted_msg)
986
987 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
988 """Encrypt and decrypt with schema version 1.1 and without salt."""
989 salt = None
990 encrypted_msg = self.db_base.encrypt(value, schema_version, salt)
991 decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt)
992 self.assertEqual(value, decrypted_msg)
993
994
995 class TestAsyncEncryption(unittest.TestCase):
996 @patch("logging.getLogger", autospec=True)
997 def setUp(self, mock_logger):
998 mock_logger = logging.getLogger()
999 mock_logger.disabled = True
1000 self.loop = asyncio.get_event_loop()
1001 self.encryption = Encryption(uri="uri", config={})
1002 self.encryption.encoding_type = encoding_type
1003 self.encryption.encrypt_mode = encyrpt_mode
1004 self.encryption._secret_key = secret_key
1005 self.admin_collection = Mock()
1006 self.admin_collection.find_one = AsyncMock()
1007 self.encryption._client = {
1008 "osm": {
1009 "admin": self.admin_collection,
1010 }
1011 }
1012
1013 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1014 def test_decrypt_fields_with_item_with_fields(self, mock_decrypt):
1015 """item and fields exist."""
1016 mock_decrypt.side_effect = [decrypted_val1, decrypted_val2]
1017 input_item = copy.deepcopy(item)
1018 expected_item = {
1019 "secret": decrypted_val1,
1020 "cacert": decrypted_val2,
1021 "path": "/var",
1022 "ip": "192.168.12.23",
1023 }
1024 fields = ["secret", "cacert"]
1025 self.loop.run_until_complete(
1026 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1027 )
1028 self.assertEqual(input_item, expected_item)
1029 _call_mock_decrypt = mock_decrypt.call_args_list
1030 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1031 self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt))
1032
1033 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1034 def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt):
1035 """item is empty and fields exists."""
1036 input_item = {}
1037 fields = ["secret", "cacert"]
1038 self.loop.run_until_complete(
1039 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1040 )
1041 self.assertEqual(input_item, {})
1042 mock_decrypt.assert_not_called()
1043
1044 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1045 def test_decrypt_fields_with_item_without_fields(self, mock_decrypt):
1046 """item exists and fields is empty."""
1047 input_item = copy.deepcopy(item)
1048 fields = []
1049 self.loop.run_until_complete(
1050 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1051 )
1052 self.assertEqual(input_item, item)
1053 mock_decrypt.assert_not_called()
1054
1055 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1056 def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt):
1057 """item exists and field has single value."""
1058 mock_decrypt.return_value = decrypted_val1
1059 fields = ["secret"]
1060 input_item = copy.deepcopy(item)
1061 expected_item = {
1062 "secret": decrypted_val1,
1063 "cacert": "mycacert",
1064 "path": "/var",
1065 "ip": "192.168.12.23",
1066 }
1067 self.loop.run_until_complete(
1068 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1069 )
1070 self.assertEqual(input_item, expected_item)
1071 _call_mock_decrypt = mock_decrypt.call_args_list
1072 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1073
1074 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1075 def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version(
1076 self, mock_decrypt
1077 ):
1078 """item exists and field has single value, salt is None, schema version is 1.0."""
1079 schema_version = "1.0"
1080 salt = None
1081 mock_decrypt.return_value = "mysecret"
1082 input_item = copy.deepcopy(item)
1083 fields = ["secret"]
1084 self.loop.run_until_complete(
1085 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1086 )
1087 self.assertEqual(input_item, item)
1088 _call_mock_decrypt = mock_decrypt.call_args_list
1089 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None))
1090
1091 @patch.object(Encryption, "decrypt", new_callable=AsyncMock)
1092 def test_decrypt_fields_decrypt_raises(self, mock_decrypt):
1093 """Method decrypt raises error."""
1094 mock_decrypt.side_effect = DbException(
1095 "Incorrect data type: type(None), string is expected."
1096 )
1097 fields = ["secret"]
1098 input_item = copy.deepcopy(item)
1099 with self.assertRaises(Exception) as error:
1100 self.loop.run_until_complete(
1101 self.encryption.decrypt_fields(input_item, fields, schema_version, salt)
1102 )
1103 self.assertEqual(
1104 str(error.exception),
1105 "database exception Incorrect data type: type(None), string is expected.",
1106 )
1107 self.assertEqual(input_item, item)
1108 _call_mock_decrypt = mock_decrypt.call_args_list
1109 self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt))
1110
1111 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1112 @patch.object(Encryption, "_encrypt_value")
1113 def test_encrypt(self, mock_encrypt_value, mock_get_secret_key):
1114 """Method decrypt raises error."""
1115 mock_encrypt_value.return_value = encyrpted_value
1116 result = self.loop.run_until_complete(
1117 self.encryption.encrypt(value, schema_version, salt)
1118 )
1119 self.assertEqual(result, encyrpted_value)
1120 mock_get_secret_key.assert_called_once()
1121 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1122
1123 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1124 @patch.object(Encryption, "_encrypt_value")
1125 def test_encrypt_get_secret_key_raises(
1126 self, mock_encrypt_value, mock_get_secret_key
1127 ):
1128 """Method get_secret_key raises error."""
1129 mock_get_secret_key.side_effect = DbException("Unexpected type.")
1130 with self.assertRaises(Exception) as error:
1131 self.loop.run_until_complete(
1132 self.encryption.encrypt(value, schema_version, salt)
1133 )
1134 self.assertEqual(str(error.exception), "database exception Unexpected type.")
1135 mock_get_secret_key.assert_called_once()
1136 mock_encrypt_value.assert_not_called()
1137
1138 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1139 @patch.object(Encryption, "_encrypt_value")
1140 def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key):
1141 """Method _encrypt raises error."""
1142 mock_encrypt_value.side_effect = TypeError(
1143 "A bytes-like object is required, not 'str'"
1144 )
1145 with self.assertRaises(Exception) as error:
1146 self.loop.run_until_complete(
1147 self.encryption.encrypt(value, schema_version, salt)
1148 )
1149 self.assertEqual(
1150 str(error.exception), "A bytes-like object is required, not 'str'"
1151 )
1152 mock_get_secret_key.assert_called_once()
1153 mock_encrypt_value.assert_called_once_with(value, schema_version, salt)
1154
1155 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1156 @patch.object(Encryption, "_decrypt_value")
1157 def test_decrypt(self, mock_decrypt_value, mock_get_secret_key):
1158 """Decrypted successfully."""
1159 mock_decrypt_value.return_value = value
1160 result = self.loop.run_until_complete(
1161 self.encryption.decrypt(encyrpted_value, schema_version, salt)
1162 )
1163 self.assertEqual(result, value)
1164 mock_get_secret_key.assert_called_once()
1165 mock_decrypt_value.assert_called_once_with(
1166 encyrpted_value, schema_version, salt
1167 )
1168
1169 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1170 @patch.object(Encryption, "_decrypt_value")
1171 def test_decrypt_get_secret_key_raises(
1172 self, mock_decrypt_value, mock_get_secret_key
1173 ):
1174 """Method get_secret_key raises error."""
1175 mock_get_secret_key.side_effect = DbException("Unexpected type.")
1176 with self.assertRaises(Exception) as error:
1177 self.loop.run_until_complete(
1178 self.encryption.decrypt(encyrpted_value, schema_version, salt)
1179 )
1180 self.assertEqual(str(error.exception), "database exception Unexpected type.")
1181 mock_get_secret_key.assert_called_once()
1182 mock_decrypt_value.assert_not_called()
1183
1184 @patch.object(Encryption, "get_secret_key", new_callable=AsyncMock)
1185 @patch.object(Encryption, "_decrypt_value")
1186 def test_decrypt_decrypt_value_raises(
1187 self, mock_decrypt_value, mock_get_secret_key
1188 ):
1189 """Method get_secret_key raises error."""
1190 mock_decrypt_value.side_effect = TypeError(
1191 "A bytes-like object is required, not 'str'"
1192 )
1193 with self.assertRaises(Exception) as error:
1194 self.loop.run_until_complete(
1195 self.encryption.decrypt(encyrpted_value, schema_version, salt)
1196 )
1197 self.assertEqual(
1198 str(error.exception), "A bytes-like object is required, not 'str'"
1199 )
1200 mock_get_secret_key.assert_called_once()
1201 mock_decrypt_value.assert_called_once_with(
1202 encyrpted_value, schema_version, salt
1203 )
1204
1205 def test_join_keys_string_key(self):
1206 """key is string."""
1207 string_key = "sample key"
1208 result = self.encryption._join_keys(string_key, secret_key)
1209 self.assertEqual(result, joined_key)
1210 self.assertTrue(isinstance(result, bytes))
1211
1212 def test_join_keys_bytes_key(self):
1213 """key is bytes."""
1214 bytes_key = b"sample key"
1215 result = self.encryption._join_keys(bytes_key, secret_key)
1216 self.assertEqual(result, joined_key)
1217 self.assertTrue(isinstance(result, bytes))
1218 self.assertEqual(len(result.decode("unicode_escape")), 32)
1219
1220 def test_join_keys_int_key(self):
1221 """key is int."""
1222 int_key = 923
1223 with self.assertRaises(Exception) as error:
1224 self.encryption._join_keys(int_key, None)
1225 self.assertEqual(str(error.exception), "'int' object is not iterable")
1226
1227 def test_join_keys_none_secret_key(self):
1228 """key is as bytes and secret key is None."""
1229 bytes_key = b"sample key"
1230 result = self.encryption._join_keys(bytes_key, None)
1231 self.assertEqual(
1232 result,
1233 b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
1234 )
1235 self.assertTrue(isinstance(result, bytes))
1236 self.assertEqual(len(result.decode("unicode_escape")), 32)
1237
1238 def test_join_keys_none_key_none_secret_key(self):
1239 """key is None and secret key is None."""
1240 with self.assertRaises(Exception) as error:
1241 self.encryption._join_keys(None, None)
1242 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1243
1244 def test_join_keys_none_key(self):
1245 """key is None and secret key exists."""
1246 with self.assertRaises(Exception) as error:
1247 self.encryption._join_keys(None, secret_key)
1248 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1249
1250 @patch.object(Encryption, "_join_keys")
1251 def test_join_secret_key_string_sample_key(self, mock_join_keys):
1252 """key is None and secret key exists as string."""
1253 update_key = "sample key"
1254 mock_join_keys.return_value = joined_key
1255 result = self.encryption._join_secret_key(update_key)
1256 self.assertEqual(result, joined_key)
1257 self.assertTrue(isinstance(result, bytes))
1258 mock_join_keys.assert_called_once_with(update_key, secret_key)
1259
1260 @patch.object(Encryption, "_join_keys")
1261 def test_join_secret_key_byte_sample_key(self, mock_join_keys):
1262 """key is None and secret key exists as bytes."""
1263 update_key = b"sample key"
1264 mock_join_keys.return_value = joined_key
1265 result = self.encryption._join_secret_key(update_key)
1266 self.assertEqual(result, joined_key)
1267 self.assertTrue(isinstance(result, bytes))
1268 mock_join_keys.assert_called_once_with(update_key, secret_key)
1269
1270 @patch.object(Encryption, "_join_keys")
1271 def test_join_secret_key_join_keys_raises(self, mock_join_keys):
1272 """Method _join_secret_key raises."""
1273 update_key = 3434
1274 mock_join_keys.side_effect = TypeError("'int' object is not iterable")
1275 with self.assertRaises(Exception) as error:
1276 self.encryption._join_secret_key(update_key)
1277 self.assertEqual(str(error.exception), "'int' object is not iterable")
1278 mock_join_keys.assert_called_once_with(update_key, secret_key)
1279
1280 @patch.object(Encryption, "_join_keys")
1281 def test_get_secret_key_exists(self, mock_join_keys):
1282 """secret_key exists."""
1283 self.encryption._secret_key = secret_key
1284 self.loop.run_until_complete(self.encryption.get_secret_key())
1285 self.assertEqual(self.encryption.secret_key, secret_key)
1286 mock_join_keys.assert_not_called()
1287
1288 @patch.object(Encryption, "_join_keys")
1289 @patch("osm_common.dbbase.b64decode")
1290 def test_get_secret_key_not_exist_database_key_exist(
1291 self, mock_b64decode, mock_join_keys
1292 ):
1293 """secret_key does not exist, database key exists."""
1294 self.encryption._secret_key = None
1295 self.encryption._admin_collection.find_one.return_value = None
1296 self.encryption._config = {"database_commonkey": "osm_new_key"}
1297 mock_join_keys.return_value = joined_key
1298 self.loop.run_until_complete(self.encryption.get_secret_key())
1299 self.assertEqual(self.encryption.secret_key, joined_key)
1300 self.assertEqual(mock_join_keys.call_count, 1)
1301 mock_b64decode.assert_not_called()
1302
1303 @patch.object(Encryption, "_join_keys")
1304 @patch("osm_common.dbbase.b64decode")
1305 def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial(
1306 self, mock_b64decode, mock_join_keys
1307 ):
1308 """secret_key does not exist, database key exists."""
1309 self.encryption._secret_key = None
1310 self.encryption._admin_collection.find_one.return_value = {"version": "1.0"}
1311 self.encryption._config = {"database_commonkey": "osm_new_key"}
1312 mock_join_keys.return_value = joined_key
1313 self.loop.run_until_complete(self.encryption.get_secret_key())
1314 self.assertEqual(self.encryption.secret_key, joined_key)
1315 self.assertEqual(mock_join_keys.call_count, 1)
1316 mock_b64decode.assert_not_called()
1317 self.encryption._admin_collection.find_one.assert_called_once_with(
1318 {"_id": "version"}
1319 )
1320 _call_mock_join_keys = mock_join_keys.call_args_list
1321 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1322
1323 @patch.object(Encryption, "_join_keys")
1324 @patch("osm_common.dbbase.b64decode")
1325 def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial(
1326 self, mock_b64decode, mock_join_keys
1327 ):
1328 """secret_key does not exist, database key exists, version and serial exist
1329 in admin collection."""
1330 self.encryption._secret_key = None
1331 self.encryption._admin_collection.find_one.return_value = {
1332 "version": "1.0",
1333 "serial": serial_bytes,
1334 }
1335 self.encryption._config = {"database_commonkey": "osm_new_key"}
1336 mock_join_keys.side_effect = [secret_key, joined_key]
1337 mock_b64decode.return_value = base64_decoded_serial
1338 self.loop.run_until_complete(self.encryption.get_secret_key())
1339 self.assertEqual(self.encryption.secret_key, joined_key)
1340 self.assertEqual(mock_join_keys.call_count, 2)
1341 mock_b64decode.assert_called_once_with(serial_bytes)
1342 self.encryption._admin_collection.find_one.assert_called_once_with(
1343 {"_id": "version"}
1344 )
1345 _call_mock_join_keys = mock_join_keys.call_args_list
1346 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1347 self.assertEqual(
1348 _call_mock_join_keys[1].args, (base64_decoded_serial, secret_key)
1349 )
1350
1351 @patch.object(Encryption, "_join_keys")
1352 @patch("osm_common.dbbase.b64decode")
1353 def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys):
1354 """Method _join_keys raises."""
1355 self.encryption._secret_key = None
1356 self.encryption._admin_collection.find_one.return_value = {
1357 "version": "1.0",
1358 "serial": serial_bytes,
1359 }
1360 self.encryption._config = {"database_commonkey": "osm_new_key"}
1361 mock_join_keys.side_effect = DbException("Invalid data type.")
1362 with self.assertRaises(Exception) as error:
1363 self.loop.run_until_complete(self.encryption.get_secret_key())
1364 self.assertEqual(str(error.exception), "database exception Invalid data type.")
1365 self.assertEqual(mock_join_keys.call_count, 1)
1366 check_if_assert_not_called(
1367 [mock_b64decode, self.encryption._admin_collection.find_one]
1368 )
1369 _call_mock_join_keys = mock_join_keys.call_args_list
1370 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1371
1372 @patch.object(Encryption, "_join_keys")
1373 @patch("osm_common.dbbase.b64decode")
1374 def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys):
1375 """Method b64decode raises."""
1376 self.encryption._secret_key = None
1377 self.encryption._admin_collection.find_one.return_value = {
1378 "version": "1.0",
1379 "serial": base64_decoded_serial,
1380 }
1381 self.encryption._config = {"database_commonkey": "osm_new_key"}
1382 mock_join_keys.return_value = secret_key
1383 mock_b64decode.side_effect = TypeError(
1384 "A bytes-like object is required, not 'str'"
1385 )
1386 with self.assertRaises(Exception) as error:
1387 self.loop.run_until_complete(self.encryption.get_secret_key())
1388 self.assertEqual(
1389 str(error.exception), "A bytes-like object is required, not 'str'"
1390 )
1391 self.assertEqual(self.encryption.secret_key, None)
1392 self.assertEqual(mock_join_keys.call_count, 1)
1393 mock_b64decode.assert_called_once_with(base64_decoded_serial)
1394 self.encryption._admin_collection.find_one.assert_called_once_with(
1395 {"_id": "version"}
1396 )
1397 _call_mock_join_keys = mock_join_keys.call_args_list
1398 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1399
1400 @patch.object(Encryption, "_join_keys")
1401 @patch("osm_common.dbbase.b64decode")
1402 def test_get_secret_key_admin_collection_find_one_raises(
1403 self, mock_b64decode, mock_join_keys
1404 ):
1405 """admin_collection find_one raises."""
1406 self.encryption._secret_key = None
1407 self.encryption._admin_collection.find_one.side_effect = DbException(
1408 "Connection failed."
1409 )
1410 self.encryption._config = {"database_commonkey": "osm_new_key"}
1411 mock_join_keys.return_value = secret_key
1412 with self.assertRaises(Exception) as error:
1413 self.loop.run_until_complete(self.encryption.get_secret_key())
1414 self.assertEqual(str(error.exception), "database exception Connection failed.")
1415 self.assertEqual(self.encryption.secret_key, None)
1416 self.assertEqual(mock_join_keys.call_count, 1)
1417 mock_b64decode.assert_not_called()
1418 self.encryption._admin_collection.find_one.assert_called_once_with(
1419 {"_id": "version"}
1420 )
1421 _call_mock_join_keys = mock_join_keys.call_args_list
1422 self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None))
1423
1424 def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self):
1425 """Encrypt and decrypt with schema version 1.1, salt exists."""
1426 encrypted_msg = self.loop.run_until_complete(
1427 self.encryption.encrypt(value, schema_version, salt)
1428 )
1429 decrypted_msg = self.loop.run_until_complete(
1430 self.encryption.decrypt(encrypted_msg, schema_version, salt)
1431 )
1432 self.assertEqual(value, decrypted_msg)
1433
1434 def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self):
1435 """Encrypt and decrypt with schema version 1.0, salt exists."""
1436 schema_version = "1.0"
1437 encrypted_msg = self.loop.run_until_complete(
1438 self.encryption.encrypt(value, schema_version, salt)
1439 )
1440 decrypted_msg = self.loop.run_until_complete(
1441 self.encryption.decrypt(encrypted_msg, schema_version, salt)
1442 )
1443 self.assertEqual(value, decrypted_msg)
1444
1445 def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self):
1446 """Encrypt and decrypt with schema version 1.1, without salt."""
1447 salt = None
1448 with self.assertRaises(Exception) as error:
1449 self.loop.run_until_complete(
1450 self.encryption.encrypt(value, schema_version, salt)
1451 )
1452 self.assertEqual(str(error.exception), "'NoneType' object is not iterable")
1453
1454
1455 class TestDeepUpdate(unittest.TestCase):
1456 def test_update_dict(self):
1457 # Original, patch, expected result
1458 TEST = (
1459 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
1460 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
1461 ({"a": "b"}, {"a": None}, {}),
1462 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
1463 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
1464 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
1465 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
1466 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
1467 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
1468 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
1469 ({1: {"a": "foo"}}, {1: None}, {}),
1470 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
1471 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
1472 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
1473 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
1474 )
1475 for t in TEST:
1476 deep_update(t[0], t[1])
1477 self.assertEqual(t[0], t[2])
1478 # test deepcopy is done. So that original dictionary does not reference the pach
1479 test_original = {1: {"a": "b"}}
1480 test_patch = {1: {"c": {"d": "e"}}}
1481 test_result = {1: {"a": "b", "c": {"d": "e"}}}
1482 deep_update(test_original, test_patch)
1483 self.assertEqual(test_original, test_result)
1484 test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
1485 self.assertEqual(test_original, test_result)
1486
1487 def test_update_array(self):
1488 # This TEST contains a list with the the Original, patch, and expected result
1489 TEST = (
1490 # delete all instances of "a"/"d"
1491 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
1492 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
1493 # delete and insert at 0
1494 (
1495 {"A": ["a", "b", "c"]},
1496 {"A": {"$b": None, "$+[0]": "b"}},
1497 {"A": ["b", "a", "c"]},
1498 ),
1499 # delete and edit
1500 (
1501 {"A": ["a", "b", "a"]},
1502 {"A": {"$a": None, "$[1]": {"c": "d"}}},
1503 {"A": [{"c": "d"}]},
1504 ),
1505 # insert if not exist
1506 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
1507 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
1508 # edit by filter
1509 (
1510 {"A": ["a", "b", "a"]},
1511 {"A": {"$b": {"c": "d"}}},
1512 {"A": ["a", {"c": "d"}, "a"]},
1513 ),
1514 (
1515 {"A": ["a", "b", "a"]},
1516 {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
1517 {"A": ["b", "a", "a", "c"]},
1518 ),
1519 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
1520 # index deletion out of range
1521 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
1522 # nested array->dict
1523 (
1524 {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
1525 {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
1526 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
1527 ),
1528 (
1529 {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
1530 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
1531 {
1532 "A": [
1533 {"id": 1, "c": {"d": "e", "f": "g"}},
1534 {"id": 1, "c": {"d": "e", "f": "g"}},
1535 ]
1536 },
1537 ),
1538 # nested array->array
1539 (
1540 {"A": ["a", "b", ["a", "b"]]},
1541 {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
1542 {"A": ["a", ["a", {}, "c"]]},
1543 ),
1544 # types str and int different, so not found
1545 (
1546 {"A": ["a", {"id": "1", "c": "d"}]},
1547 {"A": {"$id: 1": {"c": "e"}}},
1548 {"A": ["a", {"id": "1", "c": "d"}]},
1549 ),
1550 )
1551 for t in TEST:
1552 print(t)
1553 deep_update(t[0], t[1])
1554 self.assertEqual(t[0], t[2])
1555
1556 def test_update_badformat(self):
1557 # This TEST contains original, incorrect patch and #TODO text that must be present
1558 TEST = (
1559 # conflict, index 0 is edited twice
1560 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
1561 # conflict, two insertions at same index
1562 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
1563 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
1564 # bad format keys with and without $
1565 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
1566 # bad format empty $ and yaml incorrect
1567 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
1568 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
1569 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
1570 # insertion of None
1571 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
1572 # Not found, insertion of None
1573 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
1574 # index edition out of range
1575 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
1576 # conflict, two editions on index 2
1577 (
1578 {"A": ["a", {"id": "1", "c": "d"}]},
1579 {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
1580 ),
1581 )
1582 for t in TEST:
1583 print(t)
1584 self.assertRaises(DbException, deep_update, t[0], t[1])
1585 try:
1586 deep_update(t[0], t[1])
1587 except DbException as e:
1588 print(e)
1589
1590
1591 if __name__ == "__main__":
1592 unittest.main()