blob: 1d56e5860387347d0e0a47f1cc00cec11ae3625e [file] [log] [blame]
import logging
from pymongo import MongoClient, errors
from osm_common.dbbase import DbException, DbBase
from http import HTTPStatus
from time import time, sleep
from copy import deepcopy
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
# TODO consider use this decorator for database access retries
# @retry_mongocall
# def retry_mongocall(call):
# def _retry_mongocall(*args, **kwargs):
# retry = 1
# while True:
# try:
# return call(*args, **kwargs)
# except pymongo.AutoReconnect as e:
# if retry == 4:
# raise DbException(str(e))
# sleep(retry)
# return _retry_mongocall
def deep_update(to_update, update_with):
"""
Update 'to_update' dict with the content 'update_with' dict recursively
:param to_update: must be a dictionary to be modified
:param update_with: must be a dictionary. It is not changed
:return: to_update
"""
for key in update_with:
if key in to_update:
if isinstance(to_update[key], dict) and isinstance(update_with[key], dict):
deep_update(to_update[key], update_with[key])
continue
to_update[key] = deepcopy(update_with[key])
return to_update
class DbMongo(DbBase):
conn_initial_timout = 120
conn_timout = 10
def __init__(self, logger_name='db'):
self.logger = logging.getLogger(logger_name)
def db_connect(self, config):
try:
if "logger_name" in config:
self.logger = logging.getLogger(config["logger_name"])
self.client = MongoClient(config["host"], config["port"])
self.db = self.client[config["name"]]
if "loglevel" in config:
self.logger.setLevel(getattr(logging, config['loglevel']))
# get data to try a connection
now = time()
while True:
try:
self.db.users.find_one({"username": "admin"})
return
except errors.ConnectionFailure as e:
if time() - now >= self.conn_initial_timout:
raise
self.logger.info("Waiting to database up {}".format(e))
sleep(2)
except errors.PyMongoError as e:
raise DbException(str(e))
def db_disconnect(self):
pass # TODO
@staticmethod
def _format_filter(q_filter):
"""
Translate query string filter into mongo database filter
:param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
differences:
It accept ".nq" (not equal) in addition to ".neq".
For arrays you can specify index (concrete index must match), nothing (any index may match) or 'ANYINDEX'
(two or more matches applies for the same array element). Examples:
with database register: {A: [{B: 1, C: 2}, {B: 6, C: 9}]}
query 'A.B=6' matches because array A contains one element with B equal to 6
query 'A.0.B=6' does no match because index 0 of array A contains B with value 1, but not 6
query 'A.B=6&A.C=2' matches because one element of array matches B=6 and other matchesC=2
query 'A.ANYINDEX.B=6&A.ANYINDEX.C=2' does not match because it is needed the same element of the
array matching both
Examples of translations from SOL005 to >> mongo # comment
A=B; A.eq=B >> A: B # must contain key A and equal to B or be a list that contains B
A.cont=B >> A: B
A=B&A=C; A=B,C >> A: {$in: [B, C]} # must contain key A and equal to B or C or be a list that contains
# B or C
A.cont=B&A.cont=C; A.cont=B,C >> A: {$in: [B, C]}
A.ncont=B >> A: {$nin: B} # must not contain key A or if present not equal to B or if a list,
# it must not not contain B
A.ncont=B,C; A.ncont=B&A.ncont=C >> A: {$nin: [B,C]} # must not contain key A or if present not equal
# neither B nor C; or if a list, it must not contain neither B nor C
A.ne=B&A.ne=C; A.ne=B,C >> A: {$nin: [B, C]}
A.gt=B >> A: {$gt: B} # must contain key A and greater than B
A.ne=B; A.neq=B >> A: {$ne: B} # must not contain key A or if present not equal to B, or if
# an array not contain B
A.ANYINDEX.B=C >> A: {$elemMatch: {B=C}
:return: database mongo filter
"""
try:
db_filter = {}
for query_k, query_v in q_filter.items():
dot_index = query_k.rfind(".")
if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
"ncont", "neq"):
operator = "$" + query_k[dot_index + 1:]
if operator == "$neq":
operator = "$ne"
k = query_k[:dot_index]
else:
operator = "$eq"
k = query_k
v = query_v
if isinstance(v, list):
if operator in ("$eq", "$cont"):
operator = "$in"
v = query_v
elif operator in ("$ne", "$ncont"):
operator = "$nin"
v = query_v
else:
v = query_v.join(",")
if operator in ("$eq", "$cont"):
# v cannot be a comma separated list, because operator would have been changed to $in
db_v = v
elif operator == "$ncount":
# v cannot be a comma separated list, because operator would have been changed to $nin
db_v = {"$ne": v}
else:
db_v = {operator: v}
# process the ANYINDEX word at k.
kleft, _, kright = k.rpartition(".ANYINDEX.")
while kleft:
k = kleft
db_v = {"$elemMatch": {kright: db_v}}
kleft, _, kright = k.rpartition(".ANYINDEX.")
# insert in db_filter
# maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
deep_update(db_filter, {k: db_v})
return db_filter
except Exception as e:
raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
http_code=HTTPStatus.BAD_REQUEST)
def get_list(self, table, filter={}):
try:
result = []
collection = self.db[table]
db_filter = self._format_filter(filter)
rows = collection.find(db_filter)
for row in rows:
result.append(row)
return result
except DbException:
raise
except Exception as e: # TODO refine
raise DbException(str(e))
def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
try:
if filter:
filter = self._format_filter(filter)
collection = self.db[table]
if not (fail_on_empty and fail_on_more):
return collection.find_one(filter)
rows = collection.find(filter)
if rows.count() == 0:
if fail_on_empty:
raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
HTTPStatus.NOT_FOUND)
return None
elif rows.count() > 1:
if fail_on_more:
raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
HTTPStatus.CONFLICT)
return rows[0]
except Exception as e: # TODO refine
raise DbException(str(e))
def del_list(self, table, filter={}):
try:
collection = self.db[table]
rows = collection.delete_many(self._format_filter(filter))
return {"deleted": rows.deleted_count}
except DbException:
raise
except Exception as e: # TODO refine
raise DbException(str(e))
def del_one(self, table, filter={}, fail_on_empty=True):
try:
collection = self.db[table]
rows = collection.delete_one(self._format_filter(filter))
if rows.deleted_count == 0:
if fail_on_empty:
raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
HTTPStatus.NOT_FOUND)
return None
return {"deleted": rows.deleted_count}
except Exception as e: # TODO refine
raise DbException(str(e))
def create(self, table, indata):
try:
collection = self.db[table]
data = collection.insert_one(indata)
return data.inserted_id
except Exception as e: # TODO refine
raise DbException(str(e))
def set_one(self, table, filter, update_dict, fail_on_empty=True):
try:
collection = self.db[table]
rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
if rows.matched_count == 0:
if fail_on_empty:
raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
HTTPStatus.NOT_FOUND)
return None
return {"modified": rows.modified_count}
except Exception as e: # TODO refine
raise DbException(str(e))
def replace(self, table, id, indata, fail_on_empty=True):
try:
_filter = {"_id": id}
collection = self.db[table]
rows = collection.replace_one(_filter, indata)
if rows.matched_count == 0:
if fail_on_empty:
raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
HTTPStatus.NOT_FOUND)
return None
return {"replaced": rows.modified_count}
except Exception as e: # TODO refine
raise DbException(str(e))