Adding release notes and enabling import order check
[osm/common.git] / osm_common / tests / test_dbmemory.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19 from copy import deepcopy
20 import http
21 import logging
22 import unittest
23 from unittest.mock import MagicMock, Mock
24
25 from osm_common.dbbase import DbException
26 from osm_common.dbmemory import DbMemory
27 import pytest
28
29 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
30
31
32 @pytest.fixture(scope="function", params=[True, False])
33 def db_memory(request):
34 db = DbMemory(lock=request.param)
35 return db
36
37
38 @pytest.fixture(scope="function", params=[True, False])
39 def db_memory_with_data(request):
40 db = DbMemory(lock=request.param)
41
42 db.create("test", {"_id": 1, "data": 1})
43 db.create("test", {"_id": 2, "data": 2})
44 db.create("test", {"_id": 3, "data": 3})
45
46 return db
47
48
49 @pytest.fixture(scope="function")
50 def db_memory_with_many_data(request):
51 db = DbMemory(lock=False)
52
53 db.create_list(
54 "test",
55 [
56 {
57 "_id": 1,
58 "data": {"data2": {"data3": 1}},
59 "list": [{"a": 1}],
60 "text": "sometext",
61 },
62 {
63 "_id": 2,
64 "data": {"data2": {"data3": 2}},
65 "list": [{"a": 2}],
66 "list2": [1, 2, 3],
67 },
68 {"_id": 3, "data": {"data2": {"data3": 3}}, "list": [{"a": 3}]},
69 {"_id": 4, "data": {"data2": {"data3": 4}}, "list": [{"a": 4}, {"a": 0}]},
70 {"_id": 5, "data": {"data2": {"data3": 5}}, "list": [{"a": 5}]},
71 {"_id": 6, "data": {"data2": {"data3": 6}}, "list": [{"0": {"a": 1}}]},
72 {"_id": 7, "data": {"data2": {"data3": 7}}, "0": {"a": 0}},
73 {
74 "_id": 8,
75 "list": [
76 {"a": 3, "b": 0, "c": [{"a": 3, "b": 1}, {"a": 0, "b": "v"}]},
77 {"a": 0, "b": 1},
78 ],
79 },
80 ],
81 )
82 return db
83
84
85 def empty_exception_message():
86 return "database exception "
87
88
89 def get_one_exception_message(db_filter):
90 return "database exception Not found entry with filter='{}'".format(db_filter)
91
92
93 def get_one_multiple_exception_message(db_filter):
94 return "database exception Found more than one entry with filter='{}'".format(
95 db_filter
96 )
97
98
99 def del_one_exception_message(db_filter):
100 return "database exception Not found entry with filter='{}'".format(db_filter)
101
102
103 def replace_exception_message(value):
104 return "database exception Not found entry with _id='{}'".format(value)
105
106
107 def test_constructor():
108 db = DbMemory()
109 assert db.logger == logging.getLogger("db")
110 assert db.db == {}
111
112
113 def test_constructor_with_logger():
114 logger_name = "db_local"
115 db = DbMemory(logger_name=logger_name)
116 assert db.logger == logging.getLogger(logger_name)
117 assert db.db == {}
118
119
120 def test_db_connect():
121 logger_name = "db_local"
122 config = {"logger_name": logger_name}
123 db = DbMemory()
124 db.db_connect(config)
125 assert db.logger == logging.getLogger(logger_name)
126 assert db.db == {}
127
128
129 def test_db_disconnect(db_memory):
130 db_memory.db_disconnect()
131
132
133 @pytest.mark.parametrize(
134 "table, db_filter",
135 [
136 ("test", {}),
137 ("test", {"_id": 1}),
138 ("test", {"data": 1}),
139 ("test", {"_id": 1, "data": 1}),
140 ],
141 )
142 def test_get_list_with_empty_db(db_memory, table, db_filter):
143 result = db_memory.get_list(table, db_filter)
144 assert len(result) == 0
145
146
147 @pytest.mark.parametrize(
148 "table, db_filter, expected_data",
149 [
150 (
151 "test",
152 {},
153 [{"_id": 1, "data": 1}, {"_id": 2, "data": 2}, {"_id": 3, "data": 3}],
154 ),
155 ("test", {"_id": 1}, [{"_id": 1, "data": 1}]),
156 ("test", {"data": 1}, [{"_id": 1, "data": 1}]),
157 ("test", {"_id": 1, "data": 1}, [{"_id": 1, "data": 1}]),
158 ("test", {"_id": 2}, [{"_id": 2, "data": 2}]),
159 ("test", {"data": 2}, [{"_id": 2, "data": 2}]),
160 ("test", {"_id": 2, "data": 2}, [{"_id": 2, "data": 2}]),
161 ("test", {"_id": 4}, []),
162 ("test", {"data": 4}, []),
163 ("test", {"_id": 4, "data": 4}, []),
164 ("test_table", {}, []),
165 ("test_table", {"_id": 1}, []),
166 ("test_table", {"data": 1}, []),
167 ("test_table", {"_id": 1, "data": 1}, []),
168 ],
169 )
170 def test_get_list_with_non_empty_db(
171 db_memory_with_data, table, db_filter, expected_data
172 ):
173 result = db_memory_with_data.get_list(table, db_filter)
174 assert len(result) == len(expected_data)
175 for data in expected_data:
176 assert data in result
177
178
179 def test_get_list_exception(db_memory_with_data):
180 table = "test"
181 db_filter = {}
182 db_memory_with_data._find = MagicMock(side_effect=Exception())
183 with pytest.raises(DbException) as excinfo:
184 db_memory_with_data.get_list(table, db_filter)
185 assert str(excinfo.value) == empty_exception_message()
186 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
187
188
189 @pytest.mark.parametrize(
190 "table, db_filter, expected_data",
191 [
192 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
193 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
194 ("test", {"_id": 3}, {"_id": 3, "data": 3}),
195 ("test", {"data": 1}, {"_id": 1, "data": 1}),
196 ("test", {"data": 2}, {"_id": 2, "data": 2}),
197 ("test", {"data": 3}, {"_id": 3, "data": 3}),
198 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
199 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
200 ("test", {"_id": 3, "data": 3}, {"_id": 3, "data": 3}),
201 ],
202 )
203 def test_get_one(db_memory_with_data, table, db_filter, expected_data):
204 result = db_memory_with_data.get_one(table, db_filter)
205 assert result == expected_data
206 assert len(db_memory_with_data.db) == 1
207 assert table in db_memory_with_data.db
208 assert len(db_memory_with_data.db[table]) == 3
209 assert result in db_memory_with_data.db[table]
210
211
212 @pytest.mark.parametrize(
213 "db_filter, expected_ids",
214 [
215 ({}, [1, 2, 3, 4, 5, 6, 7, 8]),
216 ({"_id": 1}, [1]),
217 ({"data.data2.data3": 2}, [2]),
218 ({"data.data2.data3.eq": 2}, [2]),
219 ({"data.data2.data3": [2]}, [2]),
220 ({"data.data2.data3.cont": [2]}, [2]),
221 ({"data.data2.data3.neq": 2}, [1, 3, 4, 5, 6, 7, 8]),
222 ({"data.data2.data3.neq": [2]}, [1, 3, 4, 5, 6, 7, 8]),
223 ({"data.data2.data3.ncont": [2]}, [1, 3, 4, 5, 6, 7, 8]),
224 ({"data.data2.data3": [2, 3]}, [2, 3]),
225 ({"data.data2.data3.gt": 4}, [5, 6, 7]),
226 ({"data.data2.data3.gte": 4}, [4, 5, 6, 7]),
227 ({"data.data2.data3.lt": 4}, [1, 2, 3]),
228 ({"data.data2.data3.lte": 4}, [1, 2, 3, 4]),
229 ({"data.data2.data3.lte": 4.5}, [1, 2, 3, 4]),
230 ({"data.data2.data3.gt": "text"}, []),
231 ({"nonexist.nonexist": "4"}, []),
232 ({"nonexist.nonexist": None}, [1, 2, 3, 4, 5, 6, 7, 8]),
233 ({"nonexist.nonexist.neq": "4"}, [1, 2, 3, 4, 5, 6, 7, 8]),
234 ({"nonexist.nonexist.neq": None}, []),
235 ({"text.eq": "sometext"}, [1]),
236 ({"text.neq": "sometext"}, [2, 3, 4, 5, 6, 7, 8]),
237 ({"text.eq": "somet"}, []),
238 ({"text.gte": "a"}, [1]),
239 ({"text.gte": "somet"}, [1]),
240 ({"text.gte": "sometext"}, [1]),
241 ({"text.lt": "somet"}, []),
242 ({"data.data2.data3": 2, "data.data2.data4": None}, [2]),
243 ({"data.data2.data3": 2, "data.data2.data4": 5}, []),
244 ({"data.data2.data3": 4}, [4]),
245 ({"data.data2.data3": [3, 4, "e"]}, [3, 4]),
246 ({"data.data2.data3": None}, [8]),
247 ({"data.data2": "4"}, []),
248 ({"list.0.a": 1}, [1, 6]),
249 ({"list2": 1}, [2]),
250 ({"list2": [1, 5]}, [2]),
251 ({"list2": [1, 2]}, [2]),
252 ({"list2": [5, 7]}, []),
253 ({"list.ANYINDEX.a": 1}, [1]),
254 ({"list.a": 3, "list.b": 1}, [8]),
255 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 1}, []),
256 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.c.a": 3}, [8]),
257 ({"list.ANYINDEX.a": 3, "list.ANYINDEX.b": 0}, [8]),
258 (
259 {
260 "list.ANYINDEX.a": 3,
261 "list.ANYINDEX.c.ANYINDEX.a": 0,
262 "list.ANYINDEX.c.ANYINDEX.b": "v",
263 },
264 [8],
265 ),
266 (
267 {
268 "list.ANYINDEX.a": 3,
269 "list.ANYINDEX.c.ANYINDEX.a": 0,
270 "list.ANYINDEX.c.ANYINDEX.b": 1,
271 },
272 [],
273 ),
274 ({"list.c.b": 1}, [8]),
275 ({"list.c.b": None}, [1, 2, 3, 4, 5, 6, 7]),
276 # ({"data.data2.data3": 4}, []),
277 # ({"data.data2.data3": 4}, []),
278 ],
279 )
280 def test_get_list(db_memory_with_many_data, db_filter, expected_ids):
281 result = db_memory_with_many_data.get_list("test", db_filter)
282 assert isinstance(result, list)
283 result_ids = [item["_id"] for item in result]
284 assert len(result) == len(
285 expected_ids
286 ), "for db_filter={} result={} expected_ids={}".format(
287 db_filter, result, result_ids
288 )
289 assert result_ids == expected_ids
290 for i in range(len(result)):
291 assert result[i] in db_memory_with_many_data.db["test"]
292
293 assert len(db_memory_with_many_data.db) == 1
294 assert "test" in db_memory_with_many_data.db
295 assert len(db_memory_with_many_data.db["test"]) == 8
296 result = db_memory_with_many_data.count("test", db_filter)
297 assert result == len(expected_ids)
298
299
300 @pytest.mark.parametrize(
301 "table, db_filter, expected_data", [("test", {}, {"_id": 1, "data": 1})]
302 )
303 def test_get_one_with_multiple_results(
304 db_memory_with_data, table, db_filter, expected_data
305 ):
306 result = db_memory_with_data.get_one(table, db_filter, fail_on_more=False)
307 assert result == expected_data
308 assert len(db_memory_with_data.db) == 1
309 assert table in db_memory_with_data.db
310 assert len(db_memory_with_data.db[table]) == 3
311 assert result in db_memory_with_data.db[table]
312
313
314 def test_get_one_with_multiple_results_exception(db_memory_with_data):
315 table = "test"
316 db_filter = {}
317 with pytest.raises(DbException) as excinfo:
318 db_memory_with_data.get_one(table, db_filter)
319 assert str(excinfo.value) == (
320 empty_exception_message() + get_one_multiple_exception_message(db_filter)
321 )
322 # assert excinfo.value.http_code == http.HTTPStatus.CONFLICT
323
324
325 @pytest.mark.parametrize(
326 "table, db_filter",
327 [
328 ("test", {"_id": 4}),
329 ("test", {"data": 4}),
330 ("test", {"_id": 4, "data": 4}),
331 ("test_table", {"_id": 4}),
332 ("test_table", {"data": 4}),
333 ("test_table", {"_id": 4, "data": 4}),
334 ],
335 )
336 def test_get_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
337 with pytest.raises(DbException) as excinfo:
338 db_memory_with_data.get_one(table, db_filter)
339 assert str(excinfo.value) == (
340 empty_exception_message() + get_one_exception_message(db_filter)
341 )
342 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
343
344
345 @pytest.mark.parametrize(
346 "table, db_filter",
347 [
348 ("test", {"_id": 4}),
349 ("test", {"data": 4}),
350 ("test", {"_id": 4, "data": 4}),
351 ("test_table", {"_id": 4}),
352 ("test_table", {"data": 4}),
353 ("test_table", {"_id": 4, "data": 4}),
354 ],
355 )
356 def test_get_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
357 result = db_memory_with_data.get_one(table, db_filter, fail_on_empty=False)
358 assert result is None
359
360
361 @pytest.mark.parametrize(
362 "table, db_filter",
363 [
364 ("test", {"_id": 4}),
365 ("test", {"data": 4}),
366 ("test", {"_id": 4, "data": 4}),
367 ("test_table", {"_id": 4}),
368 ("test_table", {"data": 4}),
369 ("test_table", {"_id": 4, "data": 4}),
370 ],
371 )
372 def test_get_one_with_empty_db_exception(db_memory, table, db_filter):
373 with pytest.raises(DbException) as excinfo:
374 db_memory.get_one(table, db_filter)
375 assert str(excinfo.value) == (
376 empty_exception_message() + get_one_exception_message(db_filter)
377 )
378 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
379
380
381 @pytest.mark.parametrize(
382 "table, db_filter",
383 [
384 ("test", {"_id": 4}),
385 ("test", {"data": 4}),
386 ("test", {"_id": 4, "data": 4}),
387 ("test_table", {"_id": 4}),
388 ("test_table", {"data": 4}),
389 ("test_table", {"_id": 4, "data": 4}),
390 ],
391 )
392 def test_get_one_with_empty_db_none(db_memory, table, db_filter):
393 result = db_memory.get_one(table, db_filter, fail_on_empty=False)
394 assert result is None
395
396
397 def test_get_one_generic_exception(db_memory_with_data):
398 table = "test"
399 db_filter = {}
400 db_memory_with_data._find = MagicMock(side_effect=Exception())
401 with pytest.raises(DbException) as excinfo:
402 db_memory_with_data.get_one(table, db_filter)
403 assert str(excinfo.value) == empty_exception_message()
404 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
405
406
407 @pytest.mark.parametrize(
408 "table, db_filter, expected_data",
409 [
410 ("test", {}, []),
411 ("test", {"_id": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
412 ("test", {"_id": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
413 ("test", {"_id": 1, "data": 1}, [{"_id": 2, "data": 2}, {"_id": 3, "data": 3}]),
414 ("test", {"_id": 2, "data": 2}, [{"_id": 1, "data": 1}, {"_id": 3, "data": 3}]),
415 ],
416 )
417 def test_del_list_with_non_empty_db(
418 db_memory_with_data, table, db_filter, expected_data
419 ):
420 result = db_memory_with_data.del_list(table, db_filter)
421 assert result["deleted"] == (3 - len(expected_data))
422 assert len(db_memory_with_data.db) == 1
423 assert table in db_memory_with_data.db
424 assert len(db_memory_with_data.db[table]) == len(expected_data)
425 for data in expected_data:
426 assert data in db_memory_with_data.db[table]
427
428
429 @pytest.mark.parametrize(
430 "table, db_filter",
431 [
432 ("test", {}),
433 ("test", {"_id": 1}),
434 ("test", {"_id": 2}),
435 ("test", {"data": 1}),
436 ("test", {"data": 2}),
437 ("test", {"_id": 1, "data": 1}),
438 ("test", {"_id": 2, "data": 2}),
439 ],
440 )
441 def test_del_list_with_empty_db(db_memory, table, db_filter):
442 result = db_memory.del_list(table, db_filter)
443 assert result["deleted"] == 0
444
445
446 def test_del_list_generic_exception(db_memory_with_data):
447 table = "test"
448 db_filter = {}
449 db_memory_with_data._find = MagicMock(side_effect=Exception())
450 with pytest.raises(DbException) as excinfo:
451 db_memory_with_data.del_list(table, db_filter)
452 assert str(excinfo.value) == empty_exception_message()
453 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
454
455
456 @pytest.mark.parametrize(
457 "table, db_filter, data",
458 [
459 ("test", {}, {"_id": 1, "data": 1}),
460 ("test", {"_id": 1}, {"_id": 1, "data": 1}),
461 ("test", {"data": 1}, {"_id": 1, "data": 1}),
462 ("test", {"_id": 1, "data": 1}, {"_id": 1, "data": 1}),
463 ("test", {"_id": 2}, {"_id": 2, "data": 2}),
464 ("test", {"data": 2}, {"_id": 2, "data": 2}),
465 ("test", {"_id": 2, "data": 2}, {"_id": 2, "data": 2}),
466 ],
467 )
468 def test_del_one(db_memory_with_data, table, db_filter, data):
469 result = db_memory_with_data.del_one(table, db_filter)
470 assert result == {"deleted": 1}
471 assert len(db_memory_with_data.db) == 1
472 assert table in db_memory_with_data.db
473 assert len(db_memory_with_data.db[table]) == 2
474 assert data not in db_memory_with_data.db[table]
475
476
477 @pytest.mark.parametrize(
478 "table, db_filter",
479 [
480 ("test", {}),
481 ("test", {"_id": 1}),
482 ("test", {"_id": 2}),
483 ("test", {"data": 1}),
484 ("test", {"data": 2}),
485 ("test", {"_id": 1, "data": 1}),
486 ("test", {"_id": 2, "data": 2}),
487 ("test_table", {}),
488 ("test_table", {"_id": 1}),
489 ("test_table", {"_id": 2}),
490 ("test_table", {"data": 1}),
491 ("test_table", {"data": 2}),
492 ("test_table", {"_id": 1, "data": 1}),
493 ("test_table", {"_id": 2, "data": 2}),
494 ],
495 )
496 def test_del_one_with_empty_db_exception(db_memory, table, db_filter):
497 with pytest.raises(DbException) as excinfo:
498 db_memory.del_one(table, db_filter)
499 assert str(excinfo.value) == (
500 empty_exception_message() + del_one_exception_message(db_filter)
501 )
502 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
503
504
505 @pytest.mark.parametrize(
506 "table, db_filter",
507 [
508 ("test", {}),
509 ("test", {"_id": 1}),
510 ("test", {"_id": 2}),
511 ("test", {"data": 1}),
512 ("test", {"data": 2}),
513 ("test", {"_id": 1, "data": 1}),
514 ("test", {"_id": 2, "data": 2}),
515 ("test_table", {}),
516 ("test_table", {"_id": 1}),
517 ("test_table", {"_id": 2}),
518 ("test_table", {"data": 1}),
519 ("test_table", {"data": 2}),
520 ("test_table", {"_id": 1, "data": 1}),
521 ("test_table", {"_id": 2, "data": 2}),
522 ],
523 )
524 def test_del_one_with_empty_db_none(db_memory, table, db_filter):
525 result = db_memory.del_one(table, db_filter, fail_on_empty=False)
526 assert result is None
527
528
529 @pytest.mark.parametrize(
530 "table, db_filter",
531 [
532 ("test", {"_id": 4}),
533 ("test", {"_id": 5}),
534 ("test", {"data": 4}),
535 ("test", {"data": 5}),
536 ("test", {"_id": 1, "data": 2}),
537 ("test", {"_id": 2, "data": 3}),
538 ("test_table", {}),
539 ("test_table", {"_id": 1}),
540 ("test_table", {"_id": 2}),
541 ("test_table", {"data": 1}),
542 ("test_table", {"data": 2}),
543 ("test_table", {"_id": 1, "data": 1}),
544 ("test_table", {"_id": 2, "data": 2}),
545 ],
546 )
547 def test_del_one_with_non_empty_db_exception(db_memory_with_data, table, db_filter):
548 with pytest.raises(DbException) as excinfo:
549 db_memory_with_data.del_one(table, db_filter)
550 assert str(excinfo.value) == (
551 empty_exception_message() + del_one_exception_message(db_filter)
552 )
553 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
554
555
556 @pytest.mark.parametrize(
557 "table, db_filter",
558 [
559 ("test", {"_id": 4}),
560 ("test", {"_id": 5}),
561 ("test", {"data": 4}),
562 ("test", {"data": 5}),
563 ("test", {"_id": 1, "data": 2}),
564 ("test", {"_id": 2, "data": 3}),
565 ("test_table", {}),
566 ("test_table", {"_id": 1}),
567 ("test_table", {"_id": 2}),
568 ("test_table", {"data": 1}),
569 ("test_table", {"data": 2}),
570 ("test_table", {"_id": 1, "data": 1}),
571 ("test_table", {"_id": 2, "data": 2}),
572 ],
573 )
574 def test_del_one_with_non_empty_db_none(db_memory_with_data, table, db_filter):
575 result = db_memory_with_data.del_one(table, db_filter, fail_on_empty=False)
576 assert result is None
577
578
579 @pytest.mark.parametrize("fail_on_empty", [(True), (False)])
580 def test_del_one_generic_exception(db_memory_with_data, fail_on_empty):
581 table = "test"
582 db_filter = {}
583 db_memory_with_data._find = MagicMock(side_effect=Exception())
584 with pytest.raises(DbException) as excinfo:
585 db_memory_with_data.del_one(table, db_filter, fail_on_empty=fail_on_empty)
586 assert str(excinfo.value) == empty_exception_message()
587 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
588
589
590 @pytest.mark.parametrize(
591 "table, _id, indata",
592 [
593 ("test", 1, {"_id": 1, "data": 42}),
594 ("test", 1, {"_id": 1, "data": 42, "kk": 34}),
595 ("test", 1, {"_id": 1}),
596 ("test", 2, {"_id": 2, "data": 42}),
597 ("test", 2, {"_id": 2, "data": 42, "kk": 34}),
598 ("test", 2, {"_id": 2}),
599 ("test", 3, {"_id": 3, "data": 42}),
600 ("test", 3, {"_id": 3, "data": 42, "kk": 34}),
601 ("test", 3, {"_id": 3}),
602 ],
603 )
604 def test_replace(db_memory_with_data, table, _id, indata):
605 result = db_memory_with_data.replace(table, _id, indata)
606 assert result == {"updated": 1}
607 assert len(db_memory_with_data.db) == 1
608 assert table in db_memory_with_data.db
609 assert len(db_memory_with_data.db[table]) == 3
610 assert indata in db_memory_with_data.db[table]
611
612
613 @pytest.mark.parametrize(
614 "table, _id, indata",
615 [
616 ("test", 1, {"_id": 1, "data": 42}),
617 ("test", 2, {"_id": 2}),
618 ("test", 3, {"_id": 3}),
619 ],
620 )
621 def test_replace_without_data_exception(db_memory, table, _id, indata):
622 with pytest.raises(DbException) as excinfo:
623 db_memory.replace(table, _id, indata, fail_on_empty=True)
624 assert str(excinfo.value) == (replace_exception_message(_id))
625 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
626
627
628 @pytest.mark.parametrize(
629 "table, _id, indata",
630 [
631 ("test", 1, {"_id": 1, "data": 42}),
632 ("test", 2, {"_id": 2}),
633 ("test", 3, {"_id": 3}),
634 ],
635 )
636 def test_replace_without_data_none(db_memory, table, _id, indata):
637 result = db_memory.replace(table, _id, indata, fail_on_empty=False)
638 assert result is None
639
640
641 @pytest.mark.parametrize(
642 "table, _id, indata",
643 [
644 ("test", 11, {"_id": 11, "data": 42}),
645 ("test", 12, {"_id": 12}),
646 ("test", 33, {"_id": 33}),
647 ],
648 )
649 def test_replace_with_data_exception(db_memory_with_data, table, _id, indata):
650 with pytest.raises(DbException) as excinfo:
651 db_memory_with_data.replace(table, _id, indata, fail_on_empty=True)
652 assert str(excinfo.value) == (replace_exception_message(_id))
653 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
654
655
656 @pytest.mark.parametrize(
657 "table, _id, indata",
658 [
659 ("test", 11, {"_id": 11, "data": 42}),
660 ("test", 12, {"_id": 12}),
661 ("test", 33, {"_id": 33}),
662 ],
663 )
664 def test_replace_with_data_none(db_memory_with_data, table, _id, indata):
665 result = db_memory_with_data.replace(table, _id, indata, fail_on_empty=False)
666 assert result is None
667
668
669 @pytest.mark.parametrize("fail_on_empty", [True, False])
670 def test_replace_generic_exception(db_memory_with_data, fail_on_empty):
671 table = "test"
672 _id = {}
673 indata = {"_id": 1, "data": 1}
674 db_memory_with_data._find = MagicMock(side_effect=Exception())
675 with pytest.raises(DbException) as excinfo:
676 db_memory_with_data.replace(table, _id, indata, fail_on_empty=fail_on_empty)
677 assert str(excinfo.value) == empty_exception_message()
678 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
679
680
681 @pytest.mark.parametrize(
682 "table, id, data",
683 [
684 ("test", "1", {"data": 1}),
685 ("test", "1", {"data": 2}),
686 ("test", "2", {"data": 1}),
687 ("test", "2", {"data": 2}),
688 ("test_table", "1", {"data": 1}),
689 ("test_table", "1", {"data": 2}),
690 ("test_table", "2", {"data": 1}),
691 ("test_table", "2", {"data": 2}),
692 ("test", "1", {"data_1": 1, "data_2": 2}),
693 ("test", "1", {"data_1": 2, "data_2": 1}),
694 ("test", "2", {"data_1": 1, "data_2": 2}),
695 ("test", "2", {"data_1": 2, "data_2": 1}),
696 ("test_table", "1", {"data_1": 1, "data_2": 2}),
697 ("test_table", "1", {"data_1": 2, "data_2": 1}),
698 ("test_table", "2", {"data_1": 1, "data_2": 2}),
699 ("test_table", "2", {"data_1": 2, "data_2": 1}),
700 ],
701 )
702 def test_create_with_empty_db_with_id(db_memory, table, id, data):
703 data_to_insert = data
704 data_to_insert["_id"] = id
705 returned_id = db_memory.create(table, data_to_insert)
706 assert returned_id == id
707 assert len(db_memory.db) == 1
708 assert table in db_memory.db
709 assert len(db_memory.db[table]) == 1
710 assert data_to_insert in db_memory.db[table]
711
712
713 @pytest.mark.parametrize(
714 "table, id, data",
715 [
716 ("test", "4", {"data": 1}),
717 ("test", "5", {"data": 2}),
718 ("test", "4", {"data": 1}),
719 ("test", "5", {"data": 2}),
720 ("test_table", "4", {"data": 1}),
721 ("test_table", "5", {"data": 2}),
722 ("test_table", "4", {"data": 1}),
723 ("test_table", "5", {"data": 2}),
724 ("test", "4", {"data_1": 1, "data_2": 2}),
725 ("test", "5", {"data_1": 2, "data_2": 1}),
726 ("test", "4", {"data_1": 1, "data_2": 2}),
727 ("test", "5", {"data_1": 2, "data_2": 1}),
728 ("test_table", "4", {"data_1": 1, "data_2": 2}),
729 ("test_table", "5", {"data_1": 2, "data_2": 1}),
730 ("test_table", "4", {"data_1": 1, "data_2": 2}),
731 ("test_table", "5", {"data_1": 2, "data_2": 1}),
732 ],
733 )
734 def test_create_with_non_empty_db_with_id(db_memory_with_data, table, id, data):
735 data_to_insert = data
736 data_to_insert["_id"] = id
737 returned_id = db_memory_with_data.create(table, data_to_insert)
738 assert returned_id == id
739 assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
740 assert table in db_memory_with_data.db
741 assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
742 assert data_to_insert in db_memory_with_data.db[table]
743
744
745 @pytest.mark.parametrize(
746 "table, data",
747 [
748 ("test", {"data": 1}),
749 ("test", {"data": 2}),
750 ("test", {"data": 1}),
751 ("test", {"data": 2}),
752 ("test_table", {"data": 1}),
753 ("test_table", {"data": 2}),
754 ("test_table", {"data": 1}),
755 ("test_table", {"data": 2}),
756 ("test", {"data_1": 1, "data_2": 2}),
757 ("test", {"data_1": 2, "data_2": 1}),
758 ("test", {"data_1": 1, "data_2": 2}),
759 ("test", {"data_1": 2, "data_2": 1}),
760 ("test_table", {"data_1": 1, "data_2": 2}),
761 ("test_table", {"data_1": 2, "data_2": 1}),
762 ("test_table", {"data_1": 1, "data_2": 2}),
763 ("test_table", {"data_1": 2, "data_2": 1}),
764 ],
765 )
766 def test_create_with_empty_db_without_id(db_memory, table, data):
767 returned_id = db_memory.create(table, data)
768 assert len(db_memory.db) == 1
769 assert table in db_memory.db
770 assert len(db_memory.db[table]) == 1
771 data_inserted = data
772 data_inserted["_id"] = returned_id
773 assert data_inserted in db_memory.db[table]
774
775
776 @pytest.mark.parametrize(
777 "table, data",
778 [
779 ("test", {"data": 1}),
780 ("test", {"data": 2}),
781 ("test", {"data": 1}),
782 ("test", {"data": 2}),
783 ("test_table", {"data": 1}),
784 ("test_table", {"data": 2}),
785 ("test_table", {"data": 1}),
786 ("test_table", {"data": 2}),
787 ("test", {"data_1": 1, "data_2": 2}),
788 ("test", {"data_1": 2, "data_2": 1}),
789 ("test", {"data_1": 1, "data_2": 2}),
790 ("test", {"data_1": 2, "data_2": 1}),
791 ("test_table", {"data_1": 1, "data_2": 2}),
792 ("test_table", {"data_1": 2, "data_2": 1}),
793 ("test_table", {"data_1": 1, "data_2": 2}),
794 ("test_table", {"data_1": 2, "data_2": 1}),
795 ],
796 )
797 def test_create_with_non_empty_db_without_id(db_memory_with_data, table, data):
798 returned_id = db_memory_with_data.create(table, data)
799 assert len(db_memory_with_data.db) == (1 if table == "test" else 2)
800 assert table in db_memory_with_data.db
801 assert len(db_memory_with_data.db[table]) == (4 if table == "test" else 1)
802 data_inserted = data
803 data_inserted["_id"] = returned_id
804 assert data_inserted in db_memory_with_data.db[table]
805
806
807 def test_create_with_exception(db_memory):
808 table = "test"
809 data = {"_id": 1, "data": 1}
810 db_memory.db = MagicMock()
811 db_memory.db.__contains__.side_effect = Exception()
812 with pytest.raises(DbException) as excinfo:
813 db_memory.create(table, data)
814 assert str(excinfo.value) == empty_exception_message()
815 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
816
817
818 @pytest.mark.parametrize(
819 "db_content, update_dict, expected, message",
820 [
821 (
822 {"a": {"none": None}},
823 {"a.b.num": "v"},
824 {"a": {"none": None, "b": {"num": "v"}}},
825 "create dict",
826 ),
827 (
828 {"a": {"none": None}},
829 {"a.none.num": "v"},
830 {"a": {"none": {"num": "v"}}},
831 "create dict over none",
832 ),
833 (
834 {"a": {"b": {"num": 4}}},
835 {"a.b.num": "v"},
836 {"a": {"b": {"num": "v"}}},
837 "replace_number",
838 ),
839 (
840 {"a": {"b": {"num": 4}}},
841 {"a.b.num.c.d": "v"},
842 None,
843 "create dict over number should fail",
844 ),
845 (
846 {"a": {"b": {"num": 4}}},
847 {"a.b": "v"},
848 {"a": {"b": "v"}},
849 "replace dict with a string",
850 ),
851 (
852 {"a": {"b": {"num": 4}}},
853 {"a.b": None},
854 {"a": {"b": None}},
855 "replace dict with None",
856 ),
857 (
858 {"a": [{"b": {"num": 4}}]},
859 {"a.b.num": "v"},
860 None,
861 "create dict over list should fail",
862 ),
863 (
864 {"a": [{"b": {"num": 4}}]},
865 {"a.0.b.num": "v"},
866 {"a": [{"b": {"num": "v"}}]},
867 "set list",
868 ),
869 (
870 {"a": [{"b": {"num": 4}}]},
871 {"a.3.b.num": "v"},
872 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
873 "expand list",
874 ),
875 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
876 ({"a": [[4]]}, {"a.0.2": "v"}, {"a": [[4, None, "v"]]}, "expand nested list"),
877 (
878 {"a": [[4]]},
879 {"a.2.2": "v"},
880 {"a": [[4], None, {"2": "v"}]},
881 "expand list and add number key",
882 ),
883 ],
884 )
885 def test_set_one(db_memory, db_content, update_dict, expected, message):
886 db_memory._find = Mock(return_value=((0, db_content),))
887 if expected is None:
888 with pytest.raises(DbException) as excinfo:
889 db_memory.set_one("table", {}, update_dict)
890 assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND, message
891 else:
892 db_memory.set_one("table", {}, update_dict)
893 assert db_content == expected, message
894
895
896 class TestDbMemory(unittest.TestCase):
897 # TODO to delete. This is cover with pytest test_set_one.
898 def test_set_one(self):
899 test_set = (
900 # (database content, set-content, expected database content (None=fails), message)
901 (
902 {"a": {"none": None}},
903 {"a.b.num": "v"},
904 {"a": {"none": None, "b": {"num": "v"}}},
905 "create dict",
906 ),
907 (
908 {"a": {"none": None}},
909 {"a.none.num": "v"},
910 {"a": {"none": {"num": "v"}}},
911 "create dict over none",
912 ),
913 (
914 {"a": {"b": {"num": 4}}},
915 {"a.b.num": "v"},
916 {"a": {"b": {"num": "v"}}},
917 "replace_number",
918 ),
919 (
920 {"a": {"b": {"num": 4}}},
921 {"a.b.num.c.d": "v"},
922 None,
923 "create dict over number should fail",
924 ),
925 (
926 {"a": {"b": {"num": 4}}},
927 {"a.b": "v"},
928 {"a": {"b": "v"}},
929 "replace dict with a string",
930 ),
931 (
932 {"a": {"b": {"num": 4}}},
933 {"a.b": None},
934 {"a": {"b": None}},
935 "replace dict with None",
936 ),
937 (
938 {"a": [{"b": {"num": 4}}]},
939 {"a.b.num": "v"},
940 None,
941 "create dict over list should fail",
942 ),
943 (
944 {"a": [{"b": {"num": 4}}]},
945 {"a.0.b.num": "v"},
946 {"a": [{"b": {"num": "v"}}]},
947 "set list",
948 ),
949 (
950 {"a": [{"b": {"num": 4}}]},
951 {"a.3.b.num": "v"},
952 {"a": [{"b": {"num": 4}}, None, None, {"b": {"num": "v"}}]},
953 "expand list",
954 ),
955 ({"a": [[4]]}, {"a.0.0": "v"}, {"a": [["v"]]}, "set nested list"),
956 (
957 {"a": [[4]]},
958 {"a.0.2": "v"},
959 {"a": [[4, None, "v"]]},
960 "expand nested list",
961 ),
962 (
963 {"a": [[4]]},
964 {"a.2.2": "v"},
965 {"a": [[4], None, {"2": "v"}]},
966 "expand list and add number key",
967 ),
968 ({"a": None}, {"b.c": "v"}, {"a": None, "b": {"c": "v"}}, "expand at root"),
969 )
970 db_men = DbMemory()
971 db_men._find = Mock()
972 for db_content, update_dict, expected, message in test_set:
973 db_men._find.return_value = ((0, db_content),)
974 if expected is None:
975 self.assertRaises(DbException, db_men.set_one, "table", {}, update_dict)
976 else:
977 db_men.set_one("table", {}, update_dict)
978 self.assertEqual(db_content, expected, message)
979
980 def test_set_one_pull(self):
981 example = {"a": [1, "1", 1], "d": {}, "n": None}
982 test_set = (
983 # (database content, set-content, expected database content (None=fails), message)
984 (example, {"a": "1"}, {"a": [1, 1], "d": {}, "n": None}, "pull one item"),
985 (example, {"a": 1}, {"a": ["1"], "d": {}, "n": None}, "pull two items"),
986 (example, {"a": "v"}, example, "pull non existing item"),
987 (example, {"a.6": 1}, example, "pull non existing arrray"),
988 (example, {"d.b.c": 1}, example, "pull non existing arrray2"),
989 (example, {"b": 1}, example, "pull non existing arrray3"),
990 (example, {"d": 1}, None, "pull over dict"),
991 (example, {"n": 1}, None, "pull over None"),
992 )
993 db_men = DbMemory()
994 db_men._find = Mock()
995 for db_content, pull_dict, expected, message in test_set:
996 db_content = deepcopy(db_content)
997 db_men._find.return_value = ((0, db_content),)
998 if expected is None:
999 self.assertRaises(
1000 DbException,
1001 db_men.set_one,
1002 "table",
1003 {},
1004 None,
1005 fail_on_empty=False,
1006 pull=pull_dict,
1007 )
1008 else:
1009 db_men.set_one("table", {}, None, pull=pull_dict)
1010 self.assertEqual(db_content, expected, message)
1011
1012 def test_set_one_push(self):
1013 example = {"a": [1, "1", 1], "d": {}, "n": None}
1014 test_set = (
1015 # (database content, set-content, expected database content (None=fails), message)
1016 (
1017 example,
1018 {"d.b.c": 1},
1019 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1020 "push non existing arrray2",
1021 ),
1022 (
1023 example,
1024 {"b": 1},
1025 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1026 "push non existing arrray3",
1027 ),
1028 (
1029 example,
1030 {"a.6": 1},
1031 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1032 "push non existing arrray",
1033 ),
1034 (
1035 example,
1036 {"a": 2},
1037 {"a": [1, "1", 1, 2], "d": {}, "n": None},
1038 "push one item",
1039 ),
1040 (
1041 example,
1042 {"a": {1: 1}},
1043 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1044 "push a dict",
1045 ),
1046 (example, {"d": 1}, None, "push over dict"),
1047 (example, {"n": 1}, None, "push over None"),
1048 )
1049 db_men = DbMemory()
1050 db_men._find = Mock()
1051 for db_content, push_dict, expected, message in test_set:
1052 db_content = deepcopy(db_content)
1053 db_men._find.return_value = ((0, db_content),)
1054 if expected is None:
1055 self.assertRaises(
1056 DbException,
1057 db_men.set_one,
1058 "table",
1059 {},
1060 None,
1061 fail_on_empty=False,
1062 push=push_dict,
1063 )
1064 else:
1065 db_men.set_one("table", {}, None, push=push_dict)
1066 self.assertEqual(db_content, expected, message)
1067
1068 def test_set_one_push_list(self):
1069 example = {"a": [1, "1", 1], "d": {}, "n": None}
1070 test_set = (
1071 # (database content, set-content, expected database content (None=fails), message)
1072 (
1073 example,
1074 {"d.b.c": [1]},
1075 {"a": [1, "1", 1], "d": {"b": {"c": [1]}}, "n": None},
1076 "push non existing arrray2",
1077 ),
1078 (
1079 example,
1080 {"b": [1]},
1081 {"a": [1, "1", 1], "d": {}, "b": [1], "n": None},
1082 "push non existing arrray3",
1083 ),
1084 (
1085 example,
1086 {"a.6": [1]},
1087 {"a": [1, "1", 1, None, None, None, [1]], "d": {}, "n": None},
1088 "push non existing arrray",
1089 ),
1090 (
1091 example,
1092 {"a": [2, 3]},
1093 {"a": [1, "1", 1, 2, 3], "d": {}, "n": None},
1094 "push two item",
1095 ),
1096 (
1097 example,
1098 {"a": [{1: 1}]},
1099 {"a": [1, "1", 1, {1: 1}], "d": {}, "n": None},
1100 "push a dict",
1101 ),
1102 (example, {"d": [1]}, None, "push over dict"),
1103 (example, {"n": [1]}, None, "push over None"),
1104 (example, {"a": 1}, None, "invalid push list non an array"),
1105 )
1106 db_men = DbMemory()
1107 db_men._find = Mock()
1108 for db_content, push_list, expected, message in test_set:
1109 db_content = deepcopy(db_content)
1110 db_men._find.return_value = ((0, db_content),)
1111 if expected is None:
1112 self.assertRaises(
1113 DbException,
1114 db_men.set_one,
1115 "table",
1116 {},
1117 None,
1118 fail_on_empty=False,
1119 push_list=push_list,
1120 )
1121 else:
1122 db_men.set_one("table", {}, None, push_list=push_list)
1123 self.assertEqual(db_content, expected, message)
1124
1125 def test_unset_one(self):
1126 example = {"a": [1, "1", 1], "d": {}, "n": None}
1127 test_set = (
1128 # (database content, set-content, expected database content (None=fails), message)
1129 (example, {"d.b.c": 1}, example, "unset non existing"),
1130 (example, {"b": 1}, example, "unset non existing"),
1131 (example, {"a.6": 1}, example, "unset non existing arrray"),
1132 (example, {"a": 2}, {"d": {}, "n": None}, "unset array"),
1133 (example, {"d": 1}, {"a": [1, "1", 1], "n": None}, "unset dict"),
1134 (example, {"n": 1}, {"a": [1, "1", 1], "d": {}}, "unset None"),
1135 )
1136 db_men = DbMemory()
1137 db_men._find = Mock()
1138 for db_content, unset_dict, expected, message in test_set:
1139 db_content = deepcopy(db_content)
1140 db_men._find.return_value = ((0, db_content),)
1141 if expected is None:
1142 self.assertRaises(
1143 DbException,
1144 db_men.set_one,
1145 "table",
1146 {},
1147 None,
1148 fail_on_empty=False,
1149 unset=unset_dict,
1150 )
1151 else:
1152 db_men.set_one("table", {}, None, unset=unset_dict)
1153 self.assertEqual(db_content, expected, message)