7fc29dc1f30c2504412549e8076e5bb68f75f2fa
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
26 from uuid
import uuid4
28 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30 # TODO consider use this decorator for database access retries
32 # def retry_mongocall(call):
33 # def _retry_mongocall(*args, **kwargs):
37 # return call(*args, **kwargs)
38 # except pymongo.AutoReconnect as e:
40 # raise DbException(e)
42 # return _retry_mongocall
45 def deep_update(to_update
, update_with
):
47 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
48 'update_with' dict recursively
49 :param to_update: must be a dictionary to be modified
50 :param update_with: must be a dictionary. It is not changed
53 for key
in update_with
:
55 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
56 deep_update(to_update
[key
], update_with
[key
])
58 to_update
[key
] = deepcopy(update_with
[key
])
62 class DbMongo(DbBase
):
63 conn_initial_timout
= 120
66 def __init__(self
, logger_name
='db', lock
=False):
67 super().__init
__(logger_name
, lock
)
70 self
.database_key
= None
71 self
.secret_obtained
= False
72 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
73 # In case it is not ready when connected, it should be got later on before any decrypt operation
75 def get_secret_key(self
):
76 if self
.secret_obtained
:
79 self
.secret_key
= None
81 self
.set_secret_key(self
.database_key
)
82 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
83 if version_data
and version_data
.get("serial"):
84 self
.set_secret_key(b64decode(version_data
["serial"]))
85 self
.secret_obtained
= True
87 def db_connect(self
, config
, target_version
=None):
90 :param config: Configuration of database
91 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
92 :return: None or raises DbException on error
95 if "logger_name" in config
:
96 self
.logger
= logging
.getLogger(config
["logger_name"])
97 master_key
= config
.get("commonkey") or config
.get("masterpassword")
99 self
.database_key
= master_key
100 self
.set_secret_key(master_key
)
101 if config
.get("uri"):
102 self
.client
= MongoClient(config
["uri"], replicaSet
=config
.get("replicaset", None))
104 self
.client
= MongoClient(config
["host"], config
["port"], replicaSet
=config
.get("replicaset", None))
105 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
106 # when all modules are ready
107 self
.db
= self
.client
[config
["name"]]
108 if "loglevel" in config
:
109 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
110 # get data to try a connection
114 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
115 # check database status is ok
116 if version_data
and version_data
.get("status") != 'ENABLED':
117 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
118 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
120 db_version
= None if not version_data
else version_data
.get("version")
121 if target_version
and target_version
!= db_version
:
122 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
124 if version_data
and version_data
.get("serial"):
125 self
.secret_obtained
= True
126 self
.set_secret_key(b64decode(version_data
["serial"]))
127 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
129 except errors
.ConnectionFailure
as e
:
130 if time() - now
>= self
.conn_initial_timout
:
132 self
.logger
.info("Waiting to database up {}".format(e
))
134 except errors
.PyMongoError
as e
:
138 def _format_filter(q_filter
):
140 Translate query string q_filter into mongo database filter
141 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
143 It accept ".nq" (not equal) in addition to ".neq".
144 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
145 (two or more matches applies for the same array element). Examples:
146 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
147 query 'A.B=6' matches because array A contains one element with B equal to 6
148 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
149 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
150 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
153 Examples of translations from SOL005 to >> mongo # comment
154 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
156 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
158 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
159 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
160 # it must not not contain B
161 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
162 # neither B nor C; or if a list, it must not contain neither B nor C
163 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
164 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
165 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
166 # an array not contain B
167 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
168 :return: database mongo filter
174 for query_k
, query_v
in q_filter
.items():
175 dot_index
= query_k
.rfind(".")
176 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
178 operator
= "$" + query_k
[dot_index
+ 1:]
179 if operator
== "$neq":
181 k
= query_k
[:dot_index
]
187 if isinstance(v
, list):
188 if operator
in ("$eq", "$cont"):
191 elif operator
in ("$ne", "$ncont"):
195 v
= query_v
.join(",")
197 if operator
in ("$eq", "$cont"):
198 # v cannot be a comma separated list, because operator would have been changed to $in
200 elif operator
== "$ncount":
201 # v cannot be a comma separated list, because operator would have been changed to $nin
206 # process the ANYINDEX word at k.
207 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
210 db_v
= {"$elemMatch": {kright
: db_v
}}
211 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
213 # insert in db_filter
214 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
215 deep_update(db_filter
, {k
: db_v
})
218 except Exception as e
:
219 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
220 http_code
=HTTPStatus
.BAD_REQUEST
)
222 def get_list(self
, table
, q_filter
=None):
224 Obtain a list of entries matching q_filter
225 :param table: collection or table
226 :param q_filter: Filter
227 :return: a list (can be empty) with the found entries. Raises DbException on error
232 collection
= self
.db
[table
]
233 db_filter
= self
._format
_filter
(q_filter
)
234 rows
= collection
.find(db_filter
)
240 except Exception as e
: # TODO refine
243 def count(self
, table
, q_filter
=None):
245 Count the number of entries matching q_filter
246 :param table: collection or table
247 :param q_filter: Filter
248 :return: number of entries found (can be zero)
249 :raise: DbException on error
253 collection
= self
.db
[table
]
254 db_filter
= self
._format
_filter
(q_filter
)
255 count
= collection
.count(db_filter
)
259 except Exception as e
: # TODO refine
262 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
264 Obtain one entry matching q_filter
265 :param table: collection or table
266 :param q_filter: Filter
267 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
268 it raises a DbException
269 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
270 that it raises a DbException
271 :return: The requested element, or None
274 db_filter
= self
._format
_filter
(q_filter
)
276 collection
= self
.db
[table
]
277 if not (fail_on_empty
and fail_on_more
):
278 return collection
.find_one(db_filter
)
279 rows
= collection
.find(db_filter
)
280 if rows
.count() == 0:
282 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
283 HTTPStatus
.NOT_FOUND
)
285 elif rows
.count() > 1:
287 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
290 except Exception as e
: # TODO refine
293 def del_list(self
, table
, q_filter
=None):
295 Deletes all entries that match q_filter
296 :param table: collection or table
297 :param q_filter: Filter
298 :return: Dict with the number of entries deleted
302 collection
= self
.db
[table
]
303 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
304 return {"deleted": rows
.deleted_count
}
307 except Exception as e
: # TODO refine
310 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
312 Deletes one entry that matches q_filter
313 :param table: collection or table
314 :param q_filter: Filter
315 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
316 which case it raises a DbException
317 :return: Dict with the number of entries deleted
321 collection
= self
.db
[table
]
322 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
323 if rows
.deleted_count
== 0:
325 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
326 HTTPStatus
.NOT_FOUND
)
328 return {"deleted": rows
.deleted_count
}
329 except Exception as e
: # TODO refine
332 def create(self
, table
, indata
):
334 Add a new entry at database
335 :param table: collection or table
336 :param indata: content to be added
337 :return: database id of the inserted element. Raises a DbException on error
341 collection
= self
.db
[table
]
342 data
= collection
.insert_one(indata
)
343 return data
.inserted_id
344 except Exception as e
: # TODO refine
347 def create_list(self
, table
, indata_list
):
349 Add several entries at once
350 :param table: collection or table
351 :param indata_list: content list to be added.
352 :return: the list of inserted '_id's. Exception on error
355 for item
in indata_list
:
356 if item
.get("_id") is None:
357 item
["_id"] = str(uuid4())
359 collection
= self
.db
[table
]
360 data
= collection
.insert_many(indata_list
)
361 return data
.inserted_ids
362 except Exception as e
: # TODO refine
365 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True, unset
=None, pull
=None, push
=None,
366 push_list
=None, pull_list
=None, upsert
=False):
368 Modifies an entry at database
369 :param table: collection or table
370 :param q_filter: Filter
371 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
372 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
373 it raises a DbException
374 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
375 ignored. If not exist, it is ignored
376 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
377 if exist in the array is removed. If not exist, it is ignored
378 :param pull_list: Same as pull but values are arrays where each item is removed from the array
379 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
380 is appended to the end of the array
381 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
383 :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
384 By default this is false.
385 :return: Dict with the number of entries modified. None if no matching is found.
390 db_oper
["$set"] = update_dict
392 db_oper
["$unset"] = unset
393 if pull
or pull_list
:
394 db_oper
["$pull"] = pull
or {}
396 db_oper
["$pull"].update({k
: {"$in": v
} for k
, v
in pull_list
.items()})
397 if push
or push_list
:
398 db_oper
["$push"] = push
or {}
400 db_oper
["$push"].update({k
: {"$each": v
} for k
, v
in push_list
.items()})
403 collection
= self
.db
[table
]
404 rows
= collection
.update_one(self
._format
_filter
(q_filter
), db_oper
, upsert
=upsert
)
405 if rows
.matched_count
== 0:
407 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
408 HTTPStatus
.NOT_FOUND
)
410 return {"modified": rows
.modified_count
}
411 except Exception as e
: # TODO refine
414 def set_list(self
, table
, q_filter
, update_dict
, unset
=None, pull
=None, push
=None, push_list
=None, pull_list
=None):
416 Modifies al matching entries at database
417 :param table: collection or table
418 :param q_filter: Filter
419 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
420 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
421 ignored. If not exist, it is ignored
422 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
423 if exist in the array is removed. If not exist, it is ignored
424 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
425 single value is appended to the end of the array
426 :param pull_list: Same as pull but values are arrays where each item is removed from the array
427 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
429 :return: Dict with the number of entries modified
434 db_oper
["$set"] = update_dict
436 db_oper
["$unset"] = unset
437 if pull
or pull_list
:
438 db_oper
["$pull"] = pull
or {}
440 db_oper
["$pull"].update({k
: {"$in": v
} for k
, v
in pull_list
.items()})
441 if push
or push_list
:
442 db_oper
["$push"] = push
or {}
444 db_oper
["$push"].update({k
: {"$each": v
} for k
, v
in push_list
.items()})
446 collection
= self
.db
[table
]
447 rows
= collection
.update_many(self
._format
_filter
(q_filter
), db_oper
)
448 return {"modified": rows
.modified_count
}
449 except Exception as e
: # TODO refine
452 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
454 Replace the content of an entry
455 :param table: collection or table
456 :param _id: internal database id
457 :param indata: content to replace
458 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
459 it raises a DbException
460 :return: Dict with the number of entries replaced
463 db_filter
= {"_id": _id
}
465 collection
= self
.db
[table
]
466 rows
= collection
.replace_one(db_filter
, indata
)
467 if rows
.matched_count
== 0:
469 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
471 return {"replaced": rows
.modified_count
}
472 except Exception as e
: # TODO refine