117350e90a0570fba3e35329f8a077212ad8fea8
[osm/common.git] / osm_common / tests / test_dbbase.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19
20 import http
21 import pytest
22 import unittest
23 from osm_common.dbbase import DbBase, DbException, deep_update
24 from os import urandom
25 from http import HTTPStatus
26
27
28 def exception_message(message):
29 return "database exception " + message
30
31
32 @pytest.fixture
33 def db_base():
34 return DbBase()
35
36
37 def test_constructor():
38 db_base = DbBase()
39 assert db_base is not None
40 assert isinstance(db_base, DbBase)
41
42
43 def test_db_connect(db_base):
44 with pytest.raises(DbException) as excinfo:
45 db_base.db_connect(None)
46 assert str(excinfo.value).startswith(
47 exception_message("Method 'db_connect' not implemented")
48 )
49
50
51 def test_db_disconnect(db_base):
52 db_base.db_disconnect()
53
54
55 def test_get_list(db_base):
56 with pytest.raises(DbException) as excinfo:
57 db_base.get_list(None, None)
58 assert str(excinfo.value).startswith(
59 exception_message("Method 'get_list' not implemented")
60 )
61 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
62
63
64 def test_get_one(db_base):
65 with pytest.raises(DbException) as excinfo:
66 db_base.get_one(None, None, None, None)
67 assert str(excinfo.value).startswith(
68 exception_message("Method 'get_one' not implemented")
69 )
70 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
71
72
73 def test_create(db_base):
74 with pytest.raises(DbException) as excinfo:
75 db_base.create(None, None)
76 assert str(excinfo.value).startswith(
77 exception_message("Method 'create' not implemented")
78 )
79 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
80
81
82 def test_create_list(db_base):
83 with pytest.raises(DbException) as excinfo:
84 db_base.create_list(None, None)
85 assert str(excinfo.value).startswith(
86 exception_message("Method 'create_list' not implemented")
87 )
88 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
89
90
91 def test_del_list(db_base):
92 with pytest.raises(DbException) as excinfo:
93 db_base.del_list(None, None)
94 assert str(excinfo.value).startswith(
95 exception_message("Method 'del_list' not implemented")
96 )
97 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
98
99
100 def test_del_one(db_base):
101 with pytest.raises(DbException) as excinfo:
102 db_base.del_one(None, None, None)
103 assert str(excinfo.value).startswith(
104 exception_message("Method 'del_one' not implemented")
105 )
106 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
107
108
109 class TestEncryption(unittest.TestCase):
110 def setUp(self):
111 master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
112 db_base1 = DbBase()
113 db_base2 = DbBase()
114 db_base3 = DbBase()
115 # set self.secret_key obtained when connect
116 db_base1.set_secret_key(master_key, replace=True)
117 db_base1.set_secret_key(urandom(32))
118 db_base2.set_secret_key(None, replace=True)
119 db_base2.set_secret_key(urandom(30))
120 db_base3.set_secret_key(master_key)
121 self.db_bases = [db_base1, db_base2, db_base3]
122
123 def test_encrypt_decrypt(self):
124 TEST = (
125 ("plain text 1 ! ", None),
126 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
127 ("plain text 3 with usalt ! ", u"1afd5d1a-4a7e-4d9c-8c65-251290183106"),
128 (u"plain unicode 4 ! ", None),
129 (u"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"),
130 (u"plain unicode 6 with usalt ! ", u"1abcdd1a-4a7e-4d9c-8c65-251290183106"),
131 )
132 for db_base in self.db_bases:
133 for value, salt in TEST:
134 # no encryption
135 encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
136 self.assertEqual(
137 encrypted, value, "value '{}' has been encrypted".format(value)
138 )
139 decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
140 self.assertEqual(
141 decrypted, value, "value '{}' has been decrypted".format(value)
142 )
143
144 # encrypt/decrypt
145 encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
146 self.assertNotEqual(
147 encrypted, value, "value '{}' has not been encrypted".format(value)
148 )
149 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
150 decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
151 self.assertEqual(
152 decrypted, value, "value is not equal after encryption/decryption"
153 )
154
155 def test_encrypt_decrypt_salt(self):
156 value = "value to be encrypted!"
157 encrypted = []
158 for db_base in self.db_bases:
159 for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
160 # encrypt/decrypt
161 encrypted.append(
162 db_base.encrypt(value, schema_version="1.1", salt=salt)
163 )
164 self.assertNotEqual(
165 encrypted[-1],
166 value,
167 "value '{}' has not been encrypted".format(value),
168 )
169 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
170 decrypted = db_base.decrypt(
171 encrypted[-1], schema_version="1.1", salt=salt
172 )
173 self.assertEqual(
174 decrypted, value, "value is not equal after encryption/decryption"
175 )
176 for i in range(0, len(encrypted)):
177 for j in range(i + 1, len(encrypted)):
178 self.assertNotEqual(
179 encrypted[i],
180 encrypted[j],
181 "encryption with different salt must contain different result",
182 )
183 # decrypt with a different master key
184 try:
185 decrypted = self.db_bases[-1].decrypt(
186 encrypted[0], schema_version="1.1", salt=None
187 )
188 self.assertNotEqual(
189 encrypted[0],
190 decrypted,
191 "Decryption with different KEY must generate different result",
192 )
193 except DbException as e:
194 self.assertEqual(
195 e.http_code,
196 HTTPStatus.INTERNAL_SERVER_ERROR,
197 "Decryption with different KEY does not provide expected http_code",
198 )
199
200
201 class TestDeepUpdate(unittest.TestCase):
202 def test_update_dict(self):
203 # Original, patch, expected result
204 TEST = (
205 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
206 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
207 ({"a": "b"}, {"a": None}, {}),
208 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
209 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
210 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
211 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
212 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
213 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
214 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
215 ({1: {"a": "foo"}}, {1: None}, {}),
216 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
217 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
218 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
219 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
220 )
221 for t in TEST:
222 deep_update(t[0], t[1])
223 self.assertEqual(t[0], t[2])
224 # test deepcopy is done. So that original dictionary does not reference the pach
225 test_original = {1: {"a": "b"}}
226 test_patch = {1: {"c": {"d": "e"}}}
227 test_result = {1: {"a": "b", "c": {"d": "e"}}}
228 deep_update(test_original, test_patch)
229 self.assertEqual(test_original, test_result)
230 test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
231 self.assertEqual(test_original, test_result)
232
233 def test_update_array(self):
234 # This TEST contains a list with the the Original, patch, and expected result
235 TEST = (
236 # delete all instances of "a"/"d"
237 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
238 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
239 # delete and insert at 0
240 (
241 {"A": ["a", "b", "c"]},
242 {"A": {"$b": None, "$+[0]": "b"}},
243 {"A": ["b", "a", "c"]},
244 ),
245 # delete and edit
246 (
247 {"A": ["a", "b", "a"]},
248 {"A": {"$a": None, "$[1]": {"c": "d"}}},
249 {"A": [{"c": "d"}]},
250 ),
251 # insert if not exist
252 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
253 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
254 # edit by filter
255 (
256 {"A": ["a", "b", "a"]},
257 {"A": {"$b": {"c": "d"}}},
258 {"A": ["a", {"c": "d"}, "a"]},
259 ),
260 (
261 {"A": ["a", "b", "a"]},
262 {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
263 {"A": ["b", "a", "a", "c"]},
264 ),
265 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
266 # index deletion out of range
267 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
268 # nested array->dict
269 (
270 {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
271 {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
272 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
273 ),
274 (
275 {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
276 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
277 {
278 "A": [
279 {"id": 1, "c": {"d": "e", "f": "g"}},
280 {"id": 1, "c": {"d": "e", "f": "g"}},
281 ]
282 },
283 ),
284 # nested array->array
285 (
286 {"A": ["a", "b", ["a", "b"]]},
287 {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
288 {"A": ["a", ["a", {}, "c"]]},
289 ),
290 # types str and int different, so not found
291 (
292 {"A": ["a", {"id": "1", "c": "d"}]},
293 {"A": {"$id: 1": {"c": "e"}}},
294 {"A": ["a", {"id": "1", "c": "d"}]},
295 ),
296 )
297 for t in TEST:
298 print(t)
299 deep_update(t[0], t[1])
300 self.assertEqual(t[0], t[2])
301
302 def test_update_badformat(self):
303 # This TEST contains original, incorrect patch and #TODO text that must be present
304 TEST = (
305 # conflict, index 0 is edited twice
306 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
307 # conflict, two insertions at same index
308 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
309 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
310 # bad format keys with and without $
311 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
312 # bad format empty $ and yaml incorrect
313 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
314 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
315 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
316 # insertion of None
317 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
318 # Not found, insertion of None
319 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
320 # index edition out of range
321 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
322 # conflict, two editions on index 2
323 (
324 {"A": ["a", {"id": "1", "c": "d"}]},
325 {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
326 ),
327 )
328 for t in TEST:
329 print(t)
330 self.assertRaises(DbException, deep_update, t[0], t[1])
331 try:
332 deep_update(t[0], t[1])
333 except DbException as e:
334 print(e)
335
336
337 if __name__ == "__main__":
338 unittest.main()