bug559 some modifications
[osm/common.git] / osm_common / dbmongo.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Telefonica S.A.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18
19 import logging
20 from pymongo import MongoClient, errors
21 from osm_common.dbbase import DbException, DbBase
22 from http import HTTPStatus
23 from time import time, sleep
24 from copy import deepcopy
25 from base64 import b64decode
26
27 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28
29 # TODO consider use this decorator for database access retries
30 # @retry_mongocall
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
33 # retry = 1
34 # while True:
35 # try:
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
38 # if retry == 4:
39 # raise DbException(e)
40 # sleep(retry)
41 # return _retry_mongocall
42
43
44 def deep_update(to_update, update_with):
45 """
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
50 :return: to_update
51 """
52 for key in update_with:
53 if key in to_update:
54 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
55 deep_update(to_update[key], update_with[key])
56 continue
57 to_update[key] = deepcopy(update_with[key])
58 return to_update
59
60
61 class DbMongo(DbBase):
62 conn_initial_timout = 120
63 conn_timout = 10
64
65 def __init__(self, logger_name='db'):
66 super().__init__(logger_name)
67 self.client = None
68 self.db = None
69
70 def db_connect(self, config, target_version=None):
71 """
72 Connect to database
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
76 """
77 try:
78 if "logger_name" in config:
79 self.logger = logging.getLogger(config["logger_name"])
80 self.master_password = config.get("masterpassword")
81 self.client = MongoClient(config["host"], config["port"])
82 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
83 # when all modules are ready
84 self.db = self.client[config["name"]]
85 if "loglevel" in config:
86 self.logger.setLevel(getattr(logging, config['loglevel']))
87 # get data to try a connection
88 now = time()
89 while True:
90 try:
91 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
92 # check database status is ok
93 if version_data and version_data.get("status") != 'ENABLED':
94 raise DbException("Wrong database status '{}'".format(version_data.get("status")),
95 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
96 # check version
97 db_version = None if not version_data else version_data.get("version")
98 if target_version and target_version != db_version:
99 raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
100 # get serial
101 if version_data and version_data.get("serial"):
102 self.set_secret_key(b64decode(version_data["serial"]))
103 self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
104 return
105 except errors.ConnectionFailure as e:
106 if time() - now >= self.conn_initial_timout:
107 raise
108 self.logger.info("Waiting to database up {}".format(e))
109 sleep(2)
110 except errors.PyMongoError as e:
111 raise DbException(e)
112
113 @staticmethod
114 def _format_filter(q_filter):
115 """
116 Translate query string q_filter into mongo database filter
117 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
118 differences:
119 It accept ".nq" (not equal) in addition to ".neq".
120 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
121 (two or more matches applies for the same array element). Examples:
122 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
123 query 'A.B=6' matches because array A contains one element with B equal to 6
124 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
125 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
126 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
127 array matching both
128
129 Examples of translations from SOL005 to >> mongo # comment
130 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
131 A.cont=B >> A: B
132 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
133 # B or C
134 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
135 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
136 # it must not not contain B
137 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
138 # neither B nor C; or if a list, it must not contain neither B nor C
139 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
140 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
141 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
142 # an array not contain B
143 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
144 :return: database mongo filter
145 """
146 try:
147 db_filter = {}
148 if not q_filter:
149 return db_filter
150 for query_k, query_v in q_filter.items():
151 dot_index = query_k.rfind(".")
152 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
153 "ncont", "neq"):
154 operator = "$" + query_k[dot_index + 1:]
155 if operator == "$neq":
156 operator = "$ne"
157 k = query_k[:dot_index]
158 else:
159 operator = "$eq"
160 k = query_k
161
162 v = query_v
163 if isinstance(v, list):
164 if operator in ("$eq", "$cont"):
165 operator = "$in"
166 v = query_v
167 elif operator in ("$ne", "$ncont"):
168 operator = "$nin"
169 v = query_v
170 else:
171 v = query_v.join(",")
172
173 if operator in ("$eq", "$cont"):
174 # v cannot be a comma separated list, because operator would have been changed to $in
175 db_v = v
176 elif operator == "$ncount":
177 # v cannot be a comma separated list, because operator would have been changed to $nin
178 db_v = {"$ne": v}
179 else:
180 db_v = {operator: v}
181
182 # process the ANYINDEX word at k.
183 kleft, _, kright = k.rpartition(".ANYINDEX.")
184 while kleft:
185 k = kleft
186 db_v = {"$elemMatch": {kright: db_v}}
187 kleft, _, kright = k.rpartition(".ANYINDEX.")
188
189 # insert in db_filter
190 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
191 deep_update(db_filter, {k: db_v})
192
193 return db_filter
194 except Exception as e:
195 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
196 http_code=HTTPStatus.BAD_REQUEST)
197
198 def get_list(self, table, q_filter=None):
199 """
200 Obtain a list of entries matching q_filter
201 :param table: collection or table
202 :param q_filter: Filter
203 :return: a list (can be empty) with the found entries. Raises DbException on error
204 """
205 try:
206 result = []
207 collection = self.db[table]
208 db_filter = self._format_filter(q_filter)
209 rows = collection.find(db_filter)
210 for row in rows:
211 result.append(row)
212 return result
213 except DbException:
214 raise
215 except Exception as e: # TODO refine
216 raise DbException(e)
217
218 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
219 """
220 Obtain one entry matching q_filter
221 :param table: collection or table
222 :param q_filter: Filter
223 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
224 it raises a DbException
225 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
226 that it raises a DbException
227 :return: The requested element, or None
228 """
229 try:
230 db_filter = self._format_filter(q_filter)
231 collection = self.db[table]
232 if not (fail_on_empty and fail_on_more):
233 return collection.find_one(db_filter)
234 rows = collection.find(db_filter)
235 if rows.count() == 0:
236 if fail_on_empty:
237 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
238 HTTPStatus.NOT_FOUND)
239 return None
240 elif rows.count() > 1:
241 if fail_on_more:
242 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
243 HTTPStatus.CONFLICT)
244 return rows[0]
245 except Exception as e: # TODO refine
246 raise DbException(e)
247
248 def del_list(self, table, q_filter=None):
249 """
250 Deletes all entries that match q_filter
251 :param table: collection or table
252 :param q_filter: Filter
253 :return: Dict with the number of entries deleted
254 """
255 try:
256 collection = self.db[table]
257 rows = collection.delete_many(self._format_filter(q_filter))
258 return {"deleted": rows.deleted_count}
259 except DbException:
260 raise
261 except Exception as e: # TODO refine
262 raise DbException(e)
263
264 def del_one(self, table, q_filter=None, fail_on_empty=True):
265 """
266 Deletes one entry that matches q_filter
267 :param table: collection or table
268 :param q_filter: Filter
269 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
270 which case it raises a DbException
271 :return: Dict with the number of entries deleted
272 """
273 try:
274 collection = self.db[table]
275 rows = collection.delete_one(self._format_filter(q_filter))
276 if rows.deleted_count == 0:
277 if fail_on_empty:
278 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
279 HTTPStatus.NOT_FOUND)
280 return None
281 return {"deleted": rows.deleted_count}
282 except Exception as e: # TODO refine
283 raise DbException(e)
284
285 def create(self, table, indata):
286 """
287 Add a new entry at database
288 :param table: collection or table
289 :param indata: content to be added
290 :return: database id of the inserted element. Raises a DbException on error
291 """
292 try:
293 collection = self.db[table]
294 data = collection.insert_one(indata)
295 return data.inserted_id
296 except Exception as e: # TODO refine
297 raise DbException(e)
298
299 def set_one(self, table, q_filter, update_dict, fail_on_empty=True):
300 """
301 Modifies an entry at database
302 :param table: collection or table
303 :param q_filter: Filter
304 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
305 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
306 it raises a DbException
307 :return: Dict with the number of entries modified. None if no matching is found.
308 """
309 try:
310 collection = self.db[table]
311 rows = collection.update_one(self._format_filter(q_filter), {"$set": update_dict})
312 if rows.matched_count == 0:
313 if fail_on_empty:
314 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
315 HTTPStatus.NOT_FOUND)
316 return None
317 return {"modified": rows.modified_count}
318 except Exception as e: # TODO refine
319 raise DbException(e)
320
321 def set_list(self, table, q_filter, update_dict):
322 """
323 Modifies al matching entries at database
324 :param table: collection or table
325 :param q_filter: Filter
326 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
327 :return: Dict with the number of entries modified
328 """
329 try:
330 collection = self.db[table]
331 rows = collection.update_many(self._format_filter(q_filter), {"$set": update_dict})
332 return {"modified": rows.modified_count}
333 except Exception as e: # TODO refine
334 raise DbException(e)
335
336 def replace(self, table, _id, indata, fail_on_empty=True):
337 """
338 Replace the content of an entry
339 :param table: collection or table
340 :param _id: internal database id
341 :param indata: content to replace
342 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
343 it raises a DbException
344 :return: Dict with the number of entries replaced
345 """
346 try:
347 db_filter = {"_id": _id}
348 collection = self.db[table]
349 rows = collection.replace_one(db_filter, indata)
350 if rows.matched_count == 0:
351 if fail_on_empty:
352 raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
353 return None
354 return {"replaced": rows.modified_count}
355 except Exception as e: # TODO refine
356 raise DbException(e)