1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Copyright 2018 Telefonica S.A. |
4 |
|
# |
5 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
6 |
|
# you may not use this file except in compliance with the License. |
7 |
|
# You may obtain a copy of the License at |
8 |
|
# |
9 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
10 |
|
# |
11 |
|
# Unless required by applicable law or agreed to in writing, software |
12 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
13 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
14 |
|
# implied. |
15 |
|
# See the License for the specific language governing permissions and |
16 |
|
# limitations under the License. |
17 |
|
|
18 |
|
|
19 |
1 |
from base64 import b64decode |
20 |
1 |
from copy import deepcopy |
21 |
1 |
from http import HTTPStatus |
22 |
1 |
import logging |
23 |
1 |
from time import sleep, time |
24 |
1 |
from uuid import uuid4 |
25 |
|
|
26 |
1 |
from osm_common.dbbase import DbBase, DbException |
27 |
1 |
from pymongo import errors, MongoClient |
28 |
|
|
29 |
1 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
30 |
|
|
31 |
|
# TODO consider use this decorator for database access retries |
32 |
|
# @retry_mongocall |
33 |
|
# def retry_mongocall(call): |
34 |
|
# def _retry_mongocall(*args, **kwargs): |
35 |
|
# retry = 1 |
36 |
|
# while True: |
37 |
|
# try: |
38 |
|
# return call(*args, **kwargs) |
39 |
|
# except pymongo.AutoReconnect as e: |
40 |
|
# if retry == 4: |
41 |
|
# raise DbException(e) |
42 |
|
# sleep(retry) |
43 |
|
# return _retry_mongocall |
44 |
|
|
45 |
|
|
46 |
1 |
def deep_update(to_update, update_with): |
47 |
|
""" |
48 |
|
Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of |
49 |
|
'update_with' dict recursively |
50 |
|
:param to_update: must be a dictionary to be modified |
51 |
|
:param update_with: must be a dictionary. It is not changed |
52 |
|
:return: to_update |
53 |
|
""" |
54 |
0 |
for key in update_with: |
55 |
0 |
if key in to_update: |
56 |
0 |
if isinstance(to_update[key], dict) and isinstance(update_with[key], dict): |
57 |
0 |
deep_update(to_update[key], update_with[key]) |
58 |
0 |
continue |
59 |
0 |
to_update[key] = deepcopy(update_with[key]) |
60 |
0 |
return to_update |
61 |
|
|
62 |
|
|
63 |
1 |
class DbMongo(DbBase): |
64 |
1 |
conn_initial_timout = 120 |
65 |
1 |
conn_timout = 10 |
66 |
|
|
67 |
1 |
def __init__(self, logger_name="db", lock=False): |
68 |
1 |
super().__init__(logger_name, lock) |
69 |
1 |
self.client = None |
70 |
1 |
self.db = None |
71 |
1 |
self.database_key = None |
72 |
1 |
self.secret_obtained = False |
73 |
|
# ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial |
74 |
|
# In case it is not ready when connected, it should be got later on before any decrypt operation |
75 |
|
|
76 |
1 |
def get_secret_key(self): |
77 |
0 |
if self.secret_obtained: |
78 |
0 |
return |
79 |
|
|
80 |
0 |
self.secret_key = None |
81 |
0 |
if self.database_key: |
82 |
0 |
self.set_secret_key(self.database_key) |
83 |
0 |
version_data = self.get_one( |
84 |
|
"admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True |
85 |
|
) |
86 |
0 |
if version_data and version_data.get("serial"): |
87 |
0 |
self.set_secret_key(b64decode(version_data["serial"])) |
88 |
0 |
self.secret_obtained = True |
89 |
|
|
90 |
1 |
def db_connect(self, config, target_version=None): |
91 |
|
""" |
92 |
|
Connect to database |
93 |
|
:param config: Configuration of database |
94 |
|
:param target_version: if provided it checks if database contains required version, raising exception otherwise. |
95 |
|
:return: None or raises DbException on error |
96 |
|
""" |
97 |
0 |
try: |
98 |
0 |
if "logger_name" in config: |
99 |
0 |
self.logger = logging.getLogger(config["logger_name"]) |
100 |
0 |
master_key = config.get("commonkey") or config.get("masterpassword") |
101 |
0 |
if master_key: |
102 |
0 |
self.database_key = master_key |
103 |
0 |
self.set_secret_key(master_key) |
104 |
0 |
if config.get("uri"): |
105 |
0 |
self.client = MongoClient( |
106 |
|
config["uri"], replicaSet=config.get("replicaset", None) |
107 |
|
) |
108 |
|
# when all modules are ready |
109 |
0 |
self.db = self.client[config["name"]] |
110 |
0 |
if "loglevel" in config: |
111 |
0 |
self.logger.setLevel(getattr(logging, config["loglevel"])) |
112 |
|
# get data to try a connection |
113 |
0 |
now = time() |
114 |
|
while True: |
115 |
0 |
try: |
116 |
0 |
version_data = self.get_one( |
117 |
|
"admin", |
118 |
|
{"_id": "version"}, |
119 |
|
fail_on_empty=False, |
120 |
|
fail_on_more=True, |
121 |
|
) |
122 |
|
# check database status is ok |
123 |
0 |
if version_data and version_data.get("status") != "ENABLED": |
124 |
0 |
raise DbException( |
125 |
|
"Wrong database status '{}'".format( |
126 |
|
version_data.get("status") |
127 |
|
), |
128 |
|
http_code=HTTPStatus.INTERNAL_SERVER_ERROR, |
129 |
|
) |
130 |
|
# check version |
131 |
0 |
db_version = ( |
132 |
|
None if not version_data else version_data.get("version") |
133 |
|
) |
134 |
0 |
if target_version and target_version != db_version: |
135 |
0 |
raise DbException( |
136 |
|
"Invalid database version {}. Expected {}".format( |
137 |
|
db_version, target_version |
138 |
|
) |
139 |
|
) |
140 |
|
# get serial |
141 |
0 |
if version_data and version_data.get("serial"): |
142 |
0 |
self.secret_obtained = True |
143 |
0 |
self.set_secret_key(b64decode(version_data["serial"])) |
144 |
0 |
self.logger.info( |
145 |
|
"Connected to database {} version {}".format( |
146 |
|
config["name"], db_version |
147 |
|
) |
148 |
|
) |
149 |
0 |
return |
150 |
0 |
except errors.ConnectionFailure as e: |
151 |
0 |
if time() - now >= self.conn_initial_timout: |
152 |
0 |
raise |
153 |
0 |
self.logger.info("Waiting to database up {}".format(e)) |
154 |
0 |
sleep(2) |
155 |
0 |
except errors.PyMongoError as e: |
156 |
0 |
raise DbException(e) |
157 |
|
|
158 |
1 |
@staticmethod |
159 |
1 |
def _format_filter(q_filter): |
160 |
|
""" |
161 |
|
Translate query string q_filter into mongo database filter |
162 |
|
:param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and |
163 |
|
differences: |
164 |
|
It accept ".nq" (not equal) in addition to ".neq". |
165 |
|
For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX' |
166 |
|
(two or more matches applies for the same array element). Examples: |
167 |
|
with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]} |
168 |
|
query 'A.B=6' matches because array A contains one element with B equal to 6 |
169 |
|
query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6 |
170 |
|
query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2 |
171 |
|
query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the |
172 |
|
array matching both |
173 |
|
|
174 |
|
Examples of translations from SOL005 to >> mongo # comment |
175 |
|
A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B |
176 |
|
A.cont=B >> A: B |
177 |
|
A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains |
178 |
|
# B or C |
179 |
|
A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]} |
180 |
|
A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list, |
181 |
|
# it must not not contain B |
182 |
|
A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal |
183 |
|
# neither B nor C; or if a list, it must not contain neither B nor C |
184 |
|
A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]} |
185 |
|
A.gt=B >> A: {$gt: B} # must contain key A and greater than B |
186 |
|
A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if |
187 |
|
# an array not contain B |
188 |
|
A.ANYINDEX.B=C >> A: {$elemMatch: {B=C} |
189 |
|
:return: database mongo filter |
190 |
|
""" |
191 |
0 |
try: |
192 |
0 |
db_filter = {} |
193 |
0 |
if not q_filter: |
194 |
0 |
return db_filter |
195 |
0 |
for query_k, query_v in q_filter.items(): |
196 |
0 |
dot_index = query_k.rfind(".") |
197 |
0 |
if dot_index > 1 and query_k[dot_index + 1 :] in ( |
198 |
|
"eq", |
199 |
|
"ne", |
200 |
|
"gt", |
201 |
|
"gte", |
202 |
|
"lt", |
203 |
|
"lte", |
204 |
|
"cont", |
205 |
|
"ncont", |
206 |
|
"neq", |
207 |
|
): |
208 |
0 |
operator = "$" + query_k[dot_index + 1 :] |
209 |
0 |
if operator == "$neq": |
210 |
0 |
operator = "$ne" |
211 |
0 |
k = query_k[:dot_index] |
212 |
|
else: |
213 |
0 |
operator = "$eq" |
214 |
0 |
k = query_k |
215 |
|
|
216 |
0 |
v = query_v |
217 |
0 |
if isinstance(v, list): |
218 |
0 |
if operator in ("$eq", "$cont"): |
219 |
0 |
operator = "$in" |
220 |
0 |
v = query_v |
221 |
0 |
elif operator in ("$ne", "$ncont"): |
222 |
0 |
operator = "$nin" |
223 |
0 |
v = query_v |
224 |
|
else: |
225 |
0 |
v = query_v.join(",") |
226 |
|
|
227 |
0 |
if operator in ("$eq", "$cont"): |
228 |
|
# v cannot be a comma separated list, because operator would have been changed to $in |
229 |
0 |
db_v = v |
230 |
0 |
elif operator == "$ncount": |
231 |
|
# v cannot be a comma separated list, because operator would have been changed to $nin |
232 |
0 |
db_v = {"$ne": v} |
233 |
|
else: |
234 |
0 |
db_v = {operator: v} |
235 |
|
|
236 |
|
# process the ANYINDEX word at k. |
237 |
0 |
kleft, _, kright = k.rpartition(".ANYINDEX.") |
238 |
0 |
while kleft: |
239 |
0 |
k = kleft |
240 |
0 |
db_v = {"$elemMatch": {kright: db_v}} |
241 |
0 |
kleft, _, kright = k.rpartition(".ANYINDEX.") |
242 |
|
|
243 |
|
# insert in db_filter |
244 |
|
# maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8" |
245 |
0 |
deep_update(db_filter, {k: db_v}) |
246 |
|
|
247 |
0 |
return db_filter |
248 |
0 |
except Exception as e: |
249 |
0 |
raise DbException( |
250 |
|
"Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), |
251 |
|
http_code=HTTPStatus.BAD_REQUEST, |
252 |
|
) |
253 |
|
|
254 |
1 |
def get_list(self, table, q_filter=None): |
255 |
|
""" |
256 |
|
Obtain a list of entries matching q_filter |
257 |
|
:param table: collection or table |
258 |
|
:param q_filter: Filter |
259 |
|
:return: a list (can be empty) with the found entries. Raises DbException on error |
260 |
|
""" |
261 |
0 |
try: |
262 |
0 |
result = [] |
263 |
0 |
with self.lock: |
264 |
0 |
collection = self.db[table] |
265 |
0 |
db_filter = self._format_filter(q_filter) |
266 |
0 |
rows = collection.find(db_filter) |
267 |
0 |
for row in rows: |
268 |
0 |
result.append(row) |
269 |
0 |
return result |
270 |
0 |
except DbException: |
271 |
0 |
raise |
272 |
0 |
except Exception as e: # TODO refine |
273 |
0 |
raise DbException(e) |
274 |
|
|
275 |
1 |
def count(self, table, q_filter=None): |
276 |
|
""" |
277 |
|
Count the number of entries matching q_filter |
278 |
|
:param table: collection or table |
279 |
|
:param q_filter: Filter |
280 |
|
:return: number of entries found (can be zero) |
281 |
|
:raise: DbException on error |
282 |
|
""" |
283 |
0 |
try: |
284 |
0 |
with self.lock: |
285 |
0 |
collection = self.db[table] |
286 |
0 |
db_filter = self._format_filter(q_filter) |
287 |
0 |
count = collection.count(db_filter) |
288 |
0 |
return count |
289 |
0 |
except DbException: |
290 |
0 |
raise |
291 |
0 |
except Exception as e: # TODO refine |
292 |
0 |
raise DbException(e) |
293 |
|
|
294 |
1 |
def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True): |
295 |
|
""" |
296 |
|
Obtain one entry matching q_filter |
297 |
|
:param table: collection or table |
298 |
|
:param q_filter: Filter |
299 |
|
:param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case |
300 |
|
it raises a DbException |
301 |
|
:param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so |
302 |
|
that it raises a DbException |
303 |
|
:return: The requested element, or None |
304 |
|
""" |
305 |
0 |
try: |
306 |
0 |
db_filter = self._format_filter(q_filter) |
307 |
0 |
with self.lock: |
308 |
0 |
collection = self.db[table] |
309 |
0 |
if not (fail_on_empty and fail_on_more): |
310 |
0 |
return collection.find_one(db_filter) |
311 |
0 |
rows = collection.find(db_filter) |
312 |
0 |
if rows.count() == 0: |
313 |
0 |
if fail_on_empty: |
314 |
0 |
raise DbException( |
315 |
|
"Not found any {} with filter='{}'".format( |
316 |
|
table[:-1], q_filter |
317 |
|
), |
318 |
|
HTTPStatus.NOT_FOUND, |
319 |
|
) |
320 |
0 |
return None |
321 |
0 |
elif rows.count() > 1: |
322 |
0 |
if fail_on_more: |
323 |
0 |
raise DbException( |
324 |
|
"Found more than one {} with filter='{}'".format( |
325 |
|
table[:-1], q_filter |
326 |
|
), |
327 |
|
HTTPStatus.CONFLICT, |
328 |
|
) |
329 |
0 |
return rows[0] |
330 |
0 |
except Exception as e: # TODO refine |
331 |
0 |
raise DbException(e) |
332 |
|
|
333 |
1 |
def del_list(self, table, q_filter=None): |
334 |
|
""" |
335 |
|
Deletes all entries that match q_filter |
336 |
|
:param table: collection or table |
337 |
|
:param q_filter: Filter |
338 |
|
:return: Dict with the number of entries deleted |
339 |
|
""" |
340 |
0 |
try: |
341 |
0 |
with self.lock: |
342 |
0 |
collection = self.db[table] |
343 |
0 |
rows = collection.delete_many(self._format_filter(q_filter)) |
344 |
0 |
return {"deleted": rows.deleted_count} |
345 |
0 |
except DbException: |
346 |
0 |
raise |
347 |
0 |
except Exception as e: # TODO refine |
348 |
0 |
raise DbException(e) |
349 |
|
|
350 |
1 |
def del_one(self, table, q_filter=None, fail_on_empty=True): |
351 |
|
""" |
352 |
|
Deletes one entry that matches q_filter |
353 |
|
:param table: collection or table |
354 |
|
:param q_filter: Filter |
355 |
|
:param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in |
356 |
|
which case it raises a DbException |
357 |
|
:return: Dict with the number of entries deleted |
358 |
|
""" |
359 |
0 |
try: |
360 |
0 |
with self.lock: |
361 |
0 |
collection = self.db[table] |
362 |
0 |
rows = collection.delete_one(self._format_filter(q_filter)) |
363 |
0 |
if rows.deleted_count == 0: |
364 |
0 |
if fail_on_empty: |
365 |
0 |
raise DbException( |
366 |
|
"Not found any {} with filter='{}'".format( |
367 |
|
table[:-1], q_filter |
368 |
|
), |
369 |
|
HTTPStatus.NOT_FOUND, |
370 |
|
) |
371 |
0 |
return None |
372 |
0 |
return {"deleted": rows.deleted_count} |
373 |
0 |
except Exception as e: # TODO refine |
374 |
0 |
raise DbException(e) |
375 |
|
|
376 |
1 |
def create(self, table, indata): |
377 |
|
""" |
378 |
|
Add a new entry at database |
379 |
|
:param table: collection or table |
380 |
|
:param indata: content to be added |
381 |
|
:return: database id of the inserted element. Raises a DbException on error |
382 |
|
""" |
383 |
0 |
try: |
384 |
0 |
with self.lock: |
385 |
0 |
collection = self.db[table] |
386 |
0 |
data = collection.insert_one(indata) |
387 |
0 |
return data.inserted_id |
388 |
0 |
except Exception as e: # TODO refine |
389 |
0 |
raise DbException(e) |
390 |
|
|
391 |
1 |
def create_list(self, table, indata_list): |
392 |
|
""" |
393 |
|
Add several entries at once |
394 |
|
:param table: collection or table |
395 |
|
:param indata_list: content list to be added. |
396 |
|
:return: the list of inserted '_id's. Exception on error |
397 |
|
""" |
398 |
0 |
try: |
399 |
0 |
for item in indata_list: |
400 |
0 |
if item.get("_id") is None: |
401 |
0 |
item["_id"] = str(uuid4()) |
402 |
0 |
with self.lock: |
403 |
0 |
collection = self.db[table] |
404 |
0 |
data = collection.insert_many(indata_list) |
405 |
0 |
return data.inserted_ids |
406 |
0 |
except Exception as e: # TODO refine |
407 |
0 |
raise DbException(e) |
408 |
|
|
409 |
1 |
def set_one( |
410 |
|
self, |
411 |
|
table, |
412 |
|
q_filter, |
413 |
|
update_dict, |
414 |
|
fail_on_empty=True, |
415 |
|
unset=None, |
416 |
|
pull=None, |
417 |
|
push=None, |
418 |
|
push_list=None, |
419 |
|
pull_list=None, |
420 |
|
upsert=False, |
421 |
|
): |
422 |
|
""" |
423 |
|
Modifies an entry at database |
424 |
|
:param table: collection or table |
425 |
|
:param q_filter: Filter |
426 |
|
:param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value |
427 |
|
:param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case |
428 |
|
it raises a DbException |
429 |
|
:param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is |
430 |
|
ignored. If not exist, it is ignored |
431 |
|
:param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value |
432 |
|
if exist in the array is removed. If not exist, it is ignored |
433 |
|
:param pull_list: Same as pull but values are arrays where each item is removed from the array |
434 |
|
:param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value |
435 |
|
is appended to the end of the array |
436 |
|
:param push_list: Same as push but values are arrays where each item is and appended instead of appending the |
437 |
|
whole array |
438 |
|
:param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created. |
439 |
|
By default this is false. |
440 |
|
:return: Dict with the number of entries modified. None if no matching is found. |
441 |
|
""" |
442 |
0 |
try: |
443 |
0 |
db_oper = {} |
444 |
0 |
if update_dict: |
445 |
0 |
db_oper["$set"] = update_dict |
446 |
0 |
if unset: |
447 |
0 |
db_oper["$unset"] = unset |
448 |
0 |
if pull or pull_list: |
449 |
0 |
db_oper["$pull"] = pull or {} |
450 |
0 |
if pull_list: |
451 |
0 |
db_oper["$pull"].update( |
452 |
|
{k: {"$in": v} for k, v in pull_list.items()} |
453 |
|
) |
454 |
0 |
if push or push_list: |
455 |
0 |
db_oper["$push"] = push or {} |
456 |
0 |
if push_list: |
457 |
0 |
db_oper["$push"].update( |
458 |
|
{k: {"$each": v} for k, v in push_list.items()} |
459 |
|
) |
460 |
|
|
461 |
0 |
with self.lock: |
462 |
0 |
collection = self.db[table] |
463 |
0 |
rows = collection.update_one( |
464 |
|
self._format_filter(q_filter), db_oper, upsert=upsert |
465 |
|
) |
466 |
0 |
if rows.matched_count == 0: |
467 |
0 |
if fail_on_empty: |
468 |
0 |
raise DbException( |
469 |
|
"Not found any {} with filter='{}'".format( |
470 |
|
table[:-1], q_filter |
471 |
|
), |
472 |
|
HTTPStatus.NOT_FOUND, |
473 |
|
) |
474 |
0 |
return None |
475 |
0 |
return {"modified": rows.modified_count} |
476 |
0 |
except Exception as e: # TODO refine |
477 |
0 |
raise DbException(e) |
478 |
|
|
479 |
1 |
def set_list( |
480 |
|
self, |
481 |
|
table, |
482 |
|
q_filter, |
483 |
|
update_dict, |
484 |
|
unset=None, |
485 |
|
pull=None, |
486 |
|
push=None, |
487 |
|
push_list=None, |
488 |
|
pull_list=None, |
489 |
|
): |
490 |
|
""" |
491 |
|
Modifies al matching entries at database |
492 |
|
:param table: collection or table |
493 |
|
:param q_filter: Filter |
494 |
|
:param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value |
495 |
|
:param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is |
496 |
|
ignored. If not exist, it is ignored |
497 |
|
:param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value |
498 |
|
if exist in the array is removed. If not exist, it is ignored |
499 |
|
:param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the |
500 |
|
single value is appended to the end of the array |
501 |
|
:param pull_list: Same as pull but values are arrays where each item is removed from the array |
502 |
|
:param push_list: Same as push but values are arrays where each item is and appended instead of appending the |
503 |
|
whole array |
504 |
|
:return: Dict with the number of entries modified |
505 |
|
""" |
506 |
0 |
try: |
507 |
0 |
db_oper = {} |
508 |
0 |
if update_dict: |
509 |
0 |
db_oper["$set"] = update_dict |
510 |
0 |
if unset: |
511 |
0 |
db_oper["$unset"] = unset |
512 |
0 |
if pull or pull_list: |
513 |
0 |
db_oper["$pull"] = pull or {} |
514 |
0 |
if pull_list: |
515 |
0 |
db_oper["$pull"].update( |
516 |
|
{k: {"$in": v} for k, v in pull_list.items()} |
517 |
|
) |
518 |
0 |
if push or push_list: |
519 |
0 |
db_oper["$push"] = push or {} |
520 |
0 |
if push_list: |
521 |
0 |
db_oper["$push"].update( |
522 |
|
{k: {"$each": v} for k, v in push_list.items()} |
523 |
|
) |
524 |
0 |
with self.lock: |
525 |
0 |
collection = self.db[table] |
526 |
0 |
rows = collection.update_many(self._format_filter(q_filter), db_oper) |
527 |
0 |
return {"modified": rows.modified_count} |
528 |
0 |
except Exception as e: # TODO refine |
529 |
0 |
raise DbException(e) |
530 |
|
|
531 |
1 |
def replace(self, table, _id, indata, fail_on_empty=True): |
532 |
|
""" |
533 |
|
Replace the content of an entry |
534 |
|
:param table: collection or table |
535 |
|
:param _id: internal database id |
536 |
|
:param indata: content to replace |
537 |
|
:param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case |
538 |
|
it raises a DbException |
539 |
|
:return: Dict with the number of entries replaced |
540 |
|
""" |
541 |
0 |
try: |
542 |
0 |
db_filter = {"_id": _id} |
543 |
0 |
with self.lock: |
544 |
0 |
collection = self.db[table] |
545 |
0 |
rows = collection.replace_one(db_filter, indata) |
546 |
0 |
if rows.matched_count == 0: |
547 |
0 |
if fail_on_empty: |
548 |
0 |
raise DbException( |
549 |
|
"Not found any {} with _id='{}'".format(table[:-1], _id), |
550 |
|
HTTPStatus.NOT_FOUND, |
551 |
|
) |
552 |
0 |
return None |
553 |
0 |
return {"replaced": rows.modified_count} |
554 |
0 |
except Exception as e: # TODO refine |
555 |
0 |
raise DbException(e) |