--- /dev/null
+from dbbase import DbException, dbbase
+from http import HTTPStatus
+from uuid import uuid4
+from copy import deepcopy
+
+
+class dbmemory(dbbase):
+
+ def __init__(self):
+ self.db = {}
+
+ @staticmethod
+ def _format_filter(filter):
+ return filter # TODO
+
+ def _find(self, table, filter):
+ for i, row in enumerate(self.db.get(table, ())):
+ match = True
+ if filter:
+ for k, v in filter.items():
+ if k not in row or v != row[k]:
+ match = False
+ if match:
+ yield i, row
+
+ def get_list(self, table, filter={}):
+ try:
+ l = []
+ for _, row in self._find(table, self._format_filter(filter)):
+ l.append(deepcopy(row))
+ return l
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ try:
+ l = None
+ for _, row in self._find(table, self._format_filter(filter)):
+ if not fail_on_more:
+ return deepcopy(row)
+ if l:
+ raise DbException("Found more than one entry with filter='{}'".format(filter),
+ HTTPStatus.CONFLICT.value)
+ l = row
+ if not l and fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value)
+ return deepcopy(l)
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_list(self, table, filter={}):
+ try:
+ id_list = []
+ for i, _ in self._find(table, self._format_filter(filter)):
+ id_list.append(i)
+ deleted = len(id_list)
+ for i in id_list:
+ del self.db[table][i]
+ return {"deleted": deleted}
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value)
+ return None
+ del self.db[table][i]
+ return {"deleted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def replace(self, table, filter, indata, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value)
+ return None
+ self.db[table][i] = deepcopy(indata)
+ return {"upadted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def create(self, table, indata):
+ try:
+ id = indata.get("_id")
+ if not id:
+ id = str(uuid4())
+ indata["_id"] = id
+ if table not in self.db:
+ self.db[table] = []
+ self.db[table].append(deepcopy(indata))
+ return id
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+
+if __name__ == '__main__':
+ # some test code
+ db = dbmemory()
+ db.create("test", {"_id": 1, "data": 1})
+ db.create("test", {"_id": 2, "data": 2})
+ db.create("test", {"_id": 3, "data": 3})
+ print("must be 3 items:", db.get_list("test"))
+ print("must return item 2:", db.get_list("test", {"_id": 2}))
+ db.del_one("test", {"_id": 2})
+ print("must be emtpy:", db.get_list("test", {"_id": 2}))