3 from pymongo
import MongoClient
, errors
4 from dbbase
import DbException
, DbBase
5 from http
import HTTPStatus
6 from time
import time
, sleep
8 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
10 # TODO consider use this decorator for database access retries
12 # def retry_mongocall(call):
13 # def _retry_mongocall(*args, **kwargs):
17 # return call(*args, **kwargs)
18 # except pymongo.AutoReconnect as e:
20 # raise DbException(str(e))
22 # return _retry_mongocall
24 class DbMongo(DbBase
):
25 conn_initial_timout
= 120
28 def __init__(self
, logger_name
='db'):
29 self
.logger
= logging
.getLogger(logger_name
)
31 def db_connect(self
, config
):
33 if "logger_name" in config
:
34 self
.logger
= logging
.getLogger(config
["logger_name"])
35 self
.client
= MongoClient(config
["host"], config
["port"])
36 self
.db
= self
.client
[config
["name"]]
37 if "loglevel" in config
:
38 self
.logger
.setLevel(getattr(logging
, config
['loglevel']))
39 # get data to try a connection
43 self
.db
.users
.find_one({"username": "admin"})
45 except errors
.ConnectionFailure
as e
:
46 if time() - now
>= self
.conn_initial_timout
:
48 self
.logger
.info("Waiting to database up {}".format(e
))
50 except errors
.PyMongoError
as e
:
51 raise DbException(str(e
))
53 def db_disconnect(self
):
57 def _format_filter(filter):
60 for query_k
, query_v
in filter.items():
61 dot_index
= query_k
.rfind(".")
62 if dot_index
> 1 and query_k
[dot_index
+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
64 operator
= "$" + query_k
[dot_index
+1:]
65 if operator
== "$neq":
67 k
= query_k
[:dot_index
]
73 if isinstance(v
, list):
74 if operator
in ("$eq", "$cont"):
77 elif operator
in ("$ne", "$ncont"):
83 if operator
in ("$eq", "$cont"):
84 # v cannot be a comma separated list, because operator would have been changed to $in
86 elif operator
== "$ncount":
87 # v cannot be a comma separated list, because operator would have been changed to $nin
88 db_filter
[k
] = {"$ne": v
}
90 # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
91 if k
not in db_filter
:
93 db_filter
[k
][operator
] = v
96 except Exception as e
:
97 raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k
, v
, e
),
98 http_code
=HTTPStatus
.BAD_REQUEST
)
101 def get_list(self
, table
, filter={}):
104 collection
= self
.db
[table
]
105 rows
= collection
.find(self
._format
_filter
(filter))
111 except Exception as e
: # TODO refine
112 raise DbException(str(e
))
114 def get_one(self
, table
, filter={}, fail_on_empty
=True, fail_on_more
=True):
117 filter = self
._format
_filter
(filter)
118 collection
= self
.db
[table
]
119 if not (fail_on_empty
and fail_on_more
):
120 return collection
.find_one(filter)
121 rows
= collection
.find(filter)
122 if rows
.count() == 0:
124 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus
.NOT_FOUND
)
126 elif rows
.count() > 1:
128 raise DbException("Found more than one entry with filter='{}'".format(filter),
131 except Exception as e
: # TODO refine
132 raise DbException(str(e
))
134 def del_list(self
, table
, filter={}):
136 collection
= self
.db
[table
]
137 rows
= collection
.delete_many(self
._format
_filter
(filter))
138 return {"deleted": rows
.deleted_count
}
141 except Exception as e
: # TODO refine
142 raise DbException(str(e
))
144 def del_one(self
, table
, filter={}, fail_on_empty
=True):
146 collection
= self
.db
[table
]
147 rows
= collection
.delete_one(self
._format
_filter
(filter))
148 if rows
.deleted_count
== 0:
150 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus
.NOT_FOUND
)
152 return {"deleted": rows
.deleted_count
}
153 except Exception as e
: # TODO refine
154 raise DbException(str(e
))
156 def create(self
, table
, indata
):
158 collection
= self
.db
[table
]
159 data
= collection
.insert_one(indata
)
160 return data
.inserted_id
161 except Exception as e
: # TODO refine
162 raise DbException(str(e
))
164 def set_one(self
, table
, filter, update_dict
, fail_on_empty
=True):
166 collection
= self
.db
[table
]
167 rows
= collection
.update_one(self
._format
_filter
(filter), {"$set": update_dict
})
168 if rows
.updated_count
== 0:
170 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus
.NOT_FOUND
)
172 return {"deleted": rows
.deleted_count
}
173 except Exception as e
: # TODO refine
174 raise DbException(str(e
))
176 def replace(self
, table
, id, indata
, fail_on_empty
=True):
178 collection
= self
.db
[table
]
179 rows
= collection
.replace_one({"_id": id}, indata
)
180 if rows
.modified_count
== 0:
182 raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus
.NOT_FOUND
)
184 return {"replace": rows
.modified_count
}
185 except Exception as e
: # TODO refine
186 raise DbException(str(e
))