1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 from pymongo
import MongoClient
, errors
21 from osm_common
.dbbase
import DbException
, DbBase
22 from http
import HTTPStatus
23 from time
import time
, sleep
24 from copy
import deepcopy
25 from base64
import b64decode
27 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
29 # TODO consider use this decorator for database access retries
31 # def retry_mongocall(call):
32 # def _retry_mongocall(*args, **kwargs):
36 # return call(*args, **kwargs)
37 # except pymongo.AutoReconnect as e:
39 # raise DbException(e)
41 # return _retry_mongocall
44 def deep_update(to_update
, update_with
):
46 Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
47 'update_with' dict recursively
48 :param to_update: must be a dictionary to be modified
49 :param update_with: must be a dictionary. It is not changed
52 for key
in update_with
:
54 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
55 deep_update(to_update
[key
], update_with
[key
])
57 to_update
[key
] = deepcopy(update_with
[key
])
61 class DbMongo(DbBase
):
62 conn_initial_timout
= 120
65 def __init__(self
, logger_name
='db', lock
=False):
66 super().__init
__(logger_name
, lock
)
70 def db_connect(self
, config
, target_version
=None):
73 :param config: Configuration of database
74 :param target_version: if provided it checks if database contains required version, raising exception otherwise.
75 :return: None or raises DbException on error
78 if "logger_name" in config
:
79 self
.logger
= logging
.getLogger(config
["logger_name"])
80 self
.master_password
= config
.get("masterpassword")
81 self
.client
= MongoClient(config
["host"], config
["port"])
82 # TODO add as parameters also username=config.get("user"), password=config.get("password"))
83 # when all modules are ready
84 self
.db
= self
.client
[config
["name"]]
85 if "loglevel" in config
:
86 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
87 # get data to try a connection
91 version_data
= self
.get_one("admin", {"_id": "version"}, fail_on_empty
=False, fail_on_more
=True)
92 # check database status is ok
93 if version_data
and version_data
.get("status") != 'ENABLED':
94 raise DbException("Wrong database status '{}'".format(version_data
.get("status")),
95 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
97 db_version
= None if not version_data
else version_data
.get("version")
98 if target_version
and target_version
!= db_version
:
99 raise DbException("Invalid database version {}. Expected {}".format(db_version
, target_version
))
101 if version_data
and version_data
.get("serial"):
102 self
.set_secret_key(b64decode(version_data
["serial"]))
103 self
.logger
.info("Connected to database {} version {}".format(config
["name"], db_version
))
105 except errors
.ConnectionFailure
as e
:
106 if time() - now
>= self
.conn_initial_timout
:
108 self
.logger
.info("Waiting to database up {}".format(e
))
110 except errors
.PyMongoError
as e
:
114 def _format_filter(q_filter
):
116 Translate query string q_filter into mongo database filter
117 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
119 It accept ".nq" (not equal) in addition to ".neq".
120 For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
121 (two or more matches applies for the same array element). Examples:
122 with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
123 query 'A.B=6' matches because array A contains one element with B equal to 6
124 query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
125 query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
126 query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
129 Examples of translations from SOL005 to >> mongo # comment
130 A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
132 A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
134 A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
135 A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
136 # it must not not contain B
137 A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
138 # neither B nor C; or if a list, it must not contain neither B nor C
139 A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
140 A.gt=B >> A: {$gt: B} # must contain key A and greater than B
141 A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
142 # an array not contain B
143 A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
144 :return: database mongo filter
150 for query_k
, query_v
in q_filter
.items():
151 dot_index
= query_k
.rfind(".")
152 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
154 operator
= "$" + query_k
[dot_index
+ 1:]
155 if operator
== "$neq":
157 k
= query_k
[:dot_index
]
163 if isinstance(v
, list):
164 if operator
in ("$eq", "$cont"):
167 elif operator
in ("$ne", "$ncont"):
171 v
= query_v
.join(",")
173 if operator
in ("$eq", "$cont"):
174 # v cannot be a comma separated list, because operator would have been changed to $in
176 elif operator
== "$ncount":
177 # v cannot be a comma separated list, because operator would have been changed to $nin
182 # process the ANYINDEX word at k.
183 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
186 db_v
= {"$elemMatch": {kright
: db_v
}}
187 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
189 # insert in db_filter
190 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
191 deep_update(db_filter
, {k
: db_v
})
194 except Exception as e
:
195 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
196 http_code
=HTTPStatus
.BAD_REQUEST
)
198 def get_list(self
, table
, q_filter
=None):
200 Obtain a list of entries matching q_filter
201 :param table: collection or table
202 :param q_filter: Filter
203 :return: a list (can be empty) with the found entries. Raises DbException on error
208 collection
= self
.db
[table
]
209 db_filter
= self
._format
_filter
(q_filter
)
210 rows
= collection
.find(db_filter
)
216 except Exception as e
: # TODO refine
219 def get_one(self
, table
, q_filter
=None, fail_on_empty
=True, fail_on_more
=True):
221 Obtain one entry matching q_filter
222 :param table: collection or table
223 :param q_filter: Filter
224 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
225 it raises a DbException
226 :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
227 that it raises a DbException
228 :return: The requested element, or None
231 db_filter
= self
._format
_filter
(q_filter
)
233 collection
= self
.db
[table
]
234 if not (fail_on_empty
and fail_on_more
):
235 return collection
.find_one(db_filter
)
236 rows
= collection
.find(db_filter
)
237 if rows
.count() == 0:
239 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
240 HTTPStatus
.NOT_FOUND
)
242 elif rows
.count() > 1:
244 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], q_filter
),
247 except Exception as e
: # TODO refine
250 def del_list(self
, table
, q_filter
=None):
252 Deletes all entries that match q_filter
253 :param table: collection or table
254 :param q_filter: Filter
255 :return: Dict with the number of entries deleted
259 collection
= self
.db
[table
]
260 rows
= collection
.delete_many(self
._format
_filter
(q_filter
))
261 return {"deleted": rows
.deleted_count
}
264 except Exception as e
: # TODO refine
267 def del_one(self
, table
, q_filter
=None, fail_on_empty
=True):
269 Deletes one entry that matches q_filter
270 :param table: collection or table
271 :param q_filter: Filter
272 :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
273 which case it raises a DbException
274 :return: Dict with the number of entries deleted
278 collection
= self
.db
[table
]
279 rows
= collection
.delete_one(self
._format
_filter
(q_filter
))
280 if rows
.deleted_count
== 0:
282 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
283 HTTPStatus
.NOT_FOUND
)
285 return {"deleted": rows
.deleted_count
}
286 except Exception as e
: # TODO refine
289 def create(self
, table
, indata
):
291 Add a new entry at database
292 :param table: collection or table
293 :param indata: content to be added
294 :return: database id of the inserted element. Raises a DbException on error
298 collection
= self
.db
[table
]
299 data
= collection
.insert_one(indata
)
300 return data
.inserted_id
301 except Exception as e
: # TODO refine
304 def set_one(self
, table
, q_filter
, update_dict
, fail_on_empty
=True):
306 Modifies an entry at database
307 :param table: collection or table
308 :param q_filter: Filter
309 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
310 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
311 it raises a DbException
312 :return: Dict with the number of entries modified. None if no matching is found.
316 collection
= self
.db
[table
]
317 rows
= collection
.update_one(self
._format
_filter
(q_filter
), {"$set": update_dict
})
318 if rows
.matched_count
== 0:
320 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], q_filter
),
321 HTTPStatus
.NOT_FOUND
)
323 return {"modified": rows
.modified_count
}
324 except Exception as e
: # TODO refine
327 def set_list(self
, table
, q_filter
, update_dict
):
329 Modifies al matching entries at database
330 :param table: collection or table
331 :param q_filter: Filter
332 :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
333 :return: Dict with the number of entries modified
337 collection
= self
.db
[table
]
338 rows
= collection
.update_many(self
._format
_filter
(q_filter
), {"$set": update_dict
})
339 return {"modified": rows
.modified_count
}
340 except Exception as e
: # TODO refine
343 def replace(self
, table
, _id
, indata
, fail_on_empty
=True):
345 Replace the content of an entry
346 :param table: collection or table
347 :param _id: internal database id
348 :param indata: content to replace
349 :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
350 it raises a DbException
351 :return: Dict with the number of entries replaced
354 db_filter
= {"_id": _id
}
356 collection
= self
.db
[table
]
357 rows
= collection
.replace_one(db_filter
, indata
)
358 if rows
.matched_count
== 0:
360 raise DbException("Not found any {} with _id='{}'".format(table
[:-1], _id
), HTTPStatus
.NOT_FOUND
)
362 return {"replaced": rows
.modified_count
}
363 except Exception as e
: # TODO refine