X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fdbmongo.py;h=f64949d50a612884ef786d04dc72ceb0fa6bd8f8;hb=7da9795a4b73c72e81ac4880a9e9507e441aa90f;hp=16da6e7762bef2ff6d9c0ef763af19ee84ceccfe;hpb=f71fcff0e66c178f6edcaf7c6f35b15eafe9f3ba;p=osm%2Fcommon.git diff --git a/osm_common/dbmongo.py b/osm_common/dbmongo.py index 16da6e7..f64949d 100644 --- a/osm_common/dbmongo.py +++ b/osm_common/dbmongo.py @@ -16,13 +16,15 @@ # limitations under the License. -import logging -from pymongo import MongoClient, errors -from osm_common.dbbase import DbException, DbBase -from http import HTTPStatus -from time import time, sleep -from copy import deepcopy from base64 import b64decode +from copy import deepcopy +from http import HTTPStatus +import logging +from time import sleep, time +from uuid import uuid4 + +from osm_common.dbbase import DbBase, DbException +from pymongo import errors, MongoClient __author__ = "Alfonso Tierno " @@ -62,7 +64,7 @@ class DbMongo(DbBase): conn_initial_timout = 120 conn_timout = 10 - def __init__(self, logger_name='db', lock=False): + def __init__(self, logger_name="db", lock=False): super().__init__(logger_name, lock) self.client = None self.db = None @@ -78,7 +80,9 @@ class DbMongo(DbBase): self.secret_key = None if self.database_key: self.set_secret_key(self.database_key) - version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True) + version_data = self.get_one( + "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True + ) if version_data and version_data.get("serial"): self.set_secret_key(b64decode(version_data["serial"])) self.secret_obtained = True @@ -98,32 +102,50 @@ class DbMongo(DbBase): self.database_key = master_key self.set_secret_key(master_key) if config.get("uri"): - self.client = MongoClient(config["uri"]) - else: - self.client = MongoClient(config["host"], config["port"]) - # TODO add as parameters also username=config.get("user"), password=config.get("password")) + self.client = MongoClient( + config["uri"], replicaSet=config.get("replicaset", None) + ) # when all modules are ready self.db = self.client[config["name"]] if "loglevel" in config: - self.logger.setLevel(getattr(logging, config['loglevel'])) + self.logger.setLevel(getattr(logging, config["loglevel"])) # get data to try a connection now = time() while True: try: - version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True) + version_data = self.get_one( + "admin", + {"_id": "version"}, + fail_on_empty=False, + fail_on_more=True, + ) # check database status is ok - if version_data and version_data.get("status") != 'ENABLED': - raise DbException("Wrong database status '{}'".format(version_data.get("status")), - http_code=HTTPStatus.INTERNAL_SERVER_ERROR) + if version_data and version_data.get("status") != "ENABLED": + raise DbException( + "Wrong database status '{}'".format( + version_data.get("status") + ), + http_code=HTTPStatus.INTERNAL_SERVER_ERROR, + ) # check version - db_version = None if not version_data else version_data.get("version") + db_version = ( + None if not version_data else version_data.get("version") + ) if target_version and target_version != db_version: - raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version)) + raise DbException( + "Invalid database version {}. Expected {}".format( + db_version, target_version + ) + ) # get serial if version_data and version_data.get("serial"): self.secret_obtained = True self.set_secret_key(b64decode(version_data["serial"])) - self.logger.info("Connected to database {} version {}".format(config["name"], db_version)) + self.logger.info( + "Connected to database {} version {}".format( + config["name"], db_version + ) + ) return except errors.ConnectionFailure as e: if time() - now >= self.conn_initial_timout: @@ -172,9 +194,18 @@ class DbMongo(DbBase): return db_filter for query_k, query_v in q_filter.items(): dot_index = query_k.rfind(".") - if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", - "ncont", "neq"): - operator = "$" + query_k[dot_index + 1:] + if dot_index > 1 and query_k[dot_index + 1 :] in ( + "eq", + "ne", + "gt", + "gte", + "lt", + "lte", + "cont", + "ncont", + "neq", + ): + operator = "$" + query_k[dot_index + 1 :] if operator == "$neq": operator = "$ne" k = query_k[:dot_index] @@ -215,8 +246,10 @@ class DbMongo(DbBase): return db_filter except Exception as e: - raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), - http_code=HTTPStatus.BAD_REQUEST) + raise DbException( + "Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), + http_code=HTTPStatus.BAD_REQUEST, + ) def get_list(self, table, q_filter=None): """ @@ -278,13 +311,21 @@ class DbMongo(DbBase): rows = collection.find(db_filter) if rows.count() == 0: if fail_on_empty: - raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter), - HTTPStatus.NOT_FOUND) + raise DbException( + "Not found any {} with filter='{}'".format( + table[:-1], q_filter + ), + HTTPStatus.NOT_FOUND, + ) return None elif rows.count() > 1: if fail_on_more: - raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter), - HTTPStatus.CONFLICT) + raise DbException( + "Found more than one {} with filter='{}'".format( + table[:-1], q_filter + ), + HTTPStatus.CONFLICT, + ) return rows[0] except Exception as e: # TODO refine raise DbException(e) @@ -321,8 +362,12 @@ class DbMongo(DbBase): rows = collection.delete_one(self._format_filter(q_filter)) if rows.deleted_count == 0: if fail_on_empty: - raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter), - HTTPStatus.NOT_FOUND) + raise DbException( + "Not found any {} with filter='{}'".format( + table[:-1], q_filter + ), + HTTPStatus.NOT_FOUND, + ) return None return {"deleted": rows.deleted_count} except Exception as e: # TODO refine @@ -343,20 +388,55 @@ class DbMongo(DbBase): except Exception as e: # TODO refine raise DbException(e) - def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None): + def create_list(self, table, indata_list): + """ + Add several entries at once + :param table: collection or table + :param indata_list: content list to be added. + :return: the list of inserted '_id's. Exception on error + """ + try: + for item in indata_list: + if item.get("_id") is None: + item["_id"] = str(uuid4()) + with self.lock: + collection = self.db[table] + data = collection.insert_many(indata_list) + return data.inserted_ids + except Exception as e: # TODO refine + raise DbException(e) + + def set_one( + self, + table, + q_filter, + update_dict, + fail_on_empty=True, + unset=None, + pull=None, + push=None, + push_list=None, + pull_list=None, + upsert=False, + ): """ Modifies an entry at database :param table: collection or table :param q_filter: Filter :param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value - :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case + :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case it raises a DbException :param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is ignored. If not exist, it is ignored :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value if exist in the array is removed. If not exist, it is ignored + :param pull_list: Same as pull but values are arrays where each item is removed from the array :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value is appended to the end of the array + :param push_list: Same as push but values are arrays where each item is and appended instead of appending the + whole array + :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created. + By default this is false. :return: Dict with the number of entries modified. None if no matching is found. """ try: @@ -365,24 +445,48 @@ class DbMongo(DbBase): db_oper["$set"] = update_dict if unset: db_oper["$unset"] = unset - if pull: - db_oper["$pull"] = pull - if push: - db_oper["$push"] = push + if pull or pull_list: + db_oper["$pull"] = pull or {} + if pull_list: + db_oper["$pull"].update( + {k: {"$in": v} for k, v in pull_list.items()} + ) + if push or push_list: + db_oper["$push"] = push or {} + if push_list: + db_oper["$push"].update( + {k: {"$each": v} for k, v in push_list.items()} + ) with self.lock: collection = self.db[table] - rows = collection.update_one(self._format_filter(q_filter), db_oper) + rows = collection.update_one( + self._format_filter(q_filter), db_oper, upsert=upsert + ) if rows.matched_count == 0: if fail_on_empty: - raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter), - HTTPStatus.NOT_FOUND) + raise DbException( + "Not found any {} with filter='{}'".format( + table[:-1], q_filter + ), + HTTPStatus.NOT_FOUND, + ) return None return {"modified": rows.modified_count} except Exception as e: # TODO refine raise DbException(e) - def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None): + def set_list( + self, + table, + q_filter, + update_dict, + unset=None, + pull=None, + push=None, + push_list=None, + pull_list=None, + ): """ Modifies al matching entries at database :param table: collection or table @@ -392,8 +496,11 @@ class DbMongo(DbBase): ignored. If not exist, it is ignored :param pull: Plain dictionary with the content to be removed from an array. It is a dot separated keys and value if exist in the array is removed. If not exist, it is ignored - :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys and value - is appended to the end of the array + :param push: Plain dictionary with the content to be appended to an array. It is a dot separated keys, the + single value is appended to the end of the array + :param pull_list: Same as pull but values are arrays where each item is removed from the array + :param push_list: Same as push but values are arrays where each item is and appended instead of appending the + whole array :return: Dict with the number of entries modified """ try: @@ -402,10 +509,18 @@ class DbMongo(DbBase): db_oper["$set"] = update_dict if unset: db_oper["$unset"] = unset - if pull: - db_oper["$pull"] = pull - if push: - db_oper["$push"] = push + if pull or pull_list: + db_oper["$pull"] = pull or {} + if pull_list: + db_oper["$pull"].update( + {k: {"$in": v} for k, v in pull_list.items()} + ) + if push or push_list: + db_oper["$push"] = push or {} + if push_list: + db_oper["$push"].update( + {k: {"$each": v} for k, v in push_list.items()} + ) with self.lock: collection = self.db[table] rows = collection.update_many(self._format_filter(q_filter), db_oper) @@ -430,7 +545,10 @@ class DbMongo(DbBase): rows = collection.replace_one(db_filter, indata) if rows.matched_count == 0: if fail_on_empty: - raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND) + raise DbException( + "Not found any {} with _id='{}'".format(table[:-1], _id), + HTTPStatus.NOT_FOUND, + ) return None return {"replaced": rows.modified_count} except Exception as e: # TODO refine