Code Coverage

Cobertura Coverage Report > osm_common >

dbmongo.py

Trend

Classes100%
 
Lines11%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
dbmongo.py
100%
1/1
11%
27/251
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
dbmongo.py
11%
27/251
N/A

Source

osm_common/dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #    http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 1 import logging
20 1 from pymongo import MongoClient, errors
21 1 from osm_common.dbbase import DbException, DbBase
22 1 from http import HTTPStatus
23 1 from time import time, sleep
24 1 from copy import deepcopy
25 1 from base64 import b64decode
26 1 from uuid import uuid4
27
28 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29
30 # TODO consider use this decorator for database access retries
31 # @retry_mongocall
32 # def retry_mongocall(call):
33 #     def _retry_mongocall(*args, **kwargs):
34 #         retry = 1
35 #         while True:
36 #             try:
37 #                 return call(*args, **kwargs)
38 #             except pymongo.AutoReconnect as e:
39 #                 if retry == 4:
40 #                     raise DbException(e)
41 #                 sleep(retry)
42 #     return _retry_mongocall
43
44
45 1 def deep_update(to_update, update_with):
46     """
47     Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
48     'update_with' dict recursively
49     :param to_update: must be a dictionary to be modified
50     :param update_with: must be a dictionary. It is not changed
51     :return: to_update
52     """
53 0     for key in update_with:
54 0         if key in to_update:
55 0             if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
56 0                 deep_update(to_update[key], update_with[key])
57 0                 continue
58 0         to_update[key] = deepcopy(update_with[key])
59 0     return to_update
60
61
62 1 class DbMongo(DbBase):
63 1     conn_initial_timout = 120
64 1     conn_timout = 10
65
66 1     def __init__(self, logger_name='db', lock=False):
67 0         super().__init__(logger_name, lock)
68 0         self.client = None
69 0         self.db = None
70 0         self.database_key = None
71 0         self.secret_obtained = False
72         # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
73         # In case it is not ready when connected, it should be got later on before any decrypt operation
74
75 1     def get_secret_key(self):
76 0         if self.secret_obtained:
77 0             return
78
79 0         self.secret_key = None
80 0         if self.database_key:
81 0             self.set_secret_key(self.database_key)
82 0         version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
83 0         if version_data and version_data.get("serial"):
84 0             self.set_secret_key(b64decode(version_data["serial"]))
85 0         self.secret_obtained = True
86
87 1     def db_connect(self, config, target_version=None):
88         """
89         Connect to database
90         :param config: Configuration of database
91         :param target_version: if provided it checks if database contains required version, raising exception otherwise.
92         :return: None or raises DbException on error
93         """
94 0         try:
95 0             if "logger_name" in config:
96 0                 self.logger = logging.getLogger(config["logger_name"])
97 0             master_key = config.get("commonkey") or config.get("masterpassword")
98 0             if master_key:
99 0                 self.database_key = master_key
100 0                 self.set_secret_key(master_key)
101 0             if config.get("uri"):
102 0                 self.client = MongoClient(config["uri"], replicaSet=config.get("replicaset", None))
103             else:
104 0                 self.client = MongoClient(config["host"], config["port"], replicaSet=config.get("replicaset", None))
105             # TODO add as parameters also username=config.get("user"), password=config.get("password"))
106             # when all modules are ready
107 0             self.db = self.client[config["name"]]
108 0             if "loglevel" in config:
109 0                 self.logger.setLevel(getattr(logging, config['loglevel']))
110             # get data to try a connection
111 0             now = time()
112 0             while True:
113 0                 try:
114 0                     version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
115                     # check database status is ok
116 0                     if version_data and version_data.get("status") != 'ENABLED':
117 0                         raise DbException("Wrong database status '{}'".format(version_data.get("status")),
118                                           http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
119                     # check version
120 0                     db_version = None if not version_data else version_data.get("version")
121 0                     if target_version and target_version != db_version:
122 0                         raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
123                     # get serial
124 0                     if version_data and version_data.get("serial"):
125 0                         self.secret_obtained = True
126 0                         self.set_secret_key(b64decode(version_data["serial"]))
127 0                     self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
128 0                     return
129 0                 except errors.ConnectionFailure as e:
130 0                     if time() - now >= self.conn_initial_timout:
131 0                         raise
132 0                     self.logger.info("Waiting to database up {}".format(e))
133 0                     sleep(2)
134 0         except errors.PyMongoError as e:
135 0             raise DbException(e)
136
137 1     @staticmethod
138     def _format_filter(q_filter):
139         """
140         Translate query string q_filter into mongo database filter
141         :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
142         differences:
143             It accept ".nq" (not equal) in addition to ".neq".
144             For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
145             (two or more matches applies for the same array element). Examples:
146                 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
147                 query 'A.B=6' matches because array A contains one element with B equal to 6
148                 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
149                 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
150                 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
151                     array matching both
152
153         Examples of translations from SOL005 to  >> mongo  # comment
154             A=B; A.eq=B         >> A: B             # must contain key A and equal to B or be a list that contains B
155             A.cont=B            >> A: B
156             A=B&A=C; A=B,C      >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
157                 # B or C
158             A.cont=B&A.cont=C; A.cont=B,C  >> A: {$in: [B, C]}
159             A.ncont=B           >> A: {$nin: B}     # must not contain key A or if present not equal to B or if a list,
160                 # it must not not contain B
161             A.ncont=B,C; A.ncont=B&A.ncont=C    >> A: {$nin: [B,C]}     # must not contain key A or if present not equal
162                 # neither B nor C; or if a list, it must not contain neither B nor C
163             A.ne=B&A.ne=C; A.ne=B,C             >> A: {$nin: [B, C]}
164             A.gt=B              >> A: {$gt: B}      # must contain key A and greater than B
165             A.ne=B; A.neq=B         >> A: {$ne: B}          # must not contain key A or if present not equal to B, or if
166                 # an array not contain B
167             A.ANYINDEX.B=C          >> A: {$elemMatch: {B=C}
168         :return: database mongo filter
169         """
170 0         try:
171 0             db_filter = {}
172 0             if not q_filter:
173 0                 return db_filter
174 0             for query_k, query_v in q_filter.items():
175 0                 dot_index = query_k.rfind(".")
176 0                 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
177                                                                "ncont", "neq"):
178 0                     operator = "$" + query_k[dot_index + 1:]
179 0                     if operator == "$neq":
180 0                         operator = "$ne"
181 0                     k = query_k[:dot_index]
182                 else:
183 0                     operator = "$eq"
184 0                     k = query_k
185
186 0                 v = query_v
187 0                 if isinstance(v, list):
188 0                     if operator in ("$eq", "$cont"):
189 0                         operator = "$in"
190 0                         v = query_v
191 0                     elif operator in ("$ne", "$ncont"):
192 0                         operator = "$nin"
193 0                         v = query_v
194                     else:
195 0                         v = query_v.join(",")
196
197 0                 if operator in ("$eq", "$cont"):
198                     # v cannot be a comma separated list, because operator would have been changed to $in
199 0                     db_v = v
200 0                 elif operator == "$ncount":
201                     # v cannot be a comma separated list, because operator would have been changed to $nin
202 0                     db_v = {"$ne": v}
203                 else:
204 0                     db_v = {operator: v}
205
206                 # process the ANYINDEX word at k.
207 0                 kleft, _, kright = k.rpartition(".ANYINDEX.")
208 0                 while kleft:
209 0                     k = kleft
210 0                     db_v = {"$elemMatch": {kright: db_v}}
211 0                     kleft, _, kright = k.rpartition(".ANYINDEX.")
212
213                 # insert in db_filter
214                 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
215 0                 deep_update(db_filter, {k: db_v})
216
217 0             return db_filter
218 0         except Exception as e:
219 0             raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
220                               http_code=HTTPStatus.BAD_REQUEST)
221
222 1     def get_list(self, table, q_filter=None):
223         """
224         Obtain a list of entries matching q_filter
225         :param table: collection or table
226         :param q_filter: Filter
227         :return: a list (can be empty) with the found entries. Raises DbException on error
228         """
229 0         try:
230 0             result = []
231 0             with self.lock:
232 0                 collection = self.db[table]
233 0                 db_filter = self._format_filter(q_filter)
234 0                 rows = collection.find(db_filter)
235 0             for row in rows:
236 0                 result.append(row)
237 0             return result
238 0         except DbException:
239 0             raise
240 0         except Exception as e:  # TODO refine
241 0             raise DbException(e)
242
243 1     def count(self, table, q_filter=None):
244         """
245         Count the number of entries matching q_filter
246         :param table: collection or table
247         :param q_filter: Filter
248         :return: number of entries found (can be zero)
249         :raise: DbException on error
250         """
251 0         try:
252 0             with self.lock:
253 0                 collection = self.db[table]
254 0                 db_filter = self._format_filter(q_filter)
255 0                 count = collection.count(db_filter)
256 0             return count
257 0         except DbException:
258 0             raise
259 0         except Exception as e:  # TODO refine
260 0             raise DbException(e)
261
262 1     def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
263         """
264         Obtain one entry matching q_filter
265         :param table: collection or table
266         :param q_filter: Filter
267         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
268         it raises a DbException
269         :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
270         that it raises a DbException
271         :return: The requested element, or None
272         """
273 0         try:
274 0             db_filter = self._format_filter(q_filter)
275 0             with self.lock:
276 0                 collection = self.db[table]
277 0                 if not (fail_on_empty and fail_on_more):
278 0                     return collection.find_one(db_filter)
279 0                 rows = collection.find(db_filter)
280 0             if rows.count() == 0:
281 0                 if fail_on_empty:
282 0                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
283                                       HTTPStatus.NOT_FOUND)
284 0                 return None
285 0             elif rows.count() > 1:
286 0                 if fail_on_more:
287 0                     raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
288                                       HTTPStatus.CONFLICT)
289 0             return rows[0]
290 0         except Exception as e:  # TODO refine
291 0             raise DbException(e)
292
293 1     def del_list(self, table, q_filter=None):
294         """
295         Deletes all entries that match q_filter
296         :param table: collection or table
297         :param q_filter: Filter
298         :return: Dict with the number of entries deleted
299         """
300 0         try:
301 0             with self.lock:
302 0                 collection = self.db[table]
303 0                 rows = collection.delete_many(self._format_filter(q_filter))
304 0             return {"deleted": rows.deleted_count}
305 0         except DbException:
306 0             raise
307 0         except Exception as e:  # TODO refine
308 0             raise DbException(e)
309
310 1     def del_one(self, table, q_filter=None, fail_on_empty=True):
311         """
312         Deletes one entry that matches q_filter
313         :param table: collection or table
314         :param q_filter: Filter
315         :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
316         which case it raises a DbException
317         :return: Dict with the number of entries deleted
318         """
319 0         try:
320 0             with self.lock:
321 0                 collection = self.db[table]
322 0                 rows = collection.delete_one(self._format_filter(q_filter))
323 0             if rows.deleted_count == 0:
324 0                 if fail_on_empty:
325 0                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
326                                       HTTPStatus.NOT_FOUND)
327 0                 return None
328 0             return {"deleted": rows.deleted_count}
329 0         except Exception as e:  # TODO refine
330 0             raise DbException(e)
331
332 1     def create(self, table, indata):
333         """
334         Add a new entry at database
335         :param table: collection or table
336         :param indata: content to be added
337         :return: database id of the inserted element. Raises a DbException on error
338         """
339 0         try:
340 0             with self.lock:
341 0                 collection = self.db[table]
342 0                 data = collection.insert_one(indata)
343 0             return data.inserted_id
344 0         except Exception as e:  # TODO refine
345 0             raise DbException(e)
346
347 1     def create_list(self, table, indata_list):
348         """
349         Add several entries at once
350         :param table: collection or table
351         :param indata_list: content list to be added.
352         :return: the list of inserted '_id's. Exception on error
353         """
354 0         try:
355 0             for item in indata_list:
356 0                 if item.get("_id") is None:
357 0                     item["_id"] = str(uuid4())
358 0             with self.lock:
359 0                 collection = self.db[table]
360 0                 data = collection.insert_many(indata_list)
361 0             return data.inserted_ids
362 0         except Exception as e:  # TODO refine
363 0             raise DbException(e)
364
365 1     def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None,
366                 push_list=None, pull_list=None):
367         """
368         Modifies an entry at database
369         :param table: collection or table
370         :param q_filter: Filter
371         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
372         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
373         it raises a DbException
374         :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
375                       ignored. If not exist, it is ignored
376         :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
377                      if exist in the array is removed. If not exist, it is ignored
378         :param pull_list: Same as pull but values are arrays where each item is removed from the array
379         :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
380                      is appended to the end of the array
381         :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
382                           whole array
383         :return: Dict with the number of entries modified. None if no matching is found.
384         """
385 0         try:
386 0             db_oper = {}
387 0             if update_dict:
388 0                 db_oper["$set"] = update_dict
389 0             if unset:
390 0                 db_oper["$unset"] = unset
391 0             if pull or pull_list:
392 0                 db_oper["$pull"] = pull or {}
393 0                 if pull_list:
394 0                     db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
395 0             if push or push_list:
396 0                 db_oper["$push"] = push or {}
397 0                 if push_list:
398 0                     db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
399
400 0             with self.lock:
401 0                 collection = self.db[table]
402 0                 rows = collection.update_one(self._format_filter(q_filter), db_oper)
403 0             if rows.matched_count == 0:
404 0                 if fail_on_empty:
405 0                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
406                                       HTTPStatus.NOT_FOUND)
407 0                 return None
408 0             return {"modified": rows.modified_count}
409 0         except Exception as e:  # TODO refine
410 0             raise DbException(e)
411
412 1     def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
413         """
414         Modifies al matching entries at database
415         :param table: collection or table
416         :param q_filter: Filter
417         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
418         :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
419                       ignored. If not exist, it is ignored
420         :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
421                      if exist in the array is removed. If not exist, it is ignored
422         :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
423                      single value is appended to the end of the array
424         :param pull_list: Same as pull but values are arrays where each item is removed from the array
425         :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
426                           whole array
427         :return: Dict with the number of entries modified
428         """
429 0         try:
430 0             db_oper = {}
431 0             if update_dict:
432 0                 db_oper["$set"] = update_dict
433 0             if unset:
434 0                 db_oper["$unset"] = unset
435 0             if pull or pull_list:
436 0                 db_oper["$pull"] = pull or {}
437 0                 if pull_list:
438 0                     db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
439 0             if push or push_list:
440 0                 db_oper["$push"] = push or {}
441 0                 if push_list:
442 0                     db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
443 0             with self.lock:
444 0                 collection = self.db[table]
445 0                 rows = collection.update_many(self._format_filter(q_filter), db_oper)
446 0             return {"modified": rows.modified_count}
447 0         except Exception as e:  # TODO refine
448 0             raise DbException(e)
449
450 1     def replace(self, table, _id, indata, fail_on_empty=True):
451         """
452         Replace the content of an entry
453         :param table: collection or table
454         :param _id: internal database id
455         :param indata: content to replace
456         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
457         it raises a DbException
458         :return: Dict with the number of entries replaced
459         """
460 0         try:
461 0             db_filter = {"_id": _id}
462 0             with self.lock:
463 0                 collection = self.db[table]
464 0                 rows = collection.replace_one(db_filter, indata)
465 0             if rows.matched_count == 0:
466 0                 if fail_on_empty:
467 0                     raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
468 0                 return None
469 0             return {"replaced": rows.modified_count}
470 0         except Exception as e:  # TODO refine
471 0             raise DbException(e)