0f89c96cad130eab869ac442ec35f5e8f115aecc
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
27 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 # TODO consider use this decorator for database access retries
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
39 # raise DbException(e)
41 # return _retry_mongocall
44 def deep_update(to_update
, update_with
):
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
52 for key
in update_with
:
54 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
55 deep_update(to_update
[key
], update_with
[key
])
57 to_update
[key
] = deepcopy(update_with
[key
])
61 class DbMongo(DbBase
):
62 conn_initial_timout
= 120
65 def __init__(self
, logger_name
='db', lock
=False):
66 super().__init
__(logger_name
, lock
)
70 def db_connect(self
, config
, target_version
=None):
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
78 if "logger_name" in config
:
79 self
.logger
= logging
.getLogger(config
["logger_name"])
80 master_key
= config
.get("commonkey") or config
.get("masterpassword")
82 self
.set_secret_key(master_key
)
83 self
.client
= MongoClient(config
["host"], config
["port"])
84 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
85 # when all modules are ready
86 self
.db
= self
.client
[config
["name"]]
87 if "loglevel" in config
:
88 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
89 # get data to try a connection
93 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
94 # check database status is ok
95 if version_data
and version_data
.get("status") != 'ENABLED':
96 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
97 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
99 db_version
= None if not version_data
else version_data
.get("version")
100 if target_version
and target_version
!= db_version
:
101 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
103 if version_data
and version_data
.get("serial"):
104 self
.set_secret_key(b64decode(version_data
["serial"]))
105 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
107 except errors
.ConnectionFailure
as e
:
108 if time() - now
>= self
.conn_initial_timout
:
110 self
.logger
.info("Waiting to database up {}".format(e
))
112 except errors
.PyMongoError
as e
:
116 def _format_filter(q_filter
):
118 Translate query string q_filter into mongo database filter
119 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
121 It accept ".nq" (not equal) in addition to ".neq".
122 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
123 (two or more matches applies for the same array element). Examples:
124 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
125 query 'A.B=6' matches because array A contains one element with B equal to 6
126 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
127 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
128 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
131 Examples of translations from SOL005 to >> mongo # comment
132 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
134 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
136 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
137 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
138 # it must not not contain B
139 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
140 # neither B nor C; or if a list, it must not contain neither B nor C
141 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
142 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
143 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
144 # an array not contain B
145 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
146 :return: database mongo filter
152 for query_k
, query_v
in q_filter
.items():
153 dot_index
= query_k
.rfind(".")
154 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
156 operator
= "$" + query_k
[dot_index
+ 1:]
157 if operator
== "$neq":
159 k
= query_k
[:dot_index
]
165 if isinstance(v
, list):
166 if operator
in ("$eq", "$cont"):
169 elif operator
in ("$ne", "$ncont"):
173 v
= query_v
.join(",")
175 if operator
in ("$eq", "$cont"):
176 # v cannot be a comma separated list, because operator would have been changed to $in
178 elif operator
== "$ncount":
179 # v cannot be a comma separated list, because operator would have been changed to $nin
184 # process the ANYINDEX word at k.
185 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
188 db_v
= {"$elemMatch": {kright
: db_v
}}
189 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
191 # insert in db_filter
192 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
193 deep_update(db_filter
, {k
: db_v
})
196 except Exception as e
:
197 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
198 http_code
=HTTPStatus
.BAD_REQUEST
)
200 def get_list(self
, table
, q_filter
=None):
202 Obtain a list of entries matching q_filter
203 :param table: collection or table
204 :param q_filter: Filter
205 :return: a list (can be empty) with the found entries. Raises DbException on error
210 collection
= self
.db
[table
]
211 db_filter
= self
._format
_filter
(q_filter
)
212 rows
= collection
.find(db_filter
)
218 except Exception as e
: # TODO refine
221 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
223 Obtain one entry matching q_filter
224 :param table: collection or table
225 :param q_filter: Filter
226 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
227 it raises a DbException
228 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
229 that it raises a DbException
230 :return: The requested element, or None
233 db_filter
= self
._format
_filter
(q_filter
)
235 collection
= self
.db
[table
]
236 if not (fail_on_empty
and fail_on_more
):
237 return collection
.find_one(db_filter
)
238 rows
= collection
.find(db_filter
)
239 if rows
.count() == 0:
241 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
242 HTTPStatus
.NOT_FOUND
)
244 elif rows
.count() > 1:
246 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
249 except Exception as e
: # TODO refine
252 def del_list(self
, table
, q_filter
=None):
254 Deletes all entries that match q_filter
255 :param table: collection or table
256 :param q_filter: Filter
257 :return: Dict with the number of entries deleted
261 collection
= self
.db
[table
]
262 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
263 return {"deleted": rows
.deleted_count
}
266 except Exception as e
: # TODO refine
269 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
271 Deletes one entry that matches q_filter
272 :param table: collection or table
273 :param q_filter: Filter
274 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
275 which case it raises a DbException
276 :return: Dict with the number of entries deleted
280 collection
= self
.db
[table
]
281 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
282 if rows
.deleted_count
== 0:
284 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
285 HTTPStatus
.NOT_FOUND
)
287 return {"deleted": rows
.deleted_count
}
288 except Exception as e
: # TODO refine
291 def create(self
, table
, indata
):
293 Add a new entry at database
294 :param table: collection or table
295 :param indata: content to be added
296 :return: database id of the inserted element. Raises a DbException on error
300 collection
= self
.db
[table
]
301 data
= collection
.insert_one(indata
)
302 return data
.inserted_id
303 except Exception as e
: # TODO refine
306 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True):
308 Modifies an entry at database
309 :param table: collection or table
310 :param q_filter: Filter
311 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
312 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
313 it raises a DbException
314 :return: Dict with the number of entries modified. None if no matching is found.
318 collection
= self
.db
[table
]
319 rows
= collection
.update_one(self
._format
_filter
(q_filter
), {"$set": update_dict
})
320 if rows
.matched_count
== 0:
322 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
323 HTTPStatus
.NOT_FOUND
)
325 return {"modified": rows
.modified_count
}
326 except Exception as e
: # TODO refine
329 def set_list(self
, table
, q_filter
, update_dict
):
331 Modifies al matching entries at database
332 :param table: collection or table
333 :param q_filter: Filter
334 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
335 :return: Dict with the number of entries modified
339 collection
= self
.db
[table
]
340 rows
= collection
.update_many(self
._format
_filter
(q_filter
), {"$set": update_dict
})
341 return {"modified": rows
.modified_count
}
342 except Exception as e
: # TODO refine
345 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
347 Replace the content of an entry
348 :param table: collection or table
349 :param _id: internal database id
350 :param indata: content to replace
351 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
352 it raises a DbException
353 :return: Dict with the number of entries replaced
356 db_filter
= {"_id": _id
}
358 collection
= self
.db
[table
]
359 rows
= collection
.replace_one(db_filter
, indata
)
360 if rows
.matched_count
== 0:
362 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
364 return {"replaced": rows
.modified_count
}
365 except Exception as e
: # TODO refine