1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 from base64
import b64decode
20 from copy
import deepcopy
21 from http
import HTTPStatus
23 from time
import sleep
, time
24 from uuid
import uuid4
26 from osm_common
.dbbase
import DbBase
, DbException
27 from pymongo
import errors
, MongoClient
29 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
31 # TODO consider use this decorator for database access retries
33 # def retry_mongocall(call):
34 # def _retry_mongocall(*args, **kwargs):
38 # return call(*args, **kwargs)
39 # except pymongo.AutoReconnect as e:
41 # raise DbException(e)
43 # return _retry_mongocall
46 def deep_update(to_update
, update_with
):
48 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
49 'update_with' dict recursively
50 :param to_update: must be a dictionary to be modified
51 :param update_with: must be a dictionary. It is not changed
54 for key
in update_with
:
56 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
57 deep_update(to_update
[key
], update_with
[key
])
59 to_update
[key
] = deepcopy(update_with
[key
])
63 class DbMongo(DbBase
):
64 conn_initial_timout
= 120
67 def __init__(self
, logger_name
="db", lock
=False):
68 super().__init
__(logger_name
, lock
)
71 self
.database_key
= None
72 self
.secret_obtained
= False
73 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
74 # In case it is not ready when connected, it should be got later on before any decrypt operation
76 def get_secret_key(self
):
77 if self
.secret_obtained
:
80 self
.secret_key
= None
82 self
.set_secret_key(self
.database_key
)
83 version_data
= self
.get_one(
84 "admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True
86 if version_data
and version_data
.get("serial"):
87 self
.set_secret_key(b64decode(version_data
["serial"]))
88 self
.secret_obtained
= True
90 def db_connect(self
, config
, target_version
=None):
93 :param config: Configuration of database
94 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
95 :return: None or raises DbException on error
98 if "logger_name" in config
:
99 self
.logger
= logging
.getLogger(config
["logger_name"])
100 master_key
= config
.get("commonkey") or config
.get("masterpassword")
102 self
.database_key
= master_key
103 self
.set_secret_key(master_key
)
104 if config
.get("uri"):
105 self
.client
= MongoClient(
106 config
["uri"], replicaSet
=config
.get("replicaset", None)
109 self
.client
= MongoClient(
112 replicaSet
=config
.get("replicaset", None),
114 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
115 # when all modules are ready
116 self
.db
= self
.client
[config
["name"]]
117 if "loglevel" in config
:
118 self
.logger
.setLevel(getattr(logging
, config
["loglevel"]))
119 # get data to try a connection
123 version_data
= self
.get_one(
129 # check database status is ok
130 if version_data
and version_data
.get("status") != "ENABLED":
132 "Wrong database status '{}'".format(
133 version_data
.get("status")
135 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
139 None if not version_data
else version_data
.get("version")
141 if target_version
and target_version
!= db_version
:
143 "Invalid database version {}. Expected {}".format(
144 db_version
, target_version
148 if version_data
and version_data
.get("serial"):
149 self
.secret_obtained
= True
150 self
.set_secret_key(b64decode(version_data
["serial"]))
152 "Connected to database {} version {}".format(
153 config
["name"], db_version
157 except errors
.ConnectionFailure
as e
:
158 if time() - now
>= self
.conn_initial_timout
:
160 self
.logger
.info("Waiting to database up {}".format(e
))
162 except errors
.PyMongoError
as e
:
166 def _format_filter(q_filter
):
168 Translate query string q_filter into mongo database filter
169 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
171 It accept ".nq" (not equal) in addition to ".neq".
172 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
173 (two or more matches applies for the same array element). Examples:
174 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
175 query 'A.B=6' matches because array A contains one element with B equal to 6
176 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
177 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
178 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
181 Examples of translations from SOL005 to >> mongo # comment
182 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
184 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
186 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
187 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
188 # it must not not contain B
189 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
190 # neither B nor C; or if a list, it must not contain neither B nor C
191 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
192 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
193 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
194 # an array not contain B
195 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
196 :return: database mongo filter
202 for query_k
, query_v
in q_filter
.items():
203 dot_index
= query_k
.rfind(".")
204 if dot_index
> 1 and query_k
[dot_index
+ 1 :] in (
215 operator
= "$" + query_k
[dot_index
+ 1 :]
216 if operator
== "$neq":
218 k
= query_k
[:dot_index
]
224 if isinstance(v
, list):
225 if operator
in ("$eq", "$cont"):
228 elif operator
in ("$ne", "$ncont"):
232 v
= query_v
.join(",")
234 if operator
in ("$eq", "$cont"):
235 # v cannot be a comma separated list, because operator would have been changed to $in
237 elif operator
== "$ncount":
238 # v cannot be a comma separated list, because operator would have been changed to $nin
243 # process the ANYINDEX word at k.
244 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
247 db_v
= {"$elemMatch": {kright
: db_v
}}
248 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
250 # insert in db_filter
251 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
252 deep_update(db_filter
, {k
: db_v
})
255 except Exception as e
:
257 "Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
258 http_code
=HTTPStatus
.BAD_REQUEST
,
261 def get_list(self
, table
, q_filter
=None):
263 Obtain a list of entries matching q_filter
264 :param table: collection or table
265 :param q_filter: Filter
266 :return: a list (can be empty) with the found entries. Raises DbException on error
271 collection
= self
.db
[table
]
272 db_filter
= self
._format
_filter
(q_filter
)
273 rows
= collection
.find(db_filter
)
279 except Exception as e
: # TODO refine
282 def count(self
, table
, q_filter
=None):
284 Count the number of entries matching q_filter
285 :param table: collection or table
286 :param q_filter: Filter
287 :return: number of entries found (can be zero)
288 :raise: DbException on error
292 collection
= self
.db
[table
]
293 db_filter
= self
._format
_filter
(q_filter
)
294 count
= collection
.count(db_filter
)
298 except Exception as e
: # TODO refine
301 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
303 Obtain one entry matching q_filter
304 :param table: collection or table
305 :param q_filter: Filter
306 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
307 it raises a DbException
308 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
309 that it raises a DbException
310 :return: The requested element, or None
313 db_filter
= self
._format
_filter
(q_filter
)
315 collection
= self
.db
[table
]
316 if not (fail_on_empty
and fail_on_more
):
317 return collection
.find_one(db_filter
)
318 rows
= collection
.find(db_filter
)
319 if rows
.count() == 0:
322 "Not found any {} with filter='{}'".format(
325 HTTPStatus
.NOT_FOUND
,
328 elif rows
.count() > 1:
331 "Found more than one {} with filter='{}'".format(
337 except Exception as e
: # TODO refine
340 def del_list(self
, table
, q_filter
=None):
342 Deletes all entries that match q_filter
343 :param table: collection or table
344 :param q_filter: Filter
345 :return: Dict with the number of entries deleted
349 collection
= self
.db
[table
]
350 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
351 return {"deleted": rows
.deleted_count
}
354 except Exception as e
: # TODO refine
357 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
359 Deletes one entry that matches q_filter
360 :param table: collection or table
361 :param q_filter: Filter
362 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
363 which case it raises a DbException
364 :return: Dict with the number of entries deleted
368 collection
= self
.db
[table
]
369 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
370 if rows
.deleted_count
== 0:
373 "Not found any {} with filter='{}'".format(
376 HTTPStatus
.NOT_FOUND
,
379 return {"deleted": rows
.deleted_count
}
380 except Exception as e
: # TODO refine
383 def create(self
, table
, indata
):
385 Add a new entry at database
386 :param table: collection or table
387 :param indata: content to be added
388 :return: database id of the inserted element. Raises a DbException on error
392 collection
= self
.db
[table
]
393 data
= collection
.insert_one(indata
)
394 return data
.inserted_id
395 except Exception as e
: # TODO refine
398 def create_list(self
, table
, indata_list
):
400 Add several entries at once
401 :param table: collection or table
402 :param indata_list: content list to be added.
403 :return: the list of inserted '_id's. Exception on error
406 for item
in indata_list
:
407 if item
.get("_id") is None:
408 item
["_id"] = str(uuid4())
410 collection
= self
.db
[table
]
411 data
= collection
.insert_many(indata_list
)
412 return data
.inserted_ids
413 except Exception as e
: # TODO refine
430 Modifies an entry at database
431 :param table: collection or table
432 :param q_filter: Filter
433 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
434 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
435 it raises a DbException
436 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
437 ignored. If not exist, it is ignored
438 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
439 if exist in the array is removed. If not exist, it is ignored
440 :param pull_list: Same as pull but values are arrays where each item is removed from the array
441 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
442 is appended to the end of the array
443 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
445 :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
446 By default this is false.
447 :return: Dict with the number of entries modified. None if no matching is found.
452 db_oper
["$set"] = update_dict
454 db_oper
["$unset"] = unset
455 if pull
or pull_list
:
456 db_oper
["$pull"] = pull
or {}
458 db_oper
["$pull"].update(
459 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
461 if push
or push_list
:
462 db_oper
["$push"] = push
or {}
464 db_oper
["$push"].update(
465 {k
: {"$each": v
} for k
, v
in push_list
.items()}
469 collection
= self
.db
[table
]
470 rows
= collection
.update_one(
471 self
._format
_filter
(q_filter
), db_oper
, upsert
=upsert
473 if rows
.matched_count
== 0:
476 "Not found any {} with filter='{}'".format(
479 HTTPStatus
.NOT_FOUND
,
482 return {"modified": rows
.modified_count
}
483 except Exception as e
: # TODO refine
498 Modifies al matching entries at database
499 :param table: collection or table
500 :param q_filter: Filter
501 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
502 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
503 ignored. If not exist, it is ignored
504 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
505 if exist in the array is removed. If not exist, it is ignored
506 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
507 single value is appended to the end of the array
508 :param pull_list: Same as pull but values are arrays where each item is removed from the array
509 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
511 :return: Dict with the number of entries modified
516 db_oper
["$set"] = update_dict
518 db_oper
["$unset"] = unset
519 if pull
or pull_list
:
520 db_oper
["$pull"] = pull
or {}
522 db_oper
["$pull"].update(
523 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
525 if push
or push_list
:
526 db_oper
["$push"] = push
or {}
528 db_oper
["$push"].update(
529 {k
: {"$each": v
} for k
, v
in push_list
.items()}
532 collection
= self
.db
[table
]
533 rows
= collection
.update_many(self
._format
_filter
(q_filter
), db_oper
)
534 return {"modified": rows
.modified_count
}
535 except Exception as e
: # TODO refine
538 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
540 Replace the content of an entry
541 :param table: collection or table
542 :param _id: internal database id
543 :param indata: content to replace
544 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
545 it raises a DbException
546 :return: Dict with the number of entries replaced
549 db_filter
= {"_id": _id
}
551 collection
= self
.db
[table
]
552 rows
= collection
.replace_one(db_filter
, indata
)
553 if rows
.matched_count
== 0:
556 "Not found any {} with _id='{}'".format(table
[:-1], _id
),
557 HTTPStatus
.NOT_FOUND
,
560 return {"replaced": rows
.modified_count
}
561 except Exception as e
: # TODO refine