conn_initial_timout = 120
conn_timout = 10
- def __init__(self, logger_name='db', lock=False):
+ def __init__(self, logger_name="db", lock=False):
super().__init__(logger_name, lock)
self.client = None
self.db = None
self.secret_key = None
if self.database_key:
self.set_secret_key(self.database_key)
- version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
+ version_data = self.get_one(
+ "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True
+ )
if version_data and version_data.get("serial"):
self.set_secret_key(b64decode(version_data["serial"]))
self.secret_obtained = True
self.database_key = master_key
self.set_secret_key(master_key)
if config.get("uri"):
- self.client = MongoClient(config["uri"])
+ self.client = MongoClient(
+ config["uri"], replicaSet=config.get("replicaset", None)
+ )
else:
- self.client = MongoClient(config["host"], config["port"])
+ self.client = MongoClient(
+ config["host"],
+ config["port"],
+ replicaSet=config.get("replicaset", None),
+ )
# TODO add as parameters also username=config.get("user"), password=config.get("password"))
# when all modules are ready
self.db = self.client[config["name"]]
if "loglevel" in config:
- self.logger.setLevel(getattr(logging, config['loglevel']))
+ self.logger.setLevel(getattr(logging, config["loglevel"]))
# get data to try a connection
now = time()
while True:
try:
- version_data = self.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
+ version_data = self.get_one(
+ "admin",
+ {"_id": "version"},
+ fail_on_empty=False,
+ fail_on_more=True,
+ )
# check database status is ok
- if version_data and version_data.get("status") != 'ENABLED':
- raise DbException("Wrong database status '{}'".format(version_data.get("status")),
- http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ if version_data and version_data.get("status") != "ENABLED":
+ raise DbException(
+ "Wrong database status '{}'".format(
+ version_data.get("status")
+ ),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
# check version
- db_version = None if not version_data else version_data.get("version")
+ db_version = (
+ None if not version_data else version_data.get("version")
+ )
if target_version and target_version != db_version:
- raise DbException("Invalid database version {}. Expected {}".format(db_version, target_version))
+ raise DbException(
+ "Invalid database version {}. Expected {}".format(
+ db_version, target_version
+ )
+ )
# get serial
if version_data and version_data.get("serial"):
self.secret_obtained = True
self.set_secret_key(b64decode(version_data["serial"]))
- self.logger.info("Connected to database {} version {}".format(config["name"], db_version))
+ self.logger.info(
+ "Connected to database {} version {}".format(
+ config["name"], db_version
+ )
+ )
return
except errors.ConnectionFailure as e:
if time() - now >= self.conn_initial_timout:
return db_filter
for query_k, query_v in q_filter.items():
dot_index = query_k.rfind(".")
- if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
- "ncont", "neq"):
- operator = "$" + query_k[dot_index + 1:]
+ if dot_index > 1 and query_k[dot_index + 1 :] in (
+ "eq",
+ "ne",
+ "gt",
+ "gte",
+ "lt",
+ "lte",
+ "cont",
+ "ncont",
+ "neq",
+ ):
+ operator = "$" + query_k[dot_index + 1 :]
if operator == "$neq":
operator = "$ne"
k = query_k[:dot_index]
return db_filter
except Exception as e:
- raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
- http_code=HTTPStatus.BAD_REQUEST)
+ raise DbException(
+ "Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
def get_list(self, table, q_filter=None):
"""
rows = collection.find(db_filter)
if rows.count() == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
elif rows.count() > 1:
if fail_on_more:
- raise DbException("Found more than one {} with filter='{}'".format(table[:-1], q_filter),
- HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.CONFLICT,
+ )
return rows[0]
except Exception as e: # TODO refine
raise DbException(e)
rows = collection.delete_one(self._format_filter(q_filter))
if rows.deleted_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"deleted": rows.deleted_count}
except Exception as e: # TODO refine
except Exception as e: # TODO refine
raise DbException(e)
- def set_one(self, table, q_filter, update_dict, fail_on_empty=True, unset=None, pull=None, push=None,
- push_list=None, pull_list=None):
+ def set_one(
+ self,
+ table,
+ q_filter,
+ update_dict,
+ fail_on_empty=True,
+ unset=None,
+ pull=None,
+ push=None,
+ push_list=None,
+ pull_list=None,
+ upsert=False,
+ ):
"""
Modifies an entry at database
:param table: collection or table
:param q_filter: Filter
:param update_dict: Plain dictionary with the content to be updated. It is a dot separated keys and a value
- :param fail_on_empty: If nothing matches filter it returns None unless this flag is set tu True, in which case
+ :param fail_on_empty: If nothing matches filter it returns None unless this flag is set to True, in which case
it raises a DbException
:param unset: Plain dictionary with the content to be removed if exist. It is a dot separated keys, value is
ignored. If not exist, it is ignored
is appended to the end of the array
:param push_list: Same as push but values are arrays where each item is and appended instead of appending the
whole array
+ :param upsert: If this parameter is set to True and no document is found using 'q_filter' it will be created.
+ By default this is false.
:return: Dict with the number of entries modified. None if no matching is found.
"""
try:
if pull or pull_list:
db_oper["$pull"] = pull or {}
if pull_list:
- db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
+ db_oper["$pull"].update(
+ {k: {"$in": v} for k, v in pull_list.items()}
+ )
if push or push_list:
db_oper["$push"] = push or {}
if push_list:
- db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
+ db_oper["$push"].update(
+ {k: {"$each": v} for k, v in push_list.items()}
+ )
with self.lock:
collection = self.db[table]
- rows = collection.update_one(self._format_filter(q_filter), db_oper)
+ rows = collection.update_one(
+ self._format_filter(q_filter), db_oper, upsert=upsert
+ )
if rows.matched_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], q_filter),
- HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with filter='{}'".format(
+ table[:-1], q_filter
+ ),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"modified": rows.modified_count}
except Exception as e: # TODO refine
raise DbException(e)
- def set_list(self, table, q_filter, update_dict, unset=None, pull=None, push=None, push_list=None, pull_list=None):
+ def set_list(
+ self,
+ table,
+ q_filter,
+ update_dict,
+ unset=None,
+ pull=None,
+ push=None,
+ push_list=None,
+ pull_list=None,
+ ):
"""
Modifies al matching entries at database
:param table: collection or table
if pull or pull_list:
db_oper["$pull"] = pull or {}
if pull_list:
- db_oper["$pull"].update({k: {"$in": v} for k, v in pull_list.items()})
+ db_oper["$pull"].update(
+ {k: {"$in": v} for k, v in pull_list.items()}
+ )
if push or push_list:
db_oper["$push"] = push or {}
if push_list:
- db_oper["$push"].update({k: {"$each": v} for k, v in push_list.items()})
+ db_oper["$push"].update(
+ {k: {"$each": v} for k, v in push_list.items()}
+ )
with self.lock:
collection = self.db[table]
rows = collection.update_many(self._format_filter(q_filter), db_oper)
rows = collection.replace_one(db_filter, indata)
if rows.matched_count == 0:
if fail_on_empty:
- raise DbException("Not found any {} with _id='{}'".format(table[:-1], _id), HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with _id='{}'".format(table[:-1], _id),
+ HTTPStatus.NOT_FOUND,
+ )
return None
return {"replaced": rows.modified_count}
except Exception as e: # TODO refine