class DbMemory(DbBase):
- def __init__(self, logger_name='db'):
- super().__init__(logger_name)
+ def __init__(self, logger_name='db', lock=False):
+ super().__init__(logger_name, lock)
self.db = {}
def db_connect(self, config):
"""
if "logger_name" in config:
self.logger = logging.getLogger(config["logger_name"])
- self.master_password = config.get("masterpassword")
+ master_key = config.get("commonkey") or config.get("masterpassword")
+ if master_key:
+ self.set_secret_key(master_key)
@staticmethod
def _format_filter(q_filter):
"""
try:
result = []
- for _, row in self._find(table, self._format_filter(q_filter)):
- result.append(deepcopy(row))
+ with self.lock:
+ for _, row in self._find(table, self._format_filter(q_filter)):
+ result.append(deepcopy(row))
return result
except DbException:
raise
"""
try:
result = None
- for _, row in self._find(table, self._format_filter(q_filter)):
- if not fail_on_more:
- return deepcopy(row)
- if result:
- raise DbException("Found more than one entry with filter='{}'".format(q_filter),
- HTTPStatus.CONFLICT.value)
- result = row
+ with self.lock:
+ for _, row in self._find(table, self._format_filter(q_filter)):
+ if not fail_on_more:
+ return deepcopy(row)
+ if result:
+ raise DbException("Found more than one entry with filter='{}'".format(q_filter),
+ HTTPStatus.CONFLICT.value)
+ result = row
if not result and fail_on_empty:
raise DbException("Not found entry with filter='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
return deepcopy(result)
"""
try:
id_list = []
- for i, _ in self._find(table, self._format_filter(q_filter)):
- id_list.append(i)
+ with self.lock:
+ for i, _ in self._find(table, self._format_filter(q_filter)):
+ id_list.append(i)
deleted = len(id_list)
for i in reversed(id_list):
del self.db[table][i]
:return: Dict with the number of entries deleted
"""
try:
- for i, _ in self._find(table, self._format_filter(q_filter)):
- break
- else:
- if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
- return None
- del self.db[table][i]
+ with self.lock:
+ for i, _ in self._find(table, self._format_filter(q_filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(q_filter), HTTPStatus.NOT_FOUND)
+ return None
+ del self.db[table][i]
return {"deleted": 1}
except Exception as e: # TODO refine
raise DbException(str(e))
:return: Dict with the number of entries replaced
"""
try:
- for i, _ in self._find(table, self._format_filter({"_id": _id})):
- break
- else:
- if fail_on_empty:
- raise DbException("Not found entry with _id='{}'".format(_id), HTTPStatus.NOT_FOUND)
- return None
- self.db[table][i] = deepcopy(indata)
+ with self.lock:
+ for i, _ in self._find(table, self._format_filter({"_id": _id})):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with _id='{}'".format(_id), HTTPStatus.NOT_FOUND)
+ return None
+ self.db[table][i] = deepcopy(indata)
return {"updated": 1}
except DbException:
raise
if not id:
id = str(uuid4())
indata["_id"] = id
- if table not in self.db:
- self.db[table] = []
- self.db[table].append(deepcopy(indata))
+ with self.lock:
+ if table not in self.db:
+ self.db[table] = []
+ self.db[table].append(deepcopy(indata))
return id
except Exception as e: # TODO refine
raise DbException(str(e))