1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
27 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 # TODO consider use this decorator for database access retries
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
39 # raise DbException(e)
41 # return _retry_mongocall
44 def deep_update(to_update
, update_with
):
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
52 for key
in update_with
:
54 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
55 deep_update(to_update
[key
], update_with
[key
])
57 to_update
[key
] = deepcopy(update_with
[key
])
61 class DbMongo(DbBase
):
62 conn_initial_timout
= 120
65 def __init__(self
, logger_name
='db', lock
=False):
66 super().__init
__(logger_name
, lock
)
69 self
.database_key
= None
70 self
.secret_obtained
= False
71 # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
72 # In case it is not ready when connected, it should be got later on before any decrypt operation
74 def get_secret_key(self
):
75 if self
.secret_obtained
:
78 self
.secret_key
= None
80 self
.set_secret_key(self
.database_key
)
81 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
82 if version_data
and version_data
.get("serial"):
83 self
.set_secret_key(b64decode(version_data
["serial"]))
84 self
.secret_obtained
= True
86 def db_connect(self
, config
, target_version
=None):
89 :param config: Configuration of database
90 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
91 :return: None or raises DbException on error
94 if "logger_name" in config
:
95 self
.logger
= logging
.getLogger(config
["logger_name"])
96 master_key
= config
.get("commonkey") or config
.get("masterpassword")
98 self
.database_key
= master_key
99 self
.set_secret_key(master_key
)
100 if config
.get("uri"):
101 self
.client
= MongoClient(config
["uri"])
103 self
.client
= MongoClient(config
["host"], config
["port"])
104 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
105 # when all modules are ready
106 self
.db
= self
.client
[config
["name"]]
107 if "loglevel" in config
:
108 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
109 # get data to try a connection
113 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
114 # check database status is ok
115 if version_data
and version_data
.get("status") != 'ENABLED':
116 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
117 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
119 db_version
= None if not version_data
else version_data
.get("version")
120 if target_version
and target_version
!= db_version
:
121 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
123 if version_data
and version_data
.get("serial"):
124 self
.secret_obtained
= True
125 self
.set_secret_key(b64decode(version_data
["serial"]))
126 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
128 except errors
.ConnectionFailure
as e
:
129 if time() - now
>= self
.conn_initial_timout
:
131 self
.logger
.info("Waiting to database up {}".format(e
))
133 except errors
.PyMongoError
as e
:
137 def _format_filter(q_filter
):
139 Translate query string q_filter into mongo database filter
140 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
142 It accept ".nq" (not equal) in addition to ".neq".
143 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
144 (two or more matches applies for the same array element). Examples:
145 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
146 query 'A.B=6' matches because array A contains one element with B equal to 6
147 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
148 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
149 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
152 Examples of translations from SOL005 to >> mongo # comment
153 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
155 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
157 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
158 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
159 # it must not not contain B
160 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
161 # neither B nor C; or if a list, it must not contain neither B nor C
162 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
163 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
164 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
165 # an array not contain B
166 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
167 :return: database mongo filter
173 for query_k
, query_v
in q_filter
.items():
174 dot_index
= query_k
.rfind(".")
175 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
177 operator
= "$" + query_k
[dot_index
+ 1:]
178 if operator
== "$neq":
180 k
= query_k
[:dot_index
]
186 if isinstance(v
, list):
187 if operator
in ("$eq", "$cont"):
190 elif operator
in ("$ne", "$ncont"):
194 v
= query_v
.join(",")
196 if operator
in ("$eq", "$cont"):
197 # v cannot be a comma separated list, because operator would have been changed to $in
199 elif operator
== "$ncount":
200 # v cannot be a comma separated list, because operator would have been changed to $nin
205 # process the ANYINDEX word at k.
206 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
209 db_v
= {"$elemMatch": {kright
: db_v
}}
210 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
212 # insert in db_filter
213 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
214 deep_update(db_filter
, {k
: db_v
})
217 except Exception as e
:
218 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
219 http_code
=HTTPStatus
.BAD_REQUEST
)
221 def get_list(self
, table
, q_filter
=None):
223 Obtain a list of entries matching q_filter
224 :param table: collection or table
225 :param q_filter: Filter
226 :return: a list (can be empty) with the found entries. Raises DbException on error
231 collection
= self
.db
[table
]
232 db_filter
= self
._format
_filter
(q_filter
)
233 rows
= collection
.find(db_filter
)
239 except Exception as e
: # TODO refine
242 def count(self
, table
, q_filter
=None):
244 Count the number of entries matching q_filter
245 :param table: collection or table
246 :param q_filter: Filter
247 :return: number of entries found (can be zero)
248 :raise: DbException on error
252 collection
= self
.db
[table
]
253 db_filter
= self
._format
_filter
(q_filter
)
254 count
= collection
.count(db_filter
)
258 except Exception as e
: # TODO refine
261 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
263 Obtain one entry matching q_filter
264 :param table: collection or table
265 :param q_filter: Filter
266 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
267 it raises a DbException
268 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
269 that it raises a DbException
270 :return: The requested element, or None
273 db_filter
= self
._format
_filter
(q_filter
)
275 collection
= self
.db
[table
]
276 if not (fail_on_empty
and fail_on_more
):
277 return collection
.find_one(db_filter
)
278 rows
= collection
.find(db_filter
)
279 if rows
.count() == 0:
281 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
282 HTTPStatus
.NOT_FOUND
)
284 elif rows
.count() > 1:
286 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
289 except Exception as e
: # TODO refine
292 def del_list(self
, table
, q_filter
=None):
294 Deletes all entries that match q_filter
295 :param table: collection or table
296 :param q_filter: Filter
297 :return: Dict with the number of entries deleted
301 collection
= self
.db
[table
]
302 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
303 return {"deleted": rows
.deleted_count
}
306 except Exception as e
: # TODO refine
309 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
311 Deletes one entry that matches q_filter
312 :param table: collection or table
313 :param q_filter: Filter
314 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
315 which case it raises a DbException
316 :return: Dict with the number of entries deleted
320 collection
= self
.db
[table
]
321 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
322 if rows
.deleted_count
== 0:
324 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
325 HTTPStatus
.NOT_FOUND
)
327 return {"deleted": rows
.deleted_count
}
328 except Exception as e
: # TODO refine
331 def create(self
, table
, indata
):
333 Add a new entry at database
334 :param table: collection or table
335 :param indata: content to be added
336 :return: database id of the inserted element. Raises a DbException on error
340 collection
= self
.db
[table
]
341 data
= collection
.insert_one(indata
)
342 return data
.inserted_id
343 except Exception as e
: # TODO refine
346 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True, unset
=None, pull
=None, push
=None):
348 Modifies an entry at database
349 :param table: collection or table
350 :param q_filter: Filter
351 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
352 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
353 it raises a DbException
354 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
355 ignored. If not exist, it is ignored
356 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
357 if exist in the array is removed. If not exist, it is ignored
358 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
359 is appended to the end of the array
360 :return: Dict with the number of entries modified. None if no matching is found.
365 db_oper
["$set"] = update_dict
367 db_oper
["$unset"] = unset
369 db_oper
["$pull"] = pull
371 db_oper
["$push"] = push
374 collection
= self
.db
[table
]
375 rows
= collection
.update_one(self
._format
_filter
(q_filter
), db_oper
)
376 if rows
.matched_count
== 0:
378 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
379 HTTPStatus
.NOT_FOUND
)
381 return {"modified": rows
.modified_count
}
382 except Exception as e
: # TODO refine
385 def set_list(self
, table
, q_filter
, update_dict
):
387 Modifies al matching entries at database
388 :param table: collection or table
389 :param q_filter: Filter
390 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
391 :return: Dict with the number of entries modified
395 collection
= self
.db
[table
]
396 rows
= collection
.update_many(self
._format
_filter
(q_filter
), {"$set": update_dict
})
397 return {"modified": rows
.modified_count
}
398 except Exception as e
: # TODO refine
401 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
403 Replace the content of an entry
404 :param table: collection or table
405 :param _id: internal database id
406 :param indata: content to replace
407 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
408 it raises a DbException
409 :return: Dict with the number of entries replaced
412 db_filter
= {"_id": _id
}
414 collection
= self
.db
[table
]
415 rows
= collection
.replace_one(db_filter
, indata
)
416 if rows
.matched_count
== 0:
418 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
420 return {"replaced": rows
.modified_count
}
421 except Exception as e
: # TODO refine