initial commit

Change-Id: Ia40148fdc2cabbbacb0b67aaed8442ed0ecf0bc2
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_common/__init__.py b/osm_common/__init__.py
new file mode 100644
index 0000000..df7b893
--- /dev/null
+++ b/osm_common/__init__.py
@@ -0,0 +1,2 @@
+version = '0.1.3'
+date_version = '2018-04-19'
\ No newline at end of file
diff --git a/osm_common/dbbase.py b/osm_common/dbbase.py
new file mode 100644
index 0000000..aa9c24e
--- /dev/null
+++ b/osm_common/dbbase.py
@@ -0,0 +1,37 @@
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
+        self.http_code = http_code
+        Exception.__init__(self, "database exception " + message)
+
+
+class DbBase(object):
+
+    def __init__(self):
+        pass
+
+    def db_connect(self, config):
+        pass
+
+    def db_disconnect(self):
+        pass
+
+    def get_list(self, table, filter={}):
+        pass
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        pass
+
+    def create(self, table, indata):
+        pass
+
+    def del_list(self, table, filter={}):
+        pass
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        pass
diff --git a/osm_common/dbmemory.py b/osm_common/dbmemory.py
new file mode 100644
index 0000000..cdb0482
--- /dev/null
+++ b/osm_common/dbmemory.py
@@ -0,0 +1,124 @@
+import logging
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from uuid import uuid4
+from copy import deepcopy
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbMemory(DbBase):
+
+    def __init__(self, logger_name='db'):
+        self.logger = logging.getLogger(logger_name)
+        self.db = {}
+
+    def db_connect(self, config):
+        if "logger_name" in config:
+            self.logger = logging.getLogger(config["logger_name"])
+
+    @staticmethod
+    def _format_filter(filter):
+        return filter    # TODO
+
+    def _find(self, table, filter):
+        for i, row in enumerate(self.db.get(table, ())):
+            match = True
+            if filter:
+                for k, v in filter.items():
+                    if k not in row or v != row[k]:
+                        match = False
+            if match:
+                yield i, row
+
+    def get_list(self, table, filter={}):
+        try:
+            l = []
+            for _, row in self._find(table, self._format_filter(filter)):
+                l.append(deepcopy(row))
+            return l
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        try:
+            l = None
+            for _, row in self._find(table, self._format_filter(filter)):
+                if not fail_on_more:
+                    return deepcopy(row)
+                if l:
+                    raise DbException("Found more than one entry with filter='{}'".format(filter),
+                                      HTTPStatus.CONFLICT.value)
+                l = row
+            if not l and fail_on_empty:
+                raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+            return deepcopy(l)
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_list(self, table, filter={}):
+        try:
+            id_list = []
+            for i, _ in self._find(table, self._format_filter(filter)):
+                id_list.append(i)
+            deleted = len(id_list)
+            for i in id_list:
+                del self.db[table][i]
+            return {"deleted": deleted}
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        try:
+            for i, _ in self._find(table, self._format_filter(filter)):
+                break
+            else:
+                if fail_on_empty:
+                    raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+                return None
+            del self.db[table][i]
+            return {"deleted": 1}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def replace(self, table, filter, indata, fail_on_empty=True):
+        try:
+            for i, _ in self._find(table, self._format_filter(filter)):
+                break
+            else:
+                if fail_on_empty:
+                    raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+                return None
+            self.db[table][i] = deepcopy(indata)
+            return {"upadted": 1}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def create(self, table, indata):
+        try:
+            id = indata.get("_id")
+            if not id:
+                id = str(uuid4())
+                indata["_id"] = id
+            if table not in self.db:
+                self.db[table] = []
+            self.db[table].append(deepcopy(indata))
+            return id
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+
+if __name__ == '__main__':
+    # some test code
+    db = dbmemory()
+    db.create("test", {"_id": 1, "data": 1})
+    db.create("test", {"_id": 2, "data": 2})
+    db.create("test", {"_id": 3, "data": 3})
+    print("must be 3 items:", db.get_list("test"))
+    print("must return item 2:", db.get_list("test", {"_id": 2}))
+    db.del_one("test", {"_id": 2})
+    print("must be emtpy:", db.get_list("test", {"_id": 2}))
diff --git a/osm_common/dbmongo.py b/osm_common/dbmongo.py
new file mode 100644
index 0000000..582773a
--- /dev/null
+++ b/osm_common/dbmongo.py
@@ -0,0 +1,191 @@
+
+import logging
+from pymongo import MongoClient, errors
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from time import time, sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+# TODO consider use this decorator for database access retries
+# @retry_mongocall
+# def retry_mongocall(call):
+#     def _retry_mongocall(*args, **kwargs):
+#         retry = 1
+#         while True:
+#             try:
+#                 return call(*args, **kwargs)
+#             except pymongo.AutoReconnect as e:
+#                 if retry == 4:
+#                     raise DbException(str(e))
+#                 sleep(retry)
+#     return _retry_mongocall
+
+
+class DbMongo(DbBase):
+    conn_initial_timout = 120
+    conn_timout = 10
+
+    def __init__(self, logger_name='db'):
+        self.logger = logging.getLogger(logger_name)
+
+    def db_connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.client = MongoClient(config["host"], config["port"])
+            self.db = self.client[config["name"]]
+            if "loglevel" in config:
+                self.logger.setLevel(getattr(logging, config['loglevel']))
+            # get data to try a connection
+            now = time()
+            while True:
+                try:
+                    self.db.users.find_one({"username": "admin"})
+                    return
+                except errors.ConnectionFailure as e:
+                    if time() - now >= self.conn_initial_timout:
+                        raise
+                    self.logger.info("Waiting to database up {}".format(e))
+                    sleep(2)
+        except errors.PyMongoError as e:
+            raise DbException(str(e))
+
+    def db_disconnect(self):
+        pass  # TODO
+
+    @staticmethod
+    def _format_filter(filter):
+        try:
+            db_filter = {}
+            for query_k, query_v in filter.items():
+                dot_index = query_k.rfind(".")
+                if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
+                                                               "ncont", "neq"):
+                    operator = "$" + query_k[dot_index+1:]
+                    if operator == "$neq":
+                        operator = "$ne"
+                    k = query_k[:dot_index]
+                else:
+                    operator = "$eq"
+                    k = query_k
+
+                v = query_v
+                if isinstance(v, list):
+                    if operator in ("$eq", "$cont"):
+                        operator = "$in"
+                        v = query_v
+                    elif operator in ("$ne", "$ncont"):
+                        operator = "$nin"
+                        v = query_v
+                    else:
+                        v = query_v.join(",")
+
+                if operator in ("$eq", "$cont"):
+                    # v cannot be a comma separated list, because operator would have been changed to $in
+                    db_filter[k] = v
+                elif operator == "$ncount":
+                    # v cannot be a comma separated list, because operator would have been changed to $nin
+                    db_filter[k] = {"$ne": v}
+                else:
+                    # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
+                    if k not in db_filter:
+                        db_filter[k] = {}
+                    db_filter[k][operator] = v
+
+            return db_filter
+        except Exception as e:
+            raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+                              http_code=HTTPStatus.BAD_REQUEST)
+
+    def get_list(self, table, filter={}):
+        try:
+            l = []
+            collection = self.db[table]
+            rows = collection.find(self._format_filter(filter))
+            for row in rows:
+                l.append(row)
+            return l
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+        try:
+            if filter:
+                filter = self._format_filter(filter)
+            collection = self.db[table]
+            if not (fail_on_empty and fail_on_more):
+                return collection.find_one(filter)
+            rows = collection.find(filter)
+            if rows.count() == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            elif rows.count() > 1:
+                if fail_on_more:
+                    raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.CONFLICT)
+            return rows[0]
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_list(self, table, filter={}):
+        try:
+            collection = self.db[table]
+            rows = collection.delete_many(self._format_filter(filter))
+            return {"deleted": rows.deleted_count}
+        except DbException:
+            raise
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def del_one(self, table, filter={}, fail_on_empty=True):
+        try:
+            collection = self.db[table]
+            rows = collection.delete_one(self._format_filter(filter))
+            if rows.deleted_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"deleted": rows.deleted_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def create(self, table, indata):
+        try:
+            collection = self.db[table]
+            data = collection.insert_one(indata)
+            return data.inserted_id
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def set_one(self, table, filter, update_dict, fail_on_empty=True):
+        try:
+            collection = self.db[table]
+            rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+            if rows.updated_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"deleted": rows.deleted_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
+
+    def replace(self, table, id, indata, fail_on_empty=True):
+        try:
+            _filter = {"_id": id}
+            collection = self.db[table]
+            rows = collection.replace_one(_filter, indata)
+            if rows.matched_count == 0:
+                if fail_on_empty:
+                    raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
+                                      HTTPStatus.NOT_FOUND)
+                return None
+            return {"replace": rows.modified_count}
+        except Exception as e:  # TODO refine
+            raise DbException(str(e))
diff --git a/osm_common/fsbase.py b/osm_common/fsbase.py
new file mode 100644
index 0000000..7b6cd0c
--- /dev/null
+++ b/osm_common/fsbase.py
@@ -0,0 +1,43 @@
+
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsException(Exception):
+    def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+        self.http_code = http_code
+        Exception.__init__(self, "storage exception " + message)
+
+
+class FsBase(object):
+    def __init__(self):
+        pass
+
+    def get_params(self):
+        return {}
+
+    def fs_connect(self, config):
+        pass
+
+    def fs_disconnect(self):
+        pass
+
+    def mkdir(self, folder):
+        pass
+
+    def file_exists(self, storage):
+        pass
+
+    def file_size(self, storage):
+        pass
+
+    def file_extract(self, tar_object, path):
+        pass
+
+    def file_open(self, storage, mode):
+        pass
+
+    def file_delete(self, storage, ignore_non_exist=False):
+        pass
+
diff --git a/osm_common/fslocal.py b/osm_common/fslocal.py
new file mode 100644
index 0000000..b7dd839
--- /dev/null
+++ b/osm_common/fslocal.py
@@ -0,0 +1,142 @@
+import os
+import logging
+import tarfile
+from http import HTTPStatus
+from shutil import rmtree
+from fsbase import FsBase, FsException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsLocal(FsBase):
+
+    def __init__(self, logger_name='fs'):
+        self.logger = logging.getLogger(logger_name)
+        self.path = None
+
+    def get_params(self):
+        return {"fs": "local", "path": self.path}
+
+    def fs_connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.path = config["path"]
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+                    config["path"]))
+        except FsException:
+            raise
+        except Exception as e:  # TODO refine
+            raise FsException(str(e))
+
+    def fs_disconnect(self):
+        pass  # TODO
+
+    def mkdir(self, folder):
+        """
+        Creates a folder or parent object location
+        :param folder:
+        :return: None or raises and exception
+        """
+        try:
+            os.mkdir(self.path + folder)
+        except Exception as e:
+            raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+    def file_exists(self, storage, mode=None):
+        """
+        Indicates if "storage" file exist
+        :param storage: can be a str or a str list
+        :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+        :return: True, False
+        """
+        if isinstance(storage, str):
+            f = storage
+        else:
+            f = "/".join(storage)
+        if os.path.exists(self.path + f):
+            if mode == "file" and os.path.isfile(self.path + f):
+                return True
+            if mode == "dir" and os.path.isdir(self.path + f):
+                return True
+        return False
+
+    def file_size(self, storage):
+        """
+        return file size
+        :param storage: can be a str or a str list
+        :return: file size
+        """
+        if isinstance(storage, str):
+            f = storage
+        else:
+            f = "/".join(storage)
+        return os.path.getsize(self.path + f)
+
+    def file_extract(self, tar_object, path):
+        """
+        extract a tar file
+        :param tar_object: object of type tar
+        :param path: can be a str or a str list, or a tar object where to extract the tar_object
+        :return: None
+        """
+        if isinstance(path, str):
+            f = self.path + path
+        else:
+            f = self.path + "/".join(path)
+        tar_object.extractall(path=f)
+
+    def file_open(self, storage, mode):
+        """
+        Open a file
+        :param storage: can be a str or list of str
+        :param mode: file mode
+        :return: file object
+        """
+        try:
+            if isinstance(storage, str):
+                f = storage
+            else:
+                f = "/".join(storage)
+            return open(self.path + f, mode)
+        except FileNotFoundError:
+            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def dir_ls(self, storage):
+        """
+        return folder content
+        :param storage: can be a str or list of str
+        :return: folder content
+        """
+        try:
+            if isinstance(storage, str):
+                f = storage
+            else:
+                f = "/".join(storage)
+            return os.listdir(self.path + f)
+        except NotADirectoryError:
+            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def file_delete(self, storage, ignore_non_exist=False):
+        """
+        Delete storage content recursivelly
+        :param storage: can be a str or list of str
+        :param ignore_non_exist: not raise exception if storage does not exist
+        :return: None
+        """
+
+        if isinstance(storage, str):
+            f = self.path + storage
+        else:
+            f = self.path + "/".join(storage)
+        if os.path.exists(f):
+            rmtree(f)
+        elif not ignore_non_exist:
+            raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py
new file mode 100644
index 0000000..25e8c80
--- /dev/null
+++ b/osm_common/msgbase.py
@@ -0,0 +1,47 @@
+
+import asyncio
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class MsgException(Exception):
+    """
+    Base Exception class for all msgXXXX exceptions
+    """
+
+    def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+        """
+        General exception
+        :param message:  descriptive text
+        :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
+        """
+        self.http_code = http_code
+        Exception.__init__(self, "messaging exception " + message)
+
+
+class MsgBase(object):
+    """
+    Base class for all msgXXXX classes
+    """
+
+    def __init__(self):
+        pass
+
+    def connect(self, config):
+        pass
+
+    def disconnect(self):
+        pass
+
+    def write(self, topic, key, msg):
+        pass
+
+    def read(self, topic):
+        pass
+
+    async def aiowrite(self, topic, key, msg, loop):
+        pass
+
+    async def aioread(self, topic, loop):
+        pass
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
new file mode 100644
index 0000000..c819c81
--- /dev/null
+++ b/osm_common/msgkafka.py
@@ -0,0 +1,107 @@
+import logging
+import asyncio
+import yaml
+from aiokafka import AIOKafkaConsumer
+from aiokafka import AIOKafkaProducer
+from aiokafka.errors import KafkaError
+from msgbase import MsgBase, MsgException
+#import json
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
+             "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+class MsgKafka(MsgBase):
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
+        self.host = None
+        self.port = None
+        self.consumer = None
+        self.producer = None
+        self.loop = None
+        self.broker = None
+
+    def connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.host = config["host"]
+            self.port = config["port"]
+            self.loop = asyncio.get_event_loop()
+            self.broker = str(self.host) + ":" + str(self.port)
+
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def disconnect(self):
+        try:
+            self.loop.close()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def write(self, topic, key, msg):
+        try:
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
+                                                       loop=self.loop))
+
+        except Exception as e:
+            raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+
+    def read(self, topic):
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :return: topic, key, message; or None
+        """
+        try:
+            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+        except MsgException:
+            raise
+        except Exception as e:
+            raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
+
+    async def aiowrite(self, topic, key, msg, loop=None):
+
+        if not loop:
+            loop = self.loop
+        try:
+            self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
+                                             bootstrap_servers=self.broker)
+            await self.producer.start()
+            await self.producer.send(topic=topic, key=key, value=msg)
+        except Exception as e:
+            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+        finally:
+            await self.producer.stop()
+
+    async def aioread(self, topic, loop=None, callback=None, *args):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
+        :return: topic, key, message
+        """
+
+        if not loop:
+            loop = self.loop
+        try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic,)
+
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            await self.consumer.start()
+            self.consumer.subscribe(topic_list)
+
+            async for message in self.consumer:
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
+        except KafkaError as e:
+            raise MsgException(str(e))
+        finally:
+            await self.consumer.stop()
+
diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py
new file mode 100644
index 0000000..c774f85
--- /dev/null
+++ b/osm_common/msglocal.py
@@ -0,0 +1,111 @@
+import logging
+import os
+import yaml
+import asyncio
+from msgbase import MsgBase, MsgException
+from time import sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+"""
+This emulated kafka bus by just using a shared file system. Useful for testing or devops.
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer 
+access to the same file. e.g. same volume if running with docker.
+One text line per message is used in yaml format.
+"""
+
+class MsgLocal(MsgBase):
+
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
+        self.path = None
+        # create a different file for each topic
+        self.files = {}
+        self.buffer = {}
+
+    def connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            self.path = config["path"]
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                os.mkdir(self.path)
+        except MsgException:
+            raise
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def disconnect(self):
+        for f in self.files.values():
+            try:
+                f.close()
+            except Exception as e:  # TODO refine
+                pass
+
+    def write(self, topic, key, msg):
+        """
+        Insert a message into topic
+        :param topic: topic
+        :param key: key text to be inserted
+        :param msg: value object to be inserted, can be str, object ...
+        :return: None or raises and exception
+        """
+        try:
+            if topic not in self.files:
+                self.files[topic] = open(self.path + topic, "a+")
+            yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
+            self.files[topic].flush()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    def read(self, topic, blocks=True):
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :param blocks: indicates if it should wait and block until a message is present or returns None
+        :return: topic, key, message; or None if blocks==True
+        """
+        try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic, )
+            while True:
+                for single_topic in topic_list:
+                    if single_topic not in self.files:
+                        self.files[single_topic] = open(self.path + single_topic, "a+")
+                        self.buffer[single_topic] = ""
+                    self.buffer[single_topic] += self.files[single_topic].readline()
+                    if not self.buffer[single_topic].endswith("\n"):
+                        continue
+                    msg_dict = yaml.load(self.buffer[single_topic])
+                    self.buffer[single_topic] = ""
+                    assert len(msg_dict) == 1
+                    for k, v in msg_dict.items():
+                        return single_topic, k, v
+                if not blocks:
+                    return None
+                sleep(2)
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
+    async def aioread(self, topic, loop):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :return: topic, key, message
+        """
+        try:
+            while True:
+                msg = self.read(topic, blocks=False)
+                if msg:
+                    return msg
+                await asyncio.sleep(2, loop=loop)
+        except MsgException:
+            raise
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+