1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 from base64
import b64decode
20 from copy
import deepcopy
21 from http
import HTTPStatus
23 from time
import sleep
, time
24 from uuid
import uuid4
26 from osm_common
.dbbase
import DbBase
, DbException
27 from pymongo
import errors
, MongoClient
29 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
31 # TODO consider use this decorator for database access retries
33 # def retry_mongocall(call):
34 # def _retry_mongocall(*args, **kwargs):
38 # return call(*args, **kwargs)
39 # except pymongo.AutoReconnect as e:
41 # raise DbException(e)
43 # return _retry_mongocall
46 def deep_update(to_update
, update_with
):
48 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
49 'update_with' dict recursively
50 :param to_update: must be a dictionary to be modified
51 :param update_with: must be a dictionary. It is not changed
54 for key
in update_with
:
56 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
57 deep_update(to_update
[key
], update_with
[key
])
59 to_update
[key
] = deepcopy(update_with
[key
])
63 class DbMongo(DbBase
):
64 conn_initial_timout
= 120
67 def __init__(self
, logger_name
="db", lock
=False):
68 super().__init
__(logger_name
=logger_name
, lock
=lock
)
71 self
.database_key
= None
72 self
.secret_obtained
= False
73 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
74 # In case it is not ready when connected, it should be got later on before any decrypt operation
76 def get_secret_key(self
):
77 if self
.secret_obtained
:
80 self
.secret_key
= None
82 self
.set_secret_key(self
.database_key
)
83 version_data
= self
.get_one(
84 "admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True
86 if version_data
and version_data
.get("serial"):
87 self
.set_secret_key(b64decode(version_data
["serial"]))
88 self
.secret_obtained
= True
90 def db_connect(self
, config
, target_version
=None):
93 :param config: Configuration of database
94 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
95 :return: None or raises DbException on error
98 if "logger_name" in config
:
99 self
.logger
= logging
.getLogger(config
["logger_name"])
100 master_key
= config
.get("commonkey") or config
.get("masterpassword")
102 self
.database_key
= master_key
103 self
.set_secret_key(master_key
)
104 if config
.get("uri"):
105 self
.client
= MongoClient(
106 config
["uri"], replicaSet
=config
.get("replicaset", None)
108 # when all modules are ready
109 self
.db
= self
.client
[config
["name"]]
110 if "loglevel" in config
:
111 self
.logger
.setLevel(getattr(logging
, config
["loglevel"]))
112 # get data to try a connection
116 version_data
= self
.get_one(
122 # check database status is ok
123 if version_data
and version_data
.get("status") != "ENABLED":
125 "Wrong database status '{}'".format(
126 version_data
.get("status")
128 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
132 None if not version_data
else version_data
.get("version")
134 if target_version
and target_version
!= db_version
:
136 "Invalid database version {}. Expected {}".format(
137 db_version
, target_version
141 if version_data
and version_data
.get("serial"):
142 self
.secret_obtained
= True
143 self
.set_secret_key(b64decode(version_data
["serial"]))
145 "Connected to database {} version {}".format(
146 config
["name"], db_version
150 except errors
.ConnectionFailure
as e
:
151 if time() - now
>= self
.conn_initial_timout
:
153 self
.logger
.info("Waiting to database up {}".format(e
))
155 except errors
.PyMongoError
as e
:
159 def _format_filter(q_filter
):
161 Translate query string q_filter into mongo database filter
162 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
164 It accept ".nq" (not equal) in addition to ".neq".
165 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
166 (two or more matches applies for the same array element). Examples:
167 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
168 query 'A.B=6' matches because array A contains one element with B equal to 6
169 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
170 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
171 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
174 Examples of translations from SOL005 to >> mongo # comment
175 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
177 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
179 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
180 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
181 # it must not not contain B
182 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
183 # neither B nor C; or if a list, it must not contain neither B nor C
184 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
185 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
186 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
187 # an array not contain B
188 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
189 :return: database mongo filter
195 for query_k
, query_v
in q_filter
.items():
196 dot_index
= query_k
.rfind(".")
197 if dot_index
> 1 and query_k
[dot_index
+ 1 :] in (
208 operator
= "$" + query_k
[dot_index
+ 1 :]
209 if operator
== "$neq":
211 k
= query_k
[:dot_index
]
217 if isinstance(v
, list):
218 if operator
in ("$eq", "$cont"):
221 elif operator
in ("$ne", "$ncont"):
225 v
= query_v
.join(",")
227 if operator
in ("$eq", "$cont"):
228 # v cannot be a comma separated list, because operator would have been changed to $in
230 elif operator
== "$ncount":
231 # v cannot be a comma separated list, because operator would have been changed to $nin
236 # process the ANYINDEX word at k.
237 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
240 db_v
= {"$elemMatch": {kright
: db_v
}}
241 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
243 # insert in db_filter
244 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
245 deep_update(db_filter
, {k
: db_v
})
248 except Exception as e
:
250 "Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
251 http_code
=HTTPStatus
.BAD_REQUEST
,
254 def get_list(self
, table
, q_filter
=None):
256 Obtain a list of entries matching q_filter
257 :param table: collection or table
258 :param q_filter: Filter
259 :return: a list (can be empty) with the found entries. Raises DbException on error
264 collection
= self
.db
[table
]
265 db_filter
= self
._format
_filter
(q_filter
)
266 rows
= collection
.find(db_filter
)
272 except Exception as e
: # TODO refine
275 def count(self
, table
, q_filter
=None):
277 Count the number of entries matching q_filter
278 :param table: collection or table
279 :param q_filter: Filter
280 :return: number of entries found (can be zero)
281 :raise: DbException on error
285 collection
= self
.db
[table
]
286 db_filter
= self
._format
_filter
(q_filter
)
287 count
= collection
.count(db_filter
)
291 except Exception as e
: # TODO refine
294 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
296 Obtain one entry matching q_filter
297 :param table: collection or table
298 :param q_filter: Filter
299 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
300 it raises a DbException
301 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
302 that it raises a DbException
303 :return: The requested element, or None
306 db_filter
= self
._format
_filter
(q_filter
)
308 collection
= self
.db
[table
]
309 if not (fail_on_empty
and fail_on_more
):
310 return collection
.find_one(db_filter
)
311 rows
= collection
.find(db_filter
)
312 if rows
.count() == 0:
315 "Not found any {} with filter='{}'".format(
318 HTTPStatus
.NOT_FOUND
,
321 elif rows
.count() > 1:
324 "Found more than one {} with filter='{}'".format(
330 except Exception as e
: # TODO refine
333 def del_list(self
, table
, q_filter
=None):
335 Deletes all entries that match q_filter
336 :param table: collection or table
337 :param q_filter: Filter
338 :return: Dict with the number of entries deleted
342 collection
= self
.db
[table
]
343 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
344 return {"deleted": rows
.deleted_count
}
347 except Exception as e
: # TODO refine
350 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
352 Deletes one entry that matches q_filter
353 :param table: collection or table
354 :param q_filter: Filter
355 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
356 which case it raises a DbException
357 :return: Dict with the number of entries deleted
361 collection
= self
.db
[table
]
362 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
363 if rows
.deleted_count
== 0:
366 "Not found any {} with filter='{}'".format(
369 HTTPStatus
.NOT_FOUND
,
372 return {"deleted": rows
.deleted_count
}
373 except Exception as e
: # TODO refine
376 def create(self
, table
, indata
):
378 Add a new entry at database
379 :param table: collection or table
380 :param indata: content to be added
381 :return: database id of the inserted element. Raises a DbException on error
385 collection
= self
.db
[table
]
386 data
= collection
.insert_one(indata
)
387 return data
.inserted_id
388 except Exception as e
: # TODO refine
391 def create_list(self
, table
, indata_list
):
393 Add several entries at once
394 :param table: collection or table
395 :param indata_list: content list to be added.
396 :return: the list of inserted '_id's. Exception on error
399 for item
in indata_list
:
400 if item
.get("_id") is None:
401 item
["_id"] = str(uuid4())
403 collection
= self
.db
[table
]
404 data
= collection
.insert_many(indata_list
)
405 return data
.inserted_ids
406 except Exception as e
: # TODO refine
423 Modifies an entry at database
424 :param table: collection or table
425 :param q_filter: Filter
426 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
427 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
428 it raises a DbException
429 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
430 ignored. If not exist, it is ignored
431 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
432 if exist in the array is removed. If not exist, it is ignored
433 :param pull_list: Same as pull but values are arrays where each item is removed from the array
434 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
435 is appended to the end of the array
436 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
438 :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
439 By default this is false.
440 :return: Dict with the number of entries modified. None if no matching is found.
445 db_oper
["$set"] = update_dict
447 db_oper
["$unset"] = unset
448 if pull
or pull_list
:
449 db_oper
["$pull"] = pull
or {}
451 db_oper
["$pull"].update(
452 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
454 if push
or push_list
:
455 db_oper
["$push"] = push
or {}
457 db_oper
["$push"].update(
458 {k
: {"$each": v
} for k
, v
in push_list
.items()}
462 collection
= self
.db
[table
]
463 rows
= collection
.update_one(
464 self
._format
_filter
(q_filter
), db_oper
, upsert
=upsert
466 if rows
.matched_count
== 0:
469 "Not found any {} with filter='{}'".format(
472 HTTPStatus
.NOT_FOUND
,
475 return {"modified": rows
.modified_count
}
476 except Exception as e
: # TODO refine
491 Modifies al matching entries at database
492 :param table: collection or table
493 :param q_filter: Filter
494 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
495 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
496 ignored. If not exist, it is ignored
497 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
498 if exist in the array is removed. If not exist, it is ignored
499 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
500 single value is appended to the end of the array
501 :param pull_list: Same as pull but values are arrays where each item is removed from the array
502 :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
504 :return: Dict with the number of entries modified
509 db_oper
["$set"] = update_dict
511 db_oper
["$unset"] = unset
512 if pull
or pull_list
:
513 db_oper
["$pull"] = pull
or {}
515 db_oper
["$pull"].update(
516 {k
: {"$in": v
} for k
, v
in pull_list
.items()}
518 if push
or push_list
:
519 db_oper
["$push"] = push
or {}
521 db_oper
["$push"].update(
522 {k
: {"$each": v
} for k
, v
in push_list
.items()}
525 collection
= self
.db
[table
]
526 rows
= collection
.update_many(self
._format
_filter
(q_filter
), db_oper
)
527 return {"modified": rows
.modified_count
}
528 except Exception as e
: # TODO refine
531 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
533 Replace the content of an entry
534 :param table: collection or table
535 :param _id: internal database id
536 :param indata: content to replace
537 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
538 it raises a DbException
539 :return: Dict with the number of entries replaced
542 db_filter
= {"_id": _id
}
544 collection
= self
.db
[table
]
545 rows
= collection
.replace_one(db_filter
, indata
)
546 if rows
.matched_count
== 0:
549 "Not found any {} with _id='{}'".format(table
[:-1], _id
),
550 HTTPStatus
.NOT_FOUND
,
553 return {"replaced": rows
.modified_count
}
554 except Exception as e
: # TODO refine