1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
23 from osm_common
.dbbase
import DbBase
, DbException
, deep_update
24 from os
import urandom
27 def exception_message(message
):
28 return "database exception " + message
36 def test_constructor():
38 assert db_base
is not None
39 assert isinstance(db_base
, DbBase
)
42 def test_db_connect(db_base
):
43 with pytest
.raises(DbException
) as excinfo
:
44 db_base
.db_connect(None)
45 assert str(excinfo
.value
).startswith(exception_message("Method 'db_connect' not implemented"))
48 def test_db_disconnect(db_base
):
49 db_base
.db_disconnect()
52 def test_get_list(db_base
):
53 with pytest
.raises(DbException
) as excinfo
:
54 db_base
.get_list(None, None)
55 assert str(excinfo
.value
).startswith(exception_message("Method 'get_list' not implemented"))
56 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
59 def test_get_one(db_base
):
60 with pytest
.raises(DbException
) as excinfo
:
61 db_base
.get_one(None, None, None, None)
62 assert str(excinfo
.value
).startswith(exception_message("Method 'get_one' not implemented"))
63 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
66 def test_create(db_base
):
67 with pytest
.raises(DbException
) as excinfo
:
68 db_base
.create(None, None)
69 assert str(excinfo
.value
).startswith(exception_message("Method 'create' not implemented"))
70 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
73 def test_del_list(db_base
):
74 with pytest
.raises(DbException
) as excinfo
:
75 db_base
.del_list(None, None)
76 assert str(excinfo
.value
).startswith(exception_message("Method 'del_list' not implemented"))
77 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
80 def test_del_one(db_base
):
81 with pytest
.raises(DbException
) as excinfo
:
82 db_base
.del_one(None, None, None)
83 assert str(excinfo
.value
).startswith(exception_message("Method 'del_one' not implemented"))
84 assert excinfo
.value
.http_code
== http
.HTTPStatus
.NOT_FOUND
87 class TestEncryption(unittest
.TestCase
):
89 master_key
= "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
93 # set self.secret_key obtained when connect
94 db_base1
.set_secret_key(master_key
, replace
=True)
95 db_base1
.set_secret_key(urandom(32))
96 db_base2
.set_secret_key(None, replace
=True)
97 db_base2
.set_secret_key(urandom(30))
98 db_base3
.set_secret_key(master_key
)
99 self
.db_bases
= [db_base1
, db_base2
, db_base3
]
101 def test_encrypt_decrypt(self
):
103 ("plain text 1 ! ", None),
104 ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
105 ("plain text 3 with usalt ! ", u
"1afd5d1a-4a7e-4d9c-8c65-251290183106"),
106 (u
"plain unicode 4 ! ", None),
107 (u
"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"),
108 (u
"plain unicode 6 with usalt ! ", u
"1abcdd1a-4a7e-4d9c-8c65-251290183106"),
110 for db_base
in self
.db_bases
:
111 for value
, salt
in TEST
:
113 encrypted
= db_base
.encrypt(value
, schema_version
='1.0', salt
=salt
)
114 self
.assertEqual(encrypted
, value
, "value '{}' has been encrypted".format(value
))
115 decrypted
= db_base
.decrypt(encrypted
, schema_version
='1.0', salt
=salt
)
116 self
.assertEqual(decrypted
, value
, "value '{}' has been decrypted".format(value
))
119 encrypted
= db_base
.encrypt(value
, schema_version
='1.1', salt
=salt
)
120 self
.assertNotEqual(encrypted
, value
, "value '{}' has not been encrypted".format(value
))
121 self
.assertIsInstance(encrypted
, str, "Encrypted is not ascii text")
122 decrypted
= db_base
.decrypt(encrypted
, schema_version
='1.1', salt
=salt
)
123 self
.assertEqual(decrypted
, value
, "value is not equal after encryption/decryption")
125 def test_encrypt_decrypt_salt(self
):
126 value
= "value to be encrypted!"
128 for db_base
in self
.db_bases
:
129 for salt
in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
131 encrypted
.append(db_base
.encrypt(value
, schema_version
='1.1', salt
=salt
))
132 self
.assertNotEqual(encrypted
[-1], value
, "value '{}' has not been encrypted".format(value
))
133 self
.assertIsInstance(encrypted
[-1], str, "Encrypted is not ascii text")
134 decrypted
= db_base
.decrypt(encrypted
[-1], schema_version
='1.1', salt
=salt
)
135 self
.assertEqual(decrypted
, value
, "value is not equal after encryption/decryption")
136 for i
in range(0, len(encrypted
)):
137 for j
in range(i
+1, len(encrypted
)):
138 self
.assertNotEqual(encrypted
[i
], encrypted
[j
],
139 "encryption with different salt must contain different result")
142 class TestDeepUpdate(unittest
.TestCase
):
143 def test_update_dict(self
):
144 # Original, patch, expected result
146 ({"a": "b"}, {"a": "c"}, {"a": "c"}),
147 ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}),
148 ({"a": "b"}, {"a": None}, {}),
149 ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}),
150 ({"a": ["b"]}, {"a": "c"}, {"a": "c"}),
151 ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}),
152 ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}),
153 ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}),
154 ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}),
155 ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}),
156 ({1: {"a": "foo"}}, {1: None}, {}),
157 ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}),
158 ({"e": None}, {"a": 1}, {"e": None, "a": 1}),
159 ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}),
160 ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}),
163 deep_update(t
[0], t
[1])
164 self
.assertEqual(t
[0], t
[2])
165 # test deepcopy is done. So that original dictionary does not reference the pach
166 test_original
= {1: {"a": "b"}}
167 test_patch
= {1: {"c": {"d": "e"}}}
168 test_result
= {1: {"a": "b", "c": {"d": "e"}}}
169 deep_update(test_original
, test_patch
)
170 self
.assertEqual(test_original
, test_result
)
171 test_patch
[1]["c"]["f"] = "edition of patch, must not modify original"
172 self
.assertEqual(test_original
, test_result
)
174 def test_update_array(self
):
175 # This TEST contains a list with the the Original, patch, and expected result
177 # delete all instances of "a"/"d"
178 ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
179 ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
180 # delete and insert at 0
181 ({"A": ["a", "b", "c"]}, {"A": {"$b": None, "$+[0]": "b"}}, {"A": ["b", "a", "c"]}),
183 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[1]": {"c": "d"}}}, {"A": [{"c": "d"}]}),
184 # insert if not exist
185 ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
186 ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
188 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}}}, {"A": ["a", {"c": "d"}, "a"]}),
189 ({"A": ["a", "b", "a"]}, {"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, {"A": ["b", "a", "a", "c"]}),
190 ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
191 # index deletion out of range
192 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
194 ({"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
195 {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}),
196 ({"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
197 {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
198 {"A": [{"id": 1, "c": {"d": "e", "f": "g"}}, {"id": 1, "c": {"d": "e", "f": "g"}}]}),
199 # nested array->array
200 ({"A": ["a", "b", ["a", "b"]]}, {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
201 {"A": ["a", ["a", {}, "c"]]}),
202 # types str and int different, so not found
203 ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: 1": {"c": "e"}}}, {"A": ["a", {"id": "1", "c": "d"}]}),
208 deep_update(t
[0], t
[1])
209 self
.assertEqual(t
[0], t
[2])
211 def test_update_badformat(self
):
212 # This TEST contains original, incorrect patch and #TODO text that must be present
214 # conflict, index 0 is edited twice
215 ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}),
216 # conflict, two insertions at same index
217 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}),
218 ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}),
219 # bad format keys with and without $
220 ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}),
221 # bad format empty $ and yaml incorrect
222 ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}),
223 ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}),
224 ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}),
226 ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}),
227 # Not found, insertion of None
228 ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}),
229 # index edition out of range
230 ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
231 # conflict, two editions on index 2
232 ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}),
236 self
.assertRaises(DbException
, deep_update
, t
[0], t
[1])
238 deep_update(t
[0], t
[1])
239 except DbException
as e
:
243 if __name__
== '__main__':