41409d2dd27f323ff62a0d4f2c7a8e6faf8c61f4
[osm/common.git] / osm_common / dbmongo.py
1
2 import logging
3 from pymongo import MongoClient, errors
4 from osm_common.dbbase import DbException, DbBase
5 from http import HTTPStatus
6 from time import time, sleep
7 from copy import deepcopy
8
9 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
10
11 # TODO consider use this decorator for database access retries
12 # @retry_mongocall
13 # def retry_mongocall(call):
14 # def _retry_mongocall(*args, **kwargs):
15 # retry = 1
16 # while True:
17 # try:
18 # return call(*args, **kwargs)
19 # except pymongo.AutoReconnect as e:
20 # if retry == 4:
21 # raise DbException(str(e))
22 # sleep(retry)
23 # return _retry_mongocall
24
25
26 def deep_update(to_update, update_with):
27 """
28 Update 'to_update' dict with the content 'update_with' dict recursively
29 :param to_update: must be a dictionary to be modified
30 :param update_with: must be a dictionary. It is not changed
31 :return: to_update
32 """
33 for key in update_with:
34 if key in to_update:
35 if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
36 deep_update(to_update[key], update_with[key])
37 continue
38 to_update[key] = deepcopy(update_with[key])
39 return to_update
40
41
42 class DbMongo(DbBase):
43 conn_initial_timout = 120
44 conn_timout = 10
45
46 def __init__(self, logger_name='db'):
47 self.logger = logging.getLogger(logger_name)
48
49 def db_connect(self, config):
50 try:
51 if "logger_name" in config:
52 self.logger = logging.getLogger(config["logger_name"])
53 self.client = MongoClient(config["host"], config["port"])
54 self.db = self.client[config["name"]]
55 if "loglevel" in config:
56 self.logger.setLevel(getattr(logging, config['loglevel']))
57 # get data to try a connection
58 now = time()
59 while True:
60 try:
61 self.db.users.find_one({"username": "admin"})
62 return
63 except errors.ConnectionFailure as e:
64 if time() - now >= self.conn_initial_timout:
65 raise
66 self.logger.info("Waiting to database up {}".format(e))
67 sleep(2)
68 except errors.PyMongoError as e:
69 raise DbException(str(e))
70
71 def db_disconnect(self):
72 pass # TODO
73
74 @staticmethod
75 def _format_filter(q_filter):
76 """
77 Translate query string filter into mongo database filter
78 :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
79 differences: It accpept ".nq" (not equal) in addition to ".neq".
80 For arrays if get an item with exact matching. For partial matching, use the special work ".ANYINDEX.".
81 :return: database mongo filter
82 """
83 try:
84 db_filter = {}
85 for query_k, query_v in q_filter.items():
86 dot_index = query_k.rfind(".")
87 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
88 "ncont", "neq"):
89 operator = "$" + query_k[dot_index+1:]
90 if operator == "$neq":
91 operator = "$ne"
92 k = query_k[:dot_index]
93 else:
94 operator = "$eq"
95 k = query_k
96
97 v = query_v
98 if isinstance(v, list):
99 if operator in ("$eq", "$cont"):
100 operator = "$in"
101 v = query_v
102 elif operator in ("$ne", "$ncont"):
103 operator = "$nin"
104 v = query_v
105 else:
106 v = query_v.join(",")
107
108 if operator in ("$eq", "$cont"):
109 # v cannot be a comma separated list, because operator would have been changed to $in
110 db_v = v
111 elif operator == "$ncount":
112 # v cannot be a comma separated list, because operator would have been changed to $nin
113 db_v = {"$ne": v}
114 else:
115 db_v = {operator: v}
116
117 # process the ANYINDEX word at k
118 kleft, _, kright = k.rpartition(".ANYINDEX.")
119 while kleft:
120 k = kleft
121 db_v = {"$elemMatch": {kright: db_v}}
122 kleft, _, kright = k.rpartition(".ANYINDEX.")
123
124 # insert in db_filter
125 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
126 deep_update(db_filter, {k: db_v})
127
128 return db_filter
129 except Exception as e:
130 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
131 http_code=HTTPStatus.BAD_REQUEST)
132
133 def get_list(self, table, filter={}):
134 try:
135 l = []
136 collection = self.db[table]
137 db_filter = self._format_filter(filter)
138 rows = collection.find(db_filter)
139 for row in rows:
140 l.append(row)
141 return l
142 except DbException:
143 raise
144 except Exception as e: # TODO refine
145 raise DbException(str(e))
146
147 def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
148 try:
149 if filter:
150 filter = self._format_filter(filter)
151 collection = self.db[table]
152 if not (fail_on_empty and fail_on_more):
153 return collection.find_one(filter)
154 rows = collection.find(filter)
155 if rows.count() == 0:
156 if fail_on_empty:
157 raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
158 HTTPStatus.NOT_FOUND)
159 return None
160 elif rows.count() > 1:
161 if fail_on_more:
162 raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
163 HTTPStatus.CONFLICT)
164 return rows[0]
165 except Exception as e: # TODO refine
166 raise DbException(str(e))
167
168 def del_list(self, table, filter={}):
169 try:
170 collection = self.db[table]
171 rows = collection.delete_many(self._format_filter(filter))
172 return {"deleted": rows.deleted_count}
173 except DbException:
174 raise
175 except Exception as e: # TODO refine
176 raise DbException(str(e))
177
178 def del_one(self, table, filter={}, fail_on_empty=True):
179 try:
180 collection = self.db[table]
181 rows = collection.delete_one(self._format_filter(filter))
182 if rows.deleted_count == 0:
183 if fail_on_empty:
184 raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
185 HTTPStatus.NOT_FOUND)
186 return None
187 return {"deleted": rows.deleted_count}
188 except Exception as e: # TODO refine
189 raise DbException(str(e))
190
191 def create(self, table, indata):
192 try:
193 collection = self.db[table]
194 data = collection.insert_one(indata)
195 return data.inserted_id
196 except Exception as e: # TODO refine
197 raise DbException(str(e))
198
199 def set_one(self, table, filter, update_dict, fail_on_empty=True):
200 try:
201 collection = self.db[table]
202 rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
203 if rows.matched_count == 0:
204 if fail_on_empty:
205 raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
206 HTTPStatus.NOT_FOUND)
207 return None
208 return {"modified": rows.modified_count}
209 except Exception as e: # TODO refine
210 raise DbException(str(e))
211
212 def replace(self, table, id, indata, fail_on_empty=True):
213 try:
214 _filter = {"_id": id}
215 collection = self.db[table]
216 rows = collection.replace_one(_filter, indata)
217 if rows.matched_count == 0:
218 if fail_on_empty:
219 raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
220 HTTPStatus.NOT_FOUND)
221 return None
222 return {"replaced": rows.modified_count}
223 except Exception as e: # TODO refine
224 raise DbException(str(e))