3c4552783fa5b67b2d2f739b28ce26496052861e
[osm/common.git] / osm_common / tests / test_dbmemory.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19
20 import http
21 import logging
22 import pytest
23 import unittest
24 from unittest.mock import Mock
25
26 from unittest.mock import MagicMock
27 from osm_common.dbbase import DbException
28 from osm_common.dbmemory import DbMemory
29 from copy import deepcopy
30
31 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
32
33
34 @pytest.fixture(scope="function", params=[True, False])
35 def db_memory(request):
36 db = DbMemory(lock=request.param)
37 return db
38
39
40 @pytest.fixture(scope="function", params=[True, False])
41 def db_memory_with_data(request):
42 db = DbMemory(lock=request.param)
43
44 db.create("test", {"_id": 1, "data": 1})
45 db.create("test", {"_id": 2, "data": 2})
46 db.create("test", {"_id": 3, "data": 3})
47
48 return db
49
50
51 @pytest.fixture(scope="function")
52 def db_memory_with_many_data(request):
53 db = DbMemory(lock=False)
54
55 db.create_list(
56 "test",
57 [
58 {
59 "_id": 1,
60 "data": {"data2": {"data3": 1}},
61 "list": [{"a": 1}],
62 "text": "sometext",
63 },
64 {
65 "_id": 2,
66 "data": {"data2": {"data3": 2}},
67 "list": [{"a": 2}],
68 "list2": [1, 2, 3],
69 },
70 {"_id": 3, "data": {"data2": {"data3": 3}}, "list": [{"a": 3}]},
71 {"_id": 4, "data": {"data2": {"data3": 4}}, "list": [{"a": 4}, {"a": 0}]},
72 {"_id": 5, "data": {"data2": {"data3": 5}}, "list": [{"a": 5}]},
73 {"_id": 6, "data": {"data2": {"data3": 6}}, "list": [{"0": {"a": 1}}]},
74 {"_id": 7, "data": {"data2": {"data3": 7}}, "0": {"a": 0}},
75 {
76 "_id": 8,
77 "list": [
78 {"a": 3, "b": 0, "c": [{"a": 3, "b": 1}, {"a": 0, "b": "v"}]},
79 {"a": 0, "b": 1},
80 ],
81 },
82 ],
83 )
84 return db
85
86
87 def empty_exception_message():
88 return "database exception "
89
90
91 def get_one_exception_message(db_filter):
92 return "database exception Not found entry with filter='{}'".format(db_filter)
93
94
95 def get_one_multiple_exception_message(db_filter):
96 return "database exception Found more than one entry with filter='{}'".format(
97 db_filter
98 )
99
100
101 def del_one_exception_message(db_filter):
102 return "database exception Not found entry with filter='{}'".format(db_filter)
103
104
105 def replace_exception_message(value):
106 return "database exception Not found entry with _id='{}'".format(value)
107
108
109 def test_constructor():
110 db = DbMemory()
111 assert db.logger == logging.getLogger("db")
112 assert db.db == {}
113
114
115 def test_constructor_with_logger():
116 logger_name = "db_local"
117 db = DbMemory(logger_name=logger_name)
118 assert db.logger == logging.getLogger(logger_name)
119 assert db.db == {}
120
121
122 def test_db_connect():
123 logger_name = "db_local"
124 config = {"logger_name": logger_name}
125 db = DbMemory()
126 db.db_connect(config)
127 assert db.logger == logging.getLogger(logger_name)
128 assert db.db == {}
129
130
131 def test_db_disconnect(db_memory):
132 db_memory.db_disconnect()
133
134
135 @pytest.mark.parametrize(
136 "table, db_filter",
137 [
138 ("test", {}),
139 ("test", {"_id": 1}),
140 ("test", {"data": 1}),
141 ("test", {"_id": 1, "data": 1}),
142 ],
143 )
144 def test_get_list_with_empty_db(db_memory, table, db_filter):
145 result = db_memory.get_list(table, db_filter)
146 assert len(result) == 0
147
148
149 @pytest.mark.parametrize(
150 "table, db_filter, expected_data",
151 [
152 (
153 "test",
154 {},
155 [{"_id": 1, "data": 1}, {"_id": 2, "data": 2}, {"_id": 3, "data": 3}],
156 ),
157 ("test", {"_id": 1}, [{"_id": 1, "data": 1}]),
158 ("test", {"data": 1}, [{"_id": 1, "data": 1}]),
159 ("test", {"_id": 1, "data": 1}, [{"_id": 1, "data": 1}]),
160 ("test", {"_id": 2}, [{"_id": 2, "data": 2}]),
161 ("test", {"data": 2}, [{"_id": 2, "data": 2}]),
162 ("test", {"_id": 2, "data": 2}, [{"_id": 2, "data": 2}]),
163 ("test", {"_id": 4}, []),
164 ("test", {"data": 4}, []),
165 ("test", {"_id": 4, "data": 4}, []),
166 ("test_table", {}, []),
167 ("test_table", {"_id": 1}, []),
168 ("test_table", {"data": 1}, []),
169 ("test_table", {"_id": 1, "data": 1}, []),
170 ],
171 )
172 def test_get_list_with_non_empty_db(
173 db_memory_with_data, table, db_filter, expected_data
174 ):
175 result = db_memory_with_data.get_list(table, db_filter)
176 assert len(result) == len(expected_data)
177 for data in expected_data:
178 assert data in result
179
180
181 def test_get_list_exception(db_memory_with_data):
182 table = "test"
183 db_filter = {}
184 db_memory_with_data._find = MagicMock(side_effect=Exception())
185 with pytest.raises(DbException) as excinfo:
186 db_memory_with_data.get_list(table, db_filter)
187 assert str(excinfo.value) == empty_exception_message()
188 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
189
190
191 @pytest.mark.parametrize(
192 "table, db_filter, expected_data",
193 [
194 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
195 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
196 ("test", {"_id": 3}, {"_id": 3, "data": 3}),
197 ("test", {"data": 1}, {"_id": 1, "data": 1}),
198 ("test", {"data": 2}, {"_id": 2, "data": 2}),
199 ("test", {"data": 3}, {"_id": 3, "data": 3}),
200 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
201 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
202 ("test", {"_id": 3, "data": 3}, {"_id": 3, "data": 3}),
203 ],
204 )
205 def test_get_one(db_memory_with_data, table, db_filter, expected_data):
206 result = db_memory_with_data.get_one(table, db_filter)
207 assert result == expected_data
208 assert len(db_memory_with_data.db) == 1
209 assert table in db_memory_with_data.db
210 assert len(db_memory_with_data.db[table]) == 3
211 assert result in db_memory_with_data.db[table]
212
213
214 @pytest.mark.parametrize(
215 "db_filter, expected_ids",
216 [
217 ({}, [1, 2, 3, 4, 5, 6, 7, 8]),
218 ({"_id": 1}, [1]),
219 ({"data.data2.data3": 2}, [2]),
220 ({"data.data2.data3.eq": 2}, [2]),
221 ({"data.data2.data3": [2]}, [2]),
222 ({"data.data2.data3.cont": [2]}, [2]),
223 ({"data.data2.data3.neq": 2}, [1, 3, 4, 5, 6, 7, 8]),
224 ({"data.data2.data3.neq": [2]}, [1, 3, 4, 5, 6, 7, 8]),
225 ({"data.data2.data3.ncont": [2]}, [1, 3, 4, 5, 6, 7, 8]),
226 ({"data.data2.data3": [2, 3]}, [2, 3]),
227 ({"data.data2.data3.gt": 4}, [5, 6, 7]),
228 ({"data.data2.data3.gte": 4}, [4, 5, 6, 7]),
229 ({"data.data2.data3.lt": 4}, [1, 2, 3]),
230 ({"data.data2.data3.lte": 4}, [1, 2, 3, 4]),
231 ({"data.data2.data3.lte": 4.5}, [1, 2, 3, 4]),
232 ({"data.data2.data3.gt": "text"}, []),
233 ({"nonexist.nonexist": "4"}, []),
234 ({"nonexist.nonexist": None}, [1, 2, 3, 4, 5, 6, 7, 8]),
235 ({"nonexist.nonexist.neq": "4"}, [1, 2, 3, 4, 5, 6, 7, 8]),
236 ({"nonexist.nonexist.neq": None}, []),
237 ({"text.eq": "sometext"}, [1]),
238 ({"text.neq": "sometext"}, [2, 3, 4, 5, 6, 7, 8]),
239 ({"text.eq": "somet"}, []),
240 ({"text.gte": "a"}, [1]),
241 ({"text.gte": "somet"}, [1]),
242 ({"text.gte": "sometext"}, [1]),
243 ({"text.lt": "somet"}, []),
244 ({"data.data2.data3": 2, "data.data2.data4": None}, [2]),
245 ({"data.data2.data3": 2, "data.data2.data4": 5}, []),
246 ({"data.data2.data3": 4}, [4]),
247 ({"data.data2.data3": [3, 4, "e"]}, [3, 4]),
248 ({"data.data2.data3": None}, [8]),
249 ({"data.data2": "4"}, []),
250 ({"list.0.a": 1}, [1, 6]),
251 ({"list2": 1}, [2]),
252 ({"list2": [1, 5]}, [2]),
253 ({"list2": [1, 2]}, [2]),
254 ({"list2": [5, 7]}, []),
255 ({"list.ANYINDEX.a": 1}, [1]),
256 ({"list.a": 3, "list.b": 1}, [8]),
257 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 1}, []),
258 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.a": 3}, [8]),
259 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 0}, [8]),
260 (
261 {
262 "list.ANYINDEX.a": 3,
263 "list.ANYINDEX.c.ANYINDEX.a": 0,
264 "list.ANYINDEX.c.ANYINDEX.b": "v",
265 },
266 [8],
267 ),
268 (
269 {
270 "list.ANYINDEX.a": 3,
271 "list.ANYINDEX.c.ANYINDEX.a": 0,
272 "list.ANYINDEX.c.ANYINDEX.b": 1,
273 },
274 [],
275 ),
276 ({"list.c.b": 1}, [8]),
277 ({"list.c.b": None}, [1, 2, 3, 4, 5, 6, 7]),
278 # ({"data.data2.data3": 4}, []),
279 # ({"data.data2.data3": 4}, []),
280 ],
281 )
282 def test_get_list(db_memory_with_many_data, db_filter, expected_ids):
283 result = db_memory_with_many_data.get_list("test", db_filter)
284 assert isinstance(result, list)
285 result_ids = [item["_id"] for item in result]
286 assert len(result) == len(
287 expected_ids
288 ), "for db_filter={} result={} expected_ids={}".format(
289 db_filter, result, result_ids
290 )
291 assert result_ids == expected_ids
292 for i in range(len(result)):
293 assert result[i] in db_memory_with_many_data.db["test"]
294
295 assert len(db_memory_with_many_data.db) == 1
296 assert "test" in db_memory_with_many_data.db
297 assert len(db_memory_with_many_data.db["test"]) == 8
298 result = db_memory_with_many_data.count("test", db_filter)
299 assert result == len(expected_ids)
300
301
302 @pytest.mark.parametrize(
303 "table, db_filter, expected_data", [("test", {}, {"_id": 1, "data": 1})]
304 )
305 def test_get_one_with_multiple_results(
306 db_memory_with_data, table, db_filter, expected_data
307 ):
308 result = db_memory_with_data.get_one(table, db_filter, fail_on_more=False)
309 assert result == expected_data
310 assert len(db_memory_with_data.db) == 1
311 assert table in db_memory_with_data.db
312 assert len(db_memory_with_data.db[table]) == 3
313 assert result in db_memory_with_data.db[table]
314
315
316 def test_get_one_with_multiple_results_exception(db_memory_with_data):
317 table = "test"
318 db_filter = {}
319 with pytest.raises(DbException) as excinfo:
320 db_memory_with_data.get_one(table, db_filter)
321 assert str(excinfo.value) == (
322 empty_exception_message() + get_one_multiple_exception_message(db_filter)
323 )
324 # assert excinfo.value.http_code == http.HTTPStatus.CONFLICT
325
326
327 @pytest.mark.parametrize(
328 "table, db_filter",
329 [
330 ("test", {"_id": 4}),
331 ("test", {"data": 4}),
332 ("test", {"_id": 4, "data": 4}),
333 ("test_table", {"_id": 4}),
334 ("test_table", {"data": 4}),
335 ("test_table", {"_id": 4, "data": 4}),
336 ],
337 )
338 def test_get_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
339 with pytest.raises(DbException) as excinfo:
340 db_memory_with_data.get_one(table, db_filter)
341 assert str(excinfo.value) == (
342 empty_exception_message() + get_one_exception_message(db_filter)
343 )
344 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
345
346
347 @pytest.mark.parametrize(
348 "table, db_filter",
349 [
350 ("test", {"_id": 4}),
351 ("test", {"data": 4}),
352 ("test", {"_id": 4, "data": 4}),
353 ("test_table", {"_id": 4}),
354 ("test_table", {"data": 4}),
355 ("test_table", {"_id": 4, "data": 4}),
356 ],
357 )
358 def test_get_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
359 result = db_memory_with_data.get_one(table, db_filter, fail_on_empty=False)
360 assert result is None
361
362
363 @pytest.mark.parametrize(
364 "table, db_filter",
365 [
366 ("test", {"_id": 4}),
367 ("test", {"data": 4}),
368 ("test", {"_id": 4, "data": 4}),
369 ("test_table", {"_id": 4}),
370 ("test_table", {"data": 4}),
371 ("test_table", {"_id": 4, "data": 4}),
372 ],
373 )
374 def test_get_one_with_empty_db_exception(db_memory, table, db_filter):
375 with pytest.raises(DbException) as excinfo:
376 db_memory.get_one(table, db_filter)
377 assert str(excinfo.value) == (
378 empty_exception_message() + get_one_exception_message(db_filter)
379 )
380 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
381
382
383 @pytest.mark.parametrize(
384 "table, db_filter",
385 [
386 ("test", {"_id": 4}),
387 ("test", {"data": 4}),
388 ("test", {"_id": 4, "data": 4}),
389 ("test_table", {"_id": 4}),
390 ("test_table", {"data": 4}),
391 ("test_table", {"_id": 4, "data": 4}),
392 ],
393 )
394 def test_get_one_with_empty_db_none(db_memory, table, db_filter):
395 result = db_memory.get_one(table, db_filter, fail_on_empty=False)
396 assert result is None
397
398
399 def test_get_one_generic_exception(db_memory_with_data):
400 table = "test"
401 db_filter = {}
402 db_memory_with_data._find = MagicMock(side_effect=Exception())
403 with pytest.raises(DbException) as excinfo:
404 db_memory_with_data.get_one(table, db_filter)
405 assert str(excinfo.value) == empty_exception_message()
406 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
407
408
409 @pytest.mark.parametrize(
410 "table, db_filter, expected_data",
411 [
412 ("test", {}, []),
413 ("test", {"_id": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
414 ("test", {"_id": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
415 ("test", {"_id": 1, "data": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
416 ("test", {"_id": 2, "data": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
417 ],
418 )
419 def test_del_list_with_non_empty_db(
420 db_memory_with_data, table, db_filter, expected_data
421 ):
422 result = db_memory_with_data.del_list(table, db_filter)
423 assert result["deleted"] == (3 - len(expected_data))
424 assert len(db_memory_with_data.db) == 1
425 assert table in db_memory_with_data.db
426 assert len(db_memory_with_data.db[table]) == len(expected_data)
427 for data in expected_data:
428 assert data in db_memory_with_data.db[table]
429
430
431 @pytest.mark.parametrize(
432 "table, db_filter",
433 [
434 ("test", {}),
435 ("test", {"_id": 1}),
436 ("test", {"_id": 2}),
437 ("test", {"data": 1}),
438 ("test", {"data": 2}),
439 ("test", {"_id": 1, "data": 1}),
440 ("test", {"_id": 2, "data": 2}),
441 ],
442 )
443 def test_del_list_with_empty_db(db_memory, table, db_filter):
444 result = db_memory.del_list(table, db_filter)
445 assert result["deleted"] == 0
446
447
448 def test_del_list_generic_exception(db_memory_with_data):
449 table = "test"
450 db_filter = {}
451 db_memory_with_data._find = MagicMock(side_effect=Exception())
452 with pytest.raises(DbException) as excinfo:
453 db_memory_with_data.del_list(table, db_filter)
454 assert str(excinfo.value) == empty_exception_message()
455 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
456
457
458 @pytest.mark.parametrize(
459 "table, db_filter, data",
460 [
461 ("test", {}, {"_id": 1, "data": 1}),
462 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
463 ("test", {"data": 1}, {"_id": 1, "data": 1}),
464 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
465 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
466 ("test", {"data": 2}, {"_id": 2, "data": 2}),
467 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
468 ],
469 )
470 def test_del_one(db_memory_with_data, table, db_filter, data):
471 result = db_memory_with_data.del_one(table, db_filter)
472 assert result == {"deleted": 1}
473 assert len(db_memory_with_data.db) == 1
474 assert table in db_memory_with_data.db
475 assert len(db_memory_with_data.db[table]) == 2
476 assert data not in db_memory_with_data.db[table]
477
478
479 @pytest.mark.parametrize(
480 "table, db_filter",
481 [
482 ("test", {}),
483 ("test", {"_id": 1}),
484 ("test", {"_id": 2}),
485 ("test", {"data": 1}),
486 ("test", {"data": 2}),
487 ("test", {"_id": 1, "data": 1}),
488 ("test", {"_id": 2, "data": 2}),
489 ("test_table", {}),
490 ("test_table", {"_id": 1}),
491 ("test_table", {"_id": 2}),
492 ("test_table", {"data": 1}),
493 ("test_table", {"data": 2}),
494 ("test_table", {"_id": 1, "data": 1}),
495 ("test_table", {"_id": 2, "data": 2}),
496 ],
497 )
498 def test_del_one_with_empty_db_exception(db_memory, table, db_filter):
499 with pytest.raises(DbException) as excinfo:
500 db_memory.del_one(table, db_filter)
501 assert str(excinfo.value) == (
502 empty_exception_message() + del_one_exception_message(db_filter)
503 )
504 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
505
506
507 @pytest.mark.parametrize(
508 "table, db_filter",
509 [
510 ("test", {}),
511 ("test", {"_id": 1}),
512 ("test", {"_id": 2}),
513 ("test", {"data": 1}),
514 ("test", {"data": 2}),
515 ("test", {"_id": 1, "data": 1}),
516 ("test", {"_id": 2, "data": 2}),
517 ("test_table", {}),
518 ("test_table", {"_id": 1}),
519 ("test_table", {"_id": 2}),
520 ("test_table", {"data": 1}),
521 ("test_table", {"data": 2}),
522 ("test_table", {"_id": 1, "data": 1}),
523 ("test_table", {"_id": 2, "data": 2}),
524 ],
525 )
526 def test_del_one_with_empty_db_none(db_memory, table, db_filter):
527 result = db_memory.del_one(table, db_filter, fail_on_empty=False)
528 assert result is None
529
530
531 @pytest.mark.parametrize(
532 "table, db_filter",
533 [
534 ("test", {"_id": 4}),
535 ("test", {"_id": 5}),
536 ("test", {"data": 4}),
537 ("test", {"data": 5}),
538 ("test", {"_id": 1, "data": 2}),
539 ("test", {"_id": 2, "data": 3}),
540 ("test_table", {}),
541 ("test_table", {"_id": 1}),
542 ("test_table", {"_id": 2}),
543 ("test_table", {"data": 1}),
544 ("test_table", {"data": 2}),
545 ("test_table", {"_id": 1, "data": 1}),
546 ("test_table", {"_id": 2, "data": 2}),
547 ],
548 )
549 def test_del_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
550 with pytest.raises(DbException) as excinfo:
551 db_memory_with_data.del_one(table, db_filter)
552 assert str(excinfo.value) == (
553 empty_exception_message() + del_one_exception_message(db_filter)
554 )
555 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
556
557
558 @pytest.mark.parametrize(
559 "table, db_filter",
560 [
561 ("test", {"_id": 4}),
562 ("test", {"_id": 5}),
563 ("test", {"data": 4}),
564 ("test", {"data": 5}),
565 ("test", {"_id": 1, "data": 2}),
566 ("test", {"_id": 2, "data": 3}),
567 ("test_table", {}),
568 ("test_table", {"_id": 1}),
569 ("test_table", {"_id": 2}),
570 ("test_table", {"data": 1}),
571 ("test_table", {"data": 2}),
572 ("test_table", {"_id": 1, "data": 1}),
573 ("test_table", {"_id": 2, "data": 2}),
574 ],
575 )
576 def test_del_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
577 result = db_memory_with_data.del_one(table, db_filter, fail_on_empty=False)
578 assert result is None
579
580
581 @pytest.mark.parametrize("fail_on_empty", [(True), (False)])
582 def test_del_one_generic_exception(db_memory_with_data, fail_on_empty):
583 table = "test"
584 db_filter = {}
585 db_memory_with_data._find = MagicMock(side_effect=Exception())
586 with pytest.raises(DbException) as excinfo:
587 db_memory_with_data.del_one(table, db_filter, fail_on_empty=fail_on_empty)
588 assert str(excinfo.value) == empty_exception_message()
589 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
590
591
592 @pytest.mark.parametrize(
593 "table, _id, indata",
594 [
595 ("test", 1, {"_id": 1, "data": 42}),
596 ("test", 1, {"_id": 1, "data": 42, "kk": 34}),
597 ("test", 1, {"_id": 1}),
598 ("test", 2, {"_id": 2, "data": 42}),
599 ("test", 2, {"_id": 2, "data": 42, "kk": 34}),
600 ("test", 2, {"_id": 2}),
601 ("test", 3, {"_id": 3, "data": 42}),
602 ("test", 3, {"_id": 3, "data": 42, "kk": 34}),
603 ("test", 3, {"_id": 3}),
604 ],
605 )
606 def test_replace(db_memory_with_data, table, _id, indata):
607 result = db_memory_with_data.replace(table, _id, indata)
608 assert result == {"updated": 1}
609 assert len(db_memory_with_data.db) == 1
610 assert table in db_memory_with_data.db
611 assert len(db_memory_with_data.db[table]) == 3
612 assert indata in db_memory_with_data.db[table]
613
614
615 @pytest.mark.parametrize(
616 "table, _id, indata",
617 [
618 ("test", 1, {"_id": 1, "data": 42}),
619 ("test", 2, {"_id": 2}),
620 ("test", 3, {"_id": 3}),
621 ],
622 )
623 def test_replace_without_data_exception(db_memory, table, _id, indata):
624 with pytest.raises(DbException) as excinfo:
625 db_memory.replace(table, _id, indata, fail_on_empty=True)
626 assert str(excinfo.value) == (replace_exception_message(_id))
627 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
628
629
630 @pytest.mark.parametrize(
631 "table, _id, indata",
632 [
633 ("test", 1, {"_id": 1, "data": 42}),
634 ("test", 2, {"_id": 2}),
635 ("test", 3, {"_id": 3}),
636 ],
637 )
638 def test_replace_without_data_none(db_memory, table, _id, indata):
639 result = db_memory.replace(table, _id, indata, fail_on_empty=False)
640 assert result is None
641
642
643 @pytest.mark.parametrize(
644 "table, _id, indata",
645 [
646 ("test", 11, {"_id": 11, "data": 42}),
647 ("test", 12, {"_id": 12}),
648 ("test", 33, {"_id": 33}),
649 ],
650 )
651 def test_replace_with_data_exception(db_memory_with_data, table, _id, indata):
652 with pytest.raises(DbException) as excinfo:
653 db_memory_with_data.replace(table, _id, indata, fail_on_empty=True)
654 assert str(excinfo.value) == (replace_exception_message(_id))
655 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
656
657
658 @pytest.mark.parametrize(
659 "table, _id, indata",
660 [
661 ("test", 11, {"_id": 11, "data": 42}),
662 ("test", 12, {"_id": 12}),
663 ("test", 33, {"_id": 33}),
664 ],
665 )
666 def test_replace_with_data_none(db_memory_with_data, table, _id, indata):
667 result = db_memory_with_data.replace(table, _id, indata, fail_on_empty=False)
668 assert result is None
669
670
671 @pytest.mark.parametrize("fail_on_empty", [True, False])
672 def test_replace_generic_exception(db_memory_with_data, fail_on_empty):
673 table = "test"
674 _id = {}
675 indata = {"_id": 1, "data": 1}
676 db_memory_with_data._find = MagicMock(side_effect=Exception())
677 with pytest.raises(DbException) as excinfo:
678 db_memory_with_data.replace(table, _id, indata, fail_on_empty=fail_on_empty)
679 assert str(excinfo.value) == empty_exception_message()
680 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
681
682
683 @pytest.mark.parametrize(
684 "table, id, data",
685 [
686 ("test", "1", {"data": 1}),
687 ("test", "1", {"data": 2}),
688 ("test", "2", {"data": 1}),
689 ("test", "2", {"data": 2}),
690 ("test_table", "1", {"data": 1}),
691 ("test_table", "1", {"data": 2}),
692 ("test_table", "2", {"data": 1}),
693 ("test_table", "2", {"data": 2}),
694 ("test", "1", {"data_1": 1, "data_2": 2}),
695 ("test", "1", {"data_1": 2, "data_2": 1}),
696 ("test", "2", {"data_1": 1, "data_2": 2}),
697 ("test", "2", {"data_1": 2, "data_2": 1}),
698 ("test_table", "1", {"data_1": 1, "data_2": 2}),
699 ("test_table", "1", {"data_1": 2, "data_2": 1}),
700 ("test_table", "2", {"data_1": 1, "data_2": 2}),
701 ("test_table", "2", {"data_1": 2, "data_2": 1}),
702 ],
703 )
704 def test_create_with_empty_db_with_id(db_memory, table, id, data):
705 data_to_insert = data
706 data_to_insert["_id"] = id
707 returned_id = db_memory.create(table, data_to_insert)
708 assert returned_id == id
709 assert len(db_memory.db) == 1
710 assert table in db_memory.db
711 assert len(db_memory.db[table]) == 1
712 assert data_to_insert in db_memory.db[table]
713
714
715 @pytest.mark.parametrize(
716 "table, id, data",
717 [
718 ("test", "4", {"data": 1}),
719 ("test", "5", {"data": 2}),
720 ("test", "4", {"data": 1}),
721 ("test", "5", {"data": 2}),
722 ("test_table", "4", {"data": 1}),
723 ("test_table", "5", {"data": 2}),
724 ("test_table", "4", {"data": 1}),
725 ("test_table", "5", {"data": 2}),
726 ("test", "4", {"data_1": 1, "data_2": 2}),
727 ("test", "5", {"data_1": 2, "data_2": 1}),
728 ("test", "4", {"data_1": 1, "data_2": 2}),
729 ("test", "5", {"data_1": 2, "data_2": 1}),
730 ("test_table", "4", {"data_1": 1, "data_2": 2}),
731 ("test_table", "5", {"data_1": 2, "data_2": 1}),
732 ("test_table", "4", {"data_1": 1, "data_2": 2}),
733 ("test_table", "5", {"data_1": 2, "data_2": 1}),
734 ],
735 )
736 def test_create_with_non_empty_db_with_id(db_memory_with_data, table, id, data):
737 data_to_insert = data
738 data_to_insert["_id"] = id
739 returned_id = db_memory_with_data.create(table, data_to_insert)
740 assert returned_id == id
741 assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
742 assert table in db_memory_with_data.db
743 assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
744 assert data_to_insert in db_memory_with_data.db[table]
745
746
747 @pytest.mark.parametrize(
748 "table, data",
749 [
750 ("test", {"data": 1}),
751 ("test", {"data": 2}),
752 ("test", {"data": 1}),
753 ("test", {"data": 2}),
754 ("test_table", {"data": 1}),
755 ("test_table", {"data": 2}),
756 ("test_table", {"data": 1}),
757 ("test_table", {"data": 2}),
758 ("test", {"data_1": 1, "data_2": 2}),
759 ("test", {"data_1": 2, "data_2": 1}),
760 ("test", {"data_1": 1, "data_2": 2}),
761 ("test", {"data_1": 2, "data_2": 1}),
762 ("test_table", {"data_1": 1, "data_2": 2}),
763 ("test_table", {"data_1": 2, "data_2": 1}),
764 ("test_table", {"data_1": 1, "data_2": 2}),
765 ("test_table", {"data_1": 2, "data_2": 1}),
766 ],
767 )
768 def test_create_with_empty_db_without_id(db_memory, table, data):
769 returned_id = db_memory.create(table, data)
770 assert len(db_memory.db) == 1
771 assert table in db_memory.db
772 assert len(db_memory.db[table]) == 1
773 data_inserted = data
774 data_inserted["_id"] = returned_id
775 assert data_inserted in db_memory.db[table]
776
777
778 @pytest.mark.parametrize(
779 "table, data",
780 [
781 ("test", {"data": 1}),
782 ("test", {"data": 2}),
783 ("test", {"data": 1}),
784 ("test", {"data": 2}),
785 ("test_table", {"data": 1}),
786 ("test_table", {"data": 2}),
787 ("test_table", {"data": 1}),
788 ("test_table", {"data": 2}),
789 ("test", {"data_1": 1, "data_2": 2}),
790 ("test", {"data_1": 2, "data_2": 1}),
791 ("test", {"data_1": 1, "data_2": 2}),
792 ("test", {"data_1": 2, "data_2": 1}),
793 ("test_table", {"data_1": 1, "data_2": 2}),
794 ("test_table", {"data_1": 2, "data_2": 1}),
795 ("test_table", {"data_1": 1, "data_2": 2}),
796 ("test_table", {"data_1": 2, "data_2": 1}),
797 ],
798 )
799 def test_create_with_non_empty_db_without_id(db_memory_with_data, table, data):
800 returned_id = db_memory_with_data.create(table, data)
801 assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
802 assert table in db_memory_with_data.db
803 assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
804 data_inserted = data
805 data_inserted["_id"] = returned_id
806 assert data_inserted in db_memory_with_data.db[table]
807
808
809 def test_create_with_exception(db_memory):
810 table = "test"
811 data = {"_id": 1, "data": 1}
812 db_memory.db = MagicMock()
813 db_memory.db.__contains__.side_effect = Exception()
814 with pytest.raises(DbException) as excinfo:
815 db_memory.create(table, data)
816 assert str(excinfo.value) == empty_exception_message()
817 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
818
819
820 @pytest.mark.parametrize(
821 "db_content, update_dict, expected, message",
822 [
823 (
824 {"a": {"none": None}},
825 {"a.b.num": "v"},
826 {"a": {"none": None, "b": {"num": "v"}}},
827 "create dict",
828 ),
829 (
830 {"a": {"none": None}},
831 {"a.none.num": "v"},
832 {"a": {"none": {"num": "v"}}},
833 "create dict over none",
834 ),
835 (
836 {"a": {"b": {"num": 4}}},
837 {"a.b.num": "v"},
838 {"a": {"b": {"num": "v"}}},
839 "replace_number",
840 ),
841 (
842 {"a": {"b": {"num": 4}}},
843 {"a.b.num.c.d": "v"},
844 None,
845 "create dict over number should fail",
846 ),
847 (
848 {"a": {"b": {"num": 4}}},
849 {"a.b": "v"},
850 {"a": {"b": "v"}},
851 "replace dict with a string",
852 ),
853 (
854 {"a": {"b": {"num": 4}}},
855 {"a.b": None},
856 {"a": {"b": None}},
857 "replace dict with None",
858 ),
859 (
860 {"a": [{"b": {"num": 4}}]},
861 {"a.b.num": "v"},
862 None,
863 "create dict over list should fail",
864 ),
865 (
866 {"a": [{"b": {"num": 4}}]},
867 {"a.0.b.num": "v"},
868 {"a": [{"b": {"num": "v"}}]},
869 "set list",
870 ),
871 (
872 {"a": [{"b": {"num": 4}}]},
873 {"a.3.b.num": "v"},
874 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
875 "expand list",
876 ),
877 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
878 ({"a": [[4]]}, {"a.0.2": "v"}, {"a": [[4, None, "v"]]}, "expand nested list"),
879 (
880 {"a": [[4]]},
881 {"a.2.2": "v"},
882 {"a": [[4], None, {"2": "v"}]},
883 "expand list and add number key",
884 ),
885 ],
886 )
887 def test_set_one(db_memory, db_content, update_dict, expected, message):
888 db_memory._find = Mock(return_value=((0, db_content),))
889 if expected is None:
890 with pytest.raises(DbException) as excinfo:
891 db_memory.set_one("table", {}, update_dict)
892 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND, message
893 else:
894 db_memory.set_one("table", {}, update_dict)
895 assert db_content == expected, message
896
897
898 class TestDbMemory(unittest.TestCase):
899 # TODO to delete. This is cover with pytest test_set_one.
900 def test_set_one(self):
901 test_set = (
902 # (database content, set-content, expected database content (None=fails), message)
903 (
904 {"a": {"none": None}},
905 {"a.b.num": "v"},
906 {"a": {"none": None, "b": {"num": "v"}}},
907 "create dict",
908 ),
909 (
910 {"a": {"none": None}},
911 {"a.none.num": "v"},
912 {"a": {"none": {"num": "v"}}},
913 "create dict over none",
914 ),
915 (
916 {"a": {"b": {"num": 4}}},
917 {"a.b.num": "v"},
918 {"a": {"b": {"num": "v"}}},
919 "replace_number",
920 ),
921 (
922 {"a": {"b": {"num": 4}}},
923 {"a.b.num.c.d": "v"},
924 None,
925 "create dict over number should fail",
926 ),
927 (
928 {"a": {"b": {"num": 4}}},
929 {"a.b": "v"},
930 {"a": {"b": "v"}},
931 "replace dict with a string",
932 ),
933 (
934 {"a": {"b": {"num": 4}}},
935 {"a.b": None},
936 {"a": {"b": None}},
937 "replace dict with None",
938 ),
939 (
940 {"a": [{"b": {"num": 4}}]},
941 {"a.b.num": "v"},
942 None,
943 "create dict over list should fail",
944 ),
945 (
946 {"a": [{"b": {"num": 4}}]},
947 {"a.0.b.num": "v"},
948 {"a": [{"b": {"num": "v"}}]},
949 "set list",
950 ),
951 (
952 {"a": [{"b": {"num": 4}}]},
953 {"a.3.b.num": "v"},
954 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
955 "expand list",
956 ),
957 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
958 (
959 {"a": [[4]]},
960 {"a.0.2": "v"},
961 {"a": [[4, None, "v"]]},
962 "expand nested list",
963 ),
964 (
965 {"a": [[4]]},
966 {"a.2.2": "v"},
967 {"a": [[4], None, {"2": "v"}]},
968 "expand list and add number key",
969 ),
970 ({"a": None}, {"b.c": "v"}, {"a": None, "b": {"c": "v"}}, "expand at root"),
971 )
972 db_men = DbMemory()
973 db_men._find = Mock()
974 for db_content, update_dict, expected, message in test_set:
975 db_men._find.return_value = ((0, db_content),)
976 if expected is None:
977 self.assertRaises(DbException, db_men.set_one, "table", {}, update_dict)
978 else:
979 db_men.set_one("table", {}, update_dict)
980 self.assertEqual(db_content, expected, message)
981
982 def test_set_one_pull(self):
983 example = {"a": [1, "1", 1], "d": {}, "n": None}
984 test_set = (
985 # (database content, set-content, expected database content (None=fails), message)
986 (example, {"a": "1"}, {"a": [1, 1], "d": {}, "n": None}, "pull one item"),
987 (example, {"a": 1}, {"a": ["1"], "d": {}, "n": None}, "pull two items"),
988 (example, {"a": "v"}, example, "pull non existing item"),
989 (example, {"a.6": 1}, example, "pull non existing arrray"),
990 (example, {"d.b.c": 1}, example, "pull non existing arrray2"),
991 (example, {"b": 1}, example, "pull non existing arrray3"),
992 (example, {"d": 1}, None, "pull over dict"),
993 (example, {"n": 1}, None, "pull over None"),
994 )
995 db_men = DbMemory()
996 db_men._find = Mock()
997 for db_content, pull_dict, expected, message in test_set:
998 db_content = deepcopy(db_content)
999 db_men._find.return_value = ((0, db_content),)
1000 if expected is None:
1001 self.assertRaises(
1002 DbException,
1003 db_men.set_one,
1004 "table",
1005 {},
1006 None,
1007 fail_on_empty=False,
1008 pull=pull_dict,
1009 )
1010 else:
1011 db_men.set_one("table", {}, None, pull=pull_dict)
1012 self.assertEqual(db_content, expected, message)
1013
1014 def test_set_one_push(self):
1015 example = {"a": [1, "1", 1], "d": {}, "n": None}
1016 test_set = (
1017 # (database content, set-content, expected database content (None=fails), message)
1018 (
1019 example,
1020 {"d.b.c": 1},
1021 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1022 "push non existing arrray2",
1023 ),
1024 (
1025 example,
1026 {"b": 1},
1027 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1028 "push non existing arrray3",
1029 ),
1030 (
1031 example,
1032 {"a.6": 1},
1033 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1034 "push non existing arrray",
1035 ),
1036 (
1037 example,
1038 {"a": 2},
1039 {"a": [1, "1", 1, 2], "d": {}, "n": None},
1040 "push one item",
1041 ),
1042 (
1043 example,
1044 {"a": {1: 1}},
1045 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1046 "push a dict",
1047 ),
1048 (example, {"d": 1}, None, "push over dict"),
1049 (example, {"n": 1}, None, "push over None"),
1050 )
1051 db_men = DbMemory()
1052 db_men._find = Mock()
1053 for db_content, push_dict, expected, message in test_set:
1054 db_content = deepcopy(db_content)
1055 db_men._find.return_value = ((0, db_content),)
1056 if expected is None:
1057 self.assertRaises(
1058 DbException,
1059 db_men.set_one,
1060 "table",
1061 {},
1062 None,
1063 fail_on_empty=False,
1064 push=push_dict,
1065 )
1066 else:
1067 db_men.set_one("table", {}, None, push=push_dict)
1068 self.assertEqual(db_content, expected, message)
1069
1070 def test_set_one_push_list(self):
1071 example = {"a": [1, "1", 1], "d": {}, "n": None}
1072 test_set = (
1073 # (database content, set-content, expected database content (None=fails), message)
1074 (
1075 example,
1076 {"d.b.c": [1]},
1077 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1078 "push non existing arrray2",
1079 ),
1080 (
1081 example,
1082 {"b": [1]},
1083 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1084 "push non existing arrray3",
1085 ),
1086 (
1087 example,
1088 {"a.6": [1]},
1089 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1090 "push non existing arrray",
1091 ),
1092 (
1093 example,
1094 {"a": [2, 3]},
1095 {"a": [1, "1", 1, 2, 3], "d": {}, "n": None},
1096 "push two item",
1097 ),
1098 (
1099 example,
1100 {"a": [{1: 1}]},
1101 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1102 "push a dict",
1103 ),
1104 (example, {"d": [1]}, None, "push over dict"),
1105 (example, {"n": [1]}, None, "push over None"),
1106 (example, {"a": 1}, None, "invalid push list non an array"),
1107 )
1108 db_men = DbMemory()
1109 db_men._find = Mock()
1110 for db_content, push_list, expected, message in test_set:
1111 db_content = deepcopy(db_content)
1112 db_men._find.return_value = ((0, db_content),)
1113 if expected is None:
1114 self.assertRaises(
1115 DbException,
1116 db_men.set_one,
1117 "table",
1118 {},
1119 None,
1120 fail_on_empty=False,
1121 push_list=push_list,
1122 )
1123 else:
1124 db_men.set_one("table", {}, None, push_list=push_list)
1125 self.assertEqual(db_content, expected, message)
1126
1127 def test_unset_one(self):
1128 example = {"a": [1, "1", 1], "d": {}, "n": None}
1129 test_set = (
1130 # (database content, set-content, expected database content (None=fails), message)
1131 (example, {"d.b.c": 1}, example, "unset non existing"),
1132 (example, {"b": 1}, example, "unset non existing"),
1133 (example, {"a.6": 1}, example, "unset non existing arrray"),
1134 (example, {"a": 2}, {"d": {}, "n": None}, "unset array"),
1135 (example, {"d": 1}, {"a": [1, "1", 1], "n": None}, "unset dict"),
1136 (example, {"n": 1}, {"a": [1, "1", 1], "d": {}}, "unset None"),
1137 )
1138 db_men = DbMemory()
1139 db_men._find = Mock()
1140 for db_content, unset_dict, expected, message in test_set:
1141 db_content = deepcopy(db_content)
1142 db_men._find.return_value = ((0, db_content),)
1143 if expected is None:
1144 self.assertRaises(
1145 DbException,
1146 db_men.set_one,
1147 "table",
1148 {},
1149 None,
1150 fail_on_empty=False,
1151 unset=unset_dict,
1152 )
1153 else:
1154 db_men.set_one("table", {}, None, unset=unset_dict)
1155 self.assertEqual(db_content, expected, message)