import os
import yaml
import asyncio
-from msgbase import MsgBase, MsgException
+from osm_common.msgbase import MsgBase, MsgException
from time import sleep
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"""
This emulated kafka bus by just using a shared file system. Useful for testing or devops.
-One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
access to the same file. e.g. same volume if running with docker.
One text line per message is used in yaml format.
"""
+
class MsgLocal(MsgBase):
def __init__(self, logger_name='msg'):
self.logger = logging.getLogger(logger_name)
self.path = None
# create a different file for each topic
- self.files = {}
+ self.files_read = {}
+ self.files_write = {}
self.buffer = {}
def connect(self, config):
raise MsgException(str(e))
def disconnect(self):
- for f in self.files.values():
+ for f in self.files_read.values():
+ try:
+ f.close()
+ except Exception: # TODO refine
+ pass
+ for f in self.files_write.values():
try:
f.close()
- except Exception as e: # TODO refine
+ except Exception: # TODO refine
pass
def write(self, topic, key, msg):
:return: None or raises and exception
"""
try:
- if topic not in self.files:
- self.files[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
- self.files[topic].flush()
+ if topic not in self.files_write:
+ self.files_write[topic] = open(self.path + topic, "a+")
+ yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+ self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e))
topic_list = (topic, )
while True:
for single_topic in topic_list:
- if single_topic not in self.files:
- self.files[single_topic] = open(self.path + single_topic, "a+")
+ if single_topic not in self.files_read:
+ self.files_read[single_topic] = open(self.path + single_topic, "a+")
self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files[single_topic].readline()
+ self.buffer[single_topic] += self.files_read[single_topic].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
msg_dict = yaml.load(self.buffer[single_topic])
except Exception as e: # TODO refine
raise MsgException(str(e))
+ async def aiowrite(self, topic, key, msg, loop=None):
+ """
+ Asyncio write. It blocks
+ :param topic: str
+ :param key: str
+ :param msg: message, can be str or yaml
+ :param loop: asyncio loop
+ :return: nothing if ok or raises an exception
+ """
+ return self.write(topic, key, msg)