b0686aa713fa12c4cded0da81dfb0ce81a6b2b40
[osm/common.git] / osm_common / dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 import logging
20 from pymongo import MongoClient, errors
21 from osm_common.dbbase import DbException, DbBase
22 from http import HTTPStatus
23 from time import time, sleep
24 from copy import deepcopy
25 from base64 import b64decode
26 from uuid import uuid4
27
28 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29
30 # TODO consider use this decorator for database access retries
31 # @retry_mongocall
32 # def retry_mongocall(call):
33 # def _retry_mongocall(*args, **kwargs):
34 # retry = 1
35 # while True:
36 # try:
37 # return call(*args, **kwargs)
38 # except pymongo.AutoReconnect as e:
39 # if retry == 4:
40 # raise DbException(e)
41 # sleep(retry)
42 # return _retry_mongocall
43
44
45 def deep_update(to_update, update_with):
46 """
47 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
48 'update_with' dict recursively
49 :param to_update: must be a dictionary to be modified
50 :param update_with: must be a dictionary. It is not changed
51 :return: to_update
52 """
53 for key in update_with:
54 if key in to_update:
55 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
56 deep_update(to_update[key], update_with[key])
57 continue
58 to_update[key] = deepcopy(update_with[key])
59 return to_update
60
61
62 class DbMongo(DbBase):
63 conn_initial_timout = 120
64 conn_timout = 10
65
66 def __init__(self, logger_name='db', lock=False):
67 super().__init__(logger_name, lock)
68 self.client = None
69 self.db = None
70 self.database_key = None
71 self.secret_obtained = False
72 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
73 # In case it is not ready when connected, it should be got later on before any decrypt operation
74
75 def get_secret_key(self):
76 if self.secret_obtained:
77 return
78
79 self.secret_key = None
80 if self.database_key:
81 self.set_secret_key(self.database_key)
82 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
83 if version_data and version_data.get("serial"):
84 self.set_secret_key(b64decode(version_data["serial"]))
85 self.secret_obtained = True
86
87 def db_connect(self, config, target_version=None):
88 """
89 Connect to database
90 :param config: Configuration of database
91 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
92 :return: None or raises DbException on error
93 """
94 try:
95 if "logger_name" in config:
96 self.logger = logging.getLogger(config["logger_name"])
97 master_key = config.get("commonkey") or config.get("masterpassword")
98 if master_key:
99 self.database_key = master_key
100 self.set_secret_key(master_key)
101 if config.get("uri"):
102 self.client = MongoClient(config["uri"])
103 else:
104 self.client = MongoClient(config["host"], config["port"])
105 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
106 # when all modules are ready
107 self.db = self.client[config["name"]]
108 if "loglevel" in config:
109 self.logger.setLevel(getattr(logging, config['loglevel']))
110 # get data to try a connection
111 now = time()
112 while True:
113 try:
114 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
115 # check database status is ok
116 if version_data and version_data.get("status") != 'ENABLED':
117 raise DbException("Wrong database status '{}'".format(version_data.get("status")),
118 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
119 # check version
120 db_version = None if not version_data else version_data.get("version")
121 if target_version and target_version != db_version:
122 raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
123 # get serial
124 if version_data and version_data.get("serial"):
125 self.secret_obtained = True
126 self.set_secret_key(b64decode(version_data["serial"]))
127 self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
128 return
129 except errors.ConnectionFailure as e:
130 if time() - now >= self.conn_initial_timout:
131 raise
132 self.logger.info("Waiting to database up {}".format(e))
133 sleep(2)
134 except errors.PyMongoError as e:
135 raise DbException(e)
136
137 @staticmethod
138 def _format_filter(q_filter):
139 """
140 Translate query string q_filter into mongo database filter
141 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
142 differences:
143 It accept ".nq" (not equal) in addition to ".neq".
144 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
145 (two or more matches applies for the same array element). Examples:
146 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
147 query 'A.B=6' matches because array A contains one element with B equal to 6
148 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
149 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
150 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
151 array matching both
152
153 Examples of translations from SOL005 to >> mongo # comment
154 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
155 A.cont=B >> A: B
156 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
157 # B or C
158 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
159 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
160 # it must not not contain B
161 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
162 # neither B nor C; or if a list, it must not contain neither B nor C
163 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
164 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
165 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
166 # an array not contain B
167 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
168 :return: database mongo filter
169 """
170 try:
171 db_filter = {}
172 if not q_filter:
173 return db_filter
174 for query_k, query_v in q_filter.items():
175 dot_index = query_k.rfind(".")
176 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
177 "ncont", "neq"):
178 operator = "$" + query_k[dot_index + 1:]
179 if operator == "$neq":
180 operator = "$ne"
181 k = query_k[:dot_index]
182 else:
183 operator = "$eq"
184 k = query_k
185
186 v = query_v
187 if isinstance(v, list):
188 if operator in ("$eq", "$cont"):
189 operator = "$in"
190 v = query_v
191 elif operator in ("$ne", "$ncont"):
192 operator = "$nin"
193 v = query_v
194 else:
195 v = query_v.join(",")
196
197 if operator in ("$eq", "$cont"):
198 # v cannot be a comma separated list, because operator would have been changed to $in
199 db_v = v
200 elif operator == "$ncount":
201 # v cannot be a comma separated list, because operator would have been changed to $nin
202 db_v = {"$ne": v}
203 else:
204 db_v = {operator: v}
205
206 # process the ANYINDEX word at k.
207 kleft, _, kright = k.rpartition(".ANYINDEX.")
208 while kleft:
209 k = kleft
210 db_v = {"$elemMatch": {kright: db_v}}
211 kleft, _, kright = k.rpartition(".ANYINDEX.")
212
213 # insert in db_filter
214 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
215 deep_update(db_filter, {k: db_v})
216
217 return db_filter
218 except Exception as e:
219 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
220 http_code=HTTPStatus.BAD_REQUEST)
221
222 def get_list(self, table, q_filter=None):
223 """
224 Obtain a list of entries matching q_filter
225 :param table: collection or table
226 :param q_filter: Filter
227 :return: a list (can be empty) with the found entries. Raises DbException on error
228 """
229 try:
230 result = []
231 with self.lock:
232 collection = self.db[table]
233 db_filter = self._format_filter(q_filter)
234 rows = collection.find(db_filter)
235 for row in rows:
236 result.append(row)
237 return result
238 except DbException:
239 raise
240 except Exception as e: # TODO refine
241 raise DbException(e)
242
243 def count(self, table, q_filter=None):
244 """
245 Count the number of entries matching q_filter
246 :param table: collection or table
247 :param q_filter: Filter
248 :return: number of entries found (can be zero)
249 :raise: DbException on error
250 """
251 try:
252 with self.lock:
253 collection = self.db[table]
254 db_filter = self._format_filter(q_filter)
255 count = collection.count(db_filter)
256 return count
257 except DbException:
258 raise
259 except Exception as e: # TODO refine
260 raise DbException(e)
261
262 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
263 """
264 Obtain one entry matching q_filter
265 :param table: collection or table
266 :param q_filter: Filter
267 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
268 it raises a DbException
269 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
270 that it raises a DbException
271 :return: The requested element, or None
272 """
273 try:
274 db_filter = self._format_filter(q_filter)
275 with self.lock:
276 collection = self.db[table]
277 if not (fail_on_empty and fail_on_more):
278 return collection.find_one(db_filter)
279 rows = collection.find(db_filter)
280 if rows.count() == 0:
281 if fail_on_empty:
282 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
283 HTTPStatus.NOT_FOUND)
284 return None
285 elif rows.count() > 1:
286 if fail_on_more:
287 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
288 HTTPStatus.CONFLICT)
289 return rows[0]
290 except Exception as e: # TODO refine
291 raise DbException(e)
292
293 def del_list(self, table, q_filter=None):
294 """
295 Deletes all entries that match q_filter
296 :param table: collection or table
297 :param q_filter: Filter
298 :return: Dict with the number of entries deleted
299 """
300 try:
301 with self.lock:
302 collection = self.db[table]
303 rows = collection.delete_many(self._format_filter(q_filter))
304 return {"deleted": rows.deleted_count}
305 except DbException:
306 raise
307 except Exception as e: # TODO refine
308 raise DbException(e)
309
310 def del_one(self, table, q_filter=None, fail_on_empty=True):
311 """
312 Deletes one entry that matches q_filter
313 :param table: collection or table
314 :param q_filter: Filter
315 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
316 which case it raises a DbException
317 :return: Dict with the number of entries deleted
318 """
319 try:
320 with self.lock:
321 collection = self.db[table]
322 rows = collection.delete_one(self._format_filter(q_filter))
323 if rows.deleted_count == 0:
324 if fail_on_empty:
325 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
326 HTTPStatus.NOT_FOUND)
327 return None
328 return {"deleted": rows.deleted_count}
329 except Exception as e: # TODO refine
330 raise DbException(e)
331
332 def create(self, table, indata):
333 """
334 Add a new entry at database
335 :param table: collection or table
336 :param indata: content to be added
337 :return: database id of the inserted element. Raises a DbException on error
338 """
339 try:
340 with self.lock:
341 collection = self.db[table]
342 data = collection.insert_one(indata)
343 return data.inserted_id
344 except Exception as e: # TODO refine
345 raise DbException(e)
346
347 def create_list(self, table, indata_list):
348 """
349 Add several entries at once
350 :param table: collection or table
351 :param indata_list: content list to be added.
352 :return: the list of inserted '_id's. Exception on error
353 """
354 try:
355 for item in indata_list:
356 if item.get("_id") is None:
357 item["_id"] = str(uuid4())
358 with self.lock:
359 collection = self.db[table]
360 data = collection.insert_many(indata_list)
361 return data.inserted_ids
362 except Exception as e: # TODO refine
363 raise DbException(e)
364
365 def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None,
366 push_list=None, pull_list=None):
367 """
368 Modifies an entry at database
369 :param table: collection or table
370 :param q_filter: Filter
371 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
372 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
373 it raises a DbException
374 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
375 ignored. If not exist, it is ignored
376 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
377 if exist in the array is removed. If not exist, it is ignored
378 :param pull_list: Same as pull but values are arrays where each item is removed from the array
379 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
380 is appended to the end of the array
381 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
382 whole array
383 :return: Dict with the number of entries modified. None if no matching is found.
384 """
385 try:
386 db_oper = {}
387 if update_dict:
388 db_oper["$set"] = update_dict
389 if unset:
390 db_oper["$unset"] = unset
391 if pull or pull_list:
392 db_oper["$pull"] = pull or {}
393 if pull_list:
394 db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
395 if push or push_list:
396 db_oper["$push"] = push or {}
397 if push_list:
398 db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
399
400 with self.lock:
401 collection = self.db[table]
402 rows = collection.update_one(self._format_filter(q_filter), db_oper)
403 if rows.matched_count == 0:
404 if fail_on_empty:
405 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
406 HTTPStatus.NOT_FOUND)
407 return None
408 return {"modified": rows.modified_count}
409 except Exception as e: # TODO refine
410 raise DbException(e)
411
412 def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
413 """
414 Modifies al matching entries at database
415 :param table: collection or table
416 :param q_filter: Filter
417 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
418 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
419 ignored. If not exist, it is ignored
420 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
421 if exist in the array is removed. If not exist, it is ignored
422 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
423 single value is appended to the end of the array
424 :param pull_list: Same as pull but values are arrays where each item is removed from the array
425 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
426 whole array
427 :return: Dict with the number of entries modified
428 """
429 try:
430 db_oper = {}
431 if update_dict:
432 db_oper["$set"] = update_dict
433 if unset:
434 db_oper["$unset"] = unset
435 if pull or pull_list:
436 db_oper["$pull"] = pull or {}
437 if pull_list:
438 db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
439 if push or push_list:
440 db_oper["$push"] = push or {}
441 if push_list:
442 db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
443 with self.lock:
444 collection = self.db[table]
445 rows = collection.update_many(self._format_filter(q_filter), db_oper)
446 return {"modified": rows.modified_count}
447 except Exception as e: # TODO refine
448 raise DbException(e)
449
450 def replace(self, table, _id, indata, fail_on_empty=True):
451 """
452 Replace the content of an entry
453 :param table: collection or table
454 :param _id: internal database id
455 :param indata: content to replace
456 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
457 it raises a DbException
458 :return: Dict with the number of entries replaced
459 """
460 try:
461 db_filter = {"_id": _id}
462 with self.lock:
463 collection = self.db[table]
464 rows = collection.replace_one(db_filter, indata)
465 if rows.matched_count == 0:
466 if fail_on_empty:
467 raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
468 return None
469 return {"replaced": rows.modified_count}
470 except Exception as e: # TODO refine
471 raise DbException(e)