blob: ca1336d20d6264aafd5197b49b3c3d1d4a338144 [file] [log] [blame]
Eduardo Sousaa0117812019-02-05 15:57:09 +00001# Copyright 2018 Whitestack, LLC
2# Copyright 2018 Telefonica S.A.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15#
16# For those usages not covered by the Apache License, Version 2.0 please
17# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18##
19
Eduardo Sousa4d611d32018-05-09 19:20:37 +010020import http
21import pytest
tiernob3e750b2018-09-05 11:25:23 +020022import unittest
23from osm_common.dbbase import DbBase, DbException, deep_update
tierno136f2952018-10-19 13:01:03 +020024from os import urandom
tiernobd5a4022019-01-30 09:48:38 +000025from http import HTTPStatus
Eduardo Sousa4d611d32018-05-09 19:20:37 +010026
tiernob20a9022018-05-22 12:07:05 +020027
Eduardo Sousa4d611d32018-05-09 19:20:37 +010028def exception_message(message):
29 return "database exception " + message
30
tiernob20a9022018-05-22 12:07:05 +020031
Eduardo Sousa4d611d32018-05-09 19:20:37 +010032@pytest.fixture
33def db_base():
34 return DbBase()
35
tiernob20a9022018-05-22 12:07:05 +020036
Eduardo Sousa4d611d32018-05-09 19:20:37 +010037def test_constructor():
38 db_base = DbBase()
tiernob20a9022018-05-22 12:07:05 +020039 assert db_base is not None
Eduardo Sousa4d611d32018-05-09 19:20:37 +010040 assert isinstance(db_base, DbBase)
41
tiernob20a9022018-05-22 12:07:05 +020042
Eduardo Sousa4d611d32018-05-09 19:20:37 +010043def test_db_connect(db_base):
tierno136f2952018-10-19 13:01:03 +020044 with pytest.raises(DbException) as excinfo:
45 db_base.db_connect(None)
46 assert str(excinfo.value).startswith(exception_message("Method 'db_connect' not implemented"))
Eduardo Sousa4d611d32018-05-09 19:20:37 +010047
tiernob20a9022018-05-22 12:07:05 +020048
Eduardo Sousa4d611d32018-05-09 19:20:37 +010049def test_db_disconnect(db_base):
50 db_base.db_disconnect()
51
tiernob20a9022018-05-22 12:07:05 +020052
Eduardo Sousa4d611d32018-05-09 19:20:37 +010053def test_get_list(db_base):
54 with pytest.raises(DbException) as excinfo:
55 db_base.get_list(None, None)
56 assert str(excinfo.value).startswith(exception_message("Method 'get_list' not implemented"))
57 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
58
tiernob20a9022018-05-22 12:07:05 +020059
Eduardo Sousa4d611d32018-05-09 19:20:37 +010060def test_get_one(db_base):
61 with pytest.raises(DbException) as excinfo:
62 db_base.get_one(None, None, None, None)
63 assert str(excinfo.value).startswith(exception_message("Method 'get_one' not implemented"))
64 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
65
tiernob20a9022018-05-22 12:07:05 +020066
Eduardo Sousa4d611d32018-05-09 19:20:37 +010067def test_create(db_base):
68 with pytest.raises(DbException) as excinfo:
69 db_base.create(None, None)
70 assert str(excinfo.value).startswith(exception_message("Method 'create' not implemented"))
71 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
72
tiernob20a9022018-05-22 12:07:05 +020073
Eduardo Sousa4d611d32018-05-09 19:20:37 +010074def test_del_list(db_base):
75 with pytest.raises(DbException) as excinfo:
76 db_base.del_list(None, None)
77 assert str(excinfo.value).startswith(exception_message("Method 'del_list' not implemented"))
78 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
79
tiernob20a9022018-05-22 12:07:05 +020080
Eduardo Sousa4d611d32018-05-09 19:20:37 +010081def test_del_one(db_base):
82 with pytest.raises(DbException) as excinfo:
83 db_base.del_one(None, None, None)
84 assert str(excinfo.value).startswith(exception_message("Method 'del_one' not implemented"))
85 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
tiernob3e750b2018-09-05 11:25:23 +020086
87
tierno136f2952018-10-19 13:01:03 +020088class TestEncryption(unittest.TestCase):
89 def setUp(self):
tiernoeef7cb72018-11-12 11:51:49 +010090 master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
tiernocfc52722018-10-23 11:41:49 +020091 db_base1 = DbBase()
tierno136f2952018-10-19 13:01:03 +020092 db_base2 = DbBase()
tiernoeef7cb72018-11-12 11:51:49 +010093 db_base3 = DbBase()
tierno136f2952018-10-19 13:01:03 +020094 # set self.secret_key obtained when connect
tiernoeef7cb72018-11-12 11:51:49 +010095 db_base1.set_secret_key(master_key, replace=True)
96 db_base1.set_secret_key(urandom(32))
97 db_base2.set_secret_key(None, replace=True)
98 db_base2.set_secret_key(urandom(30))
99 db_base3.set_secret_key(master_key)
100 self.db_bases = [db_base1, db_base2, db_base3]
tierno136f2952018-10-19 13:01:03 +0200101
102 def test_encrypt_decrypt(self):
103 TEST = (
104 ("plain text 1 ! ", None),
105 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
106 ("plain text 3 with usalt ! ", u"1afd5d1a-4a7e-4d9c-8c65-251290183106"),
107 (u"plain unicode 4 ! ", None),
108 (u"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"),
109 (u"plain unicode 6 with usalt ! ", u"1abcdd1a-4a7e-4d9c-8c65-251290183106"),
110 )
tiernoeef7cb72018-11-12 11:51:49 +0100111 for db_base in self.db_bases:
tierno136f2952018-10-19 13:01:03 +0200112 for value, salt in TEST:
113 # no encryption
114 encrypted = db_base.encrypt(value, schema_version='1.0', salt=salt)
115 self.assertEqual(encrypted, value, "value '{}' has been encrypted".format(value))
116 decrypted = db_base.decrypt(encrypted, schema_version='1.0', salt=salt)
117 self.assertEqual(decrypted, value, "value '{}' has been decrypted".format(value))
118
119 # encrypt/decrypt
120 encrypted = db_base.encrypt(value, schema_version='1.1', salt=salt)
121 self.assertNotEqual(encrypted, value, "value '{}' has not been encrypted".format(value))
122 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
123 decrypted = db_base.decrypt(encrypted, schema_version='1.1', salt=salt)
124 self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
125
126 def test_encrypt_decrypt_salt(self):
127 value = "value to be encrypted!"
128 encrypted = []
tiernoeef7cb72018-11-12 11:51:49 +0100129 for db_base in self.db_bases:
tierno136f2952018-10-19 13:01:03 +0200130 for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
131 # encrypt/decrypt
132 encrypted.append(db_base.encrypt(value, schema_version='1.1', salt=salt))
133 self.assertNotEqual(encrypted[-1], value, "value '{}' has not been encrypted".format(value))
134 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
135 decrypted = db_base.decrypt(encrypted[-1], schema_version='1.1', salt=salt)
136 self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
137 for i in range(0, len(encrypted)):
138 for j in range(i+1, len(encrypted)):
139 self.assertNotEqual(encrypted[i], encrypted[j],
tiernoeef7cb72018-11-12 11:51:49 +0100140 "encryption with different salt must contain different result")
tiernobd5a4022019-01-30 09:48:38 +0000141 # decrypt with a different master key
142 try:
143 decrypted = self.db_bases[-1].decrypt(encrypted[0], schema_version='1.1', salt=None)
144 self.assertNotEqual(encrypted[0], decrypted, "Decryption with different KEY must generate different result")
145 except DbException as e:
146 self.assertEqual(e.http_code, HTTPStatus.INTERNAL_SERVER_ERROR,
147 "Decryption with different KEY does not provide expected http_code")
tierno136f2952018-10-19 13:01:03 +0200148
149
tiernob3e750b2018-09-05 11:25:23 +0200150class TestDeepUpdate(unittest.TestCase):
151 def test_update_dict(self):
152 # Original, patch, expected result
153 TEST = (
154 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
155 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
156 ({"a": "b"}, {"a": None}, {}),
157 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
158 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
159 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
160 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
161 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
162 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
163 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
164 ({1: {"a": "foo"}}, {1: None}, {}),
165 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
166 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
167 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
168 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
169 )
170 for t in TEST:
171 deep_update(t[0], t[1])
172 self.assertEqual(t[0], t[2])
173 # test deepcopy is done. So that original dictionary does not reference the pach
174 test_original = {1: {"a": "b"}}
175 test_patch = {1: {"c": {"d": "e"}}}
176 test_result = {1: {"a": "b", "c": {"d": "e"}}}
177 deep_update(test_original, test_patch)
178 self.assertEqual(test_original, test_result)
179 test_patch[1]["c"]["f"] = "edition of patch, must not modify original"
180 self.assertEqual(test_original, test_result)
181
182 def test_update_array(self):
183 # This TEST contains a list with the the Original, patch, and expected result
184 TEST = (
185 # delete all instances of "a"/"d"
186 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
187 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
188 # delete and insert at 0
189 ({"A": ["a", "b", "c"]}, {"A": {"$b": None, "$+[0]": "b"}}, {"A": ["b", "a", "c"]}),
190 # delete and edit
191 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[1]": {"c": "d"}}}, {"A": [{"c": "d"}]}),
192 # insert if not exist
193 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
194 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
195 # edit by filter
196 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}}}, {"A": ["a", {"c": "d"}, "a"]}),
197 ({"A": ["a", "b", "a"]}, {"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, {"A": ["b", "a", "a", "c"]}),
198 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
199 # index deletion out of range
200 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
201 # nested array->dict
202 ({"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
203 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}),
204 ({"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
205 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
206 {"A": [{"id": 1, "c": {"d": "e", "f": "g"}}, {"id": 1, "c": {"d": "e", "f": "g"}}]}),
207 # nested array->array
208 ({"A": ["a", "b", ["a", "b"]]}, {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
209 {"A": ["a", ["a", {}, "c"]]}),
210 # types str and int different, so not found
211 ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: 1": {"c": "e"}}}, {"A": ["a", {"id": "1", "c": "d"}]}),
212
213 )
214 for t in TEST:
215 print(t)
216 deep_update(t[0], t[1])
217 self.assertEqual(t[0], t[2])
218
219 def test_update_badformat(self):
220 # This TEST contains original, incorrect patch and #TODO text that must be present
221 TEST = (
222 # conflict, index 0 is edited twice
223 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
224 # conflict, two insertions at same index
225 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
226 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
227 # bad format keys with and without $
228 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
229 # bad format empty $ and yaml incorrect
230 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
231 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
232 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
233 # insertion of None
234 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
235 # Not found, insertion of None
236 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
237 # index edition out of range
238 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
239 # conflict, two editions on index 2
240 ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}),
241 )
242 for t in TEST:
243 print(t)
244 self.assertRaises(DbException, deep_update, t[0], t[1])
245 try:
246 deep_update(t[0], t[1])
247 except DbException as e:
248 print(e)
tierno136f2952018-10-19 13:01:03 +0200249
250
251if __name__ == '__main__':
252 unittest.main()