af63d5b172ac7ecd3a3469c7db2a122525c6164d
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
27 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 # TODO consider use this decorator for database access retries
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
39 # raise DbException(e)
41 # return _retry_mongocall
44 def deep_update(to_update
, update_with
):
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
52 for key
in update_with
:
54 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
55 deep_update(to_update
[key
], update_with
[key
])
57 to_update
[key
] = deepcopy(update_with
[key
])
61 class DbMongo(DbBase
):
62 conn_initial_timout
= 120
65 def __init__(self
, logger_name
='db', master_password
=None):
66 super().__init
__(logger_name
, master_password
)
70 def db_connect(self
, config
, target_version
=None):
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
78 if "logger_name" in config
:
79 self
.logger
= logging
.getLogger(config
["logger_name"])
80 self
.client
= MongoClient(config
["host"], config
["port"])
81 self
.db
= self
.client
[config
["name"]]
82 if "loglevel" in config
:
83 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
84 # get data to try a connection
88 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
89 # check database status is ok
90 if version_data
and version_data
.get("status") != 'ENABLED':
91 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
92 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
94 db_version
= None if not version_data
else version_data
.get("version")
95 if target_version
and target_version
!= db_version
:
96 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
98 if version_data
and version_data
.get("serial"):
99 self
.set_secret_key(b64decode(version_data
["serial"]))
100 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
102 except errors
.ConnectionFailure
as e
:
103 if time() - now
>= self
.conn_initial_timout
:
105 self
.logger
.info("Waiting to database up {}".format(e
))
107 except errors
.PyMongoError
as e
:
111 def _format_filter(q_filter
):
113 Translate query string q_filter into mongo database filter
114 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
116 It accept ".nq" (not equal) in addition to ".neq".
117 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
118 (two or more matches applies for the same array element). Examples:
119 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
120 query 'A.B=6' matches because array A contains one element with B equal to 6
121 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
122 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
123 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
126 Examples of translations from SOL005 to >> mongo # comment
127 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
129 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
131 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
132 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
133 # it must not not contain B
134 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
135 # neither B nor C; or if a list, it must not contain neither B nor C
136 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
137 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
138 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
139 # an array not contain B
140 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
141 :return: database mongo filter
147 for query_k
, query_v
in q_filter
.items():
148 dot_index
= query_k
.rfind(".")
149 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
151 operator
= "$" + query_k
[dot_index
+ 1:]
152 if operator
== "$neq":
154 k
= query_k
[:dot_index
]
160 if isinstance(v
, list):
161 if operator
in ("$eq", "$cont"):
164 elif operator
in ("$ne", "$ncont"):
168 v
= query_v
.join(",")
170 if operator
in ("$eq", "$cont"):
171 # v cannot be a comma separated list, because operator would have been changed to $in
173 elif operator
== "$ncount":
174 # v cannot be a comma separated list, because operator would have been changed to $nin
179 # process the ANYINDEX word at k.
180 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
183 db_v
= {"$elemMatch": {kright
: db_v
}}
184 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
186 # insert in db_filter
187 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
188 deep_update(db_filter
, {k
: db_v
})
191 except Exception as e
:
192 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
193 http_code
=HTTPStatus
.BAD_REQUEST
)
195 def get_list(self
, table
, q_filter
=None):
197 Obtain a list of entries matching q_filter
198 :param table: collection or table
199 :param q_filter: Filter
200 :return: a list (can be empty) with the found entries. Raises DbException on error
204 collection
= self
.db
[table
]
205 db_filter
= self
._format
_filter
(q_filter
)
206 rows
= collection
.find(db_filter
)
212 except Exception as e
: # TODO refine
215 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
217 Obtain one entry matching q_filter
218 :param table: collection or table
219 :param q_filter: Filter
220 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
221 it raises a DbException
222 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
223 that it raises a DbException
224 :return: The requested element, or None
227 db_filter
= self
._format
_filter
(q_filter
)
228 collection
= self
.db
[table
]
229 if not (fail_on_empty
and fail_on_more
):
230 return collection
.find_one(db_filter
)
231 rows
= collection
.find(db_filter
)
232 if rows
.count() == 0:
234 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
235 HTTPStatus
.NOT_FOUND
)
237 elif rows
.count() > 1:
239 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
242 except Exception as e
: # TODO refine
245 def del_list(self
, table
, q_filter
=None):
247 Deletes all entries that match q_filter
248 :param table: collection or table
249 :param q_filter: Filter
250 :return: Dict with the number of entries deleted
253 collection
= self
.db
[table
]
254 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
255 return {"deleted": rows
.deleted_count
}
258 except Exception as e
: # TODO refine
261 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
263 Deletes one entry that matches q_filter
264 :param table: collection or table
265 :param q_filter: Filter
266 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
267 which case it raises a DbException
268 :return: Dict with the number of entries deleted
271 collection
= self
.db
[table
]
272 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
273 if rows
.deleted_count
== 0:
275 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
276 HTTPStatus
.NOT_FOUND
)
278 return {"deleted": rows
.deleted_count
}
279 except Exception as e
: # TODO refine
282 def create(self
, table
, indata
):
284 Add a new entry at database
285 :param table: collection or table
286 :param indata: content to be added
287 :return: database id of the inserted element. Raises a DbException on error
290 collection
= self
.db
[table
]
291 data
= collection
.insert_one(indata
)
292 return data
.inserted_id
293 except Exception as e
: # TODO refine
296 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True):
298 Modifies an entry at database
299 :param table: collection or table
300 :param q_filter: Filter
301 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
302 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
303 it raises a DbException
304 :return: Dict with the number of entries modified. None if no matching is found.
307 collection
= self
.db
[table
]
308 rows
= collection
.update_one(self
._format
_filter
(q_filter
), {"$set": update_dict
})
309 if rows
.matched_count
== 0:
311 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
312 HTTPStatus
.NOT_FOUND
)
314 return {"modified": rows
.modified_count
}
315 except Exception as e
: # TODO refine
318 def set_list(self
, table
, q_filter
, update_dict
):
320 Modifies al matching entries at database
321 :param table: collection or table
322 :param q_filter: Filter
323 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
324 :return: Dict with the number of entries modified
327 collection
= self
.db
[table
]
328 rows
= collection
.update_many(self
._format
_filter
(q_filter
), {"$set": update_dict
})
329 return {"modified": rows
.modified_count
}
330 except Exception as e
: # TODO refine
333 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
335 Replace the content of an entry
336 :param table: collection or table
337 :param _id: internal database id
338 :param indata: content to replace
339 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
340 it raises a DbException
341 :return: Dict with the number of entries replaced
344 db_filter
= {"_id": _id
}
345 collection
= self
.db
[table
]
346 rows
= collection
.replace_one(db_filter
, indata
)
347 if rows
.matched_count
== 0:
349 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
351 return {"replaced": rows
.modified_count
}
352 except Exception as e
: # TODO refine