1 |
|
# Copyright 2018 Whitestack, LLC |
2 |
|
# Copyright 2018 Telefonica S.A. |
3 |
|
# |
4 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
5 |
|
# not use this file except in compliance with the License. You may obtain |
6 |
|
# a copy of the License at |
7 |
|
# |
8 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
# |
10 |
|
# Unless required by applicable law or agreed to in writing, software |
11 |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
12 |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
13 |
|
# License for the specific language governing permissions and limitations |
14 |
|
# under the License. |
15 |
|
# |
16 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
17 |
|
# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com |
18 |
|
## |
19 |
1 |
import asyncio |
20 |
1 |
import copy |
21 |
1 |
from copy import deepcopy |
22 |
1 |
import http |
23 |
1 |
from http import HTTPStatus |
24 |
1 |
import logging |
25 |
1 |
from os import urandom |
26 |
1 |
import unittest |
27 |
1 |
from unittest.mock import MagicMock, Mock, patch |
28 |
|
|
29 |
1 |
from Crypto.Cipher import AES |
30 |
1 |
from osm_common.dbbase import DbBase, DbException, deep_update, Encryption |
31 |
1 |
import pytest |
32 |
|
|
33 |
|
|
34 |
|
# Variables used in TestBaseEncryption and TestAsyncEncryption |
35 |
1 |
salt = "1afd5d1a-4a7e-4d9c-8c65-251290183106" |
36 |
1 |
value = "private key txt" |
37 |
1 |
padded_value = b"private key txt\0" |
38 |
1 |
padded_encoded_value = b"private key txt\x00" |
39 |
1 |
encoding_type = "ascii" |
40 |
1 |
encyrpt_mode = AES.MODE_ECB |
41 |
1 |
secret_key = b"\xeev\xc2\xb8\xb2#;Ek\xd0\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!" |
42 |
1 |
encyrpted_value = "ZW5jcnlwdGVkIGRhdGE=" |
43 |
1 |
encyrpted_bytes = b"ZW5jcnlwdGVkIGRhdGE=" |
44 |
1 |
data_to_b4_encode = b"encrypted data" |
45 |
1 |
b64_decoded = b"decrypted data" |
46 |
1 |
schema_version = "1.1" |
47 |
1 |
joined_key = b"\x9d\x17\xaf\xc8\xdeF\x1b.\x0e\xa9\xb5['\x04\xed\x1f\xb9?\xc5Ig\x80\xd5\x8d\x8aT\xd7\xf8Q\xe2u!" |
48 |
1 |
serial_bytes = b"\xf8\x96Z\x1c:}\xb5\xdf\x94\x8d\x0f\x807\xe6)\x8f\xf5!\xee}\xc2\xfa\xb3\t\xb9\xe4\r7\x19\x08\xa5b" |
49 |
1 |
base64_decoded_serial = b"g\xbe\xdb" |
50 |
1 |
decrypted_val1 = "BiV9YZEuSRAudqvz7Gs+bg==" |
51 |
1 |
decrypted_val2 = "q4LwnFdoryzbZJM5mCAnpA==" |
52 |
1 |
item = { |
53 |
|
"secret": "mysecret", |
54 |
|
"cacert": "mycacert", |
55 |
|
"path": "/var", |
56 |
|
"ip": "192.168.12.23", |
57 |
|
} |
58 |
|
|
59 |
|
|
60 |
1 |
def exception_message(message): |
61 |
0 |
return "database exception " + message |
62 |
|
|
63 |
|
|
64 |
1 |
@pytest.fixture |
65 |
1 |
def db_base(): |
66 |
0 |
return DbBase() |
67 |
|
|
68 |
|
|
69 |
1 |
def test_constructor(): |
70 |
1 |
db_base = DbBase() |
71 |
1 |
assert db_base is not None |
72 |
1 |
assert isinstance(db_base, DbBase) |
73 |
|
|
74 |
|
|
75 |
1 |
def test_db_connect(db_base): |
76 |
0 |
with pytest.raises(DbException) as excinfo: |
77 |
0 |
db_base.db_connect(None) |
78 |
0 |
assert str(excinfo.value).startswith( |
79 |
|
exception_message("Method 'db_connect' not implemented") |
80 |
|
) |
81 |
|
|
82 |
|
|
83 |
1 |
def test_db_disconnect(db_base): |
84 |
0 |
db_base.db_disconnect() |
85 |
|
|
86 |
|
|
87 |
1 |
def test_get_list(db_base): |
88 |
0 |
with pytest.raises(DbException) as excinfo: |
89 |
0 |
db_base.get_list(None, None) |
90 |
0 |
assert str(excinfo.value).startswith( |
91 |
|
exception_message("Method 'get_list' not implemented") |
92 |
|
) |
93 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
94 |
|
|
95 |
|
|
96 |
1 |
def test_get_one(db_base): |
97 |
0 |
with pytest.raises(DbException) as excinfo: |
98 |
0 |
db_base.get_one(None, None, None, None) |
99 |
0 |
assert str(excinfo.value).startswith( |
100 |
|
exception_message("Method 'get_one' not implemented") |
101 |
|
) |
102 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
103 |
|
|
104 |
|
|
105 |
1 |
def test_create(db_base): |
106 |
0 |
with pytest.raises(DbException) as excinfo: |
107 |
0 |
db_base.create(None, None) |
108 |
0 |
assert str(excinfo.value).startswith( |
109 |
|
exception_message("Method 'create' not implemented") |
110 |
|
) |
111 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
112 |
|
|
113 |
|
|
114 |
1 |
def test_create_list(db_base): |
115 |
0 |
with pytest.raises(DbException) as excinfo: |
116 |
0 |
db_base.create_list(None, None) |
117 |
0 |
assert str(excinfo.value).startswith( |
118 |
|
exception_message("Method 'create_list' not implemented") |
119 |
|
) |
120 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
121 |
|
|
122 |
|
|
123 |
1 |
def test_del_list(db_base): |
124 |
0 |
with pytest.raises(DbException) as excinfo: |
125 |
0 |
db_base.del_list(None, None) |
126 |
0 |
assert str(excinfo.value).startswith( |
127 |
|
exception_message("Method 'del_list' not implemented") |
128 |
|
) |
129 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
130 |
|
|
131 |
|
|
132 |
1 |
def test_del_one(db_base): |
133 |
0 |
with pytest.raises(DbException) as excinfo: |
134 |
0 |
db_base.del_one(None, None, None) |
135 |
0 |
assert str(excinfo.value).startswith( |
136 |
|
exception_message("Method 'del_one' not implemented") |
137 |
|
) |
138 |
0 |
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND |
139 |
|
|
140 |
|
|
141 |
1 |
class TestEncryption(unittest.TestCase): |
142 |
1 |
def setUp(self): |
143 |
1 |
master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'" |
144 |
1 |
db_base1 = DbBase() |
145 |
1 |
db_base2 = DbBase() |
146 |
1 |
db_base3 = DbBase() |
147 |
|
# set self.secret_key obtained when connect |
148 |
1 |
db_base1.set_secret_key(master_key, replace=True) |
149 |
1 |
db_base1.set_secret_key(urandom(32)) |
150 |
1 |
db_base2.set_secret_key(None, replace=True) |
151 |
1 |
db_base2.set_secret_key(urandom(30)) |
152 |
1 |
db_base3.set_secret_key(master_key) |
153 |
1 |
self.db_bases = [db_base1, db_base2, db_base3] |
154 |
|
|
155 |
1 |
def test_encrypt_decrypt(self): |
156 |
1 |
TEST = ( |
157 |
|
("plain text 1 ! ", None), |
158 |
|
("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"), |
159 |
|
) |
160 |
1 |
for db_base in self.db_bases: |
161 |
1 |
for value, salt in TEST: |
162 |
|
# no encryption |
163 |
1 |
encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt) |
164 |
1 |
self.assertEqual( |
165 |
|
encrypted, value, "value '{}' has been encrypted".format(value) |
166 |
|
) |
167 |
1 |
decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt) |
168 |
1 |
self.assertEqual( |
169 |
|
decrypted, value, "value '{}' has been decrypted".format(value) |
170 |
|
) |
171 |
|
|
172 |
|
# encrypt/decrypt |
173 |
1 |
encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt) |
174 |
1 |
self.assertNotEqual( |
175 |
|
encrypted, value, "value '{}' has not been encrypted".format(value) |
176 |
|
) |
177 |
1 |
self.assertIsInstance(encrypted, str, "Encrypted is not ascii text") |
178 |
1 |
decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt) |
179 |
1 |
self.assertEqual( |
180 |
|
decrypted, value, "value is not equal after encryption/decryption" |
181 |
|
) |
182 |
|
|
183 |
1 |
def test_encrypt_decrypt_salt(self): |
184 |
1 |
value = "value to be encrypted!" |
185 |
1 |
encrypted = [] |
186 |
1 |
for db_base in self.db_bases: |
187 |
1 |
for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"): |
188 |
|
# encrypt/decrypt |
189 |
1 |
encrypted.append( |
190 |
|
db_base.encrypt(value, schema_version="1.1", salt=salt) |
191 |
|
) |
192 |
1 |
self.assertNotEqual( |
193 |
|
encrypted[-1], |
194 |
|
value, |
195 |
|
"value '{}' has not been encrypted".format(value), |
196 |
|
) |
197 |
1 |
self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text") |
198 |
1 |
decrypted = db_base.decrypt( |
199 |
|
encrypted[-1], schema_version="1.1", salt=salt |
200 |
|
) |
201 |
1 |
self.assertEqual( |
202 |
|
decrypted, value, "value is not equal after encryption/decryption" |
203 |
|
) |
204 |
1 |
for i in range(0, len(encrypted)): |
205 |
1 |
for j in range(i + 1, len(encrypted)): |
206 |
1 |
self.assertNotEqual( |
207 |
|
encrypted[i], |
208 |
|
encrypted[j], |
209 |
|
"encryption with different salt must contain different result", |
210 |
|
) |
211 |
|
# decrypt with a different master key |
212 |
1 |
try: |
213 |
1 |
decrypted = self.db_bases[-1].decrypt( |
214 |
|
encrypted[0], schema_version="1.1", salt=None |
215 |
|
) |
216 |
0 |
self.assertNotEqual( |
217 |
|
encrypted[0], |
218 |
|
decrypted, |
219 |
|
"Decryption with different KEY must generate different result", |
220 |
|
) |
221 |
1 |
except DbException as e: |
222 |
1 |
self.assertEqual( |
223 |
|
e.http_code, |
224 |
|
HTTPStatus.INTERNAL_SERVER_ERROR, |
225 |
|
"Decryption with different KEY does not provide expected http_code", |
226 |
|
) |
227 |
|
|
228 |
|
|
229 |
1 |
class AsyncMock(MagicMock): |
230 |
1 |
async def __call__(self, *args, **kwargs): |
231 |
1 |
args = deepcopy(args) |
232 |
1 |
kwargs = deepcopy(kwargs) |
233 |
1 |
return super(AsyncMock, self).__call__(*args, **kwargs) |
234 |
|
|
235 |
|
|
236 |
1 |
class CopyingMock(MagicMock): |
237 |
1 |
def __call__(self, *args, **kwargs): |
238 |
1 |
args = deepcopy(args) |
239 |
1 |
kwargs = deepcopy(kwargs) |
240 |
1 |
return super(CopyingMock, self).__call__(*args, **kwargs) |
241 |
|
|
242 |
|
|
243 |
1 |
def check_if_assert_not_called(mocks: list): |
244 |
1 |
for mocking in mocks: |
245 |
1 |
mocking.assert_not_called() |
246 |
|
|
247 |
|
|
248 |
1 |
class TestBaseEncryption(unittest.TestCase): |
249 |
1 |
@patch("logging.getLogger", autospec=True) |
250 |
1 |
def setUp(self, mock_logger): |
251 |
1 |
mock_logger = logging.getLogger() |
252 |
1 |
mock_logger.disabled = True |
253 |
1 |
self.db_base = DbBase() |
254 |
1 |
self.mock_cipher = CopyingMock() |
255 |
1 |
self.db_base.encoding_type = encoding_type |
256 |
1 |
self.db_base.encrypt_mode = encyrpt_mode |
257 |
1 |
self.db_base.secret_key = secret_key |
258 |
1 |
self.mock_padded_msg = CopyingMock() |
259 |
|
|
260 |
1 |
def test_pad_data_len_not_multiplication_of_16(self): |
261 |
1 |
data = "hello word hello hello word hello word" |
262 |
1 |
data_len = len(data) |
263 |
1 |
expected_len = 48 |
264 |
1 |
padded = self.db_base.pad_data(data) |
265 |
1 |
self.assertEqual(len(padded), expected_len) |
266 |
1 |
self.assertTrue("\0" * (expected_len - data_len) in padded) |
267 |
|
|
268 |
1 |
def test_pad_data_len_multiplication_of_16(self): |
269 |
1 |
data = "hello word!!!!!!" |
270 |
1 |
padded = self.db_base.pad_data(data) |
271 |
1 |
self.assertEqual(padded, data) |
272 |
1 |
self.assertFalse("\0" in padded) |
273 |
|
|
274 |
1 |
def test_pad_data_empty_string(self): |
275 |
1 |
data = "" |
276 |
1 |
expected_len = 0 |
277 |
1 |
padded = self.db_base.pad_data(data) |
278 |
1 |
self.assertEqual(len(padded), expected_len) |
279 |
1 |
self.assertFalse("\0" in padded) |
280 |
|
|
281 |
1 |
def test_pad_data_not_string(self): |
282 |
1 |
data = None |
283 |
1 |
with self.assertRaises(Exception) as err: |
284 |
1 |
self.db_base.pad_data(data) |
285 |
1 |
self.assertEqual( |
286 |
|
str(err.exception), |
287 |
|
"database exception Incorrect data type: type(None), string is expected.", |
288 |
|
) |
289 |
|
|
290 |
1 |
def test_unpad_data_null_char_at_right(self): |
291 |
1 |
null_padded_data = "hell0word\0\0" |
292 |
1 |
expected_length = len(null_padded_data) - 2 |
293 |
1 |
unpadded = self.db_base.unpad_data(null_padded_data) |
294 |
1 |
self.assertEqual(len(unpadded), expected_length) |
295 |
1 |
self.assertFalse("\0" in unpadded) |
296 |
1 |
self.assertTrue("0" in unpadded) |
297 |
|
|
298 |
1 |
def test_unpad_data_null_char_is_not_rightest(self): |
299 |
1 |
null_padded_data = "hell0word\r\t\0\n" |
300 |
1 |
expected_length = len(null_padded_data) |
301 |
1 |
unpadded = self.db_base.unpad_data(null_padded_data) |
302 |
1 |
self.assertEqual(len(unpadded), expected_length) |
303 |
1 |
self.assertTrue("\0" in unpadded) |
304 |
|
|
305 |
1 |
def test_unpad_data_with_spaces_at_right(self): |
306 |
1 |
null_padded_data = " hell0word\0 " |
307 |
1 |
expected_length = len(null_padded_data) |
308 |
1 |
unpadded = self.db_base.unpad_data(null_padded_data) |
309 |
1 |
self.assertEqual(len(unpadded), expected_length) |
310 |
1 |
self.assertTrue("\0" in unpadded) |
311 |
|
|
312 |
1 |
def test_unpad_data_empty_string(self): |
313 |
1 |
data = "" |
314 |
1 |
unpadded = self.db_base.unpad_data(data) |
315 |
1 |
self.assertEqual(unpadded, "") |
316 |
1 |
self.assertFalse("\0" in unpadded) |
317 |
|
|
318 |
1 |
def test_unpad_data_not_string(self): |
319 |
1 |
data = None |
320 |
1 |
with self.assertRaises(Exception) as err: |
321 |
1 |
self.db_base.unpad_data(data) |
322 |
1 |
self.assertEqual( |
323 |
|
str(err.exception), |
324 |
|
"database exception Incorrect data type: type(None), string is expected.", |
325 |
|
) |
326 |
|
|
327 |
1 |
@patch.object(DbBase, "_join_secret_key") |
328 |
1 |
@patch.object(DbBase, "pad_data") |
329 |
1 |
def test__encrypt_value_schema_version_1_0_none_secret_key_none_salt( |
330 |
|
self, mock_pad_data, mock_join_secret_key |
331 |
|
): |
332 |
|
"""schema_version 1.0, secret_key is None and salt is None.""" |
333 |
1 |
schema_version = "1.0" |
334 |
1 |
salt = None |
335 |
1 |
self.db_base.secret_key = None |
336 |
1 |
result = self.db_base._encrypt_value(value, schema_version, salt) |
337 |
1 |
self.assertEqual(result, value) |
338 |
1 |
check_if_assert_not_called([mock_pad_data, mock_join_secret_key]) |
339 |
|
|
340 |
1 |
@patch("osm_common.dbbase.b64encode") |
341 |
1 |
@patch("osm_common.dbbase.AES") |
342 |
1 |
@patch.object(DbBase, "_join_secret_key") |
343 |
1 |
@patch.object(DbBase, "pad_data") |
344 |
1 |
def test__encrypt_value_schema_version_1_1_with_secret_key_exists_with_salt( |
345 |
|
self, |
346 |
|
mock_pad_data, |
347 |
|
mock_join_secret_key, |
348 |
|
mock_aes, |
349 |
|
mock_b64_encode, |
350 |
|
): |
351 |
|
"""schema_version 1.1, secret_key exists, salt exists.""" |
352 |
1 |
mock_aes.new.return_value = self.mock_cipher |
353 |
1 |
self.mock_cipher.encrypt.return_value = data_to_b4_encode |
354 |
1 |
self.mock_padded_msg.return_value = padded_value |
355 |
1 |
mock_pad_data.return_value = self.mock_padded_msg |
356 |
1 |
self.mock_padded_msg.encode.return_value = padded_encoded_value |
357 |
|
|
358 |
1 |
mock_b64_encode.return_value = encyrpted_bytes |
359 |
|
|
360 |
1 |
result = self.db_base._encrypt_value(value, schema_version, salt) |
361 |
|
|
362 |
1 |
self.assertTrue(isinstance(result, str)) |
363 |
1 |
self.assertEqual(result, encyrpted_value) |
364 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
365 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
366 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
367 |
1 |
mock_pad_data.assert_called_once_with(value) |
368 |
1 |
mock_b64_encode.assert_called_once_with(data_to_b4_encode) |
369 |
1 |
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value) |
370 |
1 |
self.mock_padded_msg.encode.assert_called_with(encoding_type) |
371 |
|
|
372 |
1 |
@patch.object(DbBase, "_join_secret_key") |
373 |
1 |
@patch.object(DbBase, "pad_data") |
374 |
1 |
def test__encrypt_value_schema_version_1_0_secret_key_not_exists( |
375 |
|
self, mock_pad_data, mock_join_secret_key |
376 |
|
): |
377 |
|
"""schema_version 1.0, secret_key is None, salt exists.""" |
378 |
1 |
schema_version = "1.0" |
379 |
1 |
self.db_base.secret_key = None |
380 |
1 |
result = self.db_base._encrypt_value(value, schema_version, salt) |
381 |
1 |
self.assertEqual(result, value) |
382 |
1 |
check_if_assert_not_called([mock_pad_data, mock_join_secret_key]) |
383 |
|
|
384 |
1 |
@patch.object(DbBase, "_join_secret_key") |
385 |
1 |
@patch.object(DbBase, "pad_data") |
386 |
1 |
def test__encrypt_value_schema_version_1_1_secret_key_not_exists( |
387 |
|
self, mock_pad_data, mock_join_secret_key |
388 |
|
): |
389 |
|
"""schema_version 1.1, secret_key is None, salt exists.""" |
390 |
1 |
self.db_base.secret_key = None |
391 |
1 |
result = self.db_base._encrypt_value(value, schema_version, salt) |
392 |
1 |
self.assertEqual(result, value) |
393 |
1 |
check_if_assert_not_called([mock_pad_data, mock_join_secret_key]) |
394 |
|
|
395 |
1 |
@patch("osm_common.dbbase.b64encode") |
396 |
1 |
@patch("osm_common.dbbase.AES") |
397 |
1 |
@patch.object(DbBase, "_join_secret_key") |
398 |
1 |
@patch.object(DbBase, "pad_data") |
399 |
1 |
def test__encrypt_value_schema_version_1_1_secret_key_exists_without_salt( |
400 |
|
self, |
401 |
|
mock_pad_data, |
402 |
|
mock_join_secret_key, |
403 |
|
mock_aes, |
404 |
|
mock_b64_encode, |
405 |
|
): |
406 |
|
"""schema_version 1.1, secret_key exists, salt is None.""" |
407 |
1 |
salt = None |
408 |
1 |
mock_aes.new.return_value = self.mock_cipher |
409 |
1 |
self.mock_cipher.encrypt.return_value = data_to_b4_encode |
410 |
|
|
411 |
1 |
self.mock_padded_msg.return_value = padded_value |
412 |
1 |
mock_pad_data.return_value = self.mock_padded_msg |
413 |
1 |
self.mock_padded_msg.encode.return_value = padded_encoded_value |
414 |
|
|
415 |
1 |
mock_b64_encode.return_value = encyrpted_bytes |
416 |
|
|
417 |
1 |
result = self.db_base._encrypt_value(value, schema_version, salt) |
418 |
|
|
419 |
1 |
self.assertEqual(result, encyrpted_value) |
420 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
421 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
422 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
423 |
1 |
mock_pad_data.assert_called_once_with(value) |
424 |
1 |
mock_b64_encode.assert_called_once_with(data_to_b4_encode) |
425 |
1 |
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value) |
426 |
1 |
self.mock_padded_msg.encode.assert_called_with(encoding_type) |
427 |
|
|
428 |
1 |
@patch("osm_common.dbbase.b64encode") |
429 |
1 |
@patch("osm_common.dbbase.AES") |
430 |
1 |
@patch.object(DbBase, "_join_secret_key") |
431 |
1 |
@patch.object(DbBase, "pad_data") |
432 |
1 |
def test__encrypt_value_invalid_encrpt_mode( |
433 |
|
self, |
434 |
|
mock_pad_data, |
435 |
|
mock_join_secret_key, |
436 |
|
mock_aes, |
437 |
|
mock_b64_encode, |
438 |
|
): |
439 |
|
"""encrypt_mode is invalid.""" |
440 |
1 |
mock_aes.new.side_effect = Exception("Invalid ciphering mode.") |
441 |
1 |
self.db_base.encrypt_mode = "AES.MODE_XXX" |
442 |
|
|
443 |
1 |
with self.assertRaises(Exception) as err: |
444 |
1 |
self.db_base._encrypt_value(value, schema_version, salt) |
445 |
|
|
446 |
1 |
self.assertEqual(str(err.exception), "Invalid ciphering mode.") |
447 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
448 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
449 |
1 |
self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX") |
450 |
1 |
check_if_assert_not_called([mock_pad_data, mock_b64_encode]) |
451 |
|
|
452 |
1 |
@patch("osm_common.dbbase.b64encode") |
453 |
1 |
@patch("osm_common.dbbase.AES") |
454 |
1 |
@patch.object(DbBase, "_join_secret_key") |
455 |
1 |
@patch.object(DbBase, "pad_data") |
456 |
1 |
def test__encrypt_value_schema_version_1_1_secret_key_exists_value_none( |
457 |
|
self, |
458 |
|
mock_pad_data, |
459 |
|
mock_join_secret_key, |
460 |
|
mock_aes, |
461 |
|
mock_b64_encode, |
462 |
|
): |
463 |
|
"""schema_version 1.1, secret_key exists, value is None.""" |
464 |
1 |
value = None |
465 |
1 |
mock_aes.new.return_value = self.mock_cipher |
466 |
1 |
mock_pad_data.side_effect = DbException( |
467 |
|
"Incorrect data type: type(None), string is expected." |
468 |
|
) |
469 |
|
|
470 |
1 |
with self.assertRaises(Exception) as err: |
471 |
1 |
self.db_base._encrypt_value(value, schema_version, salt) |
472 |
1 |
self.assertEqual( |
473 |
|
str(err.exception), |
474 |
|
"database exception Incorrect data type: type(None), string is expected.", |
475 |
|
) |
476 |
|
|
477 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
478 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
479 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
480 |
1 |
mock_pad_data.assert_called_once_with(value) |
481 |
1 |
check_if_assert_not_called( |
482 |
|
[mock_b64_encode, self.mock_cipher.encrypt, mock_b64_encode] |
483 |
|
) |
484 |
|
|
485 |
1 |
@patch("osm_common.dbbase.b64encode") |
486 |
1 |
@patch("osm_common.dbbase.AES") |
487 |
1 |
@patch.object(DbBase, "_join_secret_key") |
488 |
1 |
@patch.object(DbBase, "pad_data") |
489 |
1 |
def test__encrypt_value_join_secret_key_raises( |
490 |
|
self, |
491 |
|
mock_pad_data, |
492 |
|
mock_join_secret_key, |
493 |
|
mock_aes, |
494 |
|
mock_b64_encode, |
495 |
|
): |
496 |
|
"""Method join_secret_key raises DbException.""" |
497 |
1 |
salt = b"3434o34-3wewrwr-222424-2242dwew" |
498 |
|
|
499 |
1 |
mock_join_secret_key.side_effect = DbException("Unexpected type") |
500 |
|
|
501 |
1 |
mock_aes.new.return_value = self.mock_cipher |
502 |
|
|
503 |
1 |
with self.assertRaises(Exception) as err: |
504 |
1 |
self.db_base._encrypt_value(value, schema_version, salt) |
505 |
|
|
506 |
1 |
self.assertEqual(str(err.exception), "database exception Unexpected type") |
507 |
1 |
check_if_assert_not_called( |
508 |
|
[mock_pad_data, mock_aes.new, mock_b64_encode, self.mock_cipher.encrypt] |
509 |
|
) |
510 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
511 |
|
|
512 |
1 |
@patch("osm_common.dbbase.b64encode") |
513 |
1 |
@patch("osm_common.dbbase.AES") |
514 |
1 |
@patch.object(DbBase, "_join_secret_key") |
515 |
1 |
@patch.object(DbBase, "pad_data") |
516 |
1 |
def test__encrypt_value_schema_version_1_1_secret_key_exists_b64_encode_raises( |
517 |
|
self, |
518 |
|
mock_pad_data, |
519 |
|
mock_join_secret_key, |
520 |
|
mock_aes, |
521 |
|
mock_b64_encode, |
522 |
|
): |
523 |
|
"""schema_version 1.1, secret_key exists, b64encode raises TypeError.""" |
524 |
1 |
mock_aes.new.return_value = self.mock_cipher |
525 |
1 |
self.mock_cipher.encrypt.return_value = "encrypted data" |
526 |
|
|
527 |
1 |
self.mock_padded_msg.return_value = padded_value |
528 |
1 |
mock_pad_data.return_value = self.mock_padded_msg |
529 |
1 |
self.mock_padded_msg.encode.return_value = padded_encoded_value |
530 |
|
|
531 |
1 |
mock_b64_encode.side_effect = TypeError( |
532 |
|
"A bytes-like object is required, not 'str'" |
533 |
|
) |
534 |
|
|
535 |
1 |
with self.assertRaises(Exception) as error: |
536 |
1 |
self.db_base._encrypt_value(value, schema_version, salt) |
537 |
1 |
self.assertEqual( |
538 |
|
str(error.exception), "A bytes-like object is required, not 'str'" |
539 |
|
) |
540 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
541 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
542 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
543 |
1 |
mock_pad_data.assert_called_once_with(value) |
544 |
1 |
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value) |
545 |
1 |
self.mock_padded_msg.encode.assert_called_with(encoding_type) |
546 |
1 |
mock_b64_encode.assert_called_once_with("encrypted data") |
547 |
|
|
548 |
1 |
@patch("osm_common.dbbase.b64encode") |
549 |
1 |
@patch("osm_common.dbbase.AES") |
550 |
1 |
@patch.object(DbBase, "_join_secret_key") |
551 |
1 |
@patch.object(DbBase, "pad_data") |
552 |
1 |
def test__encrypt_value_cipher_encrypt_raises( |
553 |
|
self, |
554 |
|
mock_pad_data, |
555 |
|
mock_join_secret_key, |
556 |
|
mock_aes, |
557 |
|
mock_b64_encode, |
558 |
|
): |
559 |
|
"""AES encrypt method raises Exception.""" |
560 |
1 |
mock_aes.new.return_value = self.mock_cipher |
561 |
1 |
self.mock_cipher.encrypt.side_effect = Exception("Invalid data type.") |
562 |
|
|
563 |
1 |
self.mock_padded_msg.return_value = padded_value |
564 |
1 |
mock_pad_data.return_value = self.mock_padded_msg |
565 |
1 |
self.mock_padded_msg.encode.return_value = padded_encoded_value |
566 |
|
|
567 |
1 |
with self.assertRaises(Exception) as error: |
568 |
1 |
self.db_base._encrypt_value(value, schema_version, salt) |
569 |
|
|
570 |
1 |
self.assertEqual(str(error.exception), "Invalid data type.") |
571 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
572 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
573 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
574 |
1 |
mock_pad_data.assert_called_once_with(value) |
575 |
1 |
self.mock_cipher.encrypt.assert_called_once_with(padded_encoded_value) |
576 |
1 |
self.mock_padded_msg.encode.assert_called_with(encoding_type) |
577 |
1 |
mock_b64_encode.assert_not_called() |
578 |
|
|
579 |
1 |
@patch.object(DbBase, "get_secret_key") |
580 |
1 |
@patch.object(DbBase, "_encrypt_value") |
581 |
1 |
def test_encrypt_without_schema_version_without_salt( |
582 |
|
self, mock_encrypt_value, mock_get_secret_key |
583 |
|
): |
584 |
|
"""schema and salt is None.""" |
585 |
1 |
mock_encrypt_value.return_value = encyrpted_value |
586 |
1 |
result = self.db_base.encrypt(value) |
587 |
1 |
mock_encrypt_value.assert_called_once_with(value, None, None) |
588 |
1 |
mock_get_secret_key.assert_called_once() |
589 |
1 |
self.assertEqual(result, encyrpted_value) |
590 |
|
|
591 |
1 |
@patch.object(DbBase, "get_secret_key") |
592 |
1 |
@patch.object(DbBase, "_encrypt_value") |
593 |
1 |
def test_encrypt_with_schema_version_with_salt( |
594 |
|
self, mock_encrypt_value, mock_get_secret_key |
595 |
|
): |
596 |
|
"""schema version exists, salt is None.""" |
597 |
1 |
mock_encrypt_value.return_value = encyrpted_value |
598 |
1 |
result = self.db_base.encrypt(value, schema_version, salt) |
599 |
1 |
mock_encrypt_value.assert_called_once_with(value, schema_version, salt) |
600 |
1 |
mock_get_secret_key.assert_called_once() |
601 |
1 |
self.assertEqual(result, encyrpted_value) |
602 |
|
|
603 |
1 |
@patch.object(DbBase, "get_secret_key") |
604 |
1 |
@patch.object(DbBase, "_encrypt_value") |
605 |
1 |
def test_encrypt_get_secret_key_raises( |
606 |
|
self, mock_encrypt_value, mock_get_secret_key |
607 |
|
): |
608 |
|
"""get_secret_key method raises DbException.""" |
609 |
1 |
mock_get_secret_key.side_effect = DbException("KeyError") |
610 |
1 |
with self.assertRaises(Exception) as error: |
611 |
1 |
self.db_base.encrypt(value) |
612 |
1 |
self.assertEqual(str(error.exception), "database exception KeyError") |
613 |
1 |
mock_encrypt_value.assert_not_called() |
614 |
1 |
mock_get_secret_key.assert_called_once() |
615 |
|
|
616 |
1 |
@patch.object(DbBase, "get_secret_key") |
617 |
1 |
@patch.object(DbBase, "_encrypt_value") |
618 |
1 |
def test_encrypt_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key): |
619 |
|
"""_encrypt method raises DbException.""" |
620 |
1 |
mock_encrypt_value.side_effect = DbException( |
621 |
|
"Incorrect data type: type(None), string is expected." |
622 |
|
) |
623 |
1 |
with self.assertRaises(Exception) as error: |
624 |
1 |
self.db_base.encrypt(value, schema_version, salt) |
625 |
1 |
self.assertEqual( |
626 |
|
str(error.exception), |
627 |
|
"database exception Incorrect data type: type(None), string is expected.", |
628 |
|
) |
629 |
1 |
mock_encrypt_value.assert_called_once_with(value, schema_version, salt) |
630 |
1 |
mock_get_secret_key.assert_called_once() |
631 |
|
|
632 |
1 |
@patch("osm_common.dbbase.b64decode") |
633 |
1 |
@patch("osm_common.dbbase.AES") |
634 |
1 |
@patch.object(DbBase, "_join_secret_key") |
635 |
1 |
@patch.object(DbBase, "unpad_data") |
636 |
1 |
def test__decrypt_value_schema_version_1_1_secret_key_exists_without_salt( |
637 |
|
self, |
638 |
|
mock_unpad_data, |
639 |
|
mock_join_secret_key, |
640 |
|
mock_aes, |
641 |
|
mock_b64_decode, |
642 |
|
): |
643 |
|
"""schema_version 1.1, secret_key exists, salt is None.""" |
644 |
1 |
salt = None |
645 |
1 |
mock_aes.new.return_value = self.mock_cipher |
646 |
1 |
self.mock_cipher.decrypt.return_value = padded_encoded_value |
647 |
|
|
648 |
1 |
mock_b64_decode.return_value = b64_decoded |
649 |
|
|
650 |
1 |
mock_unpad_data.return_value = value |
651 |
|
|
652 |
1 |
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
653 |
1 |
self.assertEqual(result, value) |
654 |
|
|
655 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
656 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
657 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
658 |
1 |
mock_unpad_data.assert_called_once_with("private key txt\0") |
659 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
660 |
1 |
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded) |
661 |
|
|
662 |
1 |
@patch("osm_common.dbbase.b64decode") |
663 |
1 |
@patch("osm_common.dbbase.AES") |
664 |
1 |
@patch.object(DbBase, "_join_secret_key") |
665 |
1 |
@patch.object(DbBase, "unpad_data") |
666 |
1 |
def test__decrypt_value_schema_version_1_1_secret_key_exists_with_salt( |
667 |
|
self, |
668 |
|
mock_unpad_data, |
669 |
|
mock_join_secret_key, |
670 |
|
mock_aes, |
671 |
|
mock_b64_decode, |
672 |
|
): |
673 |
|
"""schema_version 1.1, secret_key exists, salt is None.""" |
674 |
1 |
mock_aes.new.return_value = self.mock_cipher |
675 |
1 |
self.mock_cipher.decrypt.return_value = padded_encoded_value |
676 |
|
|
677 |
1 |
mock_b64_decode.return_value = b64_decoded |
678 |
|
|
679 |
1 |
mock_unpad_data.return_value = value |
680 |
|
|
681 |
1 |
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
682 |
1 |
self.assertEqual(result, value) |
683 |
|
|
684 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
685 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
686 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
687 |
1 |
mock_unpad_data.assert_called_once_with("private key txt\0") |
688 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
689 |
1 |
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded) |
690 |
|
|
691 |
1 |
@patch("osm_common.dbbase.b64decode") |
692 |
1 |
@patch("osm_common.dbbase.AES") |
693 |
1 |
@patch.object(DbBase, "_join_secret_key") |
694 |
1 |
@patch.object(DbBase, "unpad_data") |
695 |
1 |
def test__decrypt_value_schema_version_1_1_without_secret_key( |
696 |
|
self, |
697 |
|
mock_unpad_data, |
698 |
|
mock_join_secret_key, |
699 |
|
mock_aes, |
700 |
|
mock_b64_decode, |
701 |
|
): |
702 |
|
"""schema_version 1.1, secret_key is None, salt exists.""" |
703 |
1 |
self.db_base.secret_key = None |
704 |
|
|
705 |
1 |
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
706 |
|
|
707 |
1 |
self.assertEqual(result, encyrpted_value) |
708 |
1 |
check_if_assert_not_called( |
709 |
|
[ |
710 |
|
mock_join_secret_key, |
711 |
|
mock_aes.new, |
712 |
|
mock_unpad_data, |
713 |
|
mock_b64_decode, |
714 |
|
self.mock_cipher.decrypt, |
715 |
|
] |
716 |
|
) |
717 |
|
|
718 |
1 |
@patch("osm_common.dbbase.b64decode") |
719 |
1 |
@patch("osm_common.dbbase.AES") |
720 |
1 |
@patch.object(DbBase, "_join_secret_key") |
721 |
1 |
@patch.object(DbBase, "unpad_data") |
722 |
1 |
def test__decrypt_value_schema_version_1_0_with_secret_key( |
723 |
|
self, |
724 |
|
mock_unpad_data, |
725 |
|
mock_join_secret_key, |
726 |
|
mock_aes, |
727 |
|
mock_b64_decode, |
728 |
|
): |
729 |
|
"""schema_version 1.0, secret_key exists, salt exists.""" |
730 |
1 |
schema_version = "1.0" |
731 |
1 |
result = self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
732 |
|
|
733 |
1 |
self.assertEqual(result, encyrpted_value) |
734 |
1 |
check_if_assert_not_called( |
735 |
|
[ |
736 |
|
mock_join_secret_key, |
737 |
|
mock_aes.new, |
738 |
|
mock_unpad_data, |
739 |
|
mock_b64_decode, |
740 |
|
self.mock_cipher.decrypt, |
741 |
|
] |
742 |
|
) |
743 |
|
|
744 |
1 |
@patch("osm_common.dbbase.b64decode") |
745 |
1 |
@patch("osm_common.dbbase.AES") |
746 |
1 |
@patch.object(DbBase, "_join_secret_key") |
747 |
1 |
@patch.object(DbBase, "unpad_data") |
748 |
1 |
def test__decrypt_value_join_secret_key_raises( |
749 |
|
self, |
750 |
|
mock_unpad_data, |
751 |
|
mock_join_secret_key, |
752 |
|
mock_aes, |
753 |
|
mock_b64_decode, |
754 |
|
): |
755 |
|
"""_join_secret_key raises TypeError.""" |
756 |
1 |
salt = object() |
757 |
1 |
mock_join_secret_key.side_effect = TypeError("'type' object is not iterable") |
758 |
|
|
759 |
1 |
with self.assertRaises(Exception) as error: |
760 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
761 |
1 |
self.assertEqual(str(error.exception), "'type' object is not iterable") |
762 |
|
|
763 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
764 |
1 |
check_if_assert_not_called( |
765 |
|
[mock_aes.new, mock_unpad_data, mock_b64_decode, self.mock_cipher.decrypt] |
766 |
|
) |
767 |
|
|
768 |
1 |
@patch("osm_common.dbbase.b64decode") |
769 |
1 |
@patch("osm_common.dbbase.AES") |
770 |
1 |
@patch.object(DbBase, "_join_secret_key") |
771 |
1 |
@patch.object(DbBase, "unpad_data") |
772 |
1 |
def test__decrypt_value_b64decode_raises( |
773 |
|
self, |
774 |
|
mock_unpad_data, |
775 |
|
mock_join_secret_key, |
776 |
|
mock_aes, |
777 |
|
mock_b64_decode, |
778 |
|
): |
779 |
|
"""b64decode raises TypeError.""" |
780 |
1 |
mock_b64_decode.side_effect = TypeError( |
781 |
|
"A str-like object is required, not 'bytes'" |
782 |
|
) |
783 |
1 |
with self.assertRaises(Exception) as error: |
784 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
785 |
1 |
self.assertEqual( |
786 |
|
str(error.exception), "A str-like object is required, not 'bytes'" |
787 |
|
) |
788 |
|
|
789 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
790 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
791 |
1 |
check_if_assert_not_called( |
792 |
|
[mock_aes.new, self.mock_cipher.decrypt, mock_unpad_data] |
793 |
|
) |
794 |
|
|
795 |
1 |
@patch("osm_common.dbbase.b64decode") |
796 |
1 |
@patch("osm_common.dbbase.AES") |
797 |
1 |
@patch.object(DbBase, "_join_secret_key") |
798 |
1 |
@patch.object(DbBase, "unpad_data") |
799 |
1 |
def test__decrypt_value_invalid_encrypt_mode( |
800 |
|
self, |
801 |
|
mock_unpad_data, |
802 |
|
mock_join_secret_key, |
803 |
|
mock_aes, |
804 |
|
mock_b64_decode, |
805 |
|
): |
806 |
|
"""Invalid AES encrypt mode.""" |
807 |
1 |
mock_aes.new.side_effect = Exception("Invalid ciphering mode.") |
808 |
1 |
self.db_base.encrypt_mode = "AES.MODE_XXX" |
809 |
|
|
810 |
1 |
mock_b64_decode.return_value = b64_decoded |
811 |
1 |
with self.assertRaises(Exception) as error: |
812 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
813 |
|
|
814 |
1 |
self.assertEqual(str(error.exception), "Invalid ciphering mode.") |
815 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
816 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
817 |
1 |
self.assertEqual(_call_mock_aes_new[1], "AES.MODE_XXX") |
818 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
819 |
1 |
check_if_assert_not_called([mock_unpad_data, self.mock_cipher.decrypt]) |
820 |
|
|
821 |
1 |
@patch("osm_common.dbbase.b64decode") |
822 |
1 |
@patch("osm_common.dbbase.AES") |
823 |
1 |
@patch.object(DbBase, "_join_secret_key") |
824 |
1 |
@patch.object(DbBase, "unpad_data") |
825 |
1 |
def test__decrypt_value_cipher_decrypt_raises( |
826 |
|
self, |
827 |
|
mock_unpad_data, |
828 |
|
mock_join_secret_key, |
829 |
|
mock_aes, |
830 |
|
mock_b64_decode, |
831 |
|
): |
832 |
|
"""AES decrypt raises Exception.""" |
833 |
1 |
mock_b64_decode.return_value = b64_decoded |
834 |
|
|
835 |
1 |
mock_aes.new.return_value = self.mock_cipher |
836 |
1 |
self.mock_cipher.decrypt.side_effect = Exception("Invalid data type.") |
837 |
|
|
838 |
1 |
with self.assertRaises(Exception) as error: |
839 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
840 |
1 |
self.assertEqual(str(error.exception), "Invalid data type.") |
841 |
|
|
842 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
843 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
844 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
845 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
846 |
1 |
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded) |
847 |
1 |
mock_unpad_data.assert_not_called() |
848 |
|
|
849 |
1 |
@patch("osm_common.dbbase.b64decode") |
850 |
1 |
@patch("osm_common.dbbase.AES") |
851 |
1 |
@patch.object(DbBase, "_join_secret_key") |
852 |
1 |
@patch.object(DbBase, "unpad_data") |
853 |
1 |
def test__decrypt_value_decode_raises( |
854 |
|
self, |
855 |
|
mock_unpad_data, |
856 |
|
mock_join_secret_key, |
857 |
|
mock_aes, |
858 |
|
mock_b64_decode, |
859 |
|
): |
860 |
|
"""Decode raises UnicodeDecodeError.""" |
861 |
1 |
mock_aes.new.return_value = self.mock_cipher |
862 |
1 |
self.mock_cipher.decrypt.return_value = b"\xd0\x000091" |
863 |
|
|
864 |
1 |
mock_b64_decode.return_value = b64_decoded |
865 |
|
|
866 |
1 |
mock_unpad_data.return_value = value |
867 |
1 |
with self.assertRaises(Exception) as error: |
868 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
869 |
1 |
self.assertEqual( |
870 |
|
str(error.exception), |
871 |
|
"database exception Cannot decrypt information. Are you using same COMMONKEY in all OSM components?", |
872 |
|
) |
873 |
1 |
self.assertEqual(type(error.exception), DbException) |
874 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
875 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
876 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
877 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
878 |
1 |
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded) |
879 |
1 |
mock_unpad_data.assert_not_called() |
880 |
|
|
881 |
1 |
@patch("osm_common.dbbase.b64decode") |
882 |
1 |
@patch("osm_common.dbbase.AES") |
883 |
1 |
@patch.object(DbBase, "_join_secret_key") |
884 |
1 |
@patch.object(DbBase, "unpad_data") |
885 |
1 |
def test__decrypt_value_unpad_data_raises( |
886 |
|
self, |
887 |
|
mock_unpad_data, |
888 |
|
mock_join_secret_key, |
889 |
|
mock_aes, |
890 |
|
mock_b64_decode, |
891 |
|
): |
892 |
|
"""Method unpad_data raises error.""" |
893 |
1 |
mock_decrypted_message = MagicMock() |
894 |
1 |
mock_decrypted_message.decode.return_value = None |
895 |
1 |
mock_aes.new.return_value = self.mock_cipher |
896 |
1 |
self.mock_cipher.decrypt.return_value = mock_decrypted_message |
897 |
1 |
mock_unpad_data.side_effect = DbException( |
898 |
|
"Incorrect data type: type(None), string is expected." |
899 |
|
) |
900 |
1 |
mock_b64_decode.return_value = b64_decoded |
901 |
|
|
902 |
1 |
with self.assertRaises(Exception) as error: |
903 |
1 |
self.db_base._decrypt_value(encyrpted_value, schema_version, salt) |
904 |
1 |
self.assertEqual( |
905 |
|
str(error.exception), |
906 |
|
"database exception Incorrect data type: type(None), string is expected.", |
907 |
|
) |
908 |
1 |
self.assertEqual(type(error.exception), DbException) |
909 |
1 |
mock_join_secret_key.assert_called_once_with(salt) |
910 |
1 |
_call_mock_aes_new = mock_aes.new.call_args_list[0].args |
911 |
1 |
self.assertEqual(_call_mock_aes_new[1], AES.MODE_ECB) |
912 |
1 |
mock_b64_decode.assert_called_once_with(encyrpted_value) |
913 |
1 |
self.mock_cipher.decrypt.assert_called_once_with(b64_decoded) |
914 |
1 |
mock_decrypted_message.decode.assert_called_once_with( |
915 |
|
self.db_base.encoding_type |
916 |
|
) |
917 |
1 |
mock_unpad_data.assert_called_once_with(None) |
918 |
|
|
919 |
1 |
@patch.object(DbBase, "get_secret_key") |
920 |
1 |
@patch.object(DbBase, "_decrypt_value") |
921 |
1 |
def test_decrypt_without_schema_version_without_salt( |
922 |
|
self, mock_decrypt_value, mock_get_secret_key |
923 |
|
): |
924 |
|
"""schema_version is None, salt is None.""" |
925 |
1 |
mock_decrypt_value.return_value = encyrpted_value |
926 |
1 |
result = self.db_base.decrypt(value) |
927 |
1 |
mock_decrypt_value.assert_called_once_with(value, None, None) |
928 |
1 |
mock_get_secret_key.assert_called_once() |
929 |
1 |
self.assertEqual(result, encyrpted_value) |
930 |
|
|
931 |
1 |
@patch.object(DbBase, "get_secret_key") |
932 |
1 |
@patch.object(DbBase, "_decrypt_value") |
933 |
1 |
def test_decrypt_with_schema_version_with_salt( |
934 |
|
self, mock_decrypt_value, mock_get_secret_key |
935 |
|
): |
936 |
|
"""schema_version and salt exist.""" |
937 |
1 |
mock_decrypt_value.return_value = encyrpted_value |
938 |
1 |
result = self.db_base.decrypt(value, schema_version, salt) |
939 |
1 |
mock_decrypt_value.assert_called_once_with(value, schema_version, salt) |
940 |
1 |
mock_get_secret_key.assert_called_once() |
941 |
1 |
self.assertEqual(result, encyrpted_value) |
942 |
|
|
943 |
1 |
@patch.object(DbBase, "get_secret_key") |
944 |
1 |
@patch.object(DbBase, "_decrypt_value") |
945 |
1 |
def test_decrypt_get_secret_key_raises( |
946 |
|
self, mock_decrypt_value, mock_get_secret_key |
947 |
|
): |
948 |
|
"""Method get_secret_key raises KeyError.""" |
949 |
1 |
mock_get_secret_key.side_effect = DbException("KeyError") |
950 |
1 |
with self.assertRaises(Exception) as error: |
951 |
1 |
self.db_base.decrypt(value) |
952 |
1 |
self.assertEqual(str(error.exception), "database exception KeyError") |
953 |
1 |
mock_decrypt_value.assert_not_called() |
954 |
1 |
mock_get_secret_key.assert_called_once() |
955 |
|
|
956 |
1 |
@patch.object(DbBase, "get_secret_key") |
957 |
1 |
@patch.object(DbBase, "_decrypt_value") |
958 |
1 |
def test_decrypt_decrypt_value_raises( |
959 |
|
self, mock_decrypt_value, mock_get_secret_key |
960 |
|
): |
961 |
|
"""Method _decrypt raises error.""" |
962 |
1 |
mock_decrypt_value.side_effect = DbException( |
963 |
|
"Incorrect data type: type(None), string is expected." |
964 |
|
) |
965 |
1 |
with self.assertRaises(Exception) as error: |
966 |
1 |
self.db_base.decrypt(value, schema_version, salt) |
967 |
1 |
self.assertEqual( |
968 |
|
str(error.exception), |
969 |
|
"database exception Incorrect data type: type(None), string is expected.", |
970 |
|
) |
971 |
1 |
mock_decrypt_value.assert_called_once_with(value, schema_version, salt) |
972 |
1 |
mock_get_secret_key.assert_called_once() |
973 |
|
|
974 |
1 |
def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self): |
975 |
|
"""Encrypt and decrypt with schema version 1.1, salt exists.""" |
976 |
1 |
encrypted_msg = self.db_base.encrypt(value, schema_version, salt) |
977 |
1 |
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt) |
978 |
1 |
self.assertEqual(value, decrypted_msg) |
979 |
|
|
980 |
1 |
def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self): |
981 |
|
"""Encrypt and decrypt with schema version 1.0, salt exists.""" |
982 |
1 |
schema_version = "1.0" |
983 |
1 |
encrypted_msg = self.db_base.encrypt(value, schema_version, salt) |
984 |
1 |
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt) |
985 |
1 |
self.assertEqual(value, decrypted_msg) |
986 |
|
|
987 |
1 |
def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self): |
988 |
|
"""Encrypt and decrypt with schema version 1.1 and without salt.""" |
989 |
1 |
salt = None |
990 |
1 |
encrypted_msg = self.db_base.encrypt(value, schema_version, salt) |
991 |
1 |
decrypted_msg = self.db_base.decrypt(encrypted_msg, schema_version, salt) |
992 |
1 |
self.assertEqual(value, decrypted_msg) |
993 |
|
|
994 |
|
|
995 |
1 |
class TestAsyncEncryption(unittest.TestCase): |
996 |
1 |
@patch("logging.getLogger", autospec=True) |
997 |
1 |
def setUp(self, mock_logger): |
998 |
1 |
mock_logger = logging.getLogger() |
999 |
1 |
mock_logger.disabled = True |
1000 |
1 |
self.encryption = Encryption(uri="uri", config={}) |
1001 |
1 |
self.encryption.encoding_type = encoding_type |
1002 |
1 |
self.encryption.encrypt_mode = encyrpt_mode |
1003 |
1 |
self.encryption._secret_key = secret_key |
1004 |
1 |
self.admin_collection = Mock() |
1005 |
1 |
self.admin_collection.find_one = AsyncMock() |
1006 |
1 |
self.encryption._client = { |
1007 |
|
"osm": { |
1008 |
|
"admin": self.admin_collection, |
1009 |
|
} |
1010 |
|
} |
1011 |
|
|
1012 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1013 |
1 |
def test_decrypt_fields_with_item_with_fields(self, mock_decrypt): |
1014 |
|
"""item and fields exist.""" |
1015 |
1 |
mock_decrypt.side_effect = [decrypted_val1, decrypted_val2] |
1016 |
1 |
input_item = copy.deepcopy(item) |
1017 |
1 |
expected_item = { |
1018 |
|
"secret": decrypted_val1, |
1019 |
|
"cacert": decrypted_val2, |
1020 |
|
"path": "/var", |
1021 |
|
"ip": "192.168.12.23", |
1022 |
|
} |
1023 |
1 |
fields = ["secret", "cacert"] |
1024 |
|
|
1025 |
1 |
asyncio.run( |
1026 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1027 |
|
) |
1028 |
1 |
self.assertEqual(input_item, expected_item) |
1029 |
1 |
_call_mock_decrypt = mock_decrypt.call_args_list |
1030 |
1 |
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt)) |
1031 |
1 |
self.assertEqual(_call_mock_decrypt[1].args, ("mycacert", "1.1", salt)) |
1032 |
|
|
1033 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1034 |
1 |
def test_decrypt_fields_empty_item_with_fields(self, mock_decrypt): |
1035 |
|
"""item is empty and fields exists.""" |
1036 |
1 |
input_item = {} |
1037 |
1 |
fields = ["secret", "cacert"] |
1038 |
1 |
asyncio.run( |
1039 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1040 |
|
) |
1041 |
1 |
self.assertEqual(input_item, {}) |
1042 |
1 |
mock_decrypt.assert_not_called() |
1043 |
|
|
1044 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1045 |
1 |
def test_decrypt_fields_with_item_without_fields(self, mock_decrypt): |
1046 |
|
"""item exists and fields is empty.""" |
1047 |
1 |
input_item = copy.deepcopy(item) |
1048 |
1 |
fields = [] |
1049 |
1 |
asyncio.run( |
1050 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1051 |
|
) |
1052 |
1 |
self.assertEqual(input_item, item) |
1053 |
1 |
mock_decrypt.assert_not_called() |
1054 |
|
|
1055 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1056 |
1 |
def test_decrypt_fields_with_item_with_single_field(self, mock_decrypt): |
1057 |
|
"""item exists and field has single value.""" |
1058 |
1 |
mock_decrypt.return_value = decrypted_val1 |
1059 |
1 |
fields = ["secret"] |
1060 |
1 |
input_item = copy.deepcopy(item) |
1061 |
1 |
expected_item = { |
1062 |
|
"secret": decrypted_val1, |
1063 |
|
"cacert": "mycacert", |
1064 |
|
"path": "/var", |
1065 |
|
"ip": "192.168.12.23", |
1066 |
|
} |
1067 |
1 |
asyncio.run( |
1068 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1069 |
|
) |
1070 |
1 |
self.assertEqual(input_item, expected_item) |
1071 |
1 |
_call_mock_decrypt = mock_decrypt.call_args_list |
1072 |
1 |
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt)) |
1073 |
|
|
1074 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1075 |
1 |
def test_decrypt_fields_with_item_with_field_none_salt_1_0_schema_version( |
1076 |
|
self, mock_decrypt |
1077 |
|
): |
1078 |
|
"""item exists and field has single value, salt is None, schema version is 1.0.""" |
1079 |
1 |
schema_version = "1.0" |
1080 |
1 |
salt = None |
1081 |
1 |
mock_decrypt.return_value = "mysecret" |
1082 |
1 |
input_item = copy.deepcopy(item) |
1083 |
1 |
fields = ["secret"] |
1084 |
1 |
asyncio.run( |
1085 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1086 |
|
) |
1087 |
1 |
self.assertEqual(input_item, item) |
1088 |
1 |
_call_mock_decrypt = mock_decrypt.call_args_list |
1089 |
1 |
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.0", None)) |
1090 |
|
|
1091 |
1 |
@patch.object(Encryption, "decrypt", new_callable=AsyncMock) |
1092 |
1 |
def test_decrypt_fields_decrypt_raises(self, mock_decrypt): |
1093 |
|
"""Method decrypt raises error.""" |
1094 |
1 |
mock_decrypt.side_effect = DbException( |
1095 |
|
"Incorrect data type: type(None), string is expected." |
1096 |
|
) |
1097 |
1 |
fields = ["secret"] |
1098 |
1 |
input_item = copy.deepcopy(item) |
1099 |
1 |
with self.assertRaises(Exception) as error: |
1100 |
1 |
asyncio.run( |
1101 |
|
self.encryption.decrypt_fields(input_item, fields, schema_version, salt) |
1102 |
|
) |
1103 |
1 |
self.assertEqual( |
1104 |
|
str(error.exception), |
1105 |
|
"database exception Incorrect data type: type(None), string is expected.", |
1106 |
|
) |
1107 |
1 |
self.assertEqual(input_item, item) |
1108 |
1 |
_call_mock_decrypt = mock_decrypt.call_args_list |
1109 |
1 |
self.assertEqual(_call_mock_decrypt[0].args, ("mysecret", "1.1", salt)) |
1110 |
|
|
1111 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1112 |
1 |
@patch.object(Encryption, "_encrypt_value") |
1113 |
1 |
def test_encrypt(self, mock_encrypt_value, mock_get_secret_key): |
1114 |
|
"""Method decrypt raises error.""" |
1115 |
1 |
mock_encrypt_value.return_value = encyrpted_value |
1116 |
1 |
result = asyncio.run(self.encryption.encrypt(value, schema_version, salt)) |
1117 |
1 |
self.assertEqual(result, encyrpted_value) |
1118 |
1 |
mock_get_secret_key.assert_called_once() |
1119 |
1 |
mock_encrypt_value.assert_called_once_with(value, schema_version, salt) |
1120 |
|
|
1121 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1122 |
1 |
@patch.object(Encryption, "_encrypt_value") |
1123 |
1 |
def test_encrypt_get_secret_key_raises( |
1124 |
|
self, mock_encrypt_value, mock_get_secret_key |
1125 |
|
): |
1126 |
|
"""Method get_secret_key raises error.""" |
1127 |
1 |
mock_get_secret_key.side_effect = DbException("Unexpected type.") |
1128 |
1 |
with self.assertRaises(Exception) as error: |
1129 |
1 |
asyncio.run(self.encryption.encrypt(value, schema_version, salt)) |
1130 |
1 |
self.assertEqual(str(error.exception), "database exception Unexpected type.") |
1131 |
1 |
mock_get_secret_key.assert_called_once() |
1132 |
1 |
mock_encrypt_value.assert_not_called() |
1133 |
|
|
1134 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1135 |
1 |
@patch.object(Encryption, "_encrypt_value") |
1136 |
1 |
def test_encrypt_get_encrypt_raises(self, mock_encrypt_value, mock_get_secret_key): |
1137 |
|
"""Method _encrypt raises error.""" |
1138 |
1 |
mock_encrypt_value.side_effect = TypeError( |
1139 |
|
"A bytes-like object is required, not 'str'" |
1140 |
|
) |
1141 |
1 |
with self.assertRaises(Exception) as error: |
1142 |
1 |
asyncio.run(self.encryption.encrypt(value, schema_version, salt)) |
1143 |
1 |
self.assertEqual( |
1144 |
|
str(error.exception), "A bytes-like object is required, not 'str'" |
1145 |
|
) |
1146 |
1 |
mock_get_secret_key.assert_called_once() |
1147 |
1 |
mock_encrypt_value.assert_called_once_with(value, schema_version, salt) |
1148 |
|
|
1149 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1150 |
1 |
@patch.object(Encryption, "_decrypt_value") |
1151 |
1 |
def test_decrypt(self, mock_decrypt_value, mock_get_secret_key): |
1152 |
|
"""Decrypted successfully.""" |
1153 |
1 |
mock_decrypt_value.return_value = value |
1154 |
1 |
result = asyncio.run( |
1155 |
|
self.encryption.decrypt(encyrpted_value, schema_version, salt) |
1156 |
|
) |
1157 |
1 |
self.assertEqual(result, value) |
1158 |
1 |
mock_get_secret_key.assert_called_once() |
1159 |
1 |
mock_decrypt_value.assert_called_once_with( |
1160 |
|
encyrpted_value, schema_version, salt |
1161 |
|
) |
1162 |
|
|
1163 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1164 |
1 |
@patch.object(Encryption, "_decrypt_value") |
1165 |
1 |
def test_decrypt_get_secret_key_raises( |
1166 |
|
self, mock_decrypt_value, mock_get_secret_key |
1167 |
|
): |
1168 |
|
"""Method get_secret_key raises error.""" |
1169 |
1 |
mock_get_secret_key.side_effect = DbException("Unexpected type.") |
1170 |
1 |
with self.assertRaises(Exception) as error: |
1171 |
1 |
asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt)) |
1172 |
1 |
self.assertEqual(str(error.exception), "database exception Unexpected type.") |
1173 |
1 |
mock_get_secret_key.assert_called_once() |
1174 |
1 |
mock_decrypt_value.assert_not_called() |
1175 |
|
|
1176 |
1 |
@patch.object(Encryption, "get_secret_key", new_callable=AsyncMock) |
1177 |
1 |
@patch.object(Encryption, "_decrypt_value") |
1178 |
1 |
def test_decrypt_decrypt_value_raises( |
1179 |
|
self, mock_decrypt_value, mock_get_secret_key |
1180 |
|
): |
1181 |
|
"""Method get_secret_key raises error.""" |
1182 |
1 |
mock_decrypt_value.side_effect = TypeError( |
1183 |
|
"A bytes-like object is required, not 'str'" |
1184 |
|
) |
1185 |
1 |
with self.assertRaises(Exception) as error: |
1186 |
1 |
asyncio.run(self.encryption.decrypt(encyrpted_value, schema_version, salt)) |
1187 |
1 |
self.assertEqual( |
1188 |
|
str(error.exception), "A bytes-like object is required, not 'str'" |
1189 |
|
) |
1190 |
1 |
mock_get_secret_key.assert_called_once() |
1191 |
1 |
mock_decrypt_value.assert_called_once_with( |
1192 |
|
encyrpted_value, schema_version, salt |
1193 |
|
) |
1194 |
|
|
1195 |
1 |
def test_join_keys_string_key(self): |
1196 |
|
"""key is string.""" |
1197 |
1 |
string_key = "sample key" |
1198 |
1 |
result = self.encryption._join_keys(string_key, secret_key) |
1199 |
1 |
self.assertEqual(result, joined_key) |
1200 |
1 |
self.assertTrue(isinstance(result, bytes)) |
1201 |
|
|
1202 |
1 |
def test_join_keys_bytes_key(self): |
1203 |
|
"""key is bytes.""" |
1204 |
1 |
bytes_key = b"sample key" |
1205 |
1 |
result = self.encryption._join_keys(bytes_key, secret_key) |
1206 |
1 |
self.assertEqual(result, joined_key) |
1207 |
1 |
self.assertTrue(isinstance(result, bytes)) |
1208 |
1 |
self.assertEqual(len(result.decode("unicode_escape")), 32) |
1209 |
|
|
1210 |
1 |
def test_join_keys_int_key(self): |
1211 |
|
"""key is int.""" |
1212 |
1 |
int_key = 923 |
1213 |
1 |
with self.assertRaises(Exception) as error: |
1214 |
1 |
self.encryption._join_keys(int_key, None) |
1215 |
1 |
self.assertEqual(str(error.exception), "'int' object is not iterable") |
1216 |
|
|
1217 |
1 |
def test_join_keys_none_secret_key(self): |
1218 |
|
"""key is as bytes and secret key is None.""" |
1219 |
1 |
bytes_key = b"sample key" |
1220 |
1 |
result = self.encryption._join_keys(bytes_key, None) |
1221 |
1 |
self.assertEqual( |
1222 |
|
result, |
1223 |
|
b"sample key\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", |
1224 |
|
) |
1225 |
1 |
self.assertTrue(isinstance(result, bytes)) |
1226 |
1 |
self.assertEqual(len(result.decode("unicode_escape")), 32) |
1227 |
|
|
1228 |
1 |
def test_join_keys_none_key_none_secret_key(self): |
1229 |
|
"""key is None and secret key is None.""" |
1230 |
1 |
with self.assertRaises(Exception) as error: |
1231 |
1 |
self.encryption._join_keys(None, None) |
1232 |
1 |
self.assertEqual(str(error.exception), "'NoneType' object is not iterable") |
1233 |
|
|
1234 |
1 |
def test_join_keys_none_key(self): |
1235 |
|
"""key is None and secret key exists.""" |
1236 |
1 |
with self.assertRaises(Exception) as error: |
1237 |
1 |
self.encryption._join_keys(None, secret_key) |
1238 |
1 |
self.assertEqual(str(error.exception), "'NoneType' object is not iterable") |
1239 |
|
|
1240 |
1 |
@patch.object(Encryption, "_join_keys") |
1241 |
1 |
def test_join_secret_key_string_sample_key(self, mock_join_keys): |
1242 |
|
"""key is None and secret key exists as string.""" |
1243 |
1 |
update_key = "sample key" |
1244 |
1 |
mock_join_keys.return_value = joined_key |
1245 |
1 |
result = self.encryption._join_secret_key(update_key) |
1246 |
1 |
self.assertEqual(result, joined_key) |
1247 |
1 |
self.assertTrue(isinstance(result, bytes)) |
1248 |
1 |
mock_join_keys.assert_called_once_with(update_key, secret_key) |
1249 |
|
|
1250 |
1 |
@patch.object(Encryption, "_join_keys") |
1251 |
1 |
def test_join_secret_key_byte_sample_key(self, mock_join_keys): |
1252 |
|
"""key is None and secret key exists as bytes.""" |
1253 |
1 |
update_key = b"sample key" |
1254 |
1 |
mock_join_keys.return_value = joined_key |
1255 |
1 |
result = self.encryption._join_secret_key(update_key) |
1256 |
1 |
self.assertEqual(result, joined_key) |
1257 |
1 |
self.assertTrue(isinstance(result, bytes)) |
1258 |
1 |
mock_join_keys.assert_called_once_with(update_key, secret_key) |
1259 |
|
|
1260 |
1 |
@patch.object(Encryption, "_join_keys") |
1261 |
1 |
def test_join_secret_key_join_keys_raises(self, mock_join_keys): |
1262 |
|
"""Method _join_secret_key raises.""" |
1263 |
1 |
update_key = 3434 |
1264 |
1 |
mock_join_keys.side_effect = TypeError("'int' object is not iterable") |
1265 |
1 |
with self.assertRaises(Exception) as error: |
1266 |
1 |
self.encryption._join_secret_key(update_key) |
1267 |
1 |
self.assertEqual(str(error.exception), "'int' object is not iterable") |
1268 |
1 |
mock_join_keys.assert_called_once_with(update_key, secret_key) |
1269 |
|
|
1270 |
1 |
@patch.object(Encryption, "_join_keys") |
1271 |
1 |
def test_get_secret_key_exists(self, mock_join_keys): |
1272 |
|
"""secret_key exists.""" |
1273 |
1 |
self.encryption._secret_key = secret_key |
1274 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1275 |
1 |
self.assertEqual(self.encryption.secret_key, secret_key) |
1276 |
1 |
mock_join_keys.assert_not_called() |
1277 |
|
|
1278 |
1 |
@patch.object(Encryption, "_join_keys") |
1279 |
1 |
@patch("osm_common.dbbase.b64decode") |
1280 |
1 |
def test_get_secret_key_not_exist_database_key_exist( |
1281 |
|
self, mock_b64decode, mock_join_keys |
1282 |
|
): |
1283 |
|
"""secret_key does not exist, database key exists.""" |
1284 |
1 |
self.encryption._secret_key = None |
1285 |
1 |
self.encryption._admin_collection.find_one.return_value = None |
1286 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1287 |
1 |
mock_join_keys.return_value = joined_key |
1288 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1289 |
1 |
self.assertEqual(self.encryption.secret_key, joined_key) |
1290 |
1 |
self.assertEqual(mock_join_keys.call_count, 1) |
1291 |
1 |
mock_b64decode.assert_not_called() |
1292 |
|
|
1293 |
1 |
@patch.object(Encryption, "_join_keys") |
1294 |
1 |
@patch("osm_common.dbbase.b64decode") |
1295 |
1 |
def test_get_secret_key_not_exist_with_database_key_version_data_exist_without_serial( |
1296 |
|
self, mock_b64decode, mock_join_keys |
1297 |
|
): |
1298 |
|
"""secret_key does not exist, database key exists.""" |
1299 |
1 |
self.encryption._secret_key = None |
1300 |
1 |
self.encryption._admin_collection.find_one.return_value = {"version": "1.0"} |
1301 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1302 |
1 |
mock_join_keys.return_value = joined_key |
1303 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1304 |
1 |
self.assertEqual(self.encryption.secret_key, joined_key) |
1305 |
1 |
self.assertEqual(mock_join_keys.call_count, 1) |
1306 |
1 |
mock_b64decode.assert_not_called() |
1307 |
1 |
self.encryption._admin_collection.find_one.assert_called_once_with( |
1308 |
|
{"_id": "version"} |
1309 |
|
) |
1310 |
1 |
_call_mock_join_keys = mock_join_keys.call_args_list |
1311 |
1 |
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None)) |
1312 |
|
|
1313 |
1 |
@patch.object(Encryption, "_join_keys") |
1314 |
1 |
@patch("osm_common.dbbase.b64decode") |
1315 |
1 |
def test_get_secret_key_not_exist_with_database_key_version_data_exist_with_serial( |
1316 |
|
self, mock_b64decode, mock_join_keys |
1317 |
|
): |
1318 |
|
"""secret_key does not exist, database key exists, version and serial exist |
1319 |
|
in admin collection.""" |
1320 |
1 |
self.encryption._secret_key = None |
1321 |
1 |
self.encryption._admin_collection.find_one.return_value = { |
1322 |
|
"version": "1.0", |
1323 |
|
"serial": serial_bytes, |
1324 |
|
} |
1325 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1326 |
1 |
mock_join_keys.side_effect = [secret_key, joined_key] |
1327 |
1 |
mock_b64decode.return_value = base64_decoded_serial |
1328 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1329 |
1 |
self.assertEqual(self.encryption.secret_key, joined_key) |
1330 |
1 |
self.assertEqual(mock_join_keys.call_count, 2) |
1331 |
1 |
mock_b64decode.assert_called_once_with(serial_bytes) |
1332 |
1 |
self.encryption._admin_collection.find_one.assert_called_once_with( |
1333 |
|
{"_id": "version"} |
1334 |
|
) |
1335 |
1 |
_call_mock_join_keys = mock_join_keys.call_args_list |
1336 |
1 |
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None)) |
1337 |
1 |
self.assertEqual( |
1338 |
|
_call_mock_join_keys[1].args, (base64_decoded_serial, secret_key) |
1339 |
|
) |
1340 |
|
|
1341 |
1 |
@patch.object(Encryption, "_join_keys") |
1342 |
1 |
@patch("osm_common.dbbase.b64decode") |
1343 |
1 |
def test_get_secret_key_join_keys_raises(self, mock_b64decode, mock_join_keys): |
1344 |
|
"""Method _join_keys raises.""" |
1345 |
1 |
self.encryption._secret_key = None |
1346 |
1 |
self.encryption._admin_collection.find_one.return_value = { |
1347 |
|
"version": "1.0", |
1348 |
|
"serial": serial_bytes, |
1349 |
|
} |
1350 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1351 |
1 |
mock_join_keys.side_effect = DbException("Invalid data type.") |
1352 |
1 |
with self.assertRaises(Exception) as error: |
1353 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1354 |
1 |
self.assertEqual(str(error.exception), "database exception Invalid data type.") |
1355 |
1 |
self.assertEqual(mock_join_keys.call_count, 1) |
1356 |
1 |
check_if_assert_not_called( |
1357 |
|
[mock_b64decode, self.encryption._admin_collection.find_one] |
1358 |
|
) |
1359 |
1 |
_call_mock_join_keys = mock_join_keys.call_args_list |
1360 |
1 |
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None)) |
1361 |
|
|
1362 |
1 |
@patch.object(Encryption, "_join_keys") |
1363 |
1 |
@patch("osm_common.dbbase.b64decode") |
1364 |
1 |
def test_get_secret_key_b64decode_raises(self, mock_b64decode, mock_join_keys): |
1365 |
|
"""Method b64decode raises.""" |
1366 |
1 |
self.encryption._secret_key = None |
1367 |
1 |
self.encryption._admin_collection.find_one.return_value = { |
1368 |
|
"version": "1.0", |
1369 |
|
"serial": base64_decoded_serial, |
1370 |
|
} |
1371 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1372 |
1 |
mock_join_keys.return_value = secret_key |
1373 |
1 |
mock_b64decode.side_effect = TypeError( |
1374 |
|
"A bytes-like object is required, not 'str'" |
1375 |
|
) |
1376 |
1 |
with self.assertRaises(Exception) as error: |
1377 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1378 |
1 |
self.assertEqual( |
1379 |
|
str(error.exception), "A bytes-like object is required, not 'str'" |
1380 |
|
) |
1381 |
1 |
self.assertEqual(self.encryption.secret_key, None) |
1382 |
1 |
self.assertEqual(mock_join_keys.call_count, 1) |
1383 |
1 |
mock_b64decode.assert_called_once_with(base64_decoded_serial) |
1384 |
1 |
self.encryption._admin_collection.find_one.assert_called_once_with( |
1385 |
|
{"_id": "version"} |
1386 |
|
) |
1387 |
1 |
_call_mock_join_keys = mock_join_keys.call_args_list |
1388 |
1 |
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None)) |
1389 |
|
|
1390 |
1 |
@patch.object(Encryption, "_join_keys") |
1391 |
1 |
@patch("osm_common.dbbase.b64decode") |
1392 |
1 |
def test_get_secret_key_admin_collection_find_one_raises( |
1393 |
|
self, mock_b64decode, mock_join_keys |
1394 |
|
): |
1395 |
|
"""admin_collection find_one raises.""" |
1396 |
1 |
self.encryption._secret_key = None |
1397 |
1 |
self.encryption._admin_collection.find_one.side_effect = DbException( |
1398 |
|
"Connection failed." |
1399 |
|
) |
1400 |
1 |
self.encryption._config = {"database_commonkey": "osm_new_key"} |
1401 |
1 |
mock_join_keys.return_value = secret_key |
1402 |
1 |
with self.assertRaises(Exception) as error: |
1403 |
1 |
asyncio.run(self.encryption.get_secret_key()) |
1404 |
1 |
self.assertEqual(str(error.exception), "database exception Connection failed.") |
1405 |
1 |
self.assertEqual(self.encryption.secret_key, None) |
1406 |
1 |
self.assertEqual(mock_join_keys.call_count, 1) |
1407 |
1 |
mock_b64decode.assert_not_called() |
1408 |
1 |
self.encryption._admin_collection.find_one.assert_called_once_with( |
1409 |
|
{"_id": "version"} |
1410 |
|
) |
1411 |
1 |
_call_mock_join_keys = mock_join_keys.call_args_list |
1412 |
1 |
self.assertEqual(_call_mock_join_keys[0].args, ("osm_new_key", None)) |
1413 |
|
|
1414 |
1 |
def test_encrypt_decrypt_with_schema_version_1_1_with_salt(self): |
1415 |
|
"""Encrypt and decrypt with schema version 1.1, salt exists.""" |
1416 |
1 |
encrypted_msg = asyncio.run( |
1417 |
|
self.encryption.encrypt(value, schema_version, salt) |
1418 |
|
) |
1419 |
1 |
decrypted_msg = asyncio.run( |
1420 |
|
self.encryption.decrypt(encrypted_msg, schema_version, salt) |
1421 |
|
) |
1422 |
1 |
self.assertEqual(value, decrypted_msg) |
1423 |
|
|
1424 |
1 |
def test_encrypt_decrypt_with_schema_version_1_0_with_salt(self): |
1425 |
|
"""Encrypt and decrypt with schema version 1.0, salt exists.""" |
1426 |
1 |
schema_version = "1.0" |
1427 |
1 |
encrypted_msg = asyncio.run( |
1428 |
|
self.encryption.encrypt(value, schema_version, salt) |
1429 |
|
) |
1430 |
1 |
decrypted_msg = asyncio.run( |
1431 |
|
self.encryption.decrypt(encrypted_msg, schema_version, salt) |
1432 |
|
) |
1433 |
1 |
self.assertEqual(value, decrypted_msg) |
1434 |
|
|
1435 |
1 |
def test_encrypt_decrypt_with_schema_version_1_1_without_salt(self): |
1436 |
|
"""Encrypt and decrypt with schema version 1.1, without salt.""" |
1437 |
1 |
salt = None |
1438 |
1 |
with self.assertRaises(Exception) as error: |
1439 |
1 |
asyncio.run(self.encryption.encrypt(value, schema_version, salt)) |
1440 |
1 |
self.assertEqual(str(error.exception), "'NoneType' object is not iterable") |
1441 |
|
|
1442 |
|
|
1443 |
1 |
class TestDeepUpdate(unittest.TestCase): |
1444 |
1 |
def test_update_dict(self): |
1445 |
|
# Original, patch, expected result |
1446 |
1 |
TEST = ( |
1447 |
|
({"a": "b"}, {"a": "c"}, {"a": "c"}), |
1448 |
|
({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}), |
1449 |
|
({"a": "b"}, {"a": None}, {}), |
1450 |
|
({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}), |
1451 |
|
({"a": ["b"]}, {"a": "c"}, {"a": "c"}), |
1452 |
|
({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}), |
1453 |
|
({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}), |
1454 |
|
({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}), |
1455 |
|
({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}), |
1456 |
|
({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}), |
1457 |
|
({1: {"a": "foo"}}, {1: None}, {}), |
1458 |
|
({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}), |
1459 |
|
({"e": None}, {"a": 1}, {"e": None, "a": 1}), |
1460 |
|
({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}), |
1461 |
|
({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}), |
1462 |
|
) |
1463 |
1 |
for t in TEST: |
1464 |
1 |
deep_update(t[0], t[1]) |
1465 |
1 |
self.assertEqual(t[0], t[2]) |
1466 |
|
# test deepcopy is done. So that original dictionary does not reference the pach |
1467 |
1 |
test_original = {1: {"a": "b"}} |
1468 |
1 |
test_patch = {1: {"c": {"d": "e"}}} |
1469 |
1 |
test_result = {1: {"a": "b", "c": {"d": "e"}}} |
1470 |
1 |
deep_update(test_original, test_patch) |
1471 |
1 |
self.assertEqual(test_original, test_result) |
1472 |
1 |
test_patch[1]["c"]["f"] = "edition of patch, must not modify original" |
1473 |
1 |
self.assertEqual(test_original, test_result) |
1474 |
|
|
1475 |
1 |
def test_update_array(self): |
1476 |
|
# This TEST contains a list with the the Original, patch, and expected result |
1477 |
1 |
TEST = ( |
1478 |
|
# delete all instances of "a"/"d" |
1479 |
|
({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}), |
1480 |
|
({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}), |
1481 |
|
# delete and insert at 0 |
1482 |
|
( |
1483 |
|
{"A": ["a", "b", "c"]}, |
1484 |
|
{"A": {"$b": None, "$+[0]": "b"}}, |
1485 |
|
{"A": ["b", "a", "c"]}, |
1486 |
|
), |
1487 |
|
# delete and edit |
1488 |
|
( |
1489 |
|
{"A": ["a", "b", "a"]}, |
1490 |
|
{"A": {"$a": None, "$[1]": {"c": "d"}}}, |
1491 |
|
{"A": [{"c": "d"}]}, |
1492 |
|
), |
1493 |
|
# insert if not exist |
1494 |
|
({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}), |
1495 |
|
({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}), |
1496 |
|
# edit by filter |
1497 |
|
( |
1498 |
|
{"A": ["a", "b", "a"]}, |
1499 |
|
{"A": {"$b": {"c": "d"}}}, |
1500 |
|
{"A": ["a", {"c": "d"}, "a"]}, |
1501 |
|
), |
1502 |
|
( |
1503 |
|
{"A": ["a", "b", "a"]}, |
1504 |
|
{"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, |
1505 |
|
{"A": ["b", "a", "a", "c"]}, |
1506 |
|
), |
1507 |
|
({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}), |
1508 |
|
# index deletion out of range |
1509 |
|
({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}), |
1510 |
|
# nested array->dict |
1511 |
|
( |
1512 |
|
{"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, |
1513 |
|
{"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}}, |
1514 |
|
{"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}, |
1515 |
|
), |
1516 |
|
( |
1517 |
|
{"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]}, |
1518 |
|
{"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}}, |
1519 |
|
{ |
1520 |
|
"A": [ |
1521 |
|
{"id": 1, "c": {"d": "e", "f": "g"}}, |
1522 |
|
{"id": 1, "c": {"d": "e", "f": "g"}}, |
1523 |
|
] |
1524 |
|
}, |
1525 |
|
), |
1526 |
|
# nested array->array |
1527 |
|
( |
1528 |
|
{"A": ["a", "b", ["a", "b"]]}, |
1529 |
|
{"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}}, |
1530 |
|
{"A": ["a", ["a", {}, "c"]]}, |
1531 |
|
), |
1532 |
|
# types str and int different, so not found |
1533 |
|
( |
1534 |
|
{"A": ["a", {"id": "1", "c": "d"}]}, |
1535 |
|
{"A": {"$id: 1": {"c": "e"}}}, |
1536 |
|
{"A": ["a", {"id": "1", "c": "d"}]}, |
1537 |
|
), |
1538 |
|
) |
1539 |
1 |
for t in TEST: |
1540 |
1 |
print(t) |
1541 |
1 |
deep_update(t[0], t[1]) |
1542 |
1 |
self.assertEqual(t[0], t[2]) |
1543 |
|
|
1544 |
1 |
def test_update_badformat(self): |
1545 |
|
# This TEST contains original, incorrect patch and #TODO text that must be present |
1546 |
1 |
TEST = ( |
1547 |
|
# conflict, index 0 is edited twice |
1548 |
|
({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}), |
1549 |
|
# conflict, two insertions at same index |
1550 |
|
({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}), |
1551 |
|
({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}), |
1552 |
|
# bad format keys with and without $ |
1553 |
|
({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}), |
1554 |
|
# bad format empty $ and yaml incorrect |
1555 |
|
({"A": ["a", "b", "a"]}, {"A": {"$": 3}}), |
1556 |
|
({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}), |
1557 |
|
({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}), |
1558 |
|
# insertion of None |
1559 |
|
({"A": ["a", "b", "a"]}, {"A": {"$+": None}}), |
1560 |
|
# Not found, insertion of None |
1561 |
|
({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}), |
1562 |
|
# index edition out of range |
1563 |
|
({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}), |
1564 |
|
# conflict, two editions on index 2 |
1565 |
|
( |
1566 |
|
{"A": ["a", {"id": "1", "c": "d"}]}, |
1567 |
|
{"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}, |
1568 |
|
), |
1569 |
|
) |
1570 |
1 |
for t in TEST: |
1571 |
1 |
print(t) |
1572 |
1 |
self.assertRaises(DbException, deep_update, t[0], t[1]) |
1573 |
1 |
try: |
1574 |
1 |
deep_update(t[0], t[1]) |
1575 |
1 |
except DbException as e: |
1576 |
1 |
print(e) |
1577 |
|
|
1578 |
|
|
1579 |
1 |
if __name__ == "__main__": |
1580 |
0 |
unittest.main() |