1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
27 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 # TODO consider use this decorator for database access retries
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
39 # raise DbException(e)
41 # return _retry_mongocall
44 def deep_update(to_update
, update_with
):
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
52 for key
in update_with
:
54 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
55 deep_update(to_update
[key
], update_with
[key
])
57 to_update
[key
] = deepcopy(update_with
[key
])
61 class DbMongo(DbBase
):
62 conn_initial_timout
= 120
65 def __init__(self
, logger_name
='db', lock
=False):
66 super().__init
__(logger_name
, lock
)
70 def db_connect(self
, config
, target_version
=None):
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
78 if "logger_name" in config
:
79 self
.logger
= logging
.getLogger(config
["logger_name"])
80 master_key
= config
.get("commonkey") or config
.get("masterpassword")
82 self
.set_secret_key(master_key
)
84 self
.client
= MongoClient(config
["uri"])
86 self
.client
= MongoClient(config
["host"], config
["port"])
87 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
88 # when all modules are ready
89 self
.db
= self
.client
[config
["name"]]
90 if "loglevel" in config
:
91 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
92 # get data to try a connection
96 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
97 # check database status is ok
98 if version_data
and version_data
.get("status") != 'ENABLED':
99 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
100 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
102 db_version
= None if not version_data
else version_data
.get("version")
103 if target_version
and target_version
!= db_version
:
104 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
106 if version_data
and version_data
.get("serial"):
107 self
.set_secret_key(b64decode(version_data
["serial"]))
108 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
110 except errors
.ConnectionFailure
as e
:
111 if time() - now
>= self
.conn_initial_timout
:
113 self
.logger
.info("Waiting to database up {}".format(e
))
115 except errors
.PyMongoError
as e
:
119 def _format_filter(q_filter
):
121 Translate query string q_filter into mongo database filter
122 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
124 It accept ".nq" (not equal) in addition to ".neq".
125 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
126 (two or more matches applies for the same array element). Examples:
127 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
128 query 'A.B=6' matches because array A contains one element with B equal to 6
129 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
130 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
131 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
134 Examples of translations from SOL005 to >> mongo # comment
135 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
137 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
139 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
140 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
141 # it must not not contain B
142 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
143 # neither B nor C; or if a list, it must not contain neither B nor C
144 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
145 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
146 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
147 # an array not contain B
148 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
149 :return: database mongo filter
155 for query_k
, query_v
in q_filter
.items():
156 dot_index
= query_k
.rfind(".")
157 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
159 operator
= "$" + query_k
[dot_index
+ 1:]
160 if operator
== "$neq":
162 k
= query_k
[:dot_index
]
168 if isinstance(v
, list):
169 if operator
in ("$eq", "$cont"):
172 elif operator
in ("$ne", "$ncont"):
176 v
= query_v
.join(",")
178 if operator
in ("$eq", "$cont"):
179 # v cannot be a comma separated list, because operator would have been changed to $in
181 elif operator
== "$ncount":
182 # v cannot be a comma separated list, because operator would have been changed to $nin
187 # process the ANYINDEX word at k.
188 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
191 db_v
= {"$elemMatch": {kright
: db_v
}}
192 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
194 # insert in db_filter
195 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
196 deep_update(db_filter
, {k
: db_v
})
199 except Exception as e
:
200 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
201 http_code
=HTTPStatus
.BAD_REQUEST
)
203 def get_list(self
, table
, q_filter
=None):
205 Obtain a list of entries matching q_filter
206 :param table: collection or table
207 :param q_filter: Filter
208 :return: a list (can be empty) with the found entries. Raises DbException on error
213 collection
= self
.db
[table
]
214 db_filter
= self
._format
_filter
(q_filter
)
215 rows
= collection
.find(db_filter
)
221 except Exception as e
: # TODO refine
224 def count(self
, table
, q_filter
=None):
226 Count the number of entries matching q_filter
227 :param table: collection or table
228 :param q_filter: Filter
229 :return: number of entries found (can be zero)
230 :raise: DbException on error
234 collection
= self
.db
[table
]
235 db_filter
= self
._format
_filter
(q_filter
)
236 count
= collection
.count(db_filter
)
240 except Exception as e
: # TODO refine
243 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
245 Obtain one entry matching q_filter
246 :param table: collection or table
247 :param q_filter: Filter
248 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
249 it raises a DbException
250 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
251 that it raises a DbException
252 :return: The requested element, or None
255 db_filter
= self
._format
_filter
(q_filter
)
257 collection
= self
.db
[table
]
258 if not (fail_on_empty
and fail_on_more
):
259 return collection
.find_one(db_filter
)
260 rows
= collection
.find(db_filter
)
261 if rows
.count() == 0:
263 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
264 HTTPStatus
.NOT_FOUND
)
266 elif rows
.count() > 1:
268 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
271 except Exception as e
: # TODO refine
274 def del_list(self
, table
, q_filter
=None):
276 Deletes all entries that match q_filter
277 :param table: collection or table
278 :param q_filter: Filter
279 :return: Dict with the number of entries deleted
283 collection
= self
.db
[table
]
284 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
285 return {"deleted": rows
.deleted_count
}
288 except Exception as e
: # TODO refine
291 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
293 Deletes one entry that matches q_filter
294 :param table: collection or table
295 :param q_filter: Filter
296 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
297 which case it raises a DbException
298 :return: Dict with the number of entries deleted
302 collection
= self
.db
[table
]
303 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
304 if rows
.deleted_count
== 0:
306 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
307 HTTPStatus
.NOT_FOUND
)
309 return {"deleted": rows
.deleted_count
}
310 except Exception as e
: # TODO refine
313 def create(self
, table
, indata
):
315 Add a new entry at database
316 :param table: collection or table
317 :param indata: content to be added
318 :return: database id of the inserted element. Raises a DbException on error
322 collection
= self
.db
[table
]
323 data
= collection
.insert_one(indata
)
324 return data
.inserted_id
325 except Exception as e
: # TODO refine
328 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True, unset
=None, pull
=None, push
=None):
330 Modifies an entry at database
331 :param table: collection or table
332 :param q_filter: Filter
333 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
334 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
335 it raises a DbException
336 :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
337 ignored. If not exist, it is ignored
338 :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
339 if exist in the array is removed. If not exist, it is ignored
340 :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
341 is appended to the end of the array
342 :return: Dict with the number of entries modified. None if no matching is found.
347 db_oper
["$set"] = update_dict
349 db_oper
["$unset"] = unset
351 db_oper
["$pull"] = pull
353 db_oper
["$push"] = push
356 collection
= self
.db
[table
]
357 rows
= collection
.update_one(self
._format
_filter
(q_filter
), db_oper
)
358 if rows
.matched_count
== 0:
360 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
361 HTTPStatus
.NOT_FOUND
)
363 return {"modified": rows
.modified_count
}
364 except Exception as e
: # TODO refine
367 def set_list(self
, table
, q_filter
, update_dict
):
369 Modifies al matching entries at database
370 :param table: collection or table
371 :param q_filter: Filter
372 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
373 :return: Dict with the number of entries modified
377 collection
= self
.db
[table
]
378 rows
= collection
.update_many(self
._format
_filter
(q_filter
), {"$set": update_dict
})
379 return {"modified": rows
.modified_count
}
380 except Exception as e
: # TODO refine
383 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
385 Replace the content of an entry
386 :param table: collection or table
387 :param _id: internal database id
388 :param indata: content to replace
389 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
390 it raises a DbException
391 :return: Dict with the number of entries replaced
394 db_filter
= {"_id": _id
}
396 collection
= self
.db
[table
]
397 rows
= collection
.replace_one(db_filter
, indata
)
398 if rows
.matched_count
== 0:
400 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
402 return {"replaced": rows
.modified_count
}
403 except Exception as e
: # TODO refine