blob: 81405214aa6f5a90cc567431541a0489ecc21da3 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Telefonica S.A.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14# implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
tierno5c012612018-04-19 16:01:59 +020018
19import logging
20from pymongo import MongoClient, errors
tierno3054f782018-04-25 16:59:53 +020021from osm_common.dbbase import DbException, DbBase
tierno5c012612018-04-19 16:01:59 +020022from http import HTTPStatus
23from time import time, sleep
tierno6ec13b02018-05-14 11:24:57 +020024from copy import deepcopy
tierno136f2952018-10-19 13:01:03 +020025from base64 import b64decode
tierno5c012612018-04-19 16:01:59 +020026
27__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
28
29# TODO consider use this decorator for database access retries
30# @retry_mongocall
31# def retry_mongocall(call):
32# def _retry_mongocall(*args, **kwargs):
33# retry = 1
34# while True:
35# try:
36# return call(*args, **kwargs)
37# except pymongo.AutoReconnect as e:
38# if retry == 4:
tierno87858ca2018-10-08 16:30:15 +020039# raise DbException(e)
tierno5c012612018-04-19 16:01:59 +020040# sleep(retry)
41# return _retry_mongocall
42
43
tierno6ec13b02018-05-14 11:24:57 +020044def deep_update(to_update, update_with):
45 """
tierno87858ca2018-10-08 16:30:15 +020046 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
tierno6ec13b02018-05-14 11:24:57 +020048 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
50 :return: to_update
51 """
52 for key in update_with:
53 if key in to_update:
54 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
55 deep_update(to_update[key], update_with[key])
56 continue
57 to_update[key] = deepcopy(update_with[key])
58 return to_update
59
60
tierno5c012612018-04-19 16:01:59 +020061class DbMongo(DbBase):
62 conn_initial_timout = 120
63 conn_timout = 10
64
tierno1e9a3292018-11-05 18:18:45 +010065 def __init__(self, logger_name='db', lock=False):
66 super().__init__(logger_name, lock)
tierno87858ca2018-10-08 16:30:15 +020067 self.client = None
68 self.db = None
tierno5c012612018-04-19 16:01:59 +020069
tierno136f2952018-10-19 13:01:03 +020070 def db_connect(self, config, target_version=None):
tierno87858ca2018-10-08 16:30:15 +020071 """
72 Connect to database
73 :param config: Configuration of database
tierno136f2952018-10-19 13:01:03 +020074 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
tierno87858ca2018-10-08 16:30:15 +020075 :return: None or raises DbException on error
76 """
tierno5c012612018-04-19 16:01:59 +020077 try:
78 if "logger_name" in config:
79 self.logger = logging.getLogger(config["logger_name"])
tiernocfc52722018-10-23 11:41:49 +020080 self.master_password = config.get("masterpassword")
tierno5c012612018-04-19 16:01:59 +020081 self.client = MongoClient(config["host"], config["port"])
tiernocfc52722018-10-23 11:41:49 +020082 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
83 # when all modules are ready
tierno5c012612018-04-19 16:01:59 +020084 self.db = self.client[config["name"]]
85 if "loglevel" in config:
86 self.logger.setLevel(getattr(logging, config['loglevel']))
87 # get data to try a connection
88 now = time()
89 while True:
90 try:
tierno136f2952018-10-19 13:01:03 +020091 version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
92 # check database status is ok
93 if version_data and version_data.get("status") != 'ENABLED':
94 raise DbException("Wrong database status '{}'".format(version_data.get("status")),
95 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
96 # check version
97 db_version = None if not version_data else version_data.get("version")
98 if target_version and target_version != db_version:
99 raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
100 # get serial
101 if version_data and version_data.get("serial"):
102 self.set_secret_key(b64decode(version_data["serial"]))
103 self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
tierno5c012612018-04-19 16:01:59 +0200104 return
105 except errors.ConnectionFailure as e:
106 if time() - now >= self.conn_initial_timout:
107 raise
108 self.logger.info("Waiting to database up {}".format(e))
109 sleep(2)
110 except errors.PyMongoError as e:
tierno87858ca2018-10-08 16:30:15 +0200111 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200112
113 @staticmethod
tierno6ec13b02018-05-14 11:24:57 +0200114 def _format_filter(q_filter):
115 """
tierno87858ca2018-10-08 16:30:15 +0200116 Translate query string q_filter into mongo database filter
tierno6ec13b02018-05-14 11:24:57 +0200117 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
tiernoaf241062018-08-31 14:53:15 +0200118 differences:
119 It accept ".nq" (not equal) in addition to ".neq".
120 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
121 (two or more matches applies for the same array element). Examples:
122 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
123 query 'A.B=6' matches because array A contains one element with B equal to 6
124 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
125 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
126 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
127 array matching both
128
129 Examples of translations from SOL005 to >> mongo # comment
130 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
131 A.cont=B >> A: B
132 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
133 # B or C
134 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
135 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
136 # it must not not contain B
137 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
138 # neither B nor C; or if a list, it must not contain neither B nor C
139 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
140 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
141 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
142 # an array not contain B
143 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
tierno6ec13b02018-05-14 11:24:57 +0200144 :return: database mongo filter
145 """
tierno5c012612018-04-19 16:01:59 +0200146 try:
147 db_filter = {}
tierno87858ca2018-10-08 16:30:15 +0200148 if not q_filter:
149 return db_filter
tierno6ec13b02018-05-14 11:24:57 +0200150 for query_k, query_v in q_filter.items():
tierno5c012612018-04-19 16:01:59 +0200151 dot_index = query_k.rfind(".")
152 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
153 "ncont", "neq"):
tiernob20a9022018-05-22 12:07:05 +0200154 operator = "$" + query_k[dot_index + 1:]
tierno5c012612018-04-19 16:01:59 +0200155 if operator == "$neq":
156 operator = "$ne"
157 k = query_k[:dot_index]
158 else:
159 operator = "$eq"
160 k = query_k
161
162 v = query_v
163 if isinstance(v, list):
164 if operator in ("$eq", "$cont"):
165 operator = "$in"
166 v = query_v
167 elif operator in ("$ne", "$ncont"):
168 operator = "$nin"
169 v = query_v
170 else:
171 v = query_v.join(",")
172
173 if operator in ("$eq", "$cont"):
174 # v cannot be a comma separated list, because operator would have been changed to $in
tierno6ec13b02018-05-14 11:24:57 +0200175 db_v = v
tierno5c012612018-04-19 16:01:59 +0200176 elif operator == "$ncount":
177 # v cannot be a comma separated list, because operator would have been changed to $nin
tierno6ec13b02018-05-14 11:24:57 +0200178 db_v = {"$ne": v}
tierno5c012612018-04-19 16:01:59 +0200179 else:
tierno6ec13b02018-05-14 11:24:57 +0200180 db_v = {operator: v}
181
tiernoaf241062018-08-31 14:53:15 +0200182 # process the ANYINDEX word at k.
tierno6ec13b02018-05-14 11:24:57 +0200183 kleft, _, kright = k.rpartition(".ANYINDEX.")
184 while kleft:
185 k = kleft
186 db_v = {"$elemMatch": {kright: db_v}}
187 kleft, _, kright = k.rpartition(".ANYINDEX.")
188
189 # insert in db_filter
190 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
191 deep_update(db_filter, {k: db_v})
tierno5c012612018-04-19 16:01:59 +0200192
193 return db_filter
194 except Exception as e:
195 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
196 http_code=HTTPStatus.BAD_REQUEST)
197
tierno87858ca2018-10-08 16:30:15 +0200198 def get_list(self, table, q_filter=None):
199 """
200 Obtain a list of entries matching q_filter
201 :param table: collection or table
202 :param q_filter: Filter
203 :return: a list (can be empty) with the found entries. Raises DbException on error
204 """
tierno5c012612018-04-19 16:01:59 +0200205 try:
tiernob20a9022018-05-22 12:07:05 +0200206 result = []
tierno1e9a3292018-11-05 18:18:45 +0100207 with self.lock:
208 collection = self.db[table]
209 db_filter = self._format_filter(q_filter)
210 rows = collection.find(db_filter)
tierno5c012612018-04-19 16:01:59 +0200211 for row in rows:
tiernob20a9022018-05-22 12:07:05 +0200212 result.append(row)
213 return result
tierno5c012612018-04-19 16:01:59 +0200214 except DbException:
215 raise
216 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200217 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200218
tierno87858ca2018-10-08 16:30:15 +0200219 def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
220 """
221 Obtain one entry matching q_filter
222 :param table: collection or table
223 :param q_filter: Filter
224 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
225 it raises a DbException
226 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
227 that it raises a DbException
228 :return: The requested element, or None
229 """
tierno5c012612018-04-19 16:01:59 +0200230 try:
tierno87858ca2018-10-08 16:30:15 +0200231 db_filter = self._format_filter(q_filter)
tierno1e9a3292018-11-05 18:18:45 +0100232 with self.lock:
233 collection = self.db[table]
234 if not (fail_on_empty and fail_on_more):
235 return collection.find_one(db_filter)
236 rows = collection.find(db_filter)
tierno5c012612018-04-19 16:01:59 +0200237 if rows.count() == 0:
238 if fail_on_empty:
tierno87858ca2018-10-08 16:30:15 +0200239 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
tierno5c012612018-04-19 16:01:59 +0200240 HTTPStatus.NOT_FOUND)
241 return None
242 elif rows.count() > 1:
243 if fail_on_more:
tierno87858ca2018-10-08 16:30:15 +0200244 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
tierno5c012612018-04-19 16:01:59 +0200245 HTTPStatus.CONFLICT)
246 return rows[0]
247 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200248 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200249
tierno87858ca2018-10-08 16:30:15 +0200250 def del_list(self, table, q_filter=None):
251 """
252 Deletes all entries that match q_filter
253 :param table: collection or table
254 :param q_filter: Filter
255 :return: Dict with the number of entries deleted
256 """
tierno5c012612018-04-19 16:01:59 +0200257 try:
tierno1e9a3292018-11-05 18:18:45 +0100258 with self.lock:
259 collection = self.db[table]
260 rows = collection.delete_many(self._format_filter(q_filter))
tierno5c012612018-04-19 16:01:59 +0200261 return {"deleted": rows.deleted_count}
262 except DbException:
263 raise
264 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200265 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200266
tierno87858ca2018-10-08 16:30:15 +0200267 def del_one(self, table, q_filter=None, fail_on_empty=True):
268 """
269 Deletes one entry that matches q_filter
270 :param table: collection or table
271 :param q_filter: Filter
272 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
273 which case it raises a DbException
274 :return: Dict with the number of entries deleted
275 """
tierno5c012612018-04-19 16:01:59 +0200276 try:
tierno1e9a3292018-11-05 18:18:45 +0100277 with self.lock:
278 collection = self.db[table]
279 rows = collection.delete_one(self._format_filter(q_filter))
tierno5c012612018-04-19 16:01:59 +0200280 if rows.deleted_count == 0:
281 if fail_on_empty:
tierno87858ca2018-10-08 16:30:15 +0200282 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
tierno5c012612018-04-19 16:01:59 +0200283 HTTPStatus.NOT_FOUND)
284 return None
285 return {"deleted": rows.deleted_count}
286 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200287 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200288
289 def create(self, table, indata):
tierno87858ca2018-10-08 16:30:15 +0200290 """
291 Add a new entry at database
292 :param table: collection or table
293 :param indata: content to be added
294 :return: database id of the inserted element. Raises a DbException on error
295 """
tierno5c012612018-04-19 16:01:59 +0200296 try:
tierno1e9a3292018-11-05 18:18:45 +0100297 with self.lock:
298 collection = self.db[table]
299 data = collection.insert_one(indata)
tierno5c012612018-04-19 16:01:59 +0200300 return data.inserted_id
301 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200302 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200303
tierno87858ca2018-10-08 16:30:15 +0200304 def set_one(self, table, q_filter, update_dict, fail_on_empty=True):
305 """
306 Modifies an entry at database
307 :param table: collection or table
308 :param q_filter: Filter
309 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
310 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
311 it raises a DbException
312 :return: Dict with the number of entries modified. None if no matching is found.
313 """
tierno5c012612018-04-19 16:01:59 +0200314 try:
tierno1e9a3292018-11-05 18:18:45 +0100315 with self.lock:
316 collection = self.db[table]
317 rows = collection.update_one(self._format_filter(q_filter), {"$set": update_dict})
tierno3054f782018-04-25 16:59:53 +0200318 if rows.matched_count == 0:
tierno5c012612018-04-19 16:01:59 +0200319 if fail_on_empty:
tierno87858ca2018-10-08 16:30:15 +0200320 raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
tierno5c012612018-04-19 16:01:59 +0200321 HTTPStatus.NOT_FOUND)
322 return None
tierno3054f782018-04-25 16:59:53 +0200323 return {"modified": rows.modified_count}
tierno5c012612018-04-19 16:01:59 +0200324 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200325 raise DbException(e)
tierno5c012612018-04-19 16:01:59 +0200326
tierno87858ca2018-10-08 16:30:15 +0200327 def set_list(self, table, q_filter, update_dict):
328 """
329 Modifies al matching entries at database
330 :param table: collection or table
331 :param q_filter: Filter
332 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
333 :return: Dict with the number of entries modified
334 """
tierno5c012612018-04-19 16:01:59 +0200335 try:
tierno1e9a3292018-11-05 18:18:45 +0100336 with self.lock:
337 collection = self.db[table]
338 rows = collection.update_many(self._format_filter(q_filter), {"$set": update_dict})
tierno87858ca2018-10-08 16:30:15 +0200339 return {"modified": rows.modified_count}
340 except Exception as e: # TODO refine
341 raise DbException(e)
342
343 def replace(self, table, _id, indata, fail_on_empty=True):
344 """
345 Replace the content of an entry
346 :param table: collection or table
347 :param _id: internal database id
348 :param indata: content to replace
349 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
350 it raises a DbException
351 :return: Dict with the number of entries replaced
352 """
353 try:
354 db_filter = {"_id": _id}
tierno1e9a3292018-11-05 18:18:45 +0100355 with self.lock:
356 collection = self.db[table]
357 rows = collection.replace_one(db_filter, indata)
tierno5c012612018-04-19 16:01:59 +0200358 if rows.matched_count == 0:
359 if fail_on_empty:
tierno87858ca2018-10-08 16:30:15 +0200360 raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
tierno5c012612018-04-19 16:01:59 +0200361 return None
tierno3054f782018-04-25 16:59:53 +0200362 return {"replaced": rows.modified_count}
tierno5c012612018-04-19 16:01:59 +0200363 except Exception as e: # TODO refine
tierno87858ca2018-10-08 16:30:15 +0200364 raise DbException(e)