+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
import logging
from pymongo import MongoClient, errors
from http import HTTPStatus
from time import time, sleep
from copy import deepcopy
+from base64 import b64decode
+from uuid import uuid4
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
# return call(*args, **kwargs)
# except pymongo.AutoReconnect as e:
# if retry == 4:
-# raise DbException(str(e))
+# raise DbException(e)
# sleep(retry)
# return _retry_mongocall
def deep_update(to_update, update_with):
"""
- Update 'to_update' dict with the content 'update_with' dict recursively
+ Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
+ 'update_with' dict recursively
:param to_update: must be a dictionary to be modified
:param update_with: must be a dictionary. It is not changed
:return: to_update
conn_initial_timout = 120
conn_timout = 10
- def __init__(self, logger_name='db'):
- self.logger = logging.getLogger(logger_name)
+ def __init__(self, logger_name="db", lock=False):
+ super().__init__(logger_name, lock)
+ self.client = None
+ self.db = None
+ self.database_key = None
+ self.secret_obtained = False
+ # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
+ # In case it is not ready when connected, it should be got later on before any decrypt operation
+
+ def get_secret_key(self):
+ if self.secret_obtained:
+ return
- def db_connect(self, config):
+ self.secret_key = None
+ if self.database_key:
+ self.set_secret_key(self.database_key)
+ version_data = self.get_one(
+ "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True
+ )
+ if version_data and version_data.get("serial"):
+ self.set_secret_key(b64decode(version_data["serial"]))
+ self.secret_obtained = True
+
+ def db_connect(self, config, target_version=None):
+ """
+ Connect to database
+ :param config: Configuration of database
+ :param target_version: if provided it checks if database contains required version, raising exception otherwise.
+ :return: None or raises DbException on error
+ """
try:
if "logger_name" in config:
self.logger = logging.getLogger(config["logger_name"])
- self.client = MongoClient(config["host"], config["port"])
+ master_key = config.get("commonkey") or config.get("masterpassword")
+ if master_key:
+ self.database_key = master_key
+ self.set_secret_key(master_key)
+ if config.get("uri"):
+ self.client = MongoClient(
+ config["uri"], replicaSet=config.get("replicaset", None)
+ )
+ else:
+ self.client = MongoClient(
+ config["host"],
+ config["port"],
+ replicaSet=config.get("replicaset", None),
+ )
+ # TODO add as parameters also username=config.get("user"), password=config.get("password"))
+ # when all modules are ready
self.db = self.client[config["name"]]
if "loglevel" in config:
- self.logger.setLevel(getattr(logging, config['loglevel']))
+ self.logger.setLevel(getattr(logging, config["loglevel"]))
# get data to try a connection
now = time()
while True:
try:
- self.db.users.find_one({"username": "admin"})
+ version_data = self.get_one(
+ "admin",
+ {"_id": "version"},
+ fail_on_empty=False,
+ fail_on_more=True,
+ )
+ # check database status is ok
+ if version_data and version_data.get("status") != "ENABLED":
+ raise DbException(
+ "Wrong database status '{}'".format(
+ version_data.get("status")
+ ),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
+ # check version
+ db_version = (
+ None if not version_data else version_data.get("version")
+ )
+ if target_version and target_version != db_version:
+ raise DbException(
+ "Invalid database version {}. Expected {}".format(
+ db_version, target_version
+ )
+ )
+ # get serial
+ if version_data and version_data.get("serial"):
+ self.secret_obtained = True
+ self.set_secret_key(b64decode(version_data["serial"]))
+ self.logger.info(
+ "Connected to database {} version {}".format(
+ config["name"], db_version
+ )
+ )
return
except errors.ConnectionFailure as e:
if time() - now >= self.conn_initial_timout:
self.logger.info("Waiting to database up {}".format(e))
sleep(2)
except errors.PyMongoError as e:
- raise DbException(str(e))
-
- def db_disconnect(self):
- pass # TODO
+ raise DbException(e)
@staticmethod
def _format_filter(q_filter):
"""
- Translate query string filter into mongo database filter
+ Translate query string q_filter into mongo database filter
:param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
differences:
It accept ".nq" (not equal) in addition to ".neq".
"""
try:
db_filter = {}
+ if not q_filter:
+ return db_filter
for query_k, query_v in q_filter.items():
dot_index = query_k.rfind(".")
- if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
- "ncont", "neq"):
- operator = "$" + query_k[dot_index + 1:]
+ if dot_index > 1 and query_k[dot_index + 1 :] in (
+ "eq",
+ "ne",
+ "gt",
+ "gte",
+ "lt",
+ "lte",
+ "cont",
+ "ncont",
+ "neq",
+ ):
+ operator = "$" + query_k[dot_index + 1 :]
if operator == "$neq":
operator = "$ne"
k = query_k[:dot_index]
return db_filter
except Exception as e:
- raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
- http_code=HTTPStatus.BAD_REQUEST)
+ raise DbException(
+ "Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
- def get_list(self, table, filter={}):
+ def get_list(self, table, q_filter=None):
+ """
+ Obtain a list of entries matching q_filter
+ :param table: collection or table
+ :param q_filter: Filter
+ :return: a list (can be empty) with the found entries. Raises DbException on error
+ """
try:
result = []
- collection = self.db[table]
- db_filter = self._format_filter(filter)
- rows = collection.find(db_filter)
+ with self.lock:
+ collection = self.db[table]
+ db_filter = self._format_filter(q_filter)
+ rows = collection.find(db_filter)
for row in rows:
result.append(row)
return result
except DbException:
raise
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
+
+ def count(self, table, q_filter=None):
+ """
+ Count the number of entries matching q_filter
+ :param table: collection or table
+ :param q_filter: Filter
+ :return: number of entries found (can be zero)
+ :raise: DbException on error
+ """
+ try:
+ with self.lock:
+ collection = self.db[table]
+ db_filter = self._format_filter(q_filter)
+ count = collection.count(db_filter)
+ return count
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(e)
- def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
+ """
+ Obtain one entry matching q_filter
+ :param table: collection or table
+ :param q_filter: Filter
+ :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
+ it raises a DbException
+ :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
+ that it raises a DbException
+ :return: The requested element, or None
+ """
try:
- if filter:
- filter = self._format_filter(filter)
- collection = self.db[table]
- if not (fail_on_empty and fail_on_more):
- return collection.find_one(filter)
- rows = collection.find(filter)
+ db_filter = self._format_filter(q_filter)
+ with self.lock:
+ collection = self.db[table]
+ if not (fail_on_empty and fail_on_more):
+ return collection.find_one(db_filter)
+ rows = collection.find(db_filter)
if rows.count() == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
elif rows.count() > 1:
if fail_on_more:
- raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.CONFLICT,
+ )
return rows[0]
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
- def del_list(self, table, filter={}):
+ def del_list(self, table, q_filter=None):
+ """
+ Deletes all entries that match q_filter
+ :param table: collection or table
+ :param q_filter: Filter
+ :return: Dict with the number of entries deleted
+ """
try:
- collection = self.db[table]
- rows = collection.delete_many(self._format_filter(filter))
+ with self.lock:
+ collection = self.db[table]
+ rows = collection.delete_many(self._format_filter(q_filter))
return {"deleted": rows.deleted_count}
except DbException:
raise
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
- def del_one(self, table, filter={}, fail_on_empty=True):
+ def del_one(self, table, q_filter=None, fail_on_empty=True):
+ """
+ Deletes one entry that matches q_filter
+ :param table: collection or table
+ :param q_filter: Filter
+ :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
+ which case it raises a DbException
+ :return: Dict with the number of entries deleted
+ """
try:
- collection = self.db[table]
- rows = collection.delete_one(self._format_filter(filter))
+ with self.lock:
+ collection = self.db[table]
+ rows = collection.delete_one(self._format_filter(q_filter))
if rows.deleted_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"deleted": rows.deleted_count}
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
def create(self, table, indata):
+ """
+ Add a new entry at database
+ :param table: collection or table
+ :param indata: content to be added
+ :return: database id of the inserted element. Raises a DbException on error
+ """
try:
- collection = self.db[table]
- data = collection.insert_one(indata)
+ with self.lock:
+ collection = self.db[table]
+ data = collection.insert_one(indata)
return data.inserted_id
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
+
+ def create_list(self, table, indata_list):
+ """
+ Add several entries at once
+ :param table: collection or table
+ :param indata_list: content list to be added.
+ :return: the list of inserted '_id's. Exception on error
+ """
+ try:
+ for item in indata_list:
+ if item.get("_id") is None:
+ item["_id"] = str(uuid4())
+ with self.lock:
+ collection = self.db[table]
+ data = collection.insert_many(indata_list)
+ return data.inserted_ids
+ except Exception as e: # TODO refine
+ raise DbException(e)
- def set_one(self, table, filter, update_dict, fail_on_empty=True):
+ def set_one(
+ self,
+ table,
+ q_filter,
+ update_dict,
+ fail_on_empty=True,
+ unset=None,
+ pull=None,
+ push=None,
+ push_list=None,
+ pull_list=None,
+ upsert=False,
+ ):
+ """
+ Modifies an entry at database
+ :param table: collection or table
+ :param q_filter: Filter
+ :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
+ :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
+ it raises a DbException
+ :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+ ignored. If not exist, it is ignored
+ :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+ if exist in the array is removed. If not exist, it is ignored
+ :param pull_list: Same as pull but values are arrays where each item is removed from the array
+ :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
+ is appended to the end of the array
+ :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+ whole array
+ :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
+ By default this is false.
+ :return: Dict with the number of entries modified. None if no matching is found.
+ """
try:
- collection = self.db[table]
- rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+ db_oper = {}
+ if update_dict:
+ db_oper["$set"] = update_dict
+ if unset:
+ db_oper["$unset"] = unset
+ if pull or pull_list:
+ db_oper["$pull"] = pull or {}
+ if pull_list:
+ db_oper["$pull"].update(
+ {k: {"$in": v} for k, v in pull_list.items()}
+ )
+ if push or push_list:
+ db_oper["$push"] = push or {}
+ if push_list:
+ db_oper["$push"].update(
+ {k: {"$each": v} for k, v in push_list.items()}
+ )
+
+ with self.lock:
+ collection = self.db[table]
+ rows = collection.update_one(
+ self._format_filter(q_filter), db_oper, upsert=upsert
+ )
if rows.matched_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"modified": rows.modified_count}
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)
+
+ def set_list(
+ self,
+ table,
+ q_filter,
+ update_dict,
+ unset=None,
+ pull=None,
+ push=None,
+ push_list=None,
+ pull_list=None,
+ ):
+ """
+ Modifies al matching entries at database
+ :param table: collection or table
+ :param q_filter: Filter
+ :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
+ :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+ ignored. If not exist, it is ignored
+ :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+ if exist in the array is removed. If not exist, it is ignored
+ :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
+ single value is appended to the end of the array
+ :param pull_list: Same as pull but values are arrays where each item is removed from the array
+ :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+ whole array
+ :return: Dict with the number of entries modified
+ """
+ try:
+ db_oper = {}
+ if update_dict:
+ db_oper["$set"] = update_dict
+ if unset:
+ db_oper["$unset"] = unset
+ if pull or pull_list:
+ db_oper["$pull"] = pull or {}
+ if pull_list:
+ db_oper["$pull"].update(
+ {k: {"$in": v} for k, v in pull_list.items()}
+ )
+ if push or push_list:
+ db_oper["$push"] = push or {}
+ if push_list:
+ db_oper["$push"].update(
+ {k: {"$each": v} for k, v in push_list.items()}
+ )
+ with self.lock:
+ collection = self.db[table]
+ rows = collection.update_many(self._format_filter(q_filter), db_oper)
+ return {"modified": rows.modified_count}
+ except Exception as e: # TODO refine
+ raise DbException(e)
- def replace(self, table, id, indata, fail_on_empty=True):
+ def replace(self, table, _id, indata, fail_on_empty=True):
+ """
+ Replace the content of an entry
+ :param table: collection or table
+ :param _id: internal database id
+ :param indata: content to replace
+ :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
+ it raises a DbException
+ :return: Dict with the number of entries replaced
+ """
try:
- _filter = {"_id": id}
- collection = self.db[table]
- rows = collection.replace_one(_filter, indata)
+ db_filter = {"_id": _id}
+ with self.lock:
+ collection = self.db[table]
+ rows = collection.replace_one(db_filter, indata)
if rows.matched_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with _id='{}'".format(table[:-1], _id),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"replaced": rows.modified_count}
except Exception as e: # TODO refine
- raise DbException(str(e))
+ raise DbException(e)