Support for mongodb replicaset connection (HA).
[osm/common.git] / osm_common / dbmongo.py
index 2e94a5a..56f387b 100644 (file)
@@ -23,6 +23,7 @@ from http import HTTPStatus
 from time import time, sleep
 from copy import deepcopy
 from base64 import b64decode
+from uuid import uuid4
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -62,10 +63,26 @@ class DbMongo(DbBase):
     conn_initial_timout = 120
     conn_timout = 10
 
-    def __init__(self, logger_name='db'):
-        super().__init__(logger_name)
+    def __init__(self, logger_name='db', lock=False):
+        super().__init__(logger_name, lock)
         self.client = None
         self.db = None
+        self.database_key = None
+        self.secret_obtained = False
+        # ^ This is used to know if database serial has been got. Database is inited by NBI, who generates the serial
+        # In case it is not ready when connected, it should be got later on before any decrypt operation
+
+    def get_secret_key(self):
+        if self.secret_obtained:
+            return
+
+        self.secret_key = None
+        if self.database_key:
+            self.set_secret_key(self.database_key)
+        version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
+        if version_data and version_data.get("serial"):
+            self.set_secret_key(b64decode(version_data["serial"]))
+        self.secret_obtained = True
 
     def db_connect(self, config, target_version=None):
         """
@@ -77,8 +94,14 @@ class DbMongo(DbBase):
         try:
             if "logger_name" in config:
                 self.logger = logging.getLogger(config["logger_name"])
-            self.master_password = config.get("masterpassword")
-            self.client = MongoClient(config["host"], config["port"])
+            master_key = config.get("commonkey") or config.get("masterpassword")
+            if master_key:
+                self.database_key = master_key
+                self.set_secret_key(master_key)
+            if config.get("uri"):
+                self.client = MongoClient(config["uri"], replicaSet=config.get("replicaset", None))
+            else:
+                self.client = MongoClient(config["host"], config["port"], replicaSet=config.get("replicaset", None))
             # TODO add as parameters also username=config.get("user"), password=config.get("password"))
             # when all modules are ready
             self.db = self.client[config["name"]]
@@ -99,6 +122,7 @@ class DbMongo(DbBase):
                         raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
                     # get serial
                     if version_data and version_data.get("serial"):
+                        self.secret_obtained = True
                         self.set_secret_key(b64decode(version_data["serial"]))
                     self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
                     return
@@ -204,9 +228,10 @@ class DbMongo(DbBase):
         """
         try:
             result = []
-            collection = self.db[table]
-            db_filter = self._format_filter(q_filter)
-            rows = collection.find(db_filter)
+            with self.lock:
+                collection = self.db[table]
+                db_filter = self._format_filter(q_filter)
+                rows = collection.find(db_filter)
             for row in rows:
                 result.append(row)
             return result
@@ -215,6 +240,25 @@ class DbMongo(DbBase):
         except Exception as e:  # TODO refine
             raise DbException(e)
 
+    def count(self, table, q_filter=None):
+        """
+        Count the number of entries matching q_filter
+        :param table: collection or table
+        :param q_filter: Filter
+        :return: number of entries found (can be zero)
+        :raise: DbException on error
+        """
+        try:
+            with self.lock:
+                collection = self.db[table]
+                db_filter = self._format_filter(q_filter)
+                count = collection.count(db_filter)
+            return count
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(e)
+
     def get_one(self, table, q_filter=None, fail_on_empty=True, fail_on_more=True):
         """
         Obtain one entry matching q_filter
@@ -228,10 +272,11 @@ class DbMongo(DbBase):
         """
         try:
             db_filter = self._format_filter(q_filter)
-            collection = self.db[table]
-            if not (fail_on_empty and fail_on_more):
-                return collection.find_one(db_filter)
-            rows = collection.find(db_filter)
+            with self.lock:
+                collection = self.db[table]
+                if not (fail_on_empty and fail_on_more):
+                    return collection.find_one(db_filter)
+                rows = collection.find(db_filter)
             if rows.count() == 0:
                 if fail_on_empty:
                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
@@ -253,8 +298,9 @@ class DbMongo(DbBase):
         :return: Dict with the number of entries deleted
         """
         try:
-            collection = self.db[table]
-            rows = collection.delete_many(self._format_filter(q_filter))
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.delete_many(self._format_filter(q_filter))
             return {"deleted": rows.deleted_count}
         except DbException:
             raise
@@ -271,8 +317,9 @@ class DbMongo(DbBase):
         :return: Dict with the number of entries deleted
         """
         try:
-            collection = self.db[table]
-            rows = collection.delete_one(self._format_filter(q_filter))
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.delete_one(self._format_filter(q_filter))
             if rows.deleted_count == 0:
                 if fail_on_empty:
                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
@@ -290,13 +337,33 @@ class DbMongo(DbBase):
         :return: database id of the inserted element. Raises a DbException on error
         """
         try:
-            collection = self.db[table]
-            data = collection.insert_one(indata)
+            with self.lock:
+                collection = self.db[table]
+                data = collection.insert_one(indata)
             return data.inserted_id
         except Exception as e:  # TODO refine
             raise DbException(e)
 
-    def set_one(self, table, q_filter, update_dict, fail_on_empty=True):
+    def create_list(self, table, indata_list):
+        """
+        Add several entries at once
+        :param table: collection or table
+        :param indata_list: content list to be added.
+        :return: the list of inserted '_id's. Exception on error
+        """
+        try:
+            for item in indata_list:
+                if item.get("_id") is None:
+                    item["_id"] = str(uuid4())
+            with self.lock:
+                collection = self.db[table]
+                data = collection.insert_many(indata_list)
+            return data.inserted_ids
+        except Exception as e:  # TODO refine
+            raise DbException(e)
+
+    def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None,
+                push_list=None, pull_list=None):
         """
         Modifies an entry at database
         :param table: collection or table
@@ -304,11 +371,35 @@ class DbMongo(DbBase):
         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
         :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
         it raises a DbException
+        :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+                      ignored. If not exist, it is ignored
+        :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+                     if exist in the array is removed. If not exist, it is ignored
+        :param pull_list: Same as pull but values are arrays where each item is removed from the array
+        :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value
+                     is appended to the end of the array
+        :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+                          whole array
         :return: Dict with the number of entries modified. None if no matching is found.
         """
         try:
-            collection = self.db[table]
-            rows = collection.update_one(self._format_filter(q_filter), {"$set": update_dict})
+            db_oper = {}
+            if update_dict:
+                db_oper["$set"] = update_dict
+            if unset:
+                db_oper["$unset"] = unset
+            if pull or pull_list:
+                db_oper["$pull"] = pull or {}
+                if pull_list:
+                    db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
+            if push or push_list:
+                db_oper["$push"] = push or {}
+                if push_list:
+                    db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
+
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.update_one(self._format_filter(q_filter), db_oper)
             if rows.matched_count == 0:
                 if fail_on_empty:
                     raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
@@ -318,17 +409,40 @@ class DbMongo(DbBase):
         except Exception as e:  # TODO refine
             raise DbException(e)
 
-    def set_list(self, table, q_filter, update_dict):
+    def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
         """
         Modifies al matching entries at database
         :param table: collection or table
         :param q_filter: Filter
         :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
+        :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
+                      ignored. If not exist, it is ignored
+        :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value
+                     if exist in the array is removed. If not exist, it is ignored
+        :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the
+                     single value is appended to the end of the array
+        :param pull_list: Same as pull but values are arrays where each item is removed from the array
+        :param push_list: Same as push but values are arrays where each item is and appended instead of appending the
+                          whole array
         :return: Dict with the number of entries modified
         """
         try:
-            collection = self.db[table]
-            rows = collection.update_many(self._format_filter(q_filter), {"$set": update_dict})
+            db_oper = {}
+            if update_dict:
+                db_oper["$set"] = update_dict
+            if unset:
+                db_oper["$unset"] = unset
+            if pull or pull_list:
+                db_oper["$pull"] = pull or {}
+                if pull_list:
+                    db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
+            if push or push_list:
+                db_oper["$push"] = push or {}
+                if push_list:
+                    db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.update_many(self._format_filter(q_filter), db_oper)
             return {"modified": rows.modified_count}
         except Exception as e:  # TODO refine
             raise DbException(e)
@@ -345,8 +459,9 @@ class DbMongo(DbBase):
         """
         try:
             db_filter = {"_id": _id}
-            collection = self.db[table]
-            rows = collection.replace_one(db_filter, indata)
+            with self.lock:
+                collection = self.db[table]
+                rows = collection.replace_one(db_filter, indata)
             if rows.matched_count == 0:
                 if fail_on_empty:
                     raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)