allow dabase update with pushlist
[osm/common.git] / osm_common / tests / test_dbmemory.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19
20 import http
21 import logging
22 import pytest
23 import unittest
24 from unittest.mock import Mock
25
26 from unittest.mock import MagicMock
27 from osm_common.dbbase import DbException
28 from osm_common.dbmemory import DbMemory
29 from copy import deepcopy
30
31 __author__ = 'Eduardo Sousa <eduardosousa@av.it.pt>'
32
33
34 @pytest.fixture(scope="function", params=[True, False])
35 def db_memory(request):
36 db = DbMemory(lock=request.param)
37 return db
38
39
40 @pytest.fixture(scope="function", params=[True, False])
41 def db_memory_with_data(request):
42 db = DbMemory(lock=request.param)
43
44 db.create("test", {"_id": 1, "data": 1})
45 db.create("test", {"_id": 2, "data": 2})
46 db.create("test", {"_id": 3, "data": 3})
47
48 return db
49
50
51 @pytest.fixture(scope="function")
52 def db_memory_with_many_data(request):
53 db = DbMemory(lock=False)
54
55 db.create_list("test", [
56 {"_id": 1, "data": {"data2": {"data3": 1}}, "list": [{"a": 1}], "text": "sometext"},
57 {"_id": 2, "data": {"data2": {"data3": 2}}, "list": [{"a": 2}], "list2": [1, 2, 3]},
58 {"_id": 3, "data": {"data2": {"data3": 3}}, "list": [{"a": 3}]},
59 {"_id": 4, "data": {"data2": {"data3": 4}}, "list": [{"a": 4}, {"a": 0}]},
60 {"_id": 5, "data": {"data2": {"data3": 5}}, "list": [{"a": 5}]},
61 {"_id": 6, "data": {"data2": {"data3": 6}}, "list": [{"0": {"a": 1}}]},
62 {"_id": 7, "data": {"data2": {"data3": 7}}, "0": {"a": 0}},
63 {"_id": 8, "list": [{"a": 3, "b": 0, "c": [{"a": 3, "b": 1}, {"a": 0, "b": "v"}]}, {"a": 0, "b": 1}]},
64 ])
65 return db
66
67
68 def empty_exception_message():
69 return 'database exception '
70
71
72 def get_one_exception_message(db_filter):
73 return "database exception Not found entry with filter='{}'".format(db_filter)
74
75
76 def get_one_multiple_exception_message(db_filter):
77 return "database exception Found more than one entry with filter='{}'".format(db_filter)
78
79
80 def del_one_exception_message(db_filter):
81 return "database exception Not found entry with filter='{}'".format(db_filter)
82
83
84 def replace_exception_message(value):
85 return "database exception Not found entry with _id='{}'".format(value)
86
87
88 def test_constructor():
89 db = DbMemory()
90 assert db.logger == logging.getLogger('db')
91 assert db.db == {}
92
93
94 def test_constructor_with_logger():
95 logger_name = 'db_local'
96 db = DbMemory(logger_name=logger_name)
97 assert db.logger == logging.getLogger(logger_name)
98 assert db.db == {}
99
100
101 def test_db_connect():
102 logger_name = 'db_local'
103 config = {'logger_name': logger_name}
104 db = DbMemory()
105 db.db_connect(config)
106 assert db.logger == logging.getLogger(logger_name)
107 assert db.db == {}
108
109
110 def test_db_disconnect(db_memory):
111 db_memory.db_disconnect()
112
113
114 @pytest.mark.parametrize("table, db_filter", [
115 ("test", {}),
116 ("test", {"_id": 1}),
117 ("test", {"data": 1}),
118 ("test", {"_id": 1, "data": 1})])
119 def test_get_list_with_empty_db(db_memory, table, db_filter):
120 result = db_memory.get_list(table, db_filter)
121 assert len(result) == 0
122
123
124 @pytest.mark.parametrize("table, db_filter, expected_data", [
125 ("test", {}, [{"_id": 1, "data": 1}, {"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
126 ("test", {"_id": 1}, [{"_id": 1, "data": 1}]),
127 ("test", {"data": 1}, [{"_id": 1, "data": 1}]),
128 ("test", {"_id": 1, "data": 1}, [{"_id": 1, "data": 1}]),
129 ("test", {"_id": 2}, [{"_id": 2, "data": 2}]),
130 ("test", {"data": 2}, [{"_id": 2, "data": 2}]),
131 ("test", {"_id": 2, "data": 2}, [{"_id": 2, "data": 2}]),
132 ("test", {"_id": 4}, []),
133 ("test", {"data": 4}, []),
134 ("test", {"_id": 4, "data": 4}, []),
135 ("test_table", {}, []),
136 ("test_table", {"_id": 1}, []),
137 ("test_table", {"data": 1}, []),
138 ("test_table", {"_id": 1, "data": 1}, [])])
139 def test_get_list_with_non_empty_db(db_memory_with_data, table, db_filter, expected_data):
140 result = db_memory_with_data.get_list(table, db_filter)
141 assert len(result) == len(expected_data)
142 for data in expected_data:
143 assert data in result
144
145
146 def test_get_list_exception(db_memory_with_data):
147 table = 'test'
148 db_filter = {}
149 db_memory_with_data._find = MagicMock(side_effect=Exception())
150 with pytest.raises(DbException) as excinfo:
151 db_memory_with_data.get_list(table, db_filter)
152 assert str(excinfo.value) == empty_exception_message()
153 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
154
155
156 @pytest.mark.parametrize("table, db_filter, expected_data", [
157 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
158 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
159 ("test", {"_id": 3}, {"_id": 3, "data": 3}),
160 ("test", {"data": 1}, {"_id": 1, "data": 1}),
161 ("test", {"data": 2}, {"_id": 2, "data": 2}),
162 ("test", {"data": 3}, {"_id": 3, "data": 3}),
163 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
164 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
165 ("test", {"_id": 3, "data": 3}, {"_id": 3, "data": 3})])
166 def test_get_one(db_memory_with_data, table, db_filter, expected_data):
167 result = db_memory_with_data.get_one(table, db_filter)
168 assert result == expected_data
169 assert len(db_memory_with_data.db) == 1
170 assert table in db_memory_with_data.db
171 assert len(db_memory_with_data.db[table]) == 3
172 assert result in db_memory_with_data.db[table]
173
174
175 @pytest.mark.parametrize("db_filter, expected_ids", [
176 ({}, [1, 2, 3, 4, 5, 6, 7, 8]),
177 ({"_id": 1}, [1]),
178 ({"data.data2.data3": 2}, [2]),
179 ({"data.data2.data3.eq": 2}, [2]),
180 ({"data.data2.data3": [2]}, [2]),
181 ({"data.data2.data3.cont": [2]}, [2]),
182 ({"data.data2.data3.neq": 2}, [1, 3, 4, 5, 6, 7, 8]),
183 ({"data.data2.data3.neq": [2]}, [1, 3, 4, 5, 6, 7, 8]),
184 ({"data.data2.data3.ncont": [2]}, [1, 3, 4, 5, 6, 7, 8]),
185 ({"data.data2.data3": [2, 3]}, [2, 3]),
186 ({"data.data2.data3.gt": 4}, [5, 6, 7]),
187 ({"data.data2.data3.gte": 4}, [4, 5, 6, 7]),
188 ({"data.data2.data3.lt": 4}, [1, 2, 3]),
189 ({"data.data2.data3.lte": 4}, [1, 2, 3, 4]),
190 ({"data.data2.data3.lte": 4.5}, [1, 2, 3, 4]),
191 ({"data.data2.data3.gt": "text"}, []),
192 ({"nonexist.nonexist": "4"}, []),
193 ({"nonexist.nonexist": None}, [1, 2, 3, 4, 5, 6, 7, 8]),
194 ({"nonexist.nonexist.neq": "4"}, [1, 2, 3, 4, 5, 6, 7, 8]),
195 ({"nonexist.nonexist.neq": None}, []),
196 ({"text.eq": "sometext"}, [1]),
197 ({"text.neq": "sometext"}, [2, 3, 4, 5, 6, 7, 8]),
198 ({"text.eq": "somet"}, []),
199 ({"text.gte": "a"}, [1]),
200 ({"text.gte": "somet"}, [1]),
201 ({"text.gte": "sometext"}, [1]),
202 ({"text.lt": "somet"}, []),
203 ({"data.data2.data3": 2, "data.data2.data4": None}, [2]),
204 ({"data.data2.data3": 2, "data.data2.data4": 5}, []),
205 ({"data.data2.data3": 4}, [4]),
206 ({"data.data2.data3": [3, 4, "e"]}, [3, 4]),
207 ({"data.data2.data3": None}, [8]),
208 ({"data.data2": "4"}, []),
209 ({"list.0.a": 1}, [1, 6]),
210 ({"list2": 1}, [2]),
211 ({"list2": [1, 5]}, [2]),
212 ({"list2": [1, 2]}, [2]),
213 ({"list2": [5, 7]}, []),
214 ({"list.ANYINDEX.a": 1}, [1]),
215 ({"list.a": 3, "list.b": 1}, [8]),
216 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 1}, []),
217 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.a": 3}, [8]),
218 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 0}, [8]),
219 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.ANYINDEX.a": 0, "list.ANYINDEX.c.ANYINDEX.b": "v"}, [8]),
220 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.ANYINDEX.a": 0, "list.ANYINDEX.c.ANYINDEX.b": 1}, []),
221 ({"list.c.b": 1}, [8]),
222 ({"list.c.b": None}, [1, 2, 3, 4, 5, 6, 7]),
223 # ({"data.data2.data3": 4}, []),
224 # ({"data.data2.data3": 4}, []),
225 ])
226 def test_get_list(db_memory_with_many_data, db_filter, expected_ids):
227 result = db_memory_with_many_data.get_list("test", db_filter)
228 assert isinstance(result, list)
229 result_ids = [item["_id"] for item in result]
230 assert len(result) == len(expected_ids), "for db_filter={} result={} expected_ids={}".format(db_filter, result,
231 result_ids)
232 assert result_ids == expected_ids
233 for i in range(len(result)):
234 assert result[i] in db_memory_with_many_data.db["test"]
235
236 assert len(db_memory_with_many_data.db) == 1
237 assert "test" in db_memory_with_many_data.db
238 assert len(db_memory_with_many_data.db["test"]) == 8
239 result = db_memory_with_many_data.count("test", db_filter)
240 assert result == len(expected_ids)
241
242
243 @pytest.mark.parametrize("table, db_filter, expected_data", [
244 ("test", {}, {"_id": 1, "data": 1})])
245 def test_get_one_with_multiple_results(db_memory_with_data, table, db_filter, expected_data):
246 result = db_memory_with_data.get_one(table, db_filter, fail_on_more=False)
247 assert result == expected_data
248 assert len(db_memory_with_data.db) == 1
249 assert table in db_memory_with_data.db
250 assert len(db_memory_with_data.db[table]) == 3
251 assert result in db_memory_with_data.db[table]
252
253
254 def test_get_one_with_multiple_results_exception(db_memory_with_data):
255 table = "test"
256 db_filter = {}
257 with pytest.raises(DbException) as excinfo:
258 db_memory_with_data.get_one(table, db_filter)
259 assert str(excinfo.value) == (empty_exception_message() + get_one_multiple_exception_message(db_filter))
260 # assert excinfo.value.http_code == http.HTTPStatus.CONFLICT
261
262
263 @pytest.mark.parametrize("table, db_filter", [
264 ("test", {"_id": 4}),
265 ("test", {"data": 4}),
266 ("test", {"_id": 4, "data": 4}),
267 ("test_table", {"_id": 4}),
268 ("test_table", {"data": 4}),
269 ("test_table", {"_id": 4, "data": 4})])
270 def test_get_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
271 with pytest.raises(DbException) as excinfo:
272 db_memory_with_data.get_one(table, db_filter)
273 assert str(excinfo.value) == (empty_exception_message() + get_one_exception_message(db_filter))
274 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
275
276
277 @pytest.mark.parametrize("table, db_filter", [
278 ("test", {"_id": 4}),
279 ("test", {"data": 4}),
280 ("test", {"_id": 4, "data": 4}),
281 ("test_table", {"_id": 4}),
282 ("test_table", {"data": 4}),
283 ("test_table", {"_id": 4, "data": 4})])
284 def test_get_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
285 result = db_memory_with_data.get_one(table, db_filter, fail_on_empty=False)
286 assert result is None
287
288
289 @pytest.mark.parametrize("table, db_filter", [
290 ("test", {"_id": 4}),
291 ("test", {"data": 4}),
292 ("test", {"_id": 4, "data": 4}),
293 ("test_table", {"_id": 4}),
294 ("test_table", {"data": 4}),
295 ("test_table", {"_id": 4, "data": 4})])
296 def test_get_one_with_empty_db_exception(db_memory, table, db_filter):
297 with pytest.raises(DbException) as excinfo:
298 db_memory.get_one(table, db_filter)
299 assert str(excinfo.value) == (empty_exception_message() + get_one_exception_message(db_filter))
300 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
301
302
303 @pytest.mark.parametrize("table, db_filter", [
304 ("test", {"_id": 4}),
305 ("test", {"data": 4}),
306 ("test", {"_id": 4, "data": 4}),
307 ("test_table", {"_id": 4}),
308 ("test_table", {"data": 4}),
309 ("test_table", {"_id": 4, "data": 4})])
310 def test_get_one_with_empty_db_none(db_memory, table, db_filter):
311 result = db_memory.get_one(table, db_filter, fail_on_empty=False)
312 assert result is None
313
314
315 def test_get_one_generic_exception(db_memory_with_data):
316 table = 'test'
317 db_filter = {}
318 db_memory_with_data._find = MagicMock(side_effect=Exception())
319 with pytest.raises(DbException) as excinfo:
320 db_memory_with_data.get_one(table, db_filter)
321 assert str(excinfo.value) == empty_exception_message()
322 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
323
324
325 @pytest.mark.parametrize("table, db_filter, expected_data", [
326 ("test", {}, []),
327 ("test", {"_id": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
328 ("test", {"_id": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
329 ("test", {"_id": 1, "data": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
330 ("test", {"_id": 2, "data": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}])])
331 def test_del_list_with_non_empty_db(db_memory_with_data, table, db_filter, expected_data):
332 result = db_memory_with_data.del_list(table, db_filter)
333 assert result["deleted"] == (3 - len(expected_data))
334 assert len(db_memory_with_data.db) == 1
335 assert table in db_memory_with_data.db
336 assert len(db_memory_with_data.db[table]) == len(expected_data)
337 for data in expected_data:
338 assert data in db_memory_with_data.db[table]
339
340
341 @pytest.mark.parametrize("table, db_filter", [
342 ("test", {}),
343 ("test", {"_id": 1}),
344 ("test", {"_id": 2}),
345 ("test", {"data": 1}),
346 ("test", {"data": 2}),
347 ("test", {"_id": 1, "data": 1}),
348 ("test", {"_id": 2, "data": 2})])
349 def test_del_list_with_empty_db(db_memory, table, db_filter):
350 result = db_memory.del_list(table, db_filter)
351 assert result['deleted'] == 0
352
353
354 def test_del_list_generic_exception(db_memory_with_data):
355 table = 'test'
356 db_filter = {}
357 db_memory_with_data._find = MagicMock(side_effect=Exception())
358 with pytest.raises(DbException) as excinfo:
359 db_memory_with_data.del_list(table, db_filter)
360 assert str(excinfo.value) == empty_exception_message()
361 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
362
363
364 @pytest.mark.parametrize("table, db_filter, data", [
365 ("test", {}, {"_id": 1, "data": 1}),
366 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
367 ("test", {"data": 1}, {"_id": 1, "data": 1}),
368 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
369 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
370 ("test", {"data": 2}, {"_id": 2, "data": 2}),
371 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2})])
372 def test_del_one(db_memory_with_data, table, db_filter, data):
373 result = db_memory_with_data.del_one(table, db_filter)
374 assert result == {"deleted": 1}
375 assert len(db_memory_with_data.db) == 1
376 assert table in db_memory_with_data.db
377 assert len(db_memory_with_data.db[table]) == 2
378 assert data not in db_memory_with_data.db[table]
379
380
381 @pytest.mark.parametrize("table, db_filter", [
382 ("test", {}),
383 ("test", {"_id": 1}),
384 ("test", {"_id": 2}),
385 ("test", {"data": 1}),
386 ("test", {"data": 2}),
387 ("test", {"_id": 1, "data": 1}),
388 ("test", {"_id": 2, "data": 2}),
389 ("test_table", {}),
390 ("test_table", {"_id": 1}),
391 ("test_table", {"_id": 2}),
392 ("test_table", {"data": 1}),
393 ("test_table", {"data": 2}),
394 ("test_table", {"_id": 1, "data": 1}),
395 ("test_table", {"_id": 2, "data": 2})])
396 def test_del_one_with_empty_db_exception(db_memory, table, db_filter):
397 with pytest.raises(DbException) as excinfo:
398 db_memory.del_one(table, db_filter)
399 assert str(excinfo.value) == (empty_exception_message() + del_one_exception_message(db_filter))
400 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
401
402
403 @pytest.mark.parametrize("table, db_filter", [
404 ("test", {}),
405 ("test", {"_id": 1}),
406 ("test", {"_id": 2}),
407 ("test", {"data": 1}),
408 ("test", {"data": 2}),
409 ("test", {"_id": 1, "data": 1}),
410 ("test", {"_id": 2, "data": 2}),
411 ("test_table", {}),
412 ("test_table", {"_id": 1}),
413 ("test_table", {"_id": 2}),
414 ("test_table", {"data": 1}),
415 ("test_table", {"data": 2}),
416 ("test_table", {"_id": 1, "data": 1}),
417 ("test_table", {"_id": 2, "data": 2})])
418 def test_del_one_with_empty_db_none(db_memory, table, db_filter):
419 result = db_memory.del_one(table, db_filter, fail_on_empty=False)
420 assert result is None
421
422
423 @pytest.mark.parametrize("table, db_filter", [
424 ("test", {"_id": 4}),
425 ("test", {"_id": 5}),
426 ("test", {"data": 4}),
427 ("test", {"data": 5}),
428 ("test", {"_id": 1, "data": 2}),
429 ("test", {"_id": 2, "data": 3}),
430 ("test_table", {}),
431 ("test_table", {"_id": 1}),
432 ("test_table", {"_id": 2}),
433 ("test_table", {"data": 1}),
434 ("test_table", {"data": 2}),
435 ("test_table", {"_id": 1, "data": 1}),
436 ("test_table", {"_id": 2, "data": 2})])
437 def test_del_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
438 with pytest.raises(DbException) as excinfo:
439 db_memory_with_data.del_one(table, db_filter)
440 assert str(excinfo.value) == (empty_exception_message() + del_one_exception_message(db_filter))
441 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
442
443
444 @pytest.mark.parametrize("table, db_filter", [
445 ("test", {"_id": 4}),
446 ("test", {"_id": 5}),
447 ("test", {"data": 4}),
448 ("test", {"data": 5}),
449 ("test", {"_id": 1, "data": 2}),
450 ("test", {"_id": 2, "data": 3}),
451 ("test_table", {}),
452 ("test_table", {"_id": 1}),
453 ("test_table", {"_id": 2}),
454 ("test_table", {"data": 1}),
455 ("test_table", {"data": 2}),
456 ("test_table", {"_id": 1, "data": 1}),
457 ("test_table", {"_id": 2, "data": 2})])
458 def test_del_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
459 result = db_memory_with_data.del_one(table, db_filter, fail_on_empty=False)
460 assert result is None
461
462
463 @pytest.mark.parametrize("fail_on_empty", [
464 (True),
465 (False)])
466 def test_del_one_generic_exception(db_memory_with_data, fail_on_empty):
467 table = 'test'
468 db_filter = {}
469 db_memory_with_data._find = MagicMock(side_effect=Exception())
470 with pytest.raises(DbException) as excinfo:
471 db_memory_with_data.del_one(table, db_filter, fail_on_empty=fail_on_empty)
472 assert str(excinfo.value) == empty_exception_message()
473 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
474
475
476 @pytest.mark.parametrize("table, _id, indata", [
477 ("test", 1, {"_id": 1, "data": 42}),
478 ("test", 1, {"_id": 1, "data": 42, "kk": 34}),
479 ("test", 1, {"_id": 1}),
480 ("test", 2, {"_id": 2, "data": 42}),
481 ("test", 2, {"_id": 2, "data": 42, "kk": 34}),
482 ("test", 2, {"_id": 2}),
483 ("test", 3, {"_id": 3, "data": 42}),
484 ("test", 3, {"_id": 3, "data": 42, "kk": 34}),
485 ("test", 3, {"_id": 3})])
486 def test_replace(db_memory_with_data, table, _id, indata):
487 result = db_memory_with_data.replace(table, _id, indata)
488 assert result == {"updated": 1}
489 assert len(db_memory_with_data.db) == 1
490 assert table in db_memory_with_data.db
491 assert len(db_memory_with_data.db[table]) == 3
492 assert indata in db_memory_with_data.db[table]
493
494
495 @pytest.mark.parametrize("table, _id, indata", [
496 ("test", 1, {"_id": 1, "data": 42}),
497 ("test", 2, {"_id": 2}),
498 ("test", 3, {"_id": 3})])
499 def test_replace_without_data_exception(db_memory, table, _id, indata):
500 with pytest.raises(DbException) as excinfo:
501 db_memory.replace(table, _id, indata, fail_on_empty=True)
502 assert str(excinfo.value) == (replace_exception_message(_id))
503 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
504
505
506 @pytest.mark.parametrize("table, _id, indata", [
507 ("test", 1, {"_id": 1, "data": 42}),
508 ("test", 2, {"_id": 2}),
509 ("test", 3, {"_id": 3})])
510 def test_replace_without_data_none(db_memory, table, _id, indata):
511 result = db_memory.replace(table, _id, indata, fail_on_empty=False)
512 assert result is None
513
514
515 @pytest.mark.parametrize("table, _id, indata", [
516 ("test", 11, {"_id": 11, "data": 42}),
517 ("test", 12, {"_id": 12}),
518 ("test", 33, {"_id": 33})])
519 def test_replace_with_data_exception(db_memory_with_data, table, _id, indata):
520 with pytest.raises(DbException) as excinfo:
521 db_memory_with_data.replace(table, _id, indata, fail_on_empty=True)
522 assert str(excinfo.value) == (replace_exception_message(_id))
523 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
524
525
526 @pytest.mark.parametrize("table, _id, indata", [
527 ("test", 11, {"_id": 11, "data": 42}),
528 ("test", 12, {"_id": 12}),
529 ("test", 33, {"_id": 33})])
530 def test_replace_with_data_none(db_memory_with_data, table, _id, indata):
531 result = db_memory_with_data.replace(table, _id, indata, fail_on_empty=False)
532 assert result is None
533
534
535 @pytest.mark.parametrize("fail_on_empty", [
536 True,
537 False])
538 def test_replace_generic_exception(db_memory_with_data, fail_on_empty):
539 table = 'test'
540 _id = {}
541 indata = {'_id': 1, 'data': 1}
542 db_memory_with_data._find = MagicMock(side_effect=Exception())
543 with pytest.raises(DbException) as excinfo:
544 db_memory_with_data.replace(table, _id, indata, fail_on_empty=fail_on_empty)
545 assert str(excinfo.value) == empty_exception_message()
546 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
547
548
549 @pytest.mark.parametrize("table, id, data", [
550 ("test", "1", {"data": 1}),
551 ("test", "1", {"data": 2}),
552 ("test", "2", {"data": 1}),
553 ("test", "2", {"data": 2}),
554 ("test_table", "1", {"data": 1}),
555 ("test_table", "1", {"data": 2}),
556 ("test_table", "2", {"data": 1}),
557 ("test_table", "2", {"data": 2}),
558 ("test", "1", {"data_1": 1, "data_2": 2}),
559 ("test", "1", {"data_1": 2, "data_2": 1}),
560 ("test", "2", {"data_1": 1, "data_2": 2}),
561 ("test", "2", {"data_1": 2, "data_2": 1}),
562 ("test_table", "1", {"data_1": 1, "data_2": 2}),
563 ("test_table", "1", {"data_1": 2, "data_2": 1}),
564 ("test_table", "2", {"data_1": 1, "data_2": 2}),
565 ("test_table", "2", {"data_1": 2, "data_2": 1})])
566 def test_create_with_empty_db_with_id(db_memory, table, id, data):
567 data_to_insert = data
568 data_to_insert['_id'] = id
569 returned_id = db_memory.create(table, data_to_insert)
570 assert returned_id == id
571 assert len(db_memory.db) == 1
572 assert table in db_memory.db
573 assert len(db_memory.db[table]) == 1
574 assert data_to_insert in db_memory.db[table]
575
576
577 @pytest.mark.parametrize("table, id, data", [
578 ("test", "4", {"data": 1}),
579 ("test", "5", {"data": 2}),
580 ("test", "4", {"data": 1}),
581 ("test", "5", {"data": 2}),
582 ("test_table", "4", {"data": 1}),
583 ("test_table", "5", {"data": 2}),
584 ("test_table", "4", {"data": 1}),
585 ("test_table", "5", {"data": 2}),
586 ("test", "4", {"data_1": 1, "data_2": 2}),
587 ("test", "5", {"data_1": 2, "data_2": 1}),
588 ("test", "4", {"data_1": 1, "data_2": 2}),
589 ("test", "5", {"data_1": 2, "data_2": 1}),
590 ("test_table", "4", {"data_1": 1, "data_2": 2}),
591 ("test_table", "5", {"data_1": 2, "data_2": 1}),
592 ("test_table", "4", {"data_1": 1, "data_2": 2}),
593 ("test_table", "5", {"data_1": 2, "data_2": 1})])
594 def test_create_with_non_empty_db_with_id(db_memory_with_data, table, id, data):
595 data_to_insert = data
596 data_to_insert['_id'] = id
597 returned_id = db_memory_with_data.create(table, data_to_insert)
598 assert returned_id == id
599 assert len(db_memory_with_data.db) == (1 if table == 'test' else 2)
600 assert table in db_memory_with_data.db
601 assert len(db_memory_with_data.db[table]) == (4 if table == 'test' else 1)
602 assert data_to_insert in db_memory_with_data.db[table]
603
604
605 @pytest.mark.parametrize("table, data", [
606 ("test", {"data": 1}),
607 ("test", {"data": 2}),
608 ("test", {"data": 1}),
609 ("test", {"data": 2}),
610 ("test_table", {"data": 1}),
611 ("test_table", {"data": 2}),
612 ("test_table", {"data": 1}),
613 ("test_table", {"data": 2}),
614 ("test", {"data_1": 1, "data_2": 2}),
615 ("test", {"data_1": 2, "data_2": 1}),
616 ("test", {"data_1": 1, "data_2": 2}),
617 ("test", {"data_1": 2, "data_2": 1}),
618 ("test_table", {"data_1": 1, "data_2": 2}),
619 ("test_table", {"data_1": 2, "data_2": 1}),
620 ("test_table", {"data_1": 1, "data_2": 2}),
621 ("test_table", {"data_1": 2, "data_2": 1})])
622 def test_create_with_empty_db_without_id(db_memory, table, data):
623 returned_id = db_memory.create(table, data)
624 assert len(db_memory.db) == 1
625 assert table in db_memory.db
626 assert len(db_memory.db[table]) == 1
627 data_inserted = data
628 data_inserted['_id'] = returned_id
629 assert data_inserted in db_memory.db[table]
630
631
632 @pytest.mark.parametrize("table, data", [
633 ("test", {"data": 1}),
634 ("test", {"data": 2}),
635 ("test", {"data": 1}),
636 ("test", {"data": 2}),
637 ("test_table", {"data": 1}),
638 ("test_table", {"data": 2}),
639 ("test_table", {"data": 1}),
640 ("test_table", {"data": 2}),
641 ("test", {"data_1": 1, "data_2": 2}),
642 ("test", {"data_1": 2, "data_2": 1}),
643 ("test", {"data_1": 1, "data_2": 2}),
644 ("test", {"data_1": 2, "data_2": 1}),
645 ("test_table", {"data_1": 1, "data_2": 2}),
646 ("test_table", {"data_1": 2, "data_2": 1}),
647 ("test_table", {"data_1": 1, "data_2": 2}),
648 ("test_table", {"data_1": 2, "data_2": 1})])
649 def test_create_with_non_empty_db_without_id(db_memory_with_data, table, data):
650 returned_id = db_memory_with_data.create(table, data)
651 assert len(db_memory_with_data.db) == (1 if table == 'test' else 2)
652 assert table in db_memory_with_data.db
653 assert len(db_memory_with_data.db[table]) == (4 if table == 'test' else 1)
654 data_inserted = data
655 data_inserted['_id'] = returned_id
656 assert data_inserted in db_memory_with_data.db[table]
657
658
659 def test_create_with_exception(db_memory):
660 table = "test"
661 data = {"_id": 1, "data": 1}
662 db_memory.db = MagicMock()
663 db_memory.db.__contains__.side_effect = Exception()
664 with pytest.raises(DbException) as excinfo:
665 db_memory.create(table, data)
666 assert str(excinfo.value) == empty_exception_message()
667 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
668
669
670 @pytest.mark.parametrize("db_content, update_dict, expected, message", [
671 ({"a": {"none": None}}, {"a.b.num": "v"}, {"a": {"none": None, "b": {"num": "v"}}}, "create dict"),
672 ({"a": {"none": None}}, {"a.none.num": "v"}, {"a": {"none": {"num": "v"}}}, "create dict over none"),
673 ({"a": {"b": {"num": 4}}}, {"a.b.num": "v"}, {"a": {"b": {"num": "v"}}}, "replace_number"),
674 ({"a": {"b": {"num": 4}}}, {"a.b.num.c.d": "v"}, None, "create dict over number should fail"),
675 ({"a": {"b": {"num": 4}}}, {"a.b": "v"}, {"a": {"b": "v"}}, "replace dict with a string"),
676 ({"a": {"b": {"num": 4}}}, {"a.b": None}, {"a": {"b": None}}, "replace dict with None"),
677 ({"a": [{"b": {"num": 4}}]}, {"a.b.num": "v"}, None, "create dict over list should fail"),
678 ({"a": [{"b": {"num": 4}}]}, {"a.0.b.num": "v"}, {"a": [{"b": {"num": "v"}}]}, "set list"),
679 ({"a": [{"b": {"num": 4}}]}, {"a.3.b.num": "v"},
680 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]}, "expand list"),
681 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
682 ({"a": [[4]]}, {"a.0.2": "v"}, {"a": [[4, None, "v"]]}, "expand nested list"),
683 ({"a": [[4]]}, {"a.2.2": "v"}, {"a": [[4], None, {"2": "v"}]}, "expand list and add number key")])
684 def test_set_one(db_memory, db_content, update_dict, expected, message):
685 db_memory._find = Mock(return_value=((0, db_content), ))
686 if expected is None:
687 with pytest.raises(DbException) as excinfo:
688 db_memory.set_one("table", {}, update_dict)
689 assert (excinfo.value.http_code == http.HTTPStatus.NOT_FOUND), message
690 else:
691 db_memory.set_one("table", {}, update_dict)
692 assert (db_content == expected), message
693
694
695 class TestDbMemory(unittest.TestCase):
696 # TODO to delete. This is cover with pytest test_set_one.
697 def test_set_one(self):
698 test_set = (
699 # (database content, set-content, expected database content (None=fails), message)
700 ({"a": {"none": None}}, {"a.b.num": "v"}, {"a": {"none": None, "b": {"num": "v"}}}, "create dict"),
701 ({"a": {"none": None}}, {"a.none.num": "v"}, {"a": {"none": {"num": "v"}}}, "create dict over none"),
702 ({"a": {"b": {"num": 4}}}, {"a.b.num": "v"}, {"a": {"b": {"num": "v"}}}, "replace_number"),
703 ({"a": {"b": {"num": 4}}}, {"a.b.num.c.d": "v"}, None, "create dict over number should fail"),
704 ({"a": {"b": {"num": 4}}}, {"a.b": "v"}, {"a": {"b": "v"}}, "replace dict with a string"),
705 ({"a": {"b": {"num": 4}}}, {"a.b": None}, {"a": {"b": None}}, "replace dict with None"),
706
707 ({"a": [{"b": {"num": 4}}]}, {"a.b.num": "v"}, None, "create dict over list should fail"),
708 ({"a": [{"b": {"num": 4}}]}, {"a.0.b.num": "v"}, {"a": [{"b": {"num": "v"}}]}, "set list"),
709 ({"a": [{"b": {"num": 4}}]}, {"a.3.b.num": "v"},
710 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]}, "expand list"),
711 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
712 ({"a": [[4]]}, {"a.0.2": "v"}, {"a": [[4, None, "v"]]}, "expand nested list"),
713 ({"a": [[4]]}, {"a.2.2": "v"}, {"a": [[4], None, {"2": "v"}]}, "expand list and add number key"),
714 ({"a": None}, {"b.c": "v"}, {"a": None, "b": {"c": "v"}}, "expand at root"),
715 )
716 db_men = DbMemory()
717 db_men._find = Mock()
718 for db_content, update_dict, expected, message in test_set:
719 db_men._find.return_value = ((0, db_content), )
720 if expected is None:
721 self.assertRaises(DbException, db_men.set_one, "table", {}, update_dict)
722 else:
723 db_men.set_one("table", {}, update_dict)
724 self.assertEqual(db_content, expected, message)
725
726 def test_set_one_pull(self):
727 example = {"a": [1, "1", 1], "d": {}, "n": None}
728 test_set = (
729 # (database content, set-content, expected database content (None=fails), message)
730 (example, {"a": "1"}, {"a": [1, 1], "d": {}, "n": None}, "pull one item"),
731 (example, {"a": 1}, {"a": ["1"], "d": {}, "n": None}, "pull two items"),
732 (example, {"a": "v"}, example, "pull non existing item"),
733 (example, {"a.6": 1}, example, "pull non existing arrray"),
734 (example, {"d.b.c": 1}, example, "pull non existing arrray2"),
735 (example, {"b": 1}, example, "pull non existing arrray3"),
736 (example, {"d": 1}, None, "pull over dict"),
737 (example, {"n": 1}, None, "pull over None"),
738 )
739 db_men = DbMemory()
740 db_men._find = Mock()
741 for db_content, pull_dict, expected, message in test_set:
742 db_content = deepcopy(db_content)
743 db_men._find.return_value = ((0, db_content), )
744 if expected is None:
745 self.assertRaises(DbException, db_men.set_one, "table", {}, None, fail_on_empty=False, pull=pull_dict)
746 else:
747 db_men.set_one("table", {}, None, pull=pull_dict)
748 self.assertEqual(db_content, expected, message)
749
750 def test_set_one_push(self):
751 example = {"a": [1, "1", 1], "d": {}, "n": None}
752 test_set = (
753 # (database content, set-content, expected database content (None=fails), message)
754 (example, {"d.b.c": 1}, {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None}, "push non existing arrray2"),
755 (example, {"b": 1}, {"a": [1, "1", 1], "d": {}, "b": [1], "n": None}, "push non existing arrray3"),
756 (example, {"a.6": 1}, {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
757 "push non existing arrray"),
758 (example, {"a": 2}, {"a": [1, "1", 1, 2], "d": {}, "n": None}, "push one item"),
759 (example, {"a": {1: 1}}, {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None}, "push a dict"),
760 (example, {"d": 1}, None, "push over dict"),
761 (example, {"n": 1}, None, "push over None"),
762 )
763 db_men = DbMemory()
764 db_men._find = Mock()
765 for db_content, push_dict, expected, message in test_set:
766 db_content = deepcopy(db_content)
767 db_men._find.return_value = ((0, db_content), )
768 if expected is None:
769 self.assertRaises(DbException, db_men.set_one, "table", {}, None, fail_on_empty=False, push=push_dict)
770 else:
771 db_men.set_one("table", {}, None, push=push_dict)
772 self.assertEqual(db_content, expected, message)
773
774 def test_set_one_push_list(self):
775 example = {"a": [1, "1", 1], "d": {}, "n": None}
776 test_set = (
777 # (database content, set-content, expected database content (None=fails), message)
778 (example, {"d.b.c": [1]}, {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
779 "push non existing arrray2"),
780 (example, {"b": [1]}, {"a": [1, "1", 1], "d": {}, "b": [1], "n": None}, "push non existing arrray3"),
781 (example, {"a.6": [1]}, {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
782 "push non existing arrray"),
783 (example, {"a": [2, 3]}, {"a": [1, "1", 1, 2, 3], "d": {}, "n": None}, "push two item"),
784 (example, {"a": [{1: 1}]}, {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None}, "push a dict"),
785 (example, {"d": [1]}, None, "push over dict"),
786 (example, {"n": [1]}, None, "push over None"),
787 (example, {"a": 1}, None, "invalid push list non an array"),
788 )
789 db_men = DbMemory()
790 db_men._find = Mock()
791 for db_content, push_list, expected, message in test_set:
792 db_content = deepcopy(db_content)
793 db_men._find.return_value = ((0, db_content), )
794 if expected is None:
795 self.assertRaises(DbException, db_men.set_one, "table", {}, None, fail_on_empty=False,
796 push_list=push_list)
797 else:
798 db_men.set_one("table", {}, None, push_list=push_list)
799 self.assertEqual(db_content, expected, message)
800
801 def test_unset_one(self):
802 example = {"a": [1, "1", 1], "d": {}, "n": None}
803 test_set = (
804 # (database content, set-content, expected database content (None=fails), message)
805 (example, {"d.b.c": 1}, example, "unset non existing"),
806 (example, {"b": 1}, example, "unset non existing"),
807 (example, {"a.6": 1}, example, "unset non existing arrray"),
808 (example, {"a": 2}, {"d": {}, "n": None}, "unset array"),
809 (example, {"d": 1}, {"a": [1, "1", 1], "n": None}, "unset dict"),
810 (example, {"n": 1}, {"a": [1, "1", 1], "d": {}}, "unset None"),
811 )
812 db_men = DbMemory()
813 db_men._find = Mock()
814 for db_content, unset_dict, expected, message in test_set:
815 db_content = deepcopy(db_content)
816 db_men._find.return_value = ((0, db_content), )
817 if expected is None:
818 self.assertRaises(DbException, db_men.set_one, "table", {}, None, fail_on_empty=False, unset=unset_dict)
819 else:
820 db_men.set_one("table", {}, None, unset=unset_dict)
821 self.assertEqual(db_content, expected, message)