0f89c96cad130eab869ac442ec35f5e8f115aecc
[osm/common.git] / osm_common / dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 import logging
20 from pymongo import MongoClient, errors
21 from osm_common.dbbase import DbException, DbBase
22 from http import HTTPStatus
23 from time import time, sleep
24 from copy import deepcopy
25 from base64 import b64decode
26
27 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28
29 # TODO consider use this decorator for database access retries
30 # @retry_mongocall
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
33 # retry = 1
34 # while True:
35 # try:
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
38 # if retry == 4:
39 # raise DbException(e)
40 # sleep(retry)
41 # return _retry_mongocall
42
43
44 def deep_update(to_update, update_with):
45 """
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
50 :return: to_update
51 """
52 for key in update_with:
53 if key in to_update:
54 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
55 deep_update(to_update[key], update_with[key])
56 continue
57 to_update[key] = deepcopy(update_with[key])
58 return to_update
59
60
61 class DbMongo(DbBase):
62 conn_initial_timout = 120
63 conn_timout = 10
64
65 def __init__(self, logger_name='db', lock=False):
66 super().__init__(logger_name, lock)
67 self.client = None
68 self.db = None
69
70 def db_connect(self, config, target_version=None):
71 """
72 Connect to database
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
76 """
77 try:
78 if "logger_name" in config:
79 self.logger = logging.getLogger(config["logger_name"])
80 master_key = config.get("commonkey") or config.get("masterpassword")
81 if master_key:
82 self.set_secret_key(master_key)
83 self.client = MongoClient(config["host"], config["port"])
84 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
85 # when all modules are ready
86 self.db = self.client[config["name"]]
87 if "loglevel" in config:
88 self.logger.setLevel(getattr(logging, config['loglevel']))
89 # get data to try a connection
90 now = time()
91 while True:
92 try:
93 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
94 # check database status is ok
95 if version_data and version_data.get("status") != 'ENABLED':
96 raise DbException("Wrong database status '{}'".format(version_data.get("status")),
97 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
98 # check version
99 db_version = None if not version_data else version_data.get("version")
100 if target_version and target_version != db_version:
101 raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
102 # get serial
103 if version_data and version_data.get("serial"):
104 self.set_secret_key(b64decode(version_data["serial"]))
105 self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
106 return
107 except errors.ConnectionFailure as e:
108 if time() - now >= self.conn_initial_timout:
109 raise
110 self.logger.info("Waiting to database up {}".format(e))
111 sleep(2)
112 except errors.PyMongoError as e:
113 raise DbException(e)
114
115 @staticmethod
116 def _format_filter(q_filter):
117 """
118 Translate query string q_filter into mongo database filter
119 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
120 differences:
121 It accept ".nq" (not equal) in addition to ".neq".
122 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
123 (two or more matches applies for the same array element). Examples:
124 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
125 query 'A.B=6' matches because array A contains one element with B equal to 6
126 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
127 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
128 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
129 array matching both
130
131 Examples of translations from SOL005 to >> mongo # comment
132 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
133 A.cont=B >> A: B
134 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
135 # B or C
136 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
137 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
138 # it must not not contain B
139 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
140 # neither B nor C; or if a list, it must not contain neither B nor C
141 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
142 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
143 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
144 # an array not contain B
145 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
146 :return: database mongo filter
147 """
148 try:
149 db_filter = {}
150 if not q_filter:
151 return db_filter
152 for query_k, query_v in q_filter.items():
153 dot_index = query_k.rfind(".")
154 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
155 "ncont", "neq"):
156 operator = "$" + query_k[dot_index + 1:]
157 if operator == "$neq":
158 operator = "$ne"
159 k = query_k[:dot_index]
160 else:
161 operator = "$eq"
162 k = query_k
163
164 v = query_v
165 if isinstance(v, list):
166 if operator in ("$eq", "$cont"):
167 operator = "$in"
168 v = query_v
169 elif operator in ("$ne", "$ncont"):
170 operator = "$nin"
171 v = query_v
172 else:
173 v = query_v.join(",")
174
175 if operator in ("$eq", "$cont"):
176 # v cannot be a comma separated list, because operator would have been changed to $in
177 db_v = v
178 elif operator == "$ncount":
179 # v cannot be a comma separated list, because operator would have been changed to $nin
180 db_v = {"$ne": v}
181 else:
182 db_v = {operator: v}
183
184 # process the ANYINDEX word at k.
185 kleft, _, kright = k.rpartition(".ANYINDEX.")
186 while kleft:
187 k = kleft
188 db_v = {"$elemMatch": {kright: db_v}}
189 kleft, _, kright = k.rpartition(".ANYINDEX.")
190
191 # insert in db_filter
192 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
193 deep_update(db_filter, {k: db_v})
194
195 return db_filter
196 except Exception as e:
197 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
198 http_code=HTTPStatus.BAD_REQUEST)
199
200 def get_list(self, table, q_filter=None):
201 """
202 Obtain a list of entries matching q_filter
203 :param table: collection or table
204 :param q_filter: Filter
205 :return: a list (can be empty) with the found entries. Raises DbException on error
206 """
207 try:
208 result = []
209 with self.lock:
210 collection = self.db[table]
211 db_filter = self._format_filter(q_filter)
212 rows = collection.find(db_filter)
213 for row in rows:
214 result.append(row)
215 return result
216 except DbException:
217 raise
218 except Exception as e: # TODO refine
219 raise DbException(e)
220
221 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
222 """
223 Obtain one entry matching q_filter
224 :param table: collection or table
225 :param q_filter: Filter
226 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
227 it raises a DbException
228 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
229 that it raises a DbException
230 :return: The requested element, or None
231 """
232 try:
233 db_filter = self._format_filter(q_filter)
234 with self.lock:
235 collection = self.db[table]
236 if not (fail_on_empty and fail_on_more):
237 return collection.find_one(db_filter)
238 rows = collection.find(db_filter)
239 if rows.count() == 0:
240 if fail_on_empty:
241 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
242 HTTPStatus.NOT_FOUND)
243 return None
244 elif rows.count() > 1:
245 if fail_on_more:
246 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
247 HTTPStatus.CONFLICT)
248 return rows[0]
249 except Exception as e: # TODO refine
250 raise DbException(e)
251
252 def del_list(self, table, q_filter=None):
253 """
254 Deletes all entries that match q_filter
255 :param table: collection or table
256 :param q_filter: Filter
257 :return: Dict with the number of entries deleted
258 """
259 try:
260 with self.lock:
261 collection = self.db[table]
262 rows = collection.delete_many(self._format_filter(q_filter))
263 return {"deleted": rows.deleted_count}
264 except DbException:
265 raise
266 except Exception as e: # TODO refine
267 raise DbException(e)
268
269 def del_one(self, table, q_filter=None, fail_on_empty=True):
270 """
271 Deletes one entry that matches q_filter
272 :param table: collection or table
273 :param q_filter: Filter
274 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
275 which case it raises a DbException
276 :return: Dict with the number of entries deleted
277 """
278 try:
279 with self.lock:
280 collection = self.db[table]
281 rows = collection.delete_one(self._format_filter(q_filter))
282 if rows.deleted_count == 0:
283 if fail_on_empty:
284 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
285 HTTPStatus.NOT_FOUND)
286 return None
287 return {"deleted": rows.deleted_count}
288 except Exception as e: # TODO refine
289 raise DbException(e)
290
291 def create(self, table, indata):
292 """
293 Add a new entry at database
294 :param table: collection or table
295 :param indata: content to be added
296 :return: database id of the inserted element. Raises a DbException on error
297 """
298 try:
299 with self.lock:
300 collection = self.db[table]
301 data = collection.insert_one(indata)
302 return data.inserted_id
303 except Exception as e: # TODO refine
304 raise DbException(e)
305
306 def set_one(self, table, q_filter, update_dict, fail_on_empty=True):
307 """
308 Modifies an entry at database
309 :param table: collection or table
310 :param q_filter: Filter
311 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
312 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
313 it raises a DbException
314 :return: Dict with the number of entries modified. None if no matching is found.
315 """
316 try:
317 with self.lock:
318 collection = self.db[table]
319 rows = collection.update_one(self._format_filter(q_filter), {"$set": update_dict})
320 if rows.matched_count == 0:
321 if fail_on_empty:
322 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
323 HTTPStatus.NOT_FOUND)
324 return None
325 return {"modified": rows.modified_count}
326 except Exception as e: # TODO refine
327 raise DbException(e)
328
329 def set_list(self, table, q_filter, update_dict):
330 """
331 Modifies al matching entries at database
332 :param table: collection or table
333 :param q_filter: Filter
334 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
335 :return: Dict with the number of entries modified
336 """
337 try:
338 with self.lock:
339 collection = self.db[table]
340 rows = collection.update_many(self._format_filter(q_filter), {"$set": update_dict})
341 return {"modified": rows.modified_count}
342 except Exception as e: # TODO refine
343 raise DbException(e)
344
345 def replace(self, table, _id, indata, fail_on_empty=True):
346 """
347 Replace the content of an entry
348 :param table: collection or table
349 :param _id: internal database id
350 :param indata: content to replace
351 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
352 it raises a DbException
353 :return: Dict with the number of entries replaced
354 """
355 try:
356 db_filter = {"_id": _id}
357 with self.lock:
358 collection = self.db[table]
359 rows = collection.replace_one(db_filter, indata)
360 if rows.matched_count == 0:
361 if fail_on_empty:
362 raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
363 return None
364 return {"replaced": rows.modified_count}
365 except Exception as e: # TODO refine
366 raise DbException(e)