Enable parallel execution and output of tox env
[osm/common.git] / osm_common / dbmongo.py
index 1d56e58..e5e12c6 100644 (file)
@@ -1,10 +1,30 @@
+# -*- coding: utf-8 -*-
 
-import logging
-from pymongo import MongoClient, errors
-from osm_common.dbbase import DbException, DbBase
-from http import HTTPStatus
-from time import time, sleep
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from base64 import b64decode
 from copy import deepcopy
+from http import HTTPStatus
+import logging
+from time import sleep, time
+from uuid import uuid4
+
+from osm_common.dbbase import DbBase, DbException
+from pymongo import errors, MongoClient
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -18,14 +38,15 @@ __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 #                 return call(*args, **kwargs)
 #             except pymongo.AutoReconnect as e:
 #                 if retry == 4:
-#                     raise DbException(str(e))
+#                     raise DbException(e)
 #                 sleep(retry)
 #     return _retry_mongocall
 
 
 def deep_update(to_update, update_with):
     """
-    Update 'to_update' dict with the content 'update_with' dict recursively
+    Similar to deepcopy but recursively with nested dictionaries. 'to_update' dict is updated with a content copy of
+    'update_with' dict recursively
     :param to_update: must be a dictionary to be modified
     :param update_with: must be a dictionary. It is not changed
     :return: to_update
@@ -43,22 +64,88 @@ class DbMongo(DbBase):
     conn_initial_timout = 120
     conn_timout = 10
 
-    def __init__(self, logger_name='db'):
-        self.logger = logging.getLogger(logger_name)
+    def __init__(self, logger_name="db", lock=False):
+        super().__init__(logger_name=logger_name, lock=lock)
+        self.client = None
+        self.db = None
+        self.database_key = None
+        self.secret_obtained = False
+        # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
+        # In case it is not ready when connected, it should be got later on before any decrypt operation
+
+    def get_secret_key(self):
+        if self.secret_obtained:
+            return
+
+        self.secret_key = None
+        if self.database_key:
+            self.set_secret_key(self.database_key)
+        version_data = self.get_one(
+            "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True
+        )
+        if version_data and version_data.get("serial"):
+            self.set_secret_key(b64decode(version_data["serial"]))
+        self.secret_obtained = True
 
-    def db_connect(self, config):
+    def db_connect(self, config, target_version=None):
+        """
+        Connect to database
+        :param config: Configuration of database
+        :param target_version: if provided it checks if database contains required version, raising exception otherwise.
+        :return: None or raises DbException on error
+        """
         try:
             if "logger_name" in config:
                 self.logger = logging.getLogger(config["logger_name"])
-            self.client = MongoClient(config["host"], config["port"])
+            master_key = config.get("commonkey") or config.get("masterpassword")
+            if master_key:
+                self.database_key = master_key
+                self.set_secret_key(master_key)
+            if config.get("uri"):
+                self.client = MongoClient(
+                    config["uri"], replicaSet=config.get("replicaset", None)
+                )
+            # when all modules are ready
             self.db = self.client[config["name"]]
             if "loglevel" in config:
-                self.logger.setLevel(getattr(logging, config['loglevel']))
+                self.logger.setLevel(getattr(logging, config["loglevel"]))
             # get data to try a connection
             now = time()
             while True:
                 try:
-                    self.db.users.find_one({"username": "admin"})
+                    version_data = self.get_one(
+                        "admin",
+                        {"_id": "version"},
+                        fail_on_empty=False,
+                        fail_on_more=True,
+                    )
+                    # check database status is ok
+                    if version_data and version_data.get("status") != "ENABLED":
+                        raise DbException(
+                            "Wrong database status '{}'".format(
+                                version_data.get("status")
+                            ),
+                            http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+                        )
+                    # check version
+                    db_version = (
+                        None if not version_data else version_data.get("version")
+                    )
+                    if target_version and target_version != db_version:
+                        raise DbException(
+                            "Invalid database version {}. Expected {}".format(
+                                db_version, target_version
+                            )
+                        )
+                    # get serial
+                    if version_data and version_data.get("serial"):
+                        self.secret_obtained = True
+                        self.set_secret_key(b64decode(version_data["serial"]))
+                    self.logger.info(
+                        "Connected to database {} version {}".format(
+                            config["name"], db_version
+                        )
+                    )
                     return
                 except errors.ConnectionFailure as e:
                     if time() - now >= self.conn_initial_timout:
@@ -66,15 +153,12 @@ class DbMongo(DbBase):
                     self.logger.info("Waiting to database up {}".format(e))
                     sleep(2)
         except errors.PyMongoError as e:
-            raise DbException(str(e))
-
-    def db_disconnect(self):
-        pass  # TODO
+            raise DbException(e)
 
     @staticmethod
     def _format_filter(q_filter):
         """
-        Translate query string filter into mongo database filter
+        Translate query string q_filter into mongo database filter
         :param q_filter: Query string content. Follows SOL005 section 4.3.2 guidelines, with the follow extensions and
         differences:
             It accept ".nq" (not equal) in addition to ".neq".
@@ -106,11 +190,22 @@ class DbMongo(DbBase):
         """
         try:
             db_filter = {}
+            if not q_filter:
+                return db_filter
             for query_k, query_v in q_filter.items():
                 dot_index = query_k.rfind(".")
-                if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
-                                                               "ncont", "neq"):
-                    operator = "$" + query_k[dot_index + 1:]
+                if dot_index > 1 and query_k[dot_index + 1 :] in (
+                    "eq",
+                    "ne",
+                    "gt",
+                    "gte",
+                    "lt",
+                    "lte",
+                    "cont",
+                    "ncont",
+                    "neq",
+                ):
+                    operator = "$" + query_k[dot_index + 1 :]
                     if operator == "$neq":
                         operator = "$ne"
                     k = query_k[:dot_index]
@@ -151,98 +246,311 @@ class DbMongo(DbBase):
 
             return db_filter
         except Exception as e:
-            raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
-                              http_code=HTTPStatus.BAD_REQUEST)
+            raise DbException(
+                "Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+                http_code=HTTPStatus.BAD_REQUEST,
+            )
 
-    def get_list(self, table, filter={}):
+    def get_list(self, table, q_filter=None):
+        """
+        Obtain a list of entries matching q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :return: a list (can be empty) with the found entries. Raises DbException on error
+        """
         try:
             result = []
-            collection = self.db[table]
-            db_filter = self._format_filter(filter)
-            rows = collection.find(db_filter)
+            with self.lock:
+                collection = self.db[table]
+                db_filter = self._format_filter(q_filter)
+                rows = collection.find(db_filter)
             for row in rows:
                 result.append(row)
             return result
         except DbException:
             raise
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
-    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+    def count(self, table, q_filter=None):
+        """
+        Count the number of entries matching q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :return: number of entries found (can be zero)
+        :raise: DbException on error
+        """
+        try:
+            with self.lock:
+                collection = self.db[table]
+                db_filter = self._format_filter(q_filter)
+                count = collection.count_documents(db_filter)
+            return count
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(e)
+
+    def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
+        """
+        Obtain one entry matching q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
+        it raises a DbException
+        :param fail_on_more: If more than one matches filter it returns one of then unless this flag is set tu True, so
+        that it raises a DbException
+        :return: The requested element, or None
+        """
         try:
-            if filter:
-                filter = self._format_filter(filter)
-            collection = self.db[table]
-            if not (fail_on_empty and fail_on_more):
-                return collection.find_one(filter)
-            rows = collection.find(filter)
-            if rows.count() == 0:
+            db_filter = self._format_filter(q_filter)
+            with self.lock:
+                collection = self.db[table]
+                if not (fail_on_empty and fail_on_more):
+                    return collection.find_one(db_filter)
+                rows = list(collection.find(db_filter))
+            if len(rows) == 0:
                 if fail_on_empty:
-                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
-                                      HTTPStatus.NOT_FOUND)
+                    raise DbException(
+                        "Not found any {} with filter='{}'".format(
+                            table[:-1], q_filter
+                        ),
+                        HTTPStatus.NOT_FOUND,
+                    )
+
                 return None
-            elif rows.count() > 1:
+            elif len(rows) > 1:
                 if fail_on_more:
-                    raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
-                                      HTTPStatus.CONFLICT)
+                    raise DbException(
+                        "Found more than one {} with filter='{}'".format(
+                            table[:-1], q_filter
+                        ),
+                        HTTPStatus.CONFLICT,
+                    )
             return rows[0]
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
-    def del_list(self, table, filter={}):
+    def del_list(self, table, q_filter=None):
+        """
+        Deletes all entries that match q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :return: Dict with the number of entries deleted
+        """
         try:
-            collection = self.db[table]
-            rows = collection.delete_many(self._format_filter(filter))
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.delete_many(self._format_filter(q_filter))
             return {"deleted": rows.deleted_count}
         except DbException:
             raise
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
-    def del_one(self, table, filter={}, fail_on_empty=True):
+    def del_one(self, table, q_filter=None, fail_on_empty=True):
+        """
+        Deletes one entry that matches q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :param fail_on_empty: If nothing matches filter it returns '0' deleted unless this flag is set tu True, in
+        which case it raises a DbException
+        :return: Dict with the number of entries deleted
+        """
         try:
-            collection = self.db[table]
-            rows = collection.delete_one(self._format_filter(filter))
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.delete_one(self._format_filter(q_filter))
             if rows.deleted_count == 0:
                 if fail_on_empty:
-                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
-                                      HTTPStatus.NOT_FOUND)
+                    raise DbException(
+                        "Not found any {} with filter='{}'".format(
+                            table[:-1], q_filter
+                        ),
+                        HTTPStatus.NOT_FOUND,
+                    )
                 return None
             return {"deleted": rows.deleted_count}
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
     def create(self, table, indata):
+        """
+        Add a new entry at database
+        :param table: collection or table
+        :param indata: content to be added
+        :return: database id of the inserted element. Raises a DbException on error
+        """
         try:
-            collection = self.db[table]
-            data = collection.insert_one(indata)
+            with self.lock:
+                collection = self.db[table]
+                data = collection.insert_one(indata)
             return data.inserted_id
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
-    def set_one(self, table, filter, update_dict, fail_on_empty=True):
+    def create_list(self, table, indata_list):
+        """
+        Add several entries at once
+        :param table: collection or table
+        :param indata_list: content list to be added.
+        :return: the list of inserted '_id's. Exception on error
+        """
         try:
-            collection = self.db[table]
-            rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+            for item in indata_list:
+                if item.get("_id") is None:
+                    item["_id"] = str(uuid4())
+            with self.lock:
+                collection = self.db[table]
+                data = collection.insert_many(indata_list)
+            return data.inserted_ids
+        except Exception as e:  # TODO refine
+            raise DbException(e)
+
+    def set_one(
+        self,
+        table,
+        q_filter,
+        update_dict,
+        fail_on_empty=True,
+        unset=None,
+        pull=None,
+        push=None,
+        push_list=None,
+        pull_list=None,
+        upsert=False,
+    ):
+        """
+        Modifies an entry at database
+        :param table: collection or table
+        :param q_filter: Filter
+        :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
+        :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
+        it raises a DbException
+        :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+                      ignored. If not exist, it is ignored
+        :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+                     if exist in the array is removed. If not exist, it is ignored
+        :param pull_list: Same as pull but values are arrays where each item is removed from the array
+        :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
+                     is appended to the end of the array
+        :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+                          whole array
+        :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
+                     By default this is false.
+        :return: Dict with the number of entries modified. None if no matching is found.
+        """
+        try:
+            db_oper = {}
+            if update_dict:
+                db_oper["$set"] = update_dict
+            if unset:
+                db_oper["$unset"] = unset
+            if pull or pull_list:
+                db_oper["$pull"] = pull or {}
+                if pull_list:
+                    db_oper["$pull"].update(
+                        {k: {"$in": v} for k, v in pull_list.items()}
+                    )
+            if push or push_list:
+                db_oper["$push"] = push or {}
+                if push_list:
+                    db_oper["$push"].update(
+                        {k: {"$each": v} for k, v in push_list.items()}
+                    )
+
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.update_one(
+                    self._format_filter(q_filter), db_oper, upsert=upsert
+                )
             if rows.matched_count == 0:
                 if fail_on_empty:
-                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
-                                      HTTPStatus.NOT_FOUND)
+                    raise DbException(
+                        "Not found any {} with filter='{}'".format(
+                            table[:-1], q_filter
+                        ),
+                        HTTPStatus.NOT_FOUND,
+                    )
                 return None
             return {"modified": rows.modified_count}
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)
 
-    def replace(self, table, id, indata, fail_on_empty=True):
+    def set_list(
+        self,
+        table,
+        q_filter,
+        update_dict,
+        unset=None,
+        pull=None,
+        push=None,
+        push_list=None,
+        pull_list=None,
+    ):
+        """
+        Modifies al matching entries at database
+        :param table: collection or table
+        :param q_filter: Filter
+        :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
+        :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+                      ignored. If not exist, it is ignored
+        :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+                     if exist in the array is removed. If not exist, it is ignored
+        :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
+                     single value is appended to the end of the array
+        :param pull_list: Same as pull but values are arrays where each item is removed from the array
+        :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+                          whole array
+        :return: Dict with the number of entries modified
+        """
+        try:
+            db_oper = {}
+            if update_dict:
+                db_oper["$set"] = update_dict
+            if unset:
+                db_oper["$unset"] = unset
+            if pull or pull_list:
+                db_oper["$pull"] = pull or {}
+                if pull_list:
+                    db_oper["$pull"].update(
+                        {k: {"$in": v} for k, v in pull_list.items()}
+                    )
+            if push or push_list:
+                db_oper["$push"] = push or {}
+                if push_list:
+                    db_oper["$push"].update(
+                        {k: {"$each": v} for k, v in push_list.items()}
+                    )
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.update_many(self._format_filter(q_filter), db_oper)
+            return {"modified": rows.modified_count}
+        except Exception as e:  # TODO refine
+            raise DbException(e)
+
+    def replace(self, table, _id, indata, fail_on_empty=True):
+        """
+        Replace the content of an entry
+        :param table: collection or table
+        :param _id: internal database id
+        :param indata: content to replace
+        :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
+        it raises a DbException
+        :return: Dict with the number of entries replaced
+        """
         try:
-            _filter = {"_id": id}
-            collection = self.db[table]
-            rows = collection.replace_one(_filter, indata)
+            db_filter = {"_id": _id}
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.replace_one(db_filter, indata)
             if rows.matched_count == 0:
                 if fail_on_empty:
-                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
-                                      HTTPStatus.NOT_FOUND)
+                    raise DbException(
+                        "Not found any {} with _id='{}'".format(table[:-1], _id),
+                        HTTPStatus.NOT_FOUND,
+                    )
                 return None
             return {"replaced": rows.modified_count}
         except Exception as e:  # TODO refine
-            raise DbException(str(e))
+            raise DbException(e)