3 from pymongo
import MongoClient
, errors
4 from osm_common
.dbbase
import DbException
, DbBase
5 from http
import HTTPStatus
6 from time
import time
, sleep
7 from copy
import deepcopy
9 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
11 # TODO consider use this decorator for database access retries
13 # def retry_mongocall(call):
14 # def _retry_mongocall(*args, **kwargs):
18 # return call(*args, **kwargs)
19 # except pymongo.AutoReconnect as e:
21 # raise DbException(str(e))
23 # return _retry_mongocall
26 def deep_update(to_update
, update_with
):
28 Update 'to_update' dict with the content 'update_with' dict recursively
29 :param to_update: must be a dictionary to be modified
30 :param update_with: must be a dictionary. It is not changed
33 for key
in update_with
:
35 if isinstance(to_update
[key
], dict) and isinstance(update_with
[key
], dict):
36 deep_update(to_update
[key
], update_with
[key
])
38 to_update
[key
] = deepcopy(update_with
[key
])
42 class DbMongo(DbBase
):
43 conn_initial_timout
= 120
46 def __init__(self
, logger_name
='db'):
47 self
.logger
= logging
.getLogger(logger_name
)
49 def db_connect(self
, config
):
51 if "logger_name" in config
:
52 self
.logger
= logging
.getLogger(config
["logger_name"])
53 self
.client
= MongoClient(config
["host"], config
["port"])
54 self
.db
= self
.client
[config
["name"]]
55 if "loglevel" in config
:
56 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
57 # get data to try a connection
61 self
.db
.users
.find_one({"username": "admin"})
63 except errors
.ConnectionFailure
as e
:
64 if time() - now
>= self
.conn_initial_timout
:
66 self
.logger
.info("Waiting to database up {}".format(e
))
68 except errors
.PyMongoError
as e
:
69 raise DbException(str(e
))
71 def db_disconnect(self
):
75 def _format_filter(q_filter
):
77 Translate query string filter into mongo database filter
78 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
79 differences: It accpept ".nq" (not equal) in addition to ".neq".
80 For arrays if get an item with exact matching. For partial matching, use the special work ".ANYINDEX.".
81 :return: database mongo filter
85 for query_k
, query_v
in q_filter
.items():
86 dot_index
= query_k
.rfind(".")
87 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
89 operator
= "$" + query_k
[dot_index
+1:]
90 if operator
== "$neq":
92 k
= query_k
[:dot_index
]
98 if isinstance(v
, list):
99 if operator
in ("$eq", "$cont"):
102 elif operator
in ("$ne", "$ncont"):
106 v
= query_v
.join(",")
108 if operator
in ("$eq", "$cont"):
109 # v cannot be a comma separated list, because operator would have been changed to $in
111 elif operator
== "$ncount":
112 # v cannot be a comma separated list, because operator would have been changed to $nin
117 # process the ANYINDEX word at k
118 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
121 db_v
= {"$elemMatch": {kright
: db_v
}}
122 kleft
, _
, kright
= k
.rpartition(".ANYINDEX.")
124 # insert in db_filter
125 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
126 deep_update(db_filter
, {k
: db_v
})
129 except Exception as e
:
130 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
131 http_code
=HTTPStatus
.BAD_REQUEST
)
133 def get_list(self
, table
, filter={}):
136 collection
= self
.db
[table
]
137 db_filter
= self
._format
_filter
(filter)
138 rows
= collection
.find(db_filter
)
144 except Exception as e
: # TODO refine
145 raise DbException(str(e
))
147 def get_one(self
, table
, filter={}, fail_on_empty
=True, fail_on_more
=True):
150 filter = self
._format
_filter
(filter)
151 collection
= self
.db
[table
]
152 if not (fail_on_empty
and fail_on_more
):
153 return collection
.find_one(filter)
154 rows
= collection
.find(filter)
155 if rows
.count() == 0:
157 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], filter),
158 HTTPStatus
.NOT_FOUND
)
160 elif rows
.count() > 1:
162 raise DbException("Found more than one {} with filter='{}'".format(table
[:-1], filter),
165 except Exception as e
: # TODO refine
166 raise DbException(str(e
))
168 def del_list(self
, table
, filter={}):
170 collection
= self
.db
[table
]
171 rows
= collection
.delete_many(self
._format
_filter
(filter))
172 return {"deleted": rows
.deleted_count
}
175 except Exception as e
: # TODO refine
176 raise DbException(str(e
))
178 def del_one(self
, table
, filter={}, fail_on_empty
=True):
180 collection
= self
.db
[table
]
181 rows
= collection
.delete_one(self
._format
_filter
(filter))
182 if rows
.deleted_count
== 0:
184 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], filter),
185 HTTPStatus
.NOT_FOUND
)
187 return {"deleted": rows
.deleted_count
}
188 except Exception as e
: # TODO refine
189 raise DbException(str(e
))
191 def create(self
, table
, indata
):
193 collection
= self
.db
[table
]
194 data
= collection
.insert_one(indata
)
195 return data
.inserted_id
196 except Exception as e
: # TODO refine
197 raise DbException(str(e
))
199 def set_one(self
, table
, filter, update_dict
, fail_on_empty
=True):
201 collection
= self
.db
[table
]
202 rows
= collection
.update_one(self
._format
_filter
(filter), {"$set": update_dict
})
203 if rows
.matched_count
== 0:
205 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], filter),
206 HTTPStatus
.NOT_FOUND
)
208 return {"modified": rows
.modified_count
}
209 except Exception as e
: # TODO refine
210 raise DbException(str(e
))
212 def replace(self
, table
, id, indata
, fail_on_empty
=True):
214 _filter
= {"_id": id}
215 collection
= self
.db
[table
]
216 rows
= collection
.replace_one(_filter
, indata
)
217 if rows
.matched_count
== 0:
219 raise DbException("Not found any {} with filter='{}'".format(table
[:-1], _filter
),
220 HTTPStatus
.NOT_FOUND
)
222 return {"replaced": rows
.modified_count
}
223 except Exception as e
: # TODO refine
224 raise DbException(str(e
))