6159e6a3a306670258ef24d425562a9700740886
[osm/common.git] / osm_common / dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 import logging
20 from pymongo import MongoClient, errors
21 from osm_common.dbbase import DbException, DbBase
22 from http import HTTPStatus
23 from time import time, sleep
24 from copy import deepcopy
25 from base64 import b64decode
26
27 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28
29 # TODO consider use this decorator for database access retries
30 # @retry_mongocall
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
33 # retry = 1
34 # while True:
35 # try:
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
38 # if retry == 4:
39 # raise DbException(e)
40 # sleep(retry)
41 # return _retry_mongocall
42
43
44 def deep_update(to_update, update_with):
45 """
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
50 :return: to_update
51 """
52 for key in update_with:
53 if key in to_update:
54 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
55 deep_update(to_update[key], update_with[key])
56 continue
57 to_update[key] = deepcopy(update_with[key])
58 return to_update
59
60
61 class DbMongo(DbBase):
62 conn_initial_timout = 120
63 conn_timout = 10
64
65 def __init__(self, logger_name='db', lock=False):
66 super().__init__(logger_name, lock)
67 self.client = None
68 self.db = None
69
70 def db_connect(self, config, target_version=None):
71 """
72 Connect to database
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
76 """
77 try:
78 if "logger_name" in config:
79 self.logger = logging.getLogger(config["logger_name"])
80 master_key = config.get("commonkey") or config.get("masterpassword")
81 if master_key:
82 self.set_secret_key(master_key)
83 if config.get("url"):
84 self.client = MongoClient(config["url"])
85 else:
86 self.client = MongoClient(config["host"], config["port"])
87 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
88 # when all modules are ready
89 self.db = self.client[config["name"]]
90 if "loglevel" in config:
91 self.logger.setLevel(getattr(logging, config['loglevel']))
92 # get data to try a connection
93 now = time()
94 while True:
95 try:
96 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
97 # check database status is ok
98 if version_data and version_data.get("status") != 'ENABLED':
99 raise DbException("Wrong database status '{}'".format(version_data.get("status")),
100 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
101 # check version
102 db_version = None if not version_data else version_data.get("version")
103 if target_version and target_version != db_version:
104 raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
105 # get serial
106 if version_data and version_data.get("serial"):
107 self.set_secret_key(b64decode(version_data["serial"]))
108 self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
109 return
110 except errors.ConnectionFailure as e:
111 if time() - now >= self.conn_initial_timout:
112 raise
113 self.logger.info("Waiting to database up {}".format(e))
114 sleep(2)
115 except errors.PyMongoError as e:
116 raise DbException(e)
117
118 @staticmethod
119 def _format_filter(q_filter):
120 """
121 Translate query string q_filter into mongo database filter
122 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
123 differences:
124 It accept ".nq" (not equal) in addition to ".neq".
125 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
126 (two or more matches applies for the same array element). Examples:
127 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
128 query 'A.B=6' matches because array A contains one element with B equal to 6
129 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
130 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
131 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
132 array matching both
133
134 Examples of translations from SOL005 to >> mongo # comment
135 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
136 A.cont=B >> A: B
137 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
138 # B or C
139 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
140 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
141 # it must not not contain B
142 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
143 # neither B nor C; or if a list, it must not contain neither B nor C
144 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
145 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
146 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
147 # an array not contain B
148 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
149 :return: database mongo filter
150 """
151 try:
152 db_filter = {}
153 if not q_filter:
154 return db_filter
155 for query_k, query_v in q_filter.items():
156 dot_index = query_k.rfind(".")
157 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
158 "ncont", "neq"):
159 operator = "$" + query_k[dot_index + 1:]
160 if operator == "$neq":
161 operator = "$ne"
162 k = query_k[:dot_index]
163 else:
164 operator = "$eq"
165 k = query_k
166
167 v = query_v
168 if isinstance(v, list):
169 if operator in ("$eq", "$cont"):
170 operator = "$in"
171 v = query_v
172 elif operator in ("$ne", "$ncont"):
173 operator = "$nin"
174 v = query_v
175 else:
176 v = query_v.join(",")
177
178 if operator in ("$eq", "$cont"):
179 # v cannot be a comma separated list, because operator would have been changed to $in
180 db_v = v
181 elif operator == "$ncount":
182 # v cannot be a comma separated list, because operator would have been changed to $nin
183 db_v = {"$ne": v}
184 else:
185 db_v = {operator: v}
186
187 # process the ANYINDEX word at k.
188 kleft, _, kright = k.rpartition(".ANYINDEX.")
189 while kleft:
190 k = kleft
191 db_v = {"$elemMatch": {kright: db_v}}
192 kleft, _, kright = k.rpartition(".ANYINDEX.")
193
194 # insert in db_filter
195 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
196 deep_update(db_filter, {k: db_v})
197
198 return db_filter
199 except Exception as e:
200 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
201 http_code=HTTPStatus.BAD_REQUEST)
202
203 def get_list(self, table, q_filter=None):
204 """
205 Obtain a list of entries matching q_filter
206 :param table: collection or table
207 :param q_filter: Filter
208 :return: a list (can be empty) with the found entries. Raises DbException on error
209 """
210 try:
211 result = []
212 with self.lock:
213 collection = self.db[table]
214 db_filter = self._format_filter(q_filter)
215 rows = collection.find(db_filter)
216 for row in rows:
217 result.append(row)
218 return result
219 except DbException:
220 raise
221 except Exception as e: # TODO refine
222 raise DbException(e)
223
224 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
225 """
226 Obtain one entry matching q_filter
227 :param table: collection or table
228 :param q_filter: Filter
229 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
230 it raises a DbException
231 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
232 that it raises a DbException
233 :return: The requested element, or None
234 """
235 try:
236 db_filter = self._format_filter(q_filter)
237 with self.lock:
238 collection = self.db[table]
239 if not (fail_on_empty and fail_on_more):
240 return collection.find_one(db_filter)
241 rows = collection.find(db_filter)
242 if rows.count() == 0:
243 if fail_on_empty:
244 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
245 HTTPStatus.NOT_FOUND)
246 return None
247 elif rows.count() > 1:
248 if fail_on_more:
249 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
250 HTTPStatus.CONFLICT)
251 return rows[0]
252 except Exception as e: # TODO refine
253 raise DbException(e)
254
255 def del_list(self, table, q_filter=None):
256 """
257 Deletes all entries that match q_filter
258 :param table: collection or table
259 :param q_filter: Filter
260 :return: Dict with the number of entries deleted
261 """
262 try:
263 with self.lock:
264 collection = self.db[table]
265 rows = collection.delete_many(self._format_filter(q_filter))
266 return {"deleted": rows.deleted_count}
267 except DbException:
268 raise
269 except Exception as e: # TODO refine
270 raise DbException(e)
271
272 def del_one(self, table, q_filter=None, fail_on_empty=True):
273 """
274 Deletes one entry that matches q_filter
275 :param table: collection or table
276 :param q_filter: Filter
277 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
278 which case it raises a DbException
279 :return: Dict with the number of entries deleted
280 """
281 try:
282 with self.lock:
283 collection = self.db[table]
284 rows = collection.delete_one(self._format_filter(q_filter))
285 if rows.deleted_count == 0:
286 if fail_on_empty:
287 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
288 HTTPStatus.NOT_FOUND)
289 return None
290 return {"deleted": rows.deleted_count}
291 except Exception as e: # TODO refine
292 raise DbException(e)
293
294 def create(self, table, indata):
295 """
296 Add a new entry at database
297 :param table: collection or table
298 :param indata: content to be added
299 :return: database id of the inserted element. Raises a DbException on error
300 """
301 try:
302 with self.lock:
303 collection = self.db[table]
304 data = collection.insert_one(indata)
305 return data.inserted_id
306 except Exception as e: # TODO refine
307 raise DbException(e)
308
309 def set_one(self, table, q_filter, update_dict, fail_on_empty=True):
310 """
311 Modifies an entry at database
312 :param table: collection or table
313 :param q_filter: Filter
314 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
315 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
316 it raises a DbException
317 :return: Dict with the number of entries modified. None if no matching is found.
318 """
319 try:
320 with self.lock:
321 collection = self.db[table]
322 rows = collection.update_one(self._format_filter(q_filter), {"$set": update_dict})
323 if rows.matched_count == 0:
324 if fail_on_empty:
325 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
326 HTTPStatus.NOT_FOUND)
327 return None
328 return {"modified": rows.modified_count}
329 except Exception as e: # TODO refine
330 raise DbException(e)
331
332 def set_list(self, table, q_filter, update_dict):
333 """
334 Modifies al matching entries at database
335 :param table: collection or table
336 :param q_filter: Filter
337 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
338 :return: Dict with the number of entries modified
339 """
340 try:
341 with self.lock:
342 collection = self.db[table]
343 rows = collection.update_many(self._format_filter(q_filter), {"$set": update_dict})
344 return {"modified": rows.modified_count}
345 except Exception as e: # TODO refine
346 raise DbException(e)
347
348 def replace(self, table, _id, indata, fail_on_empty=True):
349 """
350 Replace the content of an entry
351 :param table: collection or table
352 :param _id: internal database id
353 :param indata: content to replace
354 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
355 it raises a DbException
356 :return: Dict with the number of entries replaced
357 """
358 try:
359 db_filter = {"_id": _id}
360 with self.lock:
361 collection = self.db[table]
362 rows = collection.replace_one(db_filter, indata)
363 if rows.matched_count == 0:
364 if fail_on_empty:
365 raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
366 return None
367 return {"replaced": rows.modified_count}
368 except Exception as e: # TODO refine
369 raise DbException(e)