Code Coverage

Cobertura Coverage Report > osm_common >

dbmongo.py

Trend

File Coverage summary

NameClassesLinesConditionals
dbmongo.py
100%
1/1
13%
33/251
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
dbmongo.py
13%
33/251
N/A

Source

osm_common/dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #    http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 1 from base64 import b64decode
20 1 from copy import deepcopy
21 1 from http import HTTPStatus
22 1 import logging
23 1 from time import sleep, time
24 1 from uuid import uuid4
25
26 1 from osm_common.dbbase import DbBase, DbException
27 1 from pymongo import errors, MongoClient
28
29 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30
31 # TODO consider use this decorator for database access retries
32 # @retry_mongocall
33 # def retry_mongocall(call):
34 #     def _retry_mongocall(*args, **kwargs):
35 #         retry = 1
36 #         while True:
37 #             try:
38 #                 return call(*args, **kwargs)
39 #             except pymongo.AutoReconnect as e:
40 #                 if retry == 4:
41 #                     raise DbException(e)
42 #                 sleep(retry)
43 #     return _retry_mongocall
44
45
46 1 def deep_update(to_update, update_with):
47     """
48     Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
49     'update_with' dict recursively
50     :param to_update: must be a dictionary to be modified
51     :param update_with: must be a dictionary. It is not changed
52     :return: to_update
53     """
54 0     for key in update_with:
55 0         if key in to_update:
56 0             if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
57 0                 deep_update(to_update[key], update_with[key])
58 0                 continue
59 0         to_update[key] = deepcopy(update_with[key])
60 0     return to_update
61
62
63 1 class DbMongo(DbBase):
64 1     conn_initial_timout = 120
65 1     conn_timout = 10
66
67 1     def __init__(self, logger_name="db", lock=False):
68 1         super().__init__(logger_name=logger_name, lock=lock)
69 1         self.client = None
70 1         self.db = None
71 1         self.database_key = None
72 1         self.secret_obtained = False
73         # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
74         # In case it is not ready when connected, it should be got later on before any decrypt operation
75
76 1     def get_secret_key(self):
77 0         if self.secret_obtained:
78 0             return
79
80 0         self.secret_key = None
81 0         if self.database_key:
82 0             self.set_secret_key(self.database_key)
83 0         version_data = self.get_one(
84             "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True
85         )
86 0         if version_data and version_data.get("serial"):
87 0             self.set_secret_key(b64decode(version_data["serial"]))
88 0         self.secret_obtained = True
89
90 1     def db_connect(self, config, target_version=None):
91         """
92         Connect to database
93         :param config: Configuration of database
94         :param target_version: if provided it checks if database contains required version, raising exception otherwise.
95         :return: None or raises DbException on error
96         """
97 0         try:
98 0             if "logger_name" in config:
99 0                 self.logger = logging.getLogger(config["logger_name"])
100 0             master_key = config.get("commonkey") or config.get("masterpassword")
101 0             if master_key:
102 0                 self.database_key = master_key
103 0                 self.set_secret_key(master_key)
104 0             if config.get("uri"):
105 0                 self.client = MongoClient(
106                     config["uri"], replicaSet=config.get("replicaset", None)
107                 )
108             # when all modules are ready
109 0             self.db = self.client[config["name"]]
110 0             if "loglevel" in config:
111 0                 self.logger.setLevel(getattr(logging, config["loglevel"]))
112             # get data to try a connection
113 0             now = time()
114 0             while True:
115 0                 try:
116 0                     version_data = self.get_one(
117                         "admin",
118                         {"_id": "version"},
119                         fail_on_empty=False,
120                         fail_on_more=True,
121                     )
122                     # check database status is ok
123 0                     if version_data and version_data.get("status") != "ENABLED":
124 0                         raise DbException(
125                             "Wrong database status '{}'".format(
126                                 version_data.get("status")
127                             ),
128                             http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
129                         )
130                     # check version
131 0                     db_version = (
132                         None if not version_data else version_data.get("version")
133                     )
134 0                     if target_version and target_version != db_version:
135 0                         raise DbException(
136                             "Invalid database version {}. Expected {}".format(
137                                 db_version, target_version
138                             )
139                         )
140                     # get serial
141 0                     if version_data and version_data.get("serial"):
142 0                         self.secret_obtained = True
143 0                         self.set_secret_key(b64decode(version_data["serial"]))
144 0                     self.logger.info(
145                         "Connected to database {} version {}".format(
146                             config["name"], db_version
147                         )
148                     )
149 0                     return
150 0                 except errors.ConnectionFailure as e:
151 0                     if time() - now >= self.conn_initial_timout:
152 0                         raise
153 0                     self.logger.info("Waiting to database up {}".format(e))
154 0                     sleep(2)
155 0         except errors.PyMongoError as e:
156 0             raise DbException(e)
157
158 1     @staticmethod
159 1     def _format_filter(q_filter):
160         """
161         Translate query string q_filter into mongo database filter
162         :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
163         differences:
164             It accept ".nq" (not equal) in addition to ".neq".
165             For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
166             (two or more matches applies for the same array element). Examples:
167                 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
168                 query 'A.B=6' matches because array A contains one element with B equal to 6
169                 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
170                 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
171                 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
172                     array matching both
173
174         Examples of translations from SOL005 to  >> mongo  # comment
175             A=B; A.eq=B         >> A: B             # must contain key A and equal to B or be a list that contains B
176             A.cont=B            >> A: B
177             A=B&A=C; A=B,C      >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
178                 # B or C
179             A.cont=B&A.cont=C; A.cont=B,C  >> A: {$in: [B, C]}
180             A.ncont=B           >> A: {$nin: B}     # must not contain key A or if present not equal to B or if a list,
181                 # it must not not contain B
182             A.ncont=B,C; A.ncont=B&A.ncont=C    >> A: {$nin: [B,C]}     # must not contain key A or if present not equal
183                 # neither B nor C; or if a list, it must not contain neither B nor C
184             A.ne=B&A.ne=C; A.ne=B,C             >> A: {$nin: [B, C]}
185             A.gt=B              >> A: {$gt: B}      # must contain key A and greater than B
186             A.ne=B; A.neq=B         >> A: {$ne: B}          # must not contain key A or if present not equal to B, or if
187                 # an array not contain B
188             A.ANYINDEX.B=C          >> A: {$elemMatch: {B=C}
189         :return: database mongo filter
190         """
191 0         try:
192 0             db_filter = {}
193 0             if not q_filter:
194 0                 return db_filter
195 0             for query_k, query_v in q_filter.items():
196 0                 dot_index = query_k.rfind(".")
197 0                 if dot_index > 1 and query_k[dot_index + 1 :] in (
198                     "eq",
199                     "ne",
200                     "gt",
201                     "gte",
202                     "lt",
203                     "lte",
204                     "cont",
205                     "ncont",
206                     "neq",
207                 ):
208 0                     operator = "$" + query_k[dot_index + 1 :]
209 0                     if operator == "$neq":
210 0                         operator = "$ne"
211 0                     k = query_k[:dot_index]
212                 else:
213 0                     operator = "$eq"
214 0                     k = query_k
215
216 0                 v = query_v
217 0                 if isinstance(v, list):
218 0                     if operator in ("$eq", "$cont"):
219 0                         operator = "$in"
220 0                         v = query_v
221 0                     elif operator in ("$ne", "$ncont"):
222 0                         operator = "$nin"
223 0                         v = query_v
224                     else:
225 0                         v = query_v.join(",")
226
227 0                 if operator in ("$eq", "$cont"):
228                     # v cannot be a comma separated list, because operator would have been changed to $in
229 0                     db_v = v
230 0                 elif operator == "$ncount":
231                     # v cannot be a comma separated list, because operator would have been changed to $nin
232 0                     db_v = {"$ne": v}
233                 else:
234 0                     db_v = {operator: v}
235
236                 # process the ANYINDEX word at k.
237 0                 kleft, _, kright = k.rpartition(".ANYINDEX.")
238 0                 while kleft:
239 0                     k = kleft
240 0                     db_v = {"$elemMatch": {kright: db_v}}
241 0                     kleft, _, kright = k.rpartition(".ANYINDEX.")
242
243                 # insert in db_filter
244                 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
245 0                 deep_update(db_filter, {k: db_v})
246
247 0             return db_filter
248 0         except Exception as e:
249 0             raise DbException(
250                 "Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
251                 http_code=HTTPStatus.BAD_REQUEST,
252             )
253
254 1     def get_list(self, table, q_filter=None):
255         """
256         Obtain a list of entries matching q_filter
257         :param table: collection or table
258         :param q_filter: Filter
259         :return: a list (can be empty) with the found entries. Raises DbException on error
260         """
261 0         try:
262 0             result = []
263 0             with self.lock:
264 0                 collection = self.db[table]
265 0                 db_filter = self._format_filter(q_filter)
266 0                 rows = collection.find(db_filter)
267 0             for row in rows:
268 0                 result.append(row)
269 0             return result
270 0         except DbException:
271 0             raise
272 0         except Exception as e:  # TODO refine
273 0             raise DbException(e)
274
275 1     def count(self, table, q_filter=None):
276         """
277         Count the number of entries matching q_filter
278         :param table: collection or table
279         :param q_filter: Filter
280         :return: number of entries found (can be zero)
281         :raise: DbException on error
282         """
283 0         try:
284 0             with self.lock:
285 0                 collection = self.db[table]
286 0                 db_filter = self._format_filter(q_filter)
287 0                 count = collection.count_documents(db_filter)
288 0             return count
289 0         except DbException:
290 0             raise
291 0         except Exception as e:  # TODO refine
292 0             raise DbException(e)
293
294 1     def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
295         """
296         Obtain one entry matching q_filter
297         :param table: collection or table
298         :param q_filter: Filter
299         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
300         it raises a DbException
301         :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
302         that it raises a DbException
303         :return: The requested element, or None
304         """
305 0         try:
306 0             db_filter = self._format_filter(q_filter)
307 0             with self.lock:
308 0                 collection = self.db[table]
309 0                 if not (fail_on_empty and fail_on_more):
310 0                     return collection.find_one(db_filter)
311 0                 rows = list(collection.find(db_filter))
312 0             if len(rows) == 0:
313 0                 if fail_on_empty:
314 0                     raise DbException(
315                         "Not found any {} with filter='{}'".format(
316                             table[:-1], q_filter
317                         ),
318                         HTTPStatus.NOT_FOUND,
319                     )
320
321 0                 return None
322 0             elif len(rows) > 1:
323 0                 if fail_on_more:
324 0                     raise DbException(
325                         "Found more than one {} with filter='{}'".format(
326                             table[:-1], q_filter
327                         ),
328                         HTTPStatus.CONFLICT,
329                     )
330 0             return rows[0]
331 0         except Exception as e:  # TODO refine
332 0             raise DbException(e)
333
334 1     def del_list(self, table, q_filter=None):
335         """
336         Deletes all entries that match q_filter
337         :param table: collection or table
338         :param q_filter: Filter
339         :return: Dict with the number of entries deleted
340         """
341 0         try:
342 0             with self.lock:
343 0                 collection = self.db[table]
344 0                 rows = collection.delete_many(self._format_filter(q_filter))
345 0             return {"deleted": rows.deleted_count}
346 0         except DbException:
347 0             raise
348 0         except Exception as e:  # TODO refine
349 0             raise DbException(e)
350
351 1     def del_one(self, table, q_filter=None, fail_on_empty=True):
352         """
353         Deletes one entry that matches q_filter
354         :param table: collection or table
355         :param q_filter: Filter
356         :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
357         which case it raises a DbException
358         :return: Dict with the number of entries deleted
359         """
360 0         try:
361 0             with self.lock:
362 0                 collection = self.db[table]
363 0                 rows = collection.delete_one(self._format_filter(q_filter))
364 0             if rows.deleted_count == 0:
365 0                 if fail_on_empty:
366 0                     raise DbException(
367                         "Not found any {} with filter='{}'".format(
368                             table[:-1], q_filter
369                         ),
370                         HTTPStatus.NOT_FOUND,
371                     )
372 0                 return None
373 0             return {"deleted": rows.deleted_count}
374 0         except Exception as e:  # TODO refine
375 0             raise DbException(e)
376
377 1     def create(self, table, indata):
378         """
379         Add a new entry at database
380         :param table: collection or table
381         :param indata: content to be added
382         :return: database id of the inserted element. Raises a DbException on error
383         """
384 0         try:
385 0             with self.lock:
386 0                 collection = self.db[table]
387 0                 data = collection.insert_one(indata)
388 0             return data.inserted_id
389 0         except Exception as e:  # TODO refine
390 0             raise DbException(e)
391
392 1     def create_list(self, table, indata_list):
393         """
394         Add several entries at once
395         :param table: collection or table
396         :param indata_list: content list to be added.
397         :return: the list of inserted '_id's. Exception on error
398         """
399 0         try:
400 0             for item in indata_list:
401 0                 if item.get("_id") is None:
402 0                     item["_id"] = str(uuid4())
403 0             with self.lock:
404 0                 collection = self.db[table]
405 0                 data = collection.insert_many(indata_list)
406 0             return data.inserted_ids
407 0         except Exception as e:  # TODO refine
408 0             raise DbException(e)
409
410 1     def set_one(
411         self,
412         table,
413         q_filter,
414         update_dict,
415         fail_on_empty=True,
416         unset=None,
417         pull=None,
418         push=None,
419         push_list=None,
420         pull_list=None,
421         upsert=False,
422     ):
423         """
424         Modifies an entry at database
425         :param table: collection or table
426         :param q_filter: Filter
427         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
428         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
429         it raises a DbException
430         :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
431                       ignored. If not exist, it is ignored
432         :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
433                      if exist in the array is removed. If not exist, it is ignored
434         :param pull_list: Same as pull but values are arrays where each item is removed from the array
435         :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
436                      is appended to the end of the array
437         :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
438                           whole array
439         :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
440                      By default this is false.
441         :return: Dict with the number of entries modified. None if no matching is found.
442         """
443 0         try:
444 0             db_oper = {}
445 0             if update_dict:
446 0                 db_oper["$set"] = update_dict
447 0             if unset:
448 0                 db_oper["$unset"] = unset
449 0             if pull or pull_list:
450 0                 db_oper["$pull"] = pull or {}
451 0                 if pull_list:
452 0                     db_oper["$pull"].update(
453                         {k: {"$in": v} for k, v in pull_list.items()}
454                     )
455 0             if push or push_list:
456 0                 db_oper["$push"] = push or {}
457 0                 if push_list:
458 0                     db_oper["$push"].update(
459                         {k: {"$each": v} for k, v in push_list.items()}
460                     )
461
462 0             with self.lock:
463 0                 collection = self.db[table]
464 0                 rows = collection.update_one(
465                     self._format_filter(q_filter), db_oper, upsert=upsert
466                 )
467 0             if rows.matched_count == 0:
468 0                 if fail_on_empty:
469 0                     raise DbException(
470                         "Not found any {} with filter='{}'".format(
471                             table[:-1], q_filter
472                         ),
473                         HTTPStatus.NOT_FOUND,
474                     )
475 0                 return None
476 0             return {"modified": rows.modified_count}
477 0         except Exception as e:  # TODO refine
478 0             raise DbException(e)
479
480 1     def set_list(
481         self,
482         table,
483         q_filter,
484         update_dict,
485         unset=None,
486         pull=None,
487         push=None,
488         push_list=None,
489         pull_list=None,
490     ):
491         """
492         Modifies al matching entries at database
493         :param table: collection or table
494         :param q_filter: Filter
495         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
496         :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
497                       ignored. If not exist, it is ignored
498         :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
499                      if exist in the array is removed. If not exist, it is ignored
500         :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
501                      single value is appended to the end of the array
502         :param pull_list: Same as pull but values are arrays where each item is removed from the array
503         :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
504                           whole array
505         :return: Dict with the number of entries modified
506         """
507 0         try:
508 0             db_oper = {}
509 0             if update_dict:
510 0                 db_oper["$set"] = update_dict
511 0             if unset:
512 0                 db_oper["$unset"] = unset
513 0             if pull or pull_list:
514 0                 db_oper["$pull"] = pull or {}
515 0                 if pull_list:
516 0                     db_oper["$pull"].update(
517                         {k: {"$in": v} for k, v in pull_list.items()}
518                     )
519 0             if push or push_list:
520 0                 db_oper["$push"] = push or {}
521 0                 if push_list:
522 0                     db_oper["$push"].update(
523                         {k: {"$each": v} for k, v in push_list.items()}
524                     )
525 0             with self.lock:
526 0                 collection = self.db[table]
527 0                 rows = collection.update_many(self._format_filter(q_filter), db_oper)
528 0             return {"modified": rows.modified_count}
529 0         except Exception as e:  # TODO refine
530 0             raise DbException(e)
531
532 1     def replace(self, table, _id, indata, fail_on_empty=True):
533         """
534         Replace the content of an entry
535         :param table: collection or table
536         :param _id: internal database id
537         :param indata: content to replace
538         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
539         it raises a DbException
540         :return: Dict with the number of entries replaced
541         """
542 0         try:
543 0             db_filter = {"_id": _id}
544 0             with self.lock:
545 0                 collection = self.db[table]
546 0                 rows = collection.replace_one(db_filter, indata)
547 0             if rows.matched_count == 0:
548 0                 if fail_on_empty:
549 0                     raise DbException(
550                         "Not found any {} with _id='{}'".format(table[:-1], _id),
551                         HTTPStatus.NOT_FOUND,
552                     )
553 0                 return None
554 0             return {"replaced": rows.modified_count}
555 0         except Exception as e:  # TODO refine
556 0             raise DbException(e)