Fixes Bug 1273 - aiokafka>0.7.0 needs extra dependency for python < 3.7.0
[osm/common.git] / osm_common / dbmemory.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 import logging
19 from osm_common.dbbase import DbException, DbBase
20 from osm_common.dbmongo import deep_update
21 from http import HTTPStatus
22 from uuid import uuid4
23 from copy import deepcopy
24
25 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26
27
28 class DbMemory(DbBase):
29
30 def __init__(self, logger_name='db', lock=False):
31 super().__init__(logger_name, lock)
32 self.db = {}
33
34 def db_connect(self, config):
35 """
36 Connect to database
37 :param config: Configuration of database
38 :return: None or raises DbException on error
39 """
40 if "logger_name" in config:
41 self.logger = logging.getLogger(config["logger_name"])
42 master_key = config.get("commonkey") or config.get("masterpassword")
43 if master_key:
44 self.set_secret_key(master_key)
45
46 @staticmethod
47 def _format_filter(q_filter):
48 db_filter = {}
49 # split keys with ANYINDEX in this way:
50 # {"A.B.ANYINDEX.C.D.ANYINDEX.E": v } -> {"A.B.ANYINDEX": {"C.D.ANYINDEX": {"E": v}}}
51 if q_filter:
52 for k, v in q_filter.items():
53 db_v = v
54 kleft, _, kright = k.rpartition(".ANYINDEX.")
55 while kleft:
56 k = kleft + ".ANYINDEX"
57 db_v = {kright: db_v}
58 kleft, _, kright = k.rpartition(".ANYINDEX.")
59 deep_update(db_filter, {k: db_v})
60
61 return db_filter
62
63 def _find(self, table, q_filter):
64
65 def recursive_find(key_list, key_next_index, content, oper, target):
66 if key_next_index == len(key_list) or content is None:
67 try:
68 if oper in ("eq", "cont"):
69 if isinstance(target, list):
70 if isinstance(content, list):
71 return any(content_item in target for content_item in content)
72 return content in target
73 elif isinstance(content, list):
74 return target in content
75 else:
76 return content == target
77 elif oper in ("neq", "ne", "ncont"):
78 if isinstance(target, list):
79 if isinstance(content, list):
80 return all(content_item not in target for content_item in content)
81 return content not in target
82 elif isinstance(content, list):
83 return target not in content
84 else:
85 return content != target
86 if oper == "gt":
87 return content > target
88 elif oper == "gte":
89 return content >= target
90 elif oper == "lt":
91 return content < target
92 elif oper == "lte":
93 return content <= target
94 else:
95 raise DbException("Unknown filter operator '{}' in key '{}'".
96 format(oper, ".".join(key_list)), http_code=HTTPStatus.BAD_REQUEST)
97 except TypeError:
98 return False
99
100 elif isinstance(content, dict):
101 return recursive_find(key_list, key_next_index + 1, content.get(key_list[key_next_index]), oper,
102 target)
103 elif isinstance(content, list):
104 look_for_match = True # when there is a match return immediately
105 if (target is None) != (oper in ("neq", "ne", "ncont")): # one True and other False (Xor)
106 look_for_match = False # when there is not a match return immediately
107
108 for content_item in content:
109 if key_list[key_next_index] == "ANYINDEX" and isinstance(v, dict):
110 matches = True
111 for k2, v2 in target.items():
112 k_new_list = k2.split(".")
113 new_operator = "eq"
114 if k_new_list[-1] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", "ncont", "neq"):
115 new_operator = k_new_list.pop()
116 if not recursive_find(k_new_list, 0, content_item, new_operator, v2):
117 matches = False
118 break
119
120 else:
121 matches = recursive_find(key_list, key_next_index, content_item, oper, target)
122 if matches == look_for_match:
123 return matches
124 if key_list[key_next_index].isdecimal() and int(key_list[key_next_index]) < len(content):
125 matches = recursive_find(key_list, key_next_index + 1, content[int(key_list[key_next_index])],
126 oper, target)
127 if matches == look_for_match:
128 return matches
129 return not look_for_match
130 else: # content is not dict, nor list neither None, so not found
131 if oper in ("neq", "ne", "ncont"):
132 return target is not None
133 else:
134 return target is None
135
136 for i, row in enumerate(self.db.get(table, ())):
137 q_filter = q_filter or {}
138 for k, v in q_filter.items():
139 k_list = k.split(".")
140 operator = "eq"
141 if k_list[-1] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", "ncont", "neq"):
142 operator = k_list.pop()
143 matches = recursive_find(k_list, 0, row, operator, v)
144 if not matches:
145 break
146 else:
147 # match
148 yield i, row
149
150 def get_list(self, table, q_filter=None):
151 """
152 Obtain a list of entries matching q_filter
153 :param table: collection or table
154 :param q_filter: Filter
155 :return: a list (can be empty) with the found entries. Raises DbException on error
156 """
157 try:
158 result = []
159 with self.lock:
160 for _, row in self._find(table, self._format_filter(q_filter)):
161 result.append(deepcopy(row))
162 return result
163 except DbException:
164 raise
165 except Exception as e: # TODO refine
166 raise DbException(str(e))
167
168 def count(self, table, q_filter=None):
169 """
170 Count the number of entries matching q_filter
171 :param table: collection or table
172 :param q_filter: Filter
173 :return: number of entries found (can be zero)
174 :raise: DbException on error
175 """
176 try:
177 with self.lock:
178 return sum(1 for x in self._find(table, self._format_filter(q_filter)))
179 except DbException:
180 raise
181 except Exception as e: # TODO refine
182 raise DbException(str(e))
183
184 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
185 """
186 Obtain one entry matching q_filter
187 :param table: collection or table
188 :param q_filter: Filter
189 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
190 it raises a DbException
191 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
192 that it raises a DbException
193 :return: The requested element, or None
194 """
195 try:
196 result = None
197 with self.lock:
198 for _, row in self._find(table, self._format_filter(q_filter)):
199 if not fail_on_more:
200 return deepcopy(row)
201 if result:
202 raise DbException("Found more than one entry with filter='{}'".format(q_filter),
203 HTTPStatus.CONFLICT.value)
204 result = row
205 if not result and fail_on_empty:
206 raise DbException("Not found entry with filter='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
207 return deepcopy(result)
208 except Exception as e: # TODO refine
209 raise DbException(str(e))
210
211 def del_list(self, table, q_filter=None):
212 """
213 Deletes all entries that match q_filter
214 :param table: collection or table
215 :param q_filter: Filter
216 :return: Dict with the number of entries deleted
217 """
218 try:
219 id_list = []
220 with self.lock:
221 for i, _ in self._find(table, self._format_filter(q_filter)):
222 id_list.append(i)
223 deleted = len(id_list)
224 for i in reversed(id_list):
225 del self.db[table][i]
226 return {"deleted": deleted}
227 except DbException:
228 raise
229 except Exception as e: # TODO refine
230 raise DbException(str(e))
231
232 def del_one(self, table, q_filter=None, fail_on_empty=True):
233 """
234 Deletes one entry that matches q_filter
235 :param table: collection or table
236 :param q_filter: Filter
237 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
238 which case it raises a DbException
239 :return: Dict with the number of entries deleted
240 """
241 try:
242 with self.lock:
243 for i, _ in self._find(table, self._format_filter(q_filter)):
244 break
245 else:
246 if fail_on_empty:
247 raise DbException("Not found entry with filter='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
248 return None
249 del self.db[table][i]
250 return {"deleted": 1}
251 except Exception as e: # TODO refine
252 raise DbException(str(e))
253
254 def _update(self, db_item, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
255 """
256 Modifies an entry at database
257 :param db_item: entry of the table to update
258 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
259 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
260 ignored. If not exist, it is ignored
261 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
262 if exist in the array is removed. If not exist, it is ignored
263 :param pull_list: Same as pull but values are arrays where each item is removed from the array
264 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
265 is appended to the end of the array
266 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
267 whole array
268 :return: True if database has been changed, False if not; Exception on error
269 """
270 def _iterate_keys(k, db_nested, populate=True):
271 k_list = k.split(".")
272 k_item_prev = k_list[0]
273 populated = False
274 if k_item_prev not in db_nested and populate:
275 populated = True
276 db_nested[k_item_prev] = None
277 for k_item in k_list[1:]:
278 if isinstance(db_nested[k_item_prev], dict):
279 if k_item not in db_nested[k_item_prev]:
280 if not populate:
281 raise DbException("Cannot set '{}', not existing '{}'".format(k, k_item))
282 populated = True
283 db_nested[k_item_prev][k_item] = None
284 elif isinstance(db_nested[k_item_prev], list) and k_item.isdigit():
285 # extend list with Nones if index greater than list
286 k_item = int(k_item)
287 if k_item >= len(db_nested[k_item_prev]):
288 if not populate:
289 raise DbException("Cannot set '{}', index too large '{}'".format(k, k_item))
290 populated = True
291 db_nested[k_item_prev] += [None] * (k_item - len(db_nested[k_item_prev]) + 1)
292 elif db_nested[k_item_prev] is None:
293 if not populate:
294 raise DbException("Cannot set '{}', not existing '{}'".format(k, k_item))
295 populated = True
296 db_nested[k_item_prev] = {k_item: None}
297 else: # number, string, boolean, ... or list but with not integer key
298 raise DbException("Cannot set '{}' on existing '{}={}'".format(k, k_item_prev,
299 db_nested[k_item_prev]))
300 db_nested = db_nested[k_item_prev]
301 k_item_prev = k_item
302 return db_nested, k_item_prev, populated
303
304 updated = False
305 try:
306 if update_dict:
307 for dot_k, v in update_dict.items():
308 dict_to_update, key_to_update, _ = _iterate_keys(dot_k, db_item)
309 dict_to_update[key_to_update] = v
310 updated = True
311 if unset:
312 for dot_k in unset:
313 try:
314 dict_to_update, key_to_update, _ = _iterate_keys(dot_k, db_item, populate=False)
315 del dict_to_update[key_to_update]
316 updated = True
317 except Exception:
318 pass
319 if pull:
320 for dot_k, v in pull.items():
321 try:
322 dict_to_update, key_to_update, _ = _iterate_keys(dot_k, db_item, populate=False)
323 except Exception:
324 continue
325 if key_to_update not in dict_to_update:
326 continue
327 if not isinstance(dict_to_update[key_to_update], list):
328 raise DbException("Cannot pull '{}'. Target is not a list".format(dot_k))
329 while v in dict_to_update[key_to_update]:
330 dict_to_update[key_to_update].remove(v)
331 updated = True
332 if pull_list:
333 for dot_k, v in pull_list.items():
334 if not isinstance(v, list):
335 raise DbException("Invalid content at pull_list, '{}' must be an array".format(dot_k),
336 http_code=HTTPStatus.BAD_REQUEST)
337 try:
338 dict_to_update, key_to_update, _ = _iterate_keys(dot_k, db_item, populate=False)
339 except Exception:
340 continue
341 if key_to_update not in dict_to_update:
342 continue
343 if not isinstance(dict_to_update[key_to_update], list):
344 raise DbException("Cannot pull_list '{}'. Target is not a list".format(dot_k))
345 for single_v in v:
346 while single_v in dict_to_update[key_to_update]:
347 dict_to_update[key_to_update].remove(single_v)
348 updated = True
349 if push:
350 for dot_k, v in push.items():
351 dict_to_update, key_to_update, populated = _iterate_keys(dot_k, db_item)
352 if isinstance(dict_to_update, dict) and key_to_update not in dict_to_update:
353 dict_to_update[key_to_update] = [v]
354 updated = True
355 elif populated and dict_to_update[key_to_update] is None:
356 dict_to_update[key_to_update] = [v]
357 updated = True
358 elif not isinstance(dict_to_update[key_to_update], list):
359 raise DbException("Cannot push '{}'. Target is not a list".format(dot_k))
360 else:
361 dict_to_update[key_to_update].append(v)
362 updated = True
363 if push_list:
364 for dot_k, v in push_list.items():
365 if not isinstance(v, list):
366 raise DbException("Invalid content at push_list, '{}' must be an array".format(dot_k),
367 http_code=HTTPStatus.BAD_REQUEST)
368 dict_to_update, key_to_update, populated = _iterate_keys(dot_k, db_item)
369 if isinstance(dict_to_update, dict) and key_to_update not in dict_to_update:
370 dict_to_update[key_to_update] = v.copy()
371 updated = True
372 elif populated and dict_to_update[key_to_update] is None:
373 dict_to_update[key_to_update] = v.copy()
374 updated = True
375 elif not isinstance(dict_to_update[key_to_update], list):
376 raise DbException("Cannot push '{}'. Target is not a list".format(dot_k),
377 http_code=HTTPStatus.CONFLICT)
378 else:
379 dict_to_update[key_to_update] += v
380 updated = True
381
382 return updated
383 except DbException:
384 raise
385 except Exception as e: # TODO refine
386 raise DbException(str(e))
387
388 def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None,
389 push_list=None, pull_list=None):
390 """
391 Modifies an entry at database
392 :param table: collection or table
393 :param q_filter: Filter
394 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
395 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
396 it raises a DbException
397 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
398 ignored. If not exist, it is ignored
399 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
400 if exist in the array is removed. If not exist, it is ignored
401 :param pull_list: Same as pull but values are arrays where each item is removed from the array
402 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
403 is appended to the end of the array
404 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
405 whole array
406 :return: Dict with the number of entries modified. None if no matching is found.
407 """
408 with self.lock:
409 for i, db_item in self._find(table, self._format_filter(q_filter)):
410 updated = self._update(db_item, update_dict, unset=unset, pull=pull, push=push, push_list=push_list,
411 pull_list=pull_list)
412 return {"updated": 1 if updated else 0}
413 else:
414 if fail_on_empty:
415 raise DbException("Not found entry with _id='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
416 return None
417
418 def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
419 """Modifies al matching entries at database. Same as push. Do not fail if nothing matches"""
420 with self.lock:
421 updated = 0
422 found = 0
423 for _, db_item in self._find(table, self._format_filter(q_filter)):
424 found += 1
425 if self._update(db_item, update_dict, unset=unset, pull=pull, push=push, push_list=push_list,
426 pull_list=pull_list):
427 updated += 1
428 # if not found and fail_on_empty:
429 # raise DbException("Not found entry with '{}'".format(q_filter), HTTPStatus.NOT_FOUND)
430 return {"updated": updated} if found else None
431
432 def replace(self, table, _id, indata, fail_on_empty=True):
433 """
434 Replace the content of an entry
435 :param table: collection or table
436 :param _id: internal database id
437 :param indata: content to replace
438 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
439 it raises a DbException
440 :return: Dict with the number of entries replaced
441 """
442 try:
443 with self.lock:
444 for i, _ in self._find(table, self._format_filter({"_id": _id})):
445 break
446 else:
447 if fail_on_empty:
448 raise DbException("Not found entry with _id='{}'".format(_id), HTTPStatus.NOT_FOUND)
449 return None
450 self.db[table][i] = deepcopy(indata)
451 return {"updated": 1}
452 except DbException:
453 raise
454 except Exception as e: # TODO refine
455 raise DbException(str(e))
456
457 def create(self, table, indata):
458 """
459 Add a new entry at database
460 :param table: collection or table
461 :param indata: content to be added
462 :return: database '_id' of the inserted element. Raises a DbException on error
463 """
464 try:
465 id = indata.get("_id")
466 if not id:
467 id = str(uuid4())
468 indata["_id"] = id
469 with self.lock:
470 if table not in self.db:
471 self.db[table] = []
472 self.db[table].append(deepcopy(indata))
473 return id
474 except Exception as e: # TODO refine
475 raise DbException(str(e))
476
477 def create_list(self, table, indata_list):
478 """
479 Add a new entry at database
480 :param table: collection or table
481 :param indata_list: list content to be added
482 :return: list of inserted 'id's. Raises a DbException on error
483 """
484 try:
485 _ids = []
486 with self.lock:
487 for indata in indata_list:
488 _id = indata.get("_id")
489 if not _id:
490 _id = str(uuid4())
491 indata["_id"] = _id
492 with self.lock:
493 if table not in self.db:
494 self.db[table] = []
495 self.db[table].append(deepcopy(indata))
496 _ids.append(_id)
497 return _ids
498 except Exception as e: # TODO refine
499 raise DbException(str(e))
500
501
502 if __name__ == '__main__':
503 # some test code
504 db = DbMemory()
505 db.create("test", {"_id": 1, "data": 1})
506 db.create("test", {"_id": 2, "data": 2})
507 db.create("test", {"_id": 3, "data": 3})
508 print("must be 3 items:", db.get_list("test"))
509 print("must return item 2:", db.get_list("test", {"_id": 2}))
510 db.del_one("test", {"_id": 2})
511 print("must be emtpy:", db.get_list("test", {"_id": 2}))