Initial commit for NBI
[osm/NBI.git] / osm_nbi / dbmongo.py
1 #import pymongo
2 import logging
3 from pymongo import MongoClient, errors
4 from dbbase import DbException, DbBase
5 from http import HTTPStatus
6 from time import time, sleep
7
8 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
9
10 # TODO consider use this decorator for database access retries
11 # @retry_mongocall
12 # def retry_mongocall(call):
13 # def _retry_mongocall(*args, **kwargs):
14 # retry = 1
15 # while True:
16 # try:
17 # return call(*args, **kwargs)
18 # except pymongo.AutoReconnect as e:
19 # if retry == 4:
20 # raise DbException(str(e))
21 # sleep(retry)
22 # return _retry_mongocall
23
24 class DbMongo(DbBase):
25 conn_initial_timout = 120
26 conn_timout = 10
27
28 def __init__(self, logger_name='db'):
29 self.logger = logging.getLogger(logger_name)
30
31 def db_connect(self, config):
32 try:
33 if "logger_name" in config:
34 self.logger = logging.getLogger(config["logger_name"])
35 self.client = MongoClient(config["host"], config["port"])
36 self.db = self.client[config["name"]]
37 if "loglevel" in config:
38 self.logger.setLevel(getattr(logging, config['loglevel']))
39 # get data to try a connection
40 now = time()
41 while True:
42 try:
43 self.db.users.find_one({"username": "admin"})
44 return
45 except errors.ConnectionFailure as e:
46 if time() - now >= self.conn_initial_timout:
47 raise
48 self.logger.info("Waiting to database up {}".format(e))
49 sleep(2)
50 except errors.PyMongoError as e:
51 raise DbException(str(e))
52
53 def db_disconnect(self):
54 pass # TODO
55
56 @staticmethod
57 def _format_filter(filter):
58 try:
59 db_filter = {}
60 for query_k, query_v in filter.items():
61 dot_index = query_k.rfind(".")
62 if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
63 "ncont", "neq"):
64 operator = "$" + query_k[dot_index+1:]
65 if operator == "$neq":
66 operator = "$nq"
67 k = query_k[:dot_index]
68 else:
69 operator = "$eq"
70 k = query_k
71
72 v = query_v
73 if isinstance(v, list):
74 if operator in ("$eq", "$cont"):
75 operator = "$in"
76 v = query_v
77 elif operator in ("$ne", "$ncont"):
78 operator = "$nin"
79 v = query_v
80 else:
81 v = query_v.join(",")
82
83 if operator in ("$eq", "$cont"):
84 # v cannot be a comma separated list, because operator would have been changed to $in
85 db_filter[k] = v
86 elif operator == "$ncount":
87 # v cannot be a comma separated list, because operator would have been changed to $nin
88 db_filter[k] = {"$ne": v}
89 else:
90 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
91 if k not in db_filter:
92 db_filter[k] = {}
93 db_filter[k][operator] = v
94
95 return db_filter
96 except Exception as e:
97 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
98 http_code=HTTPStatus.BAD_REQUEST)
99
100
101 def get_list(self, table, filter={}):
102 try:
103 l = []
104 collection = self.db[table]
105 rows = collection.find(self._format_filter(filter))
106 for row in rows:
107 l.append(row)
108 return l
109 except DbException:
110 raise
111 except Exception as e: # TODO refine
112 raise DbException(str(e))
113
114 def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
115 try:
116 if filter:
117 filter = self._format_filter(filter)
118 collection = self.db[table]
119 if not (fail_on_empty and fail_on_more):
120 return collection.find_one(filter)
121 rows = collection.find(filter)
122 if rows.count() == 0:
123 if fail_on_empty:
124 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
125 return None
126 elif rows.count() > 1:
127 if fail_on_more:
128 raise DbException("Found more than one entry with filter='{}'".format(filter),
129 HTTPStatus.CONFLICT)
130 return rows[0]
131 except Exception as e: # TODO refine
132 raise DbException(str(e))
133
134 def del_list(self, table, filter={}):
135 try:
136 collection = self.db[table]
137 rows = collection.delete_many(self._format_filter(filter))
138 return {"deleted": rows.deleted_count}
139 except DbException:
140 raise
141 except Exception as e: # TODO refine
142 raise DbException(str(e))
143
144 def del_one(self, table, filter={}, fail_on_empty=True):
145 try:
146 collection = self.db[table]
147 rows = collection.delete_one(self._format_filter(filter))
148 if rows.deleted_count == 0:
149 if fail_on_empty:
150 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
151 return None
152 return {"deleted": rows.deleted_count}
153 except Exception as e: # TODO refine
154 raise DbException(str(e))
155
156 def create(self, table, indata):
157 try:
158 collection = self.db[table]
159 data = collection.insert_one(indata)
160 return data.inserted_id
161 except Exception as e: # TODO refine
162 raise DbException(str(e))
163
164 def set_one(self, table, filter, update_dict, fail_on_empty=True):
165 try:
166 collection = self.db[table]
167 rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
168 if rows.updated_count == 0:
169 if fail_on_empty:
170 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
171 return None
172 return {"deleted": rows.deleted_count}
173 except Exception as e: # TODO refine
174 raise DbException(str(e))
175
176 def replace(self, table, id, indata, fail_on_empty=True):
177 try:
178 collection = self.db[table]
179 rows = collection.replace_one({"_id": id}, indata)
180 if rows.modified_count == 0:
181 if fail_on_empty:
182 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
183 return None
184 return {"replace": rows.modified_count}
185 except Exception as e: # TODO refine
186 raise DbException(str(e))