Code Coverage

Cobertura Coverage Report > osm_common.tests >

test_dbmemory.py

Trend

File Coverage summary

NameClassesLinesConditionals
test_dbmemory.py
100%
1/1
45%
156/346
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_dbmemory.py
45%
156/346
N/A

Source

osm_common/tests/test_dbmemory.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19 1 from copy import deepcopy
20 1 import http
21 1 import logging
22 1 import unittest
23 1 from unittest.mock import MagicMock, Mock
24
25 1 from osm_common.dbbase import DbException
26 1 from osm_common.dbmemory import DbMemory
27 1 import pytest
28
29 1 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
30
31
32 1 @pytest.fixture(scope="function", params=[True, False])
33 1 def db_memory(request):
34 0     db = DbMemory(lock=request.param)
35 0     return db
36
37
38 1 @pytest.fixture(scope="function", params=[True, False])
39 1 def db_memory_with_data(request):
40 0     db = DbMemory(lock=request.param)
41
42 0     db.create("test", {"_id": 1, "data": 1})
43 0     db.create("test", {"_id": 2, "data": 2})
44 0     db.create("test", {"_id": 3, "data": 3})
45
46 0     return db
47
48
49 1 @pytest.fixture(scope="function")
50 1 def db_memory_with_many_data(request):
51 0     db = DbMemory(lock=False)
52
53 0     db.create_list(
54         "test",
55         [
56             {
57                 "_id": 1,
58                 "data": {"data2": {"data3": 1}},
59                 "list": [{"a": 1}],
60                 "text": "sometext",
61             },
62             {
63                 "_id": 2,
64                 "data": {"data2": {"data3": 2}},
65                 "list": [{"a": 2}],
66                 "list2": [1, 2, 3],
67             },
68             {"_id": 3, "data": {"data2": {"data3": 3}}, "list": [{"a": 3}]},
69             {"_id": 4, "data": {"data2": {"data3": 4}}, "list": [{"a": 4}, {"a": 0}]},
70             {"_id": 5, "data": {"data2": {"data3": 5}}, "list": [{"a": 5}]},
71             {"_id": 6, "data": {"data2": {"data3": 6}}, "list": [{"0": {"a": 1}}]},
72             {"_id": 7, "data": {"data2": {"data3": 7}}, "0": {"a": 0}},
73             {
74                 "_id": 8,
75                 "list": [
76                     {"a": 3, "b": 0, "c": [{"a": 3, "b": 1}, {"a": 0, "b": "v"}]},
77                     {"a": 0, "b": 1},
78                 ],
79             },
80         ],
81     )
82 0     return db
83
84
85 1 def empty_exception_message():
86 0     return "database exception "
87
88
89 1 def get_one_exception_message(db_filter):
90 0     return "database exception Not found entry with filter='{}'".format(db_filter)
91
92
93 1 def get_one_multiple_exception_message(db_filter):
94 0     return "database exception Found more than one entry with filter='{}'".format(
95         db_filter
96     )
97
98
99 1 def del_one_exception_message(db_filter):
100 0     return "database exception Not found entry with filter='{}'".format(db_filter)
101
102
103 1 def replace_exception_message(value):
104 0     return "database exception Not found entry with _id='{}'".format(value)
105
106
107 1 def test_constructor():
108 1     db = DbMemory()
109 1     assert db.logger == logging.getLogger("db")
110 1     assert db.db == {}
111
112
113 1 def test_constructor_with_logger():
114 1     logger_name = "db_local"
115 1     db = DbMemory(logger_name=logger_name)
116 1     assert db.logger == logging.getLogger(logger_name)
117 1     assert db.db == {}
118
119
120 1 def test_db_connect():
121 1     logger_name = "db_local"
122 1     config = {"logger_name": logger_name}
123 1     db = DbMemory()
124 1     db.db_connect(config)
125 1     assert db.logger == logging.getLogger(logger_name)
126 1     assert db.db == {}
127
128
129 1 def test_db_disconnect(db_memory):
130 0     db_memory.db_disconnect()
131
132
133 1 @pytest.mark.parametrize(
134     "table, db_filter",
135     [
136         ("test", {}),
137         ("test", {"_id": 1}),
138         ("test", {"data": 1}),
139         ("test", {"_id": 1, "data": 1}),
140     ],
141 )
142 1 def test_get_list_with_empty_db(db_memory, table, db_filter):
143 0     result = db_memory.get_list(table, db_filter)
144 0     assert len(result) == 0
145
146
147 1 @pytest.mark.parametrize(
148     "table, db_filter, expected_data",
149     [
150         (
151             "test",
152             {},
153             [{"_id": 1, "data": 1}, {"_id": 2, "data": 2}, {"_id": 3, "data": 3}],
154         ),
155         ("test", {"_id": 1}, [{"_id": 1, "data": 1}]),
156         ("test", {"data": 1}, [{"_id": 1, "data": 1}]),
157         ("test", {"_id": 1, "data": 1}, [{"_id": 1, "data": 1}]),
158         ("test", {"_id": 2}, [{"_id": 2, "data": 2}]),
159         ("test", {"data": 2}, [{"_id": 2, "data": 2}]),
160         ("test", {"_id": 2, "data": 2}, [{"_id": 2, "data": 2}]),
161         ("test", {"_id": 4}, []),
162         ("test", {"data": 4}, []),
163         ("test", {"_id": 4, "data": 4}, []),
164         ("test_table", {}, []),
165         ("test_table", {"_id": 1}, []),
166         ("test_table", {"data": 1}, []),
167         ("test_table", {"_id": 1, "data": 1}, []),
168     ],
169 )
170 1 def test_get_list_with_non_empty_db(
171     db_memory_with_data, table, db_filter, expected_data
172 ):
173 0     result = db_memory_with_data.get_list(table, db_filter)
174 0     assert len(result) == len(expected_data)
175 0     for data in expected_data:
176 0         assert data in result
177
178
179 1 def test_get_list_exception(db_memory_with_data):
180 0     table = "test"
181 0     db_filter = {}
182 0     db_memory_with_data._find = MagicMock(side_effect=Exception())
183 0     with pytest.raises(DbException) as excinfo:
184 0         db_memory_with_data.get_list(table, db_filter)
185 0     assert str(excinfo.value) == empty_exception_message()
186 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
187
188
189 1 @pytest.mark.parametrize(
190     "table, db_filter, expected_data",
191     [
192         ("test", {"_id": 1}, {"_id": 1, "data": 1}),
193         ("test", {"_id": 2}, {"_id": 2, "data": 2}),
194         ("test", {"_id": 3}, {"_id": 3, "data": 3}),
195         ("test", {"data": 1}, {"_id": 1, "data": 1}),
196         ("test", {"data": 2}, {"_id": 2, "data": 2}),
197         ("test", {"data": 3}, {"_id": 3, "data": 3}),
198         ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
199         ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
200         ("test", {"_id": 3, "data": 3}, {"_id": 3, "data": 3}),
201     ],
202 )
203 1 def test_get_one(db_memory_with_data, table, db_filter, expected_data):
204 0     result = db_memory_with_data.get_one(table, db_filter)
205 0     assert result == expected_data
206 0     assert len(db_memory_with_data.db) == 1
207 0     assert table in db_memory_with_data.db
208 0     assert len(db_memory_with_data.db[table]) == 3
209 0     assert result in db_memory_with_data.db[table]
210
211
212 1 @pytest.mark.parametrize(
213     "db_filter, expected_ids",
214     [
215         ({}, [1, 2, 3, 4, 5, 6, 7, 8]),
216         ({"_id": 1}, [1]),
217         ({"data.data2.data3": 2}, [2]),
218         ({"data.data2.data3.eq": 2}, [2]),
219         ({"data.data2.data3": [2]}, [2]),
220         ({"data.data2.data3.cont": [2]}, [2]),
221         ({"data.data2.data3.neq": 2}, [1, 3, 4, 5, 6, 7, 8]),
222         ({"data.data2.data3.neq": [2]}, [1, 3, 4, 5, 6, 7, 8]),
223         ({"data.data2.data3.ncont": [2]}, [1, 3, 4, 5, 6, 7, 8]),
224         ({"data.data2.data3": [2, 3]}, [2, 3]),
225         ({"data.data2.data3.gt": 4}, [5, 6, 7]),
226         ({"data.data2.data3.gte": 4}, [4, 5, 6, 7]),
227         ({"data.data2.data3.lt": 4}, [1, 2, 3]),
228         ({"data.data2.data3.lte": 4}, [1, 2, 3, 4]),
229         ({"data.data2.data3.lte": 4.5}, [1, 2, 3, 4]),
230         ({"data.data2.data3.gt": "text"}, []),
231         ({"nonexist.nonexist": "4"}, []),
232         ({"nonexist.nonexist": None}, [1, 2, 3, 4, 5, 6, 7, 8]),
233         ({"nonexist.nonexist.neq": "4"}, [1, 2, 3, 4, 5, 6, 7, 8]),
234         ({"nonexist.nonexist.neq": None}, []),
235         ({"text.eq": "sometext"}, [1]),
236         ({"text.neq": "sometext"}, [2, 3, 4, 5, 6, 7, 8]),
237         ({"text.eq": "somet"}, []),
238         ({"text.gte": "a"}, [1]),
239         ({"text.gte": "somet"}, [1]),
240         ({"text.gte": "sometext"}, [1]),
241         ({"text.lt": "somet"}, []),
242         ({"data.data2.data3": 2, "data.data2.data4": None}, [2]),
243         ({"data.data2.data3": 2, "data.data2.data4": 5}, []),
244         ({"data.data2.data3": 4}, [4]),
245         ({"data.data2.data3": [3, 4, "e"]}, [3, 4]),
246         ({"data.data2.data3": None}, [8]),
247         ({"data.data2": "4"}, []),
248         ({"list.0.a": 1}, [1, 6]),
249         ({"list2": 1}, [2]),
250         ({"list2": [1, 5]}, [2]),
251         ({"list2": [1, 2]}, [2]),
252         ({"list2": [5, 7]}, []),
253         ({"list.ANYINDEX.a": 1}, [1]),
254         ({"list.a": 3, "list.b": 1}, [8]),
255         ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 1}, []),
256         ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.a": 3}, [8]),
257         ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 0}, [8]),
258         (
259             {
260                 "list.ANYINDEX.a": 3,
261                 "list.ANYINDEX.c.ANYINDEX.a": 0,
262                 "list.ANYINDEX.c.ANYINDEX.b": "v",
263             },
264             [8],
265         ),
266         (
267             {
268                 "list.ANYINDEX.a": 3,
269                 "list.ANYINDEX.c.ANYINDEX.a": 0,
270                 "list.ANYINDEX.c.ANYINDEX.b": 1,
271             },
272             [],
273         ),
274         ({"list.c.b": 1}, [8]),
275         ({"list.c.b": None}, [1, 2, 3, 4, 5, 6, 7]),
276         # ({"data.data2.data3": 4}, []),
277         # ({"data.data2.data3": 4}, []),
278     ],
279 )
280 1 def test_get_list(db_memory_with_many_data, db_filter, expected_ids):
281 0     result = db_memory_with_many_data.get_list("test", db_filter)
282 0     assert isinstance(result, list)
283 0     result_ids = [item["_id"] for item in result]
284 0     assert len(result) == len(
285         expected_ids
286     ), "for db_filter={} result={} expected_ids={}".format(
287         db_filter, result, result_ids
288     )
289 0     assert result_ids == expected_ids
290 0     for i in range(len(result)):
291 0         assert result[i] in db_memory_with_many_data.db["test"]
292
293 0     assert len(db_memory_with_many_data.db) == 1
294 0     assert "test" in db_memory_with_many_data.db
295 0     assert len(db_memory_with_many_data.db["test"]) == 8
296 0     result = db_memory_with_many_data.count("test", db_filter)
297 0     assert result == len(expected_ids)
298
299
300 1 @pytest.mark.parametrize(
301     "table, db_filter, expected_data", [("test", {}, {"_id": 1, "data": 1})]
302 )
303 1 def test_get_one_with_multiple_results(
304     db_memory_with_data, table, db_filter, expected_data
305 ):
306 0     result = db_memory_with_data.get_one(table, db_filter, fail_on_more=False)
307 0     assert result == expected_data
308 0     assert len(db_memory_with_data.db) == 1
309 0     assert table in db_memory_with_data.db
310 0     assert len(db_memory_with_data.db[table]) == 3
311 0     assert result in db_memory_with_data.db[table]
312
313
314 1 def test_get_one_with_multiple_results_exception(db_memory_with_data):
315 0     table = "test"
316 0     db_filter = {}
317 0     with pytest.raises(DbException) as excinfo:
318 0         db_memory_with_data.get_one(table, db_filter)
319 0     assert str(excinfo.value) == (
320         empty_exception_message() + get_one_multiple_exception_message(db_filter)
321     )
322     # assert excinfo.value.http_code == http.HTTPStatus.CONFLICT
323
324
325 1 @pytest.mark.parametrize(
326     "table, db_filter",
327     [
328         ("test", {"_id": 4}),
329         ("test", {"data": 4}),
330         ("test", {"_id": 4, "data": 4}),
331         ("test_table", {"_id": 4}),
332         ("test_table", {"data": 4}),
333         ("test_table", {"_id": 4, "data": 4}),
334     ],
335 )
336 1 def test_get_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
337 0     with pytest.raises(DbException) as excinfo:
338 0         db_memory_with_data.get_one(table, db_filter)
339 0     assert str(excinfo.value) == (
340         empty_exception_message() + get_one_exception_message(db_filter)
341     )
342 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
343
344
345 1 @pytest.mark.parametrize(
346     "table, db_filter",
347     [
348         ("test", {"_id": 4}),
349         ("test", {"data": 4}),
350         ("test", {"_id": 4, "data": 4}),
351         ("test_table", {"_id": 4}),
352         ("test_table", {"data": 4}),
353         ("test_table", {"_id": 4, "data": 4}),
354     ],
355 )
356 1 def test_get_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
357 0     result = db_memory_with_data.get_one(table, db_filter, fail_on_empty=False)
358 0     assert result is None
359
360
361 1 @pytest.mark.parametrize(
362     "table, db_filter",
363     [
364         ("test", {"_id": 4}),
365         ("test", {"data": 4}),
366         ("test", {"_id": 4, "data": 4}),
367         ("test_table", {"_id": 4}),
368         ("test_table", {"data": 4}),
369         ("test_table", {"_id": 4, "data": 4}),
370     ],
371 )
372 1 def test_get_one_with_empty_db_exception(db_memory, table, db_filter):
373 0     with pytest.raises(DbException) as excinfo:
374 0         db_memory.get_one(table, db_filter)
375 0     assert str(excinfo.value) == (
376         empty_exception_message() + get_one_exception_message(db_filter)
377     )
378 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
379
380
381 1 @pytest.mark.parametrize(
382     "table, db_filter",
383     [
384         ("test", {"_id": 4}),
385         ("test", {"data": 4}),
386         ("test", {"_id": 4, "data": 4}),
387         ("test_table", {"_id": 4}),
388         ("test_table", {"data": 4}),
389         ("test_table", {"_id": 4, "data": 4}),
390     ],
391 )
392 1 def test_get_one_with_empty_db_none(db_memory, table, db_filter):
393 0     result = db_memory.get_one(table, db_filter, fail_on_empty=False)
394 0     assert result is None
395
396
397 1 def test_get_one_generic_exception(db_memory_with_data):
398 0     table = "test"
399 0     db_filter = {}
400 0     db_memory_with_data._find = MagicMock(side_effect=Exception())
401 0     with pytest.raises(DbException) as excinfo:
402 0         db_memory_with_data.get_one(table, db_filter)
403 0     assert str(excinfo.value) == empty_exception_message()
404 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
405
406
407 1 @pytest.mark.parametrize(
408     "table, db_filter, expected_data",
409     [
410         ("test", {}, []),
411         ("test", {"_id": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
412         ("test", {"_id": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
413         ("test", {"_id": 1, "data": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
414         ("test", {"_id": 2, "data": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
415     ],
416 )
417 1 def test_del_list_with_non_empty_db(
418     db_memory_with_data, table, db_filter, expected_data
419 ):
420 0     result = db_memory_with_data.del_list(table, db_filter)
421 0     assert result["deleted"] == (3 - len(expected_data))
422 0     assert len(db_memory_with_data.db) == 1
423 0     assert table in db_memory_with_data.db
424 0     assert len(db_memory_with_data.db[table]) == len(expected_data)
425 0     for data in expected_data:
426 0         assert data in db_memory_with_data.db[table]
427
428
429 1 @pytest.mark.parametrize(
430     "table, db_filter",
431     [
432         ("test", {}),
433         ("test", {"_id": 1}),
434         ("test", {"_id": 2}),
435         ("test", {"data": 1}),
436         ("test", {"data": 2}),
437         ("test", {"_id": 1, "data": 1}),
438         ("test", {"_id": 2, "data": 2}),
439     ],
440 )
441 1 def test_del_list_with_empty_db(db_memory, table, db_filter):
442 0     result = db_memory.del_list(table, db_filter)
443 0     assert result["deleted"] == 0
444
445
446 1 def test_del_list_generic_exception(db_memory_with_data):
447 0     table = "test"
448 0     db_filter = {}
449 0     db_memory_with_data._find = MagicMock(side_effect=Exception())
450 0     with pytest.raises(DbException) as excinfo:
451 0         db_memory_with_data.del_list(table, db_filter)
452 0     assert str(excinfo.value) == empty_exception_message()
453 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
454
455
456 1 @pytest.mark.parametrize(
457     "table, db_filter, data",
458     [
459         ("test", {}, {"_id": 1, "data": 1}),
460         ("test", {"_id": 1}, {"_id": 1, "data": 1}),
461         ("test", {"data": 1}, {"_id": 1, "data": 1}),
462         ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
463         ("test", {"_id": 2}, {"_id": 2, "data": 2}),
464         ("test", {"data": 2}, {"_id": 2, "data": 2}),
465         ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
466     ],
467 )
468 1 def test_del_one(db_memory_with_data, table, db_filter, data):
469 0     result = db_memory_with_data.del_one(table, db_filter)
470 0     assert result == {"deleted": 1}
471 0     assert len(db_memory_with_data.db) == 1
472 0     assert table in db_memory_with_data.db
473 0     assert len(db_memory_with_data.db[table]) == 2
474 0     assert data not in db_memory_with_data.db[table]
475
476
477 1 @pytest.mark.parametrize(
478     "table, db_filter",
479     [
480         ("test", {}),
481         ("test", {"_id": 1}),
482         ("test", {"_id": 2}),
483         ("test", {"data": 1}),
484         ("test", {"data": 2}),
485         ("test", {"_id": 1, "data": 1}),
486         ("test", {"_id": 2, "data": 2}),
487         ("test_table", {}),
488         ("test_table", {"_id": 1}),
489         ("test_table", {"_id": 2}),
490         ("test_table", {"data": 1}),
491         ("test_table", {"data": 2}),
492         ("test_table", {"_id": 1, "data": 1}),
493         ("test_table", {"_id": 2, "data": 2}),
494     ],
495 )
496 1 def test_del_one_with_empty_db_exception(db_memory, table, db_filter):
497 0     with pytest.raises(DbException) as excinfo:
498 0         db_memory.del_one(table, db_filter)
499 0     assert str(excinfo.value) == (
500         empty_exception_message() + del_one_exception_message(db_filter)
501     )
502 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
503
504
505 1 @pytest.mark.parametrize(
506     "table, db_filter",
507     [
508         ("test", {}),
509         ("test", {"_id": 1}),
510         ("test", {"_id": 2}),
511         ("test", {"data": 1}),
512         ("test", {"data": 2}),
513         ("test", {"_id": 1, "data": 1}),
514         ("test", {"_id": 2, "data": 2}),
515         ("test_table", {}),
516         ("test_table", {"_id": 1}),
517         ("test_table", {"_id": 2}),
518         ("test_table", {"data": 1}),
519         ("test_table", {"data": 2}),
520         ("test_table", {"_id": 1, "data": 1}),
521         ("test_table", {"_id": 2, "data": 2}),
522     ],
523 )
524 1 def test_del_one_with_empty_db_none(db_memory, table, db_filter):
525 0     result = db_memory.del_one(table, db_filter, fail_on_empty=False)
526 0     assert result is None
527
528
529 1 @pytest.mark.parametrize(
530     "table, db_filter",
531     [
532         ("test", {"_id": 4}),
533         ("test", {"_id": 5}),
534         ("test", {"data": 4}),
535         ("test", {"data": 5}),
536         ("test", {"_id": 1, "data": 2}),
537         ("test", {"_id": 2, "data": 3}),
538         ("test_table", {}),
539         ("test_table", {"_id": 1}),
540         ("test_table", {"_id": 2}),
541         ("test_table", {"data": 1}),
542         ("test_table", {"data": 2}),
543         ("test_table", {"_id": 1, "data": 1}),
544         ("test_table", {"_id": 2, "data": 2}),
545     ],
546 )
547 1 def test_del_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
548 0     with pytest.raises(DbException) as excinfo:
549 0         db_memory_with_data.del_one(table, db_filter)
550 0     assert str(excinfo.value) == (
551         empty_exception_message() + del_one_exception_message(db_filter)
552     )
553 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
554
555
556 1 @pytest.mark.parametrize(
557     "table, db_filter",
558     [
559         ("test", {"_id": 4}),
560         ("test", {"_id": 5}),
561         ("test", {"data": 4}),
562         ("test", {"data": 5}),
563         ("test", {"_id": 1, "data": 2}),
564         ("test", {"_id": 2, "data": 3}),
565         ("test_table", {}),
566         ("test_table", {"_id": 1}),
567         ("test_table", {"_id": 2}),
568         ("test_table", {"data": 1}),
569         ("test_table", {"data": 2}),
570         ("test_table", {"_id": 1, "data": 1}),
571         ("test_table", {"_id": 2, "data": 2}),
572     ],
573 )
574 1 def test_del_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
575 0     result = db_memory_with_data.del_one(table, db_filter, fail_on_empty=False)
576 0     assert result is None
577
578
579 1 @pytest.mark.parametrize("fail_on_empty", [(True), (False)])
580 1 def test_del_one_generic_exception(db_memory_with_data, fail_on_empty):
581 0     table = "test"
582 0     db_filter = {}
583 0     db_memory_with_data._find = MagicMock(side_effect=Exception())
584 0     with pytest.raises(DbException) as excinfo:
585 0         db_memory_with_data.del_one(table, db_filter, fail_on_empty=fail_on_empty)
586 0     assert str(excinfo.value) == empty_exception_message()
587 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
588
589
590 1 @pytest.mark.parametrize(
591     "table, _id, indata",
592     [
593         ("test", 1, {"_id": 1, "data": 42}),
594         ("test", 1, {"_id": 1, "data": 42, "kk": 34}),
595         ("test", 1, {"_id": 1}),
596         ("test", 2, {"_id": 2, "data": 42}),
597         ("test", 2, {"_id": 2, "data": 42, "kk": 34}),
598         ("test", 2, {"_id": 2}),
599         ("test", 3, {"_id": 3, "data": 42}),
600         ("test", 3, {"_id": 3, "data": 42, "kk": 34}),
601         ("test", 3, {"_id": 3}),
602     ],
603 )
604 1 def test_replace(db_memory_with_data, table, _id, indata):
605 0     result = db_memory_with_data.replace(table, _id, indata)
606 0     assert result == {"updated": 1}
607 0     assert len(db_memory_with_data.db) == 1
608 0     assert table in db_memory_with_data.db
609 0     assert len(db_memory_with_data.db[table]) == 3
610 0     assert indata in db_memory_with_data.db[table]
611
612
613 1 @pytest.mark.parametrize(
614     "table, _id, indata",
615     [
616         ("test", 1, {"_id": 1, "data": 42}),
617         ("test", 2, {"_id": 2}),
618         ("test", 3, {"_id": 3}),
619     ],
620 )
621 1 def test_replace_without_data_exception(db_memory, table, _id, indata):
622 0     with pytest.raises(DbException) as excinfo:
623 0         db_memory.replace(table, _id, indata, fail_on_empty=True)
624 0     assert str(excinfo.value) == (replace_exception_message(_id))
625 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
626
627
628 1 @pytest.mark.parametrize(
629     "table, _id, indata",
630     [
631         ("test", 1, {"_id": 1, "data": 42}),
632         ("test", 2, {"_id": 2}),
633         ("test", 3, {"_id": 3}),
634     ],
635 )
636 1 def test_replace_without_data_none(db_memory, table, _id, indata):
637 0     result = db_memory.replace(table, _id, indata, fail_on_empty=False)
638 0     assert result is None
639
640
641 1 @pytest.mark.parametrize(
642     "table, _id, indata",
643     [
644         ("test", 11, {"_id": 11, "data": 42}),
645         ("test", 12, {"_id": 12}),
646         ("test", 33, {"_id": 33}),
647     ],
648 )
649 1 def test_replace_with_data_exception(db_memory_with_data, table, _id, indata):
650 0     with pytest.raises(DbException) as excinfo:
651 0         db_memory_with_data.replace(table, _id, indata, fail_on_empty=True)
652 0     assert str(excinfo.value) == (replace_exception_message(_id))
653 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
654
655
656 1 @pytest.mark.parametrize(
657     "table, _id, indata",
658     [
659         ("test", 11, {"_id": 11, "data": 42}),
660         ("test", 12, {"_id": 12}),
661         ("test", 33, {"_id": 33}),
662     ],
663 )
664 1 def test_replace_with_data_none(db_memory_with_data, table, _id, indata):
665 0     result = db_memory_with_data.replace(table, _id, indata, fail_on_empty=False)
666 0     assert result is None
667
668
669 1 @pytest.mark.parametrize("fail_on_empty", [True, False])
670 1 def test_replace_generic_exception(db_memory_with_data, fail_on_empty):
671 0     table = "test"
672 0     _id = {}
673 0     indata = {"_id": 1, "data": 1}
674 0     db_memory_with_data._find = MagicMock(side_effect=Exception())
675 0     with pytest.raises(DbException) as excinfo:
676 0         db_memory_with_data.replace(table, _id, indata, fail_on_empty=fail_on_empty)
677 0     assert str(excinfo.value) == empty_exception_message()
678 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
679
680
681 1 @pytest.mark.parametrize(
682     "table, id, data",
683     [
684         ("test", "1", {"data": 1}),
685         ("test", "1", {"data": 2}),
686         ("test", "2", {"data": 1}),
687         ("test", "2", {"data": 2}),
688         ("test_table", "1", {"data": 1}),
689         ("test_table", "1", {"data": 2}),
690         ("test_table", "2", {"data": 1}),
691         ("test_table", "2", {"data": 2}),
692         ("test", "1", {"data_1": 1, "data_2": 2}),
693         ("test", "1", {"data_1": 2, "data_2": 1}),
694         ("test", "2", {"data_1": 1, "data_2": 2}),
695         ("test", "2", {"data_1": 2, "data_2": 1}),
696         ("test_table", "1", {"data_1": 1, "data_2": 2}),
697         ("test_table", "1", {"data_1": 2, "data_2": 1}),
698         ("test_table", "2", {"data_1": 1, "data_2": 2}),
699         ("test_table", "2", {"data_1": 2, "data_2": 1}),
700     ],
701 )
702 1 def test_create_with_empty_db_with_id(db_memory, table, id, data):
703 0     data_to_insert = data
704 0     data_to_insert["_id"] = id
705 0     returned_id = db_memory.create(table, data_to_insert)
706 0     assert returned_id == id
707 0     assert len(db_memory.db) == 1
708 0     assert table in db_memory.db
709 0     assert len(db_memory.db[table]) == 1
710 0     assert data_to_insert in db_memory.db[table]
711
712
713 1 @pytest.mark.parametrize(
714     "table, id, data",
715     [
716         ("test", "4", {"data": 1}),
717         ("test", "5", {"data": 2}),
718         ("test", "4", {"data": 1}),
719         ("test", "5", {"data": 2}),
720         ("test_table", "4", {"data": 1}),
721         ("test_table", "5", {"data": 2}),
722         ("test_table", "4", {"data": 1}),
723         ("test_table", "5", {"data": 2}),
724         ("test", "4", {"data_1": 1, "data_2": 2}),
725         ("test", "5", {"data_1": 2, "data_2": 1}),
726         ("test", "4", {"data_1": 1, "data_2": 2}),
727         ("test", "5", {"data_1": 2, "data_2": 1}),
728         ("test_table", "4", {"data_1": 1, "data_2": 2}),
729         ("test_table", "5", {"data_1": 2, "data_2": 1}),
730         ("test_table", "4", {"data_1": 1, "data_2": 2}),
731         ("test_table", "5", {"data_1": 2, "data_2": 1}),
732     ],
733 )
734 1 def test_create_with_non_empty_db_with_id(db_memory_with_data, table, id, data):
735 0     data_to_insert = data
736 0     data_to_insert["_id"] = id
737 0     returned_id = db_memory_with_data.create(table, data_to_insert)
738 0     assert returned_id == id
739 0     assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
740 0     assert table in db_memory_with_data.db
741 0     assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
742 0     assert data_to_insert in db_memory_with_data.db[table]
743
744
745 1 @pytest.mark.parametrize(
746     "table, data",
747     [
748         ("test", {"data": 1}),
749         ("test", {"data": 2}),
750         ("test", {"data": 1}),
751         ("test", {"data": 2}),
752         ("test_table", {"data": 1}),
753         ("test_table", {"data": 2}),
754         ("test_table", {"data": 1}),
755         ("test_table", {"data": 2}),
756         ("test", {"data_1": 1, "data_2": 2}),
757         ("test", {"data_1": 2, "data_2": 1}),
758         ("test", {"data_1": 1, "data_2": 2}),
759         ("test", {"data_1": 2, "data_2": 1}),
760         ("test_table", {"data_1": 1, "data_2": 2}),
761         ("test_table", {"data_1": 2, "data_2": 1}),
762         ("test_table", {"data_1": 1, "data_2": 2}),
763         ("test_table", {"data_1": 2, "data_2": 1}),
764     ],
765 )
766 1 def test_create_with_empty_db_without_id(db_memory, table, data):
767 0     returned_id = db_memory.create(table, data)
768 0     assert len(db_memory.db) == 1
769 0     assert table in db_memory.db
770 0     assert len(db_memory.db[table]) == 1
771 0     data_inserted = data
772 0     data_inserted["_id"] = returned_id
773 0     assert data_inserted in db_memory.db[table]
774
775
776 1 @pytest.mark.parametrize(
777     "table, data",
778     [
779         ("test", {"data": 1}),
780         ("test", {"data": 2}),
781         ("test", {"data": 1}),
782         ("test", {"data": 2}),
783         ("test_table", {"data": 1}),
784         ("test_table", {"data": 2}),
785         ("test_table", {"data": 1}),
786         ("test_table", {"data": 2}),
787         ("test", {"data_1": 1, "data_2": 2}),
788         ("test", {"data_1": 2, "data_2": 1}),
789         ("test", {"data_1": 1, "data_2": 2}),
790         ("test", {"data_1": 2, "data_2": 1}),
791         ("test_table", {"data_1": 1, "data_2": 2}),
792         ("test_table", {"data_1": 2, "data_2": 1}),
793         ("test_table", {"data_1": 1, "data_2": 2}),
794         ("test_table", {"data_1": 2, "data_2": 1}),
795     ],
796 )
797 1 def test_create_with_non_empty_db_without_id(db_memory_with_data, table, data):
798 0     returned_id = db_memory_with_data.create(table, data)
799 0     assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
800 0     assert table in db_memory_with_data.db
801 0     assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
802 0     data_inserted = data
803 0     data_inserted["_id"] = returned_id
804 0     assert data_inserted in db_memory_with_data.db[table]
805
806
807 1 def test_create_with_exception(db_memory):
808 0     table = "test"
809 0     data = {"_id": 1, "data": 1}
810 0     db_memory.db = MagicMock()
811 0     db_memory.db.__contains__.side_effect = Exception()
812 0     with pytest.raises(DbException) as excinfo:
813 0         db_memory.create(table, data)
814 0     assert str(excinfo.value) == empty_exception_message()
815 0     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
816
817
818 1 @pytest.mark.parametrize(
819     "db_content, update_dict, expected, message",
820     [
821         (
822             {"a": {"none": None}},
823             {"a.b.num": "v"},
824             {"a": {"none": None, "b": {"num": "v"}}},
825             "create dict",
826         ),
827         (
828             {"a": {"none": None}},
829             {"a.none.num": "v"},
830             {"a": {"none": {"num": "v"}}},
831             "create dict over none",
832         ),
833         (
834             {"a": {"b": {"num": 4}}},
835             {"a.b.num": "v"},
836             {"a": {"b": {"num": "v"}}},
837             "replace_number",
838         ),
839         (
840             {"a": {"b": {"num": 4}}},
841             {"a.b.num.c.d": "v"},
842             None,
843             "create dict over number should fail",
844         ),
845         (
846             {"a": {"b": {"num": 4}}},
847             {"a.b": "v"},
848             {"a": {"b": "v"}},
849             "replace dict with a string",
850         ),
851         (
852             {"a": {"b": {"num": 4}}},
853             {"a.b": None},
854             {"a": {"b": None}},
855             "replace dict with None",
856         ),
857         (
858             {"a": [{"b": {"num": 4}}]},
859             {"a.b.num": "v"},
860             None,
861             "create dict over list should fail",
862         ),
863         (
864             {"a": [{"b": {"num": 4}}]},
865             {"a.0.b.num": "v"},
866             {"a": [{"b": {"num": "v"}}]},
867             "set list",
868         ),
869         (
870             {"a": [{"b": {"num": 4}}]},
871             {"a.3.b.num": "v"},
872             {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
873             "expand list",
874         ),
875         ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
876         ({"a": [[4]]}, {"a.0.2": "v"}, {"a": [[4, None, "v"]]}, "expand nested list"),
877         (
878             {"a": [[4]]},
879             {"a.2.2": "v"},
880             {"a": [[4], None, {"2": "v"}]},
881             "expand list and add number key",
882         ),
883     ],
884 )
885 1 def test_set_one(db_memory, db_content, update_dict, expected, message):
886 0     db_memory._find = Mock(return_value=((0, db_content),))
887 0     if expected is None:
888 0         with pytest.raises(DbException) as excinfo:
889 0             db_memory.set_one("table", {}, update_dict)
890 0         assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND, message
891     else:
892 0         db_memory.set_one("table", {}, update_dict)
893 0         assert db_content == expected, message
894
895
896 1 class TestDbMemory(unittest.TestCase):
897     # TODO to delete. This is cover with pytest test_set_one.
898 1     def test_set_one(self):
899 1         test_set = (
900             # (database content, set-content, expected database content (None=fails), message)
901             (
902                 {"a": {"none": None}},
903                 {"a.b.num": "v"},
904                 {"a": {"none": None, "b": {"num": "v"}}},
905                 "create dict",
906             ),
907             (
908                 {"a": {"none": None}},
909                 {"a.none.num": "v"},
910                 {"a": {"none": {"num": "v"}}},
911                 "create dict over none",
912             ),
913             (
914                 {"a": {"b": {"num": 4}}},
915                 {"a.b.num": "v"},
916                 {"a": {"b": {"num": "v"}}},
917                 "replace_number",
918             ),
919             (
920                 {"a": {"b": {"num": 4}}},
921                 {"a.b.num.c.d": "v"},
922                 None,
923                 "create dict over number should fail",
924             ),
925             (
926                 {"a": {"b": {"num": 4}}},
927                 {"a.b": "v"},
928                 {"a": {"b": "v"}},
929                 "replace dict with a string",
930             ),
931             (
932                 {"a": {"b": {"num": 4}}},
933                 {"a.b": None},
934                 {"a": {"b": None}},
935                 "replace dict with None",
936             ),
937             (
938                 {"a": [{"b": {"num": 4}}]},
939                 {"a.b.num": "v"},
940                 None,
941                 "create dict over list should fail",
942             ),
943             (
944                 {"a": [{"b": {"num": 4}}]},
945                 {"a.0.b.num": "v"},
946                 {"a": [{"b": {"num": "v"}}]},
947                 "set list",
948             ),
949             (
950                 {"a": [{"b": {"num": 4}}]},
951                 {"a.3.b.num": "v"},
952                 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
953                 "expand list",
954             ),
955             ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
956             (
957                 {"a": [[4]]},
958                 {"a.0.2": "v"},
959                 {"a": [[4, None, "v"]]},
960                 "expand nested list",
961             ),
962             (
963                 {"a": [[4]]},
964                 {"a.2.2": "v"},
965                 {"a": [[4], None, {"2": "v"}]},
966                 "expand list and add number key",
967             ),
968             ({"a": None}, {"b.c": "v"}, {"a": None, "b": {"c": "v"}}, "expand at root"),
969         )
970 1         db_men = DbMemory()
971 1         db_men._find = Mock()
972 1         for db_content, update_dict, expected, message in test_set:
973 1             db_men._find.return_value = ((0, db_content),)
974 1             if expected is None:
975 1                 self.assertRaises(DbException, db_men.set_one, "table", {}, update_dict)
976             else:
977 1                 db_men.set_one("table", {}, update_dict)
978 1                 self.assertEqual(db_content, expected, message)
979
980 1     def test_set_one_pull(self):
981 1         example = {"a": [1, "1", 1], "d": {}, "n": None}
982 1         test_set = (
983             # (database content, set-content, expected database content (None=fails), message)
984             (example, {"a": "1"}, {"a": [1, 1], "d": {}, "n": None}, "pull one item"),
985             (example, {"a": 1}, {"a": ["1"], "d": {}, "n": None}, "pull two items"),
986             (example, {"a": "v"}, example, "pull non existing item"),
987             (example, {"a.6": 1}, example, "pull non existing arrray"),
988             (example, {"d.b.c": 1}, example, "pull non existing arrray2"),
989             (example, {"b": 1}, example, "pull non existing arrray3"),
990             (example, {"d": 1}, None, "pull over dict"),
991             (example, {"n": 1}, None, "pull over None"),
992         )
993 1         db_men = DbMemory()
994 1         db_men._find = Mock()
995 1         for db_content, pull_dict, expected, message in test_set:
996 1             db_content = deepcopy(db_content)
997 1             db_men._find.return_value = ((0, db_content),)
998 1             if expected is None:
999 1                 self.assertRaises(
1000                     DbException,
1001                     db_men.set_one,
1002                     "table",
1003                     {},
1004                     None,
1005                     fail_on_empty=False,
1006                     pull=pull_dict,
1007                 )
1008             else:
1009 1                 db_men.set_one("table", {}, None, pull=pull_dict)
1010 1                 self.assertEqual(db_content, expected, message)
1011
1012 1     def test_set_one_push(self):
1013 1         example = {"a": [1, "1", 1], "d": {}, "n": None}
1014 1         test_set = (
1015             # (database content, set-content, expected database content (None=fails), message)
1016             (
1017                 example,
1018                 {"d.b.c": 1},
1019                 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1020                 "push non existing arrray2",
1021             ),
1022             (
1023                 example,
1024                 {"b": 1},
1025                 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1026                 "push non existing arrray3",
1027             ),
1028             (
1029                 example,
1030                 {"a.6": 1},
1031                 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1032                 "push non existing arrray",
1033             ),
1034             (
1035                 example,
1036                 {"a": 2},
1037                 {"a": [1, "1", 1, 2], "d": {}, "n": None},
1038                 "push one item",
1039             ),
1040             (
1041                 example,
1042                 {"a": {1: 1}},
1043                 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1044                 "push a dict",
1045             ),
1046             (example, {"d": 1}, None, "push over dict"),
1047             (example, {"n": 1}, None, "push over None"),
1048         )
1049 1         db_men = DbMemory()
1050 1         db_men._find = Mock()
1051 1         for db_content, push_dict, expected, message in test_set:
1052 1             db_content = deepcopy(db_content)
1053 1             db_men._find.return_value = ((0, db_content),)
1054 1             if expected is None:
1055 1                 self.assertRaises(
1056                     DbException,
1057                     db_men.set_one,
1058                     "table",
1059                     {},
1060                     None,
1061                     fail_on_empty=False,
1062                     push=push_dict,
1063                 )
1064             else:
1065 1                 db_men.set_one("table", {}, None, push=push_dict)
1066 1                 self.assertEqual(db_content, expected, message)
1067
1068 1     def test_set_one_push_list(self):
1069 1         example = {"a": [1, "1", 1], "d": {}, "n": None}
1070 1         test_set = (
1071             # (database content, set-content, expected database content (None=fails), message)
1072             (
1073                 example,
1074                 {"d.b.c": [1]},
1075                 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1076                 "push non existing arrray2",
1077             ),
1078             (
1079                 example,
1080                 {"b": [1]},
1081                 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1082                 "push non existing arrray3",
1083             ),
1084             (
1085                 example,
1086                 {"a.6": [1]},
1087                 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1088                 "push non existing arrray",
1089             ),
1090             (
1091                 example,
1092                 {"a": [2, 3]},
1093                 {"a": [1, "1", 1, 2, 3], "d": {}, "n": None},
1094                 "push two item",
1095             ),
1096             (
1097                 example,
1098                 {"a": [{1: 1}]},
1099                 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1100                 "push a dict",
1101             ),
1102             (example, {"d": [1]}, None, "push over dict"),
1103             (example, {"n": [1]}, None, "push over None"),
1104             (example, {"a": 1}, None, "invalid push list non an array"),
1105         )
1106 1         db_men = DbMemory()
1107 1         db_men._find = Mock()
1108 1         for db_content, push_list, expected, message in test_set:
1109 1             db_content = deepcopy(db_content)
1110 1             db_men._find.return_value = ((0, db_content),)
1111 1             if expected is None:
1112 1                 self.assertRaises(
1113                     DbException,
1114                     db_men.set_one,
1115                     "table",
1116                     {},
1117                     None,
1118                     fail_on_empty=False,
1119                     push_list=push_list,
1120                 )
1121             else:
1122 1                 db_men.set_one("table", {}, None, push_list=push_list)
1123 1                 self.assertEqual(db_content, expected, message)
1124
1125 1     def test_unset_one(self):
1126 1         example = {"a": [1, "1", 1], "d": {}, "n": None}
1127 1         test_set = (
1128             # (database content, set-content, expected database content (None=fails), message)
1129             (example, {"d.b.c": 1}, example, "unset non existing"),
1130             (example, {"b": 1}, example, "unset non existing"),
1131             (example, {"a.6": 1}, example, "unset non existing arrray"),
1132             (example, {"a": 2}, {"d": {}, "n": None}, "unset array"),
1133             (example, {"d": 1}, {"a": [1, "1", 1], "n": None}, "unset dict"),
1134             (example, {"n": 1}, {"a": [1, "1", 1], "d": {}}, "unset None"),
1135         )
1136 1         db_men = DbMemory()
1137 1         db_men._find = Mock()
1138 1         for db_content, unset_dict, expected, message in test_set:
1139 1             db_content = deepcopy(db_content)
1140 1             db_men._find.return_value = ((0, db_content),)
1141 1             if expected is None:
1142 0                 self.assertRaises(
1143                     DbException,
1144                     db_men.set_one,
1145                     "table",
1146                     {},
1147                     None,
1148                     fail_on_empty=False,
1149                     unset=unset_dict,
1150                 )
1151             else:
1152 1                 db_men.set_one("table", {}, None, unset=unset_dict)
1153 1                 self.assertEqual(db_content, expected, message)