initial commit
Change-Id: Ia40148fdc2cabbbacb0b67aaed8442ed0ecf0bc2
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_common/__init__.py b/osm_common/__init__.py
new file mode 100644
index 0000000..df7b893
--- /dev/null
+++ b/osm_common/__init__.py
@@ -0,0 +1,2 @@
+version = '0.1.3'
+date_version = '2018-04-19'
\ No newline at end of file
diff --git a/osm_common/dbbase.py b/osm_common/dbbase.py
new file mode 100644
index 0000000..aa9c24e
--- /dev/null
+++ b/osm_common/dbbase.py
@@ -0,0 +1,37 @@
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbException(Exception):
+
+ def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
+ self.http_code = http_code
+ Exception.__init__(self, "database exception " + message)
+
+
+class DbBase(object):
+
+ def __init__(self):
+ pass
+
+ def db_connect(self, config):
+ pass
+
+ def db_disconnect(self):
+ pass
+
+ def get_list(self, table, filter={}):
+ pass
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ pass
+
+ def create(self, table, indata):
+ pass
+
+ def del_list(self, table, filter={}):
+ pass
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ pass
diff --git a/osm_common/dbmemory.py b/osm_common/dbmemory.py
new file mode 100644
index 0000000..cdb0482
--- /dev/null
+++ b/osm_common/dbmemory.py
@@ -0,0 +1,124 @@
+import logging
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from uuid import uuid4
+from copy import deepcopy
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class DbMemory(DbBase):
+
+ def __init__(self, logger_name='db'):
+ self.logger = logging.getLogger(logger_name)
+ self.db = {}
+
+ def db_connect(self, config):
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+
+ @staticmethod
+ def _format_filter(filter):
+ return filter # TODO
+
+ def _find(self, table, filter):
+ for i, row in enumerate(self.db.get(table, ())):
+ match = True
+ if filter:
+ for k, v in filter.items():
+ if k not in row or v != row[k]:
+ match = False
+ if match:
+ yield i, row
+
+ def get_list(self, table, filter={}):
+ try:
+ l = []
+ for _, row in self._find(table, self._format_filter(filter)):
+ l.append(deepcopy(row))
+ return l
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ try:
+ l = None
+ for _, row in self._find(table, self._format_filter(filter)):
+ if not fail_on_more:
+ return deepcopy(row)
+ if l:
+ raise DbException("Found more than one entry with filter='{}'".format(filter),
+ HTTPStatus.CONFLICT.value)
+ l = row
+ if not l and fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return deepcopy(l)
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_list(self, table, filter={}):
+ try:
+ id_list = []
+ for i, _ in self._find(table, self._format_filter(filter)):
+ id_list.append(i)
+ deleted = len(id_list)
+ for i in id_list:
+ del self.db[table][i]
+ return {"deleted": deleted}
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return None
+ del self.db[table][i]
+ return {"deleted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def replace(self, table, filter, indata, fail_on_empty=True):
+ try:
+ for i, _ in self._find(table, self._format_filter(filter)):
+ break
+ else:
+ if fail_on_empty:
+ raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ return None
+ self.db[table][i] = deepcopy(indata)
+ return {"upadted": 1}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def create(self, table, indata):
+ try:
+ id = indata.get("_id")
+ if not id:
+ id = str(uuid4())
+ indata["_id"] = id
+ if table not in self.db:
+ self.db[table] = []
+ self.db[table].append(deepcopy(indata))
+ return id
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+
+if __name__ == '__main__':
+ # some test code
+ db = dbmemory()
+ db.create("test", {"_id": 1, "data": 1})
+ db.create("test", {"_id": 2, "data": 2})
+ db.create("test", {"_id": 3, "data": 3})
+ print("must be 3 items:", db.get_list("test"))
+ print("must return item 2:", db.get_list("test", {"_id": 2}))
+ db.del_one("test", {"_id": 2})
+ print("must be emtpy:", db.get_list("test", {"_id": 2}))
diff --git a/osm_common/dbmongo.py b/osm_common/dbmongo.py
new file mode 100644
index 0000000..582773a
--- /dev/null
+++ b/osm_common/dbmongo.py
@@ -0,0 +1,191 @@
+
+import logging
+from pymongo import MongoClient, errors
+from dbbase import DbException, DbBase
+from http import HTTPStatus
+from time import time, sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+# TODO consider use this decorator for database access retries
+# @retry_mongocall
+# def retry_mongocall(call):
+# def _retry_mongocall(*args, **kwargs):
+# retry = 1
+# while True:
+# try:
+# return call(*args, **kwargs)
+# except pymongo.AutoReconnect as e:
+# if retry == 4:
+# raise DbException(str(e))
+# sleep(retry)
+# return _retry_mongocall
+
+
+class DbMongo(DbBase):
+ conn_initial_timout = 120
+ conn_timout = 10
+
+ def __init__(self, logger_name='db'):
+ self.logger = logging.getLogger(logger_name)
+
+ def db_connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.client = MongoClient(config["host"], config["port"])
+ self.db = self.client[config["name"]]
+ if "loglevel" in config:
+ self.logger.setLevel(getattr(logging, config['loglevel']))
+ # get data to try a connection
+ now = time()
+ while True:
+ try:
+ self.db.users.find_one({"username": "admin"})
+ return
+ except errors.ConnectionFailure as e:
+ if time() - now >= self.conn_initial_timout:
+ raise
+ self.logger.info("Waiting to database up {}".format(e))
+ sleep(2)
+ except errors.PyMongoError as e:
+ raise DbException(str(e))
+
+ def db_disconnect(self):
+ pass # TODO
+
+ @staticmethod
+ def _format_filter(filter):
+ try:
+ db_filter = {}
+ for query_k, query_v in filter.items():
+ dot_index = query_k.rfind(".")
+ if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
+ "ncont", "neq"):
+ operator = "$" + query_k[dot_index+1:]
+ if operator == "$neq":
+ operator = "$ne"
+ k = query_k[:dot_index]
+ else:
+ operator = "$eq"
+ k = query_k
+
+ v = query_v
+ if isinstance(v, list):
+ if operator in ("$eq", "$cont"):
+ operator = "$in"
+ v = query_v
+ elif operator in ("$ne", "$ncont"):
+ operator = "$nin"
+ v = query_v
+ else:
+ v = query_v.join(",")
+
+ if operator in ("$eq", "$cont"):
+ # v cannot be a comma separated list, because operator would have been changed to $in
+ db_filter[k] = v
+ elif operator == "$ncount":
+ # v cannot be a comma separated list, because operator would have been changed to $nin
+ db_filter[k] = {"$ne": v}
+ else:
+ # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
+ if k not in db_filter:
+ db_filter[k] = {}
+ db_filter[k][operator] = v
+
+ return db_filter
+ except Exception as e:
+ raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
+ http_code=HTTPStatus.BAD_REQUEST)
+
+ def get_list(self, table, filter={}):
+ try:
+ l = []
+ collection = self.db[table]
+ rows = collection.find(self._format_filter(filter))
+ for row in rows:
+ l.append(row)
+ return l
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
+ try:
+ if filter:
+ filter = self._format_filter(filter)
+ collection = self.db[table]
+ if not (fail_on_empty and fail_on_more):
+ return collection.find_one(filter)
+ rows = collection.find(filter)
+ if rows.count() == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ elif rows.count() > 1:
+ if fail_on_more:
+ raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.CONFLICT)
+ return rows[0]
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_list(self, table, filter={}):
+ try:
+ collection = self.db[table]
+ rows = collection.delete_many(self._format_filter(filter))
+ return {"deleted": rows.deleted_count}
+ except DbException:
+ raise
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def del_one(self, table, filter={}, fail_on_empty=True):
+ try:
+ collection = self.db[table]
+ rows = collection.delete_one(self._format_filter(filter))
+ if rows.deleted_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"deleted": rows.deleted_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def create(self, table, indata):
+ try:
+ collection = self.db[table]
+ data = collection.insert_one(indata)
+ return data.inserted_id
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def set_one(self, table, filter, update_dict, fail_on_empty=True):
+ try:
+ collection = self.db[table]
+ rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
+ if rows.updated_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"deleted": rows.deleted_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
+
+ def replace(self, table, id, indata, fail_on_empty=True):
+ try:
+ _filter = {"_id": id}
+ collection = self.db[table]
+ rows = collection.replace_one(_filter, indata)
+ if rows.matched_count == 0:
+ if fail_on_empty:
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
+ HTTPStatus.NOT_FOUND)
+ return None
+ return {"replace": rows.modified_count}
+ except Exception as e: # TODO refine
+ raise DbException(str(e))
diff --git a/osm_common/fsbase.py b/osm_common/fsbase.py
new file mode 100644
index 0000000..7b6cd0c
--- /dev/null
+++ b/osm_common/fsbase.py
@@ -0,0 +1,43 @@
+
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsException(Exception):
+ def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+ self.http_code = http_code
+ Exception.__init__(self, "storage exception " + message)
+
+
+class FsBase(object):
+ def __init__(self):
+ pass
+
+ def get_params(self):
+ return {}
+
+ def fs_connect(self, config):
+ pass
+
+ def fs_disconnect(self):
+ pass
+
+ def mkdir(self, folder):
+ pass
+
+ def file_exists(self, storage):
+ pass
+
+ def file_size(self, storage):
+ pass
+
+ def file_extract(self, tar_object, path):
+ pass
+
+ def file_open(self, storage, mode):
+ pass
+
+ def file_delete(self, storage, ignore_non_exist=False):
+ pass
+
diff --git a/osm_common/fslocal.py b/osm_common/fslocal.py
new file mode 100644
index 0000000..b7dd839
--- /dev/null
+++ b/osm_common/fslocal.py
@@ -0,0 +1,142 @@
+import os
+import logging
+import tarfile
+from http import HTTPStatus
+from shutil import rmtree
+from fsbase import FsBase, FsException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class FsLocal(FsBase):
+
+ def __init__(self, logger_name='fs'):
+ self.logger = logging.getLogger(logger_name)
+ self.path = None
+
+ def get_params(self):
+ return {"fs": "local", "path": self.path}
+
+ def fs_connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.path = config["path"]
+ if not self.path.endswith("/"):
+ self.path += "/"
+ if not os.path.exists(self.path):
+ raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+ config["path"]))
+ except FsException:
+ raise
+ except Exception as e: # TODO refine
+ raise FsException(str(e))
+
+ def fs_disconnect(self):
+ pass # TODO
+
+ def mkdir(self, folder):
+ """
+ Creates a folder or parent object location
+ :param folder:
+ :return: None or raises and exception
+ """
+ try:
+ os.mkdir(self.path + folder)
+ except Exception as e:
+ raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ def file_exists(self, storage, mode=None):
+ """
+ Indicates if "storage" file exist
+ :param storage: can be a str or a str list
+ :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+ :return: True, False
+ """
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ if os.path.exists(self.path + f):
+ if mode == "file" and os.path.isfile(self.path + f):
+ return True
+ if mode == "dir" and os.path.isdir(self.path + f):
+ return True
+ return False
+
+ def file_size(self, storage):
+ """
+ return file size
+ :param storage: can be a str or a str list
+ :return: file size
+ """
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return os.path.getsize(self.path + f)
+
+ def file_extract(self, tar_object, path):
+ """
+ extract a tar file
+ :param tar_object: object of type tar
+ :param path: can be a str or a str list, or a tar object where to extract the tar_object
+ :return: None
+ """
+ if isinstance(path, str):
+ f = self.path + path
+ else:
+ f = self.path + "/".join(path)
+ tar_object.extractall(path=f)
+
+ def file_open(self, storage, mode):
+ """
+ Open a file
+ :param storage: can be a str or list of str
+ :param mode: file mode
+ :return: file object
+ """
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return open(self.path + f, mode)
+ except FileNotFoundError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def dir_ls(self, storage):
+ """
+ return folder content
+ :param storage: can be a str or list of str
+ :return: folder content
+ """
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return os.listdir(self.path + f)
+ except NotADirectoryError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def file_delete(self, storage, ignore_non_exist=False):
+ """
+ Delete storage content recursivelly
+ :param storage: can be a str or list of str
+ :param ignore_non_exist: not raise exception if storage does not exist
+ :return: None
+ """
+
+ if isinstance(storage, str):
+ f = self.path + storage
+ else:
+ f = self.path + "/".join(storage)
+ if os.path.exists(f):
+ rmtree(f)
+ elif not ignore_non_exist:
+ raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py
new file mode 100644
index 0000000..25e8c80
--- /dev/null
+++ b/osm_common/msgbase.py
@@ -0,0 +1,47 @@
+
+import asyncio
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class MsgException(Exception):
+ """
+ Base Exception class for all msgXXXX exceptions
+ """
+
+ def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
+ """
+ General exception
+ :param message: descriptive text
+ :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
+ """
+ self.http_code = http_code
+ Exception.__init__(self, "messaging exception " + message)
+
+
+class MsgBase(object):
+ """
+ Base class for all msgXXXX classes
+ """
+
+ def __init__(self):
+ pass
+
+ def connect(self, config):
+ pass
+
+ def disconnect(self):
+ pass
+
+ def write(self, topic, key, msg):
+ pass
+
+ def read(self, topic):
+ pass
+
+ async def aiowrite(self, topic, key, msg, loop):
+ pass
+
+ async def aioread(self, topic, loop):
+ pass
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
new file mode 100644
index 0000000..c819c81
--- /dev/null
+++ b/osm_common/msgkafka.py
@@ -0,0 +1,107 @@
+import logging
+import asyncio
+import yaml
+from aiokafka import AIOKafkaConsumer
+from aiokafka import AIOKafkaProducer
+from aiokafka.errors import KafkaError
+from msgbase import MsgBase, MsgException
+#import json
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
+ "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+class MsgKafka(MsgBase):
+ def __init__(self, logger_name='msg'):
+ self.logger = logging.getLogger(logger_name)
+ self.host = None
+ self.port = None
+ self.consumer = None
+ self.producer = None
+ self.loop = None
+ self.broker = None
+
+ def connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.host = config["host"]
+ self.port = config["port"]
+ self.loop = asyncio.get_event_loop()
+ self.broker = str(self.host) + ":" + str(self.port)
+
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def disconnect(self):
+ try:
+ self.loop.close()
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def write(self, topic, key, msg):
+ try:
+ self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+ msg=yaml.safe_dump(msg, default_flow_style=True),
+ loop=self.loop))
+
+ except Exception as e:
+ raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+
+ def read(self, topic):
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :return: topic, key, message; or None
+ """
+ try:
+ return self.loop.run_until_complete(self.aioread(topic, self.loop))
+ except MsgException:
+ raise
+ except Exception as e:
+ raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
+
+ async def aiowrite(self, topic, key, msg, loop=None):
+
+ if not loop:
+ loop = self.loop
+ try:
+ self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
+ bootstrap_servers=self.broker)
+ await self.producer.start()
+ await self.producer.send(topic=topic, key=key, value=msg)
+ except Exception as e:
+ raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+ finally:
+ await self.producer.stop()
+
+ async def aioread(self, topic, loop=None, callback=None, *args):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :callback: callback function that will handle the message in kafka bus
+ :*args: optional arguments for callback function
+ :return: topic, key, message
+ """
+
+ if not loop:
+ loop = self.loop
+ try:
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic,)
+
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ await self.consumer.start()
+ self.consumer.subscribe(topic_list)
+
+ async for message in self.consumer:
+ if callback:
+ callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+ else:
+ return message.topic, yaml.load(message.key), yaml.load(message.value)
+ except KafkaError as e:
+ raise MsgException(str(e))
+ finally:
+ await self.consumer.stop()
+
diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py
new file mode 100644
index 0000000..c774f85
--- /dev/null
+++ b/osm_common/msglocal.py
@@ -0,0 +1,111 @@
+import logging
+import os
+import yaml
+import asyncio
+from msgbase import MsgBase, MsgException
+from time import sleep
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+"""
+This emulated kafka bus by just using a shared file system. Useful for testing or devops.
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
+access to the same file. e.g. same volume if running with docker.
+One text line per message is used in yaml format.
+"""
+
+class MsgLocal(MsgBase):
+
+ def __init__(self, logger_name='msg'):
+ self.logger = logging.getLogger(logger_name)
+ self.path = None
+ # create a different file for each topic
+ self.files = {}
+ self.buffer = {}
+
+ def connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ self.path = config["path"]
+ if not self.path.endswith("/"):
+ self.path += "/"
+ if not os.path.exists(self.path):
+ os.mkdir(self.path)
+ except MsgException:
+ raise
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def disconnect(self):
+ for f in self.files.values():
+ try:
+ f.close()
+ except Exception as e: # TODO refine
+ pass
+
+ def write(self, topic, key, msg):
+ """
+ Insert a message into topic
+ :param topic: topic
+ :param key: key text to be inserted
+ :param msg: value object to be inserted, can be str, object ...
+ :return: None or raises and exception
+ """
+ try:
+ if topic not in self.files:
+ self.files[topic] = open(self.path + topic, "a+")
+ yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
+ self.files[topic].flush()
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ def read(self, topic, blocks=True):
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :param blocks: indicates if it should wait and block until a message is present or returns None
+ :return: topic, key, message; or None if blocks==True
+ """
+ try:
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic, )
+ while True:
+ for single_topic in topic_list:
+ if single_topic not in self.files:
+ self.files[single_topic] = open(self.path + single_topic, "a+")
+ self.buffer[single_topic] = ""
+ self.buffer[single_topic] += self.files[single_topic].readline()
+ if not self.buffer[single_topic].endswith("\n"):
+ continue
+ msg_dict = yaml.load(self.buffer[single_topic])
+ self.buffer[single_topic] = ""
+ assert len(msg_dict) == 1
+ for k, v in msg_dict.items():
+ return single_topic, k, v
+ if not blocks:
+ return None
+ sleep(2)
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
+ async def aioread(self, topic, loop):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :return: topic, key, message
+ """
+ try:
+ while True:
+ msg = self.read(topic, blocks=False)
+ if msg:
+ return msg
+ await asyncio.sleep(2, loop=loop)
+ except MsgException:
+ raise
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+