1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
26 from uuid
import uuid4
28 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30 # TODO consider use this decorator for database access retries
32 # def retry_mongocall(call):
33 # def _retry_mongocall(*args, **kwargs):
37 # return call(*args, **kwargs)
38 # except pymongo.AutoReconnect as e:
40 # raise DbException(e)
42 # return _retry_mongocall
45 def deep_update(to_update
, update_with
):
47 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
48 'update_with' dict recursively
49 :param to_update: must be a dictionary to be modified
50 :param update_with: must be a dictionary. It is not changed
53 for key
in update_with
:
55 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
56 deep_update(to_update
[key
], update_with
[key
])
58 to_update
[key
] = deepcopy(update_with
[key
])
62 class DbMongo(DbBase
):
63 conn_initial_timout
= 120
66 def __init__(self
, logger_name
="db", lock
=False):
67 super().__init
__(logger_name
, lock
)
70 self
.database_key
= None
71 self
.secret_obtained
= False
72 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
73 # In case it is not ready when connected, it should be got later on before any decrypt operation
75 def get_secret_key(self
):
76 if self
.secret_obtained
:
79 self
.secret_key
= None
81 self
.set_secret_key(self
.database_key
)
82 version_data
= self
.get_one(
83 "admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True
85 if version_data
and version_data
.get("serial"):
86 self
.set_secret_key(b64decode(version_data
["serial"]))
87 self
.secret_obtained
= True
89 def db_connect(self
, config
, target_version
=None):
92 :param config: Configuration of database
93 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
94 :return: None or raises DbException on error
97 if "logger_name" in config
:
98 self
.logger
= logging
.getLogger(config
["logger_name"])
99 master_key
= config
.get("commonkey") or config
.get("masterpassword")
101 self
.database_key
= master_key
102 self
.set_secret_key(master_key
)
103 if config
.get("uri"):
104 self
.client
= MongoClient(
105 config
["uri"], replicaSet
=config
.get("replicaset", None)
108 self
.client
= MongoClient(
111 replicaSet
=config
.get("replicaset", None),
113 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
114 # when all modules are ready
115 self
.db
= self
.client
[config
["name"]]
116 if "loglevel" in config
:
117 self
.logger
.setLevel(getattr(logging
, config
["loglevel"]))
118 # get data to try a connection
122 version_data
= self
.get_one(
128 # check database status is ok
129 if version_data
and version_data
.get("status") != "ENABLED":
131 "Wrong database status '{}'".format(
132 version_data
.get("status")
134 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
138 None if not version_data
else version_data
.get("version")
140 if target_version
and target_version
!= db_version
:
142 "Invalid database version {}. Expected {}".format(
143 db_version
, target_version
147 if version_data
and version_data
.get("serial"):
148 self
.secret_obtained
= True
149 self
.set_secret_key(b64decode(version_data
["serial"]))
151 "Connected to database {} version {}".format(
152 config
["name"], db_version
156 except errors
.ConnectionFailure
as e
:
157 if time() - now
>= self
.conn_initial_timout
:
159 self
.logger
.info("Waiting to database up {}".format(e
))
161 except errors
.PyMongoError
as e
:
165 def _format_filter(q_filter
):
167 Translate query string q_filter into mongo database filter
168 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
170 It accept ".nq" (not equal) in addition to ".neq".
171 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
172 (two or more matches applies for the same array element). Examples:
173 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
174 query 'A.B=6' matches because array A contains one element with B equal to 6
175 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
176 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
177 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
180 Examples of translations from SOL005 to >> mongo # comment
181 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
183 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
185 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
186 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
187 # it must not not contain B
188 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
189 # neither B nor C; or if a list, it must not contain neither B nor C
190 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
191 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
192 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
193 # an array not contain B
194 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
195 :return: database mongo filter
201 for query_k
, query_v
in q_filter
.items():
202 dot_index
= query_k
.rfind(".")
203 if dot_index
> 1 and query_k
[dot_index
+ 1 :] in (
214 operator
= "$" + query_k
[dot_index
+ 1 :]
215 if operator
== "$neq":
217 k
= query_k
[:dot_index
]
223 if isinstance(v
, list):
224 if operator
in ("$eq", "$cont"):
227 elif operator
in ("$ne", "$ncont"):
231 v
= query_v
.join(",")
233 if operator
in ("$eq", "$cont"):
234 # v cannot be a comma separated list, because operator would have been changed to $in
236 elif operator
== "$ncount":
237 # v cannot be a comma separated list, because operator would have been changed to $nin
242 # process the ANYINDEX word at k.
243 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
246 db_v
= {"$elemMatch": {kright
: db_v
}}
247 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
249 # insert in db_filter
250 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
251 deep_update(db_filter
, {k
: db_v
})
254 except Exception as e
:
256 "Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
257 http_code
=HTTPStatus
.BAD_REQUEST
,
260 def get_list(self
, table
, q_filter
=None):
262 Obtain a list of entries matching q_filter
263 :param table: collection or table
264 :param q_filter: Filter
265 :return: a list (can be empty) with the found entries. Raises DbException on error
270 collection
= self
.db
[table
]
271 db_filter
= self
._format
_filter
(q_filter
)
272 rows
= collection
.find(db_filter
)
278 except Exception as e
: # TODO refine
281 def count(self
, table
, q_filter
=None):
283 Count the number of entries matching q_filter
284 :param table: collection or table
285 :param q_filter: Filter
286 :return: number of entries found (can be zero)
287 :raise: DbException on error
291 collection
= self
.db
[table
]
292 db_filter
= self
._format
_filter
(q_filter
)
293 count
= collection
.count(db_filter
)
297 except Exception as e
: # TODO refine
300 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
302 Obtain one entry matching q_filter
303 :param table: collection or table
304 :param q_filter: Filter
305 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
306 it raises a DbException
307 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
308 that it raises a DbException
309 :return: The requested element, or None
312 db_filter
= self
._format
_filter
(q_filter
)
314 collection
= self
.db
[table
]
315 if not (fail_on_empty
and fail_on_more
):
316 return collection
.find_one(db_filter
)
317 rows
= collection
.find(db_filter
)
318 if rows
.count() == 0:
321 "Not found any {} with filter='{}'".format(
324 HTTPStatus
.NOT_FOUND
,
327 elif rows
.count() > 1:
330 "Found more than one {} with filter='{}'".format(
336 except Exception as e
: # TODO refine
339 def del_list(self
, table
, q_filter
=None):
341 Deletes all entries that match q_filter
342 :param table: collection or table
343 :param q_filter: Filter
344 :return: Dict with the number of entries deleted
348 collection
= self
.db
[table
]
349 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
350 return {"deleted": rows
.deleted_count
}
353 except Exception as e
: # TODO refine
356 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
358 Deletes one entry that matches q_filter
359 :param table: collection or table
360 :param q_filter: Filter
361 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
362 which case it raises a DbException
363 :return: Dict with the number of entries deleted
367 collection
= self
.db
[table
]
368 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
369 if rows
.deleted_count
== 0:
372 "Not found any {} with filter='{}'".format(
375 HTTPStatus
.NOT_FOUND
,
378 return {"deleted": rows
.deleted_count
}
379 except Exception as e
: # TODO refine
382 def create(self
, table
, indata
):
384 Add a new entry at database
385 :param table: collection or table
386 :param indata: content to be added
387 :return: database id of the inserted element. Raises a DbException on error
391 collection
= self
.db
[table
]
392 data
= collection
.insert_one(indata
)
393 return data
.inserted_id
394 except Exception as e
: # TODO refine
397 def create_list(self
, table
, indata_list
):
399 Add several entries at once
400 :param table: collection or table
401 :param indata_list: content list to be added.
402 :return: the list of inserted '_id's. Exception on error
405 for item
in indata_list
:
406 if item
.get("_id") is None:
407 item
["_id"] = str(uuid4())
409 collection
= self
.db
[table
]
410 data
= collection
.insert_many(indata_list
)
411 return data
.inserted_ids
412 except Exception as e
: # TODO refine
429 Modifies an entry at database
430 :param table: collection or table
431 :param q_filter: Filter
432 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
433 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
434 it raises a DbException
435 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
436 ignored. If not exist, it is ignored
437 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
438 if exist in the array is removed. If not exist, it is ignored
439 :param pull_list: Same as pull but values are arrays where each item is removed from the array
440 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
441 is appended to the end of the array
442 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
444 :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
445 By default this is false.
446 :return: Dict with the number of entries modified. None if no matching is found.
451 db_oper
["$set"] = update_dict
453 db_oper
["$unset"] = unset
454 if pull
or pull_list
:
455 db_oper
["$pull"] = pull
or {}
457 db_oper
["$pull"].update(
458 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
460 if push
or push_list
:
461 db_oper
["$push"] = push
or {}
463 db_oper
["$push"].update(
464 {k
: {"$each": v
} for k
, v
in push_list
.items()}
468 collection
= self
.db
[table
]
469 rows
= collection
.update_one(
470 self
._format
_filter
(q_filter
), db_oper
, upsert
=upsert
472 if rows
.matched_count
== 0:
475 "Not found any {} with filter='{}'".format(
478 HTTPStatus
.NOT_FOUND
,
481 return {"modified": rows
.modified_count
}
482 except Exception as e
: # TODO refine
497 Modifies al matching entries at database
498 :param table: collection or table
499 :param q_filter: Filter
500 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
501 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
502 ignored. If not exist, it is ignored
503 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
504 if exist in the array is removed. If not exist, it is ignored
505 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
506 single value is appended to the end of the array
507 :param pull_list: Same as pull but values are arrays where each item is removed from the array
508 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
510 :return: Dict with the number of entries modified
515 db_oper
["$set"] = update_dict
517 db_oper
["$unset"] = unset
518 if pull
or pull_list
:
519 db_oper
["$pull"] = pull
or {}
521 db_oper
["$pull"].update(
522 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
524 if push
or push_list
:
525 db_oper
["$push"] = push
or {}
527 db_oper
["$push"].update(
528 {k
: {"$each": v
} for k
, v
in push_list
.items()}
531 collection
= self
.db
[table
]
532 rows
= collection
.update_many(self
._format
_filter
(q_filter
), db_oper
)
533 return {"modified": rows
.modified_count
}
534 except Exception as e
: # TODO refine
537 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
539 Replace the content of an entry
540 :param table: collection or table
541 :param _id: internal database id
542 :param indata: content to replace
543 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
544 it raises a DbException
545 :return: Dict with the number of entries replaced
548 db_filter
= {"_id": _id
}
550 collection
= self
.db
[table
]
551 rows
= collection
.replace_one(db_filter
, indata
)
552 if rows
.matched_count
== 0:
555 "Not found any {} with _id='{}'".format(table
[:-1], _id
),
556 HTTPStatus
.NOT_FOUND
,
559 return {"replaced": rows
.modified_count
}
560 except Exception as e
: # TODO refine