import os
import yaml
import asyncio
-from msgbase import MsgBase, MsgException
+from osm_common.msgbase import MsgBase, MsgException
from time import sleep
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"""
This emulated kafka bus by just using a shared file system. Useful for testing or devops.
-One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
access to the same file. e.g. same volume if running with docker.
One text line per message is used in yaml format.
"""
+
class MsgLocal(MsgBase):
def __init__(self, logger_name='msg'):
self.logger = logging.getLogger(logger_name)
self.path = None
# create a different file for each topic
- self.files = {}
+ self.files_read = {}
+ self.files_write = {}
self.buffer = {}
def connect(self, config):
raise MsgException(str(e))
def disconnect(self):
- for f in self.files.values():
+ for f in self.files_read.values():
try:
f.close()
- except Exception as e: # TODO refine
+ except Exception: # TODO refine
+ pass
+ for f in self.files_write.values():
+ try:
+ f.close()
+ except Exception: # TODO refine
pass
def write(self, topic, key, msg):
:return: None or raises and exception
"""
try:
- if topic not in self.files:
- self.files[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
- self.files[topic].flush()
+ if topic not in self.files_write:
+ self.files_write[topic] = open(self.path + topic, "a+")
+ yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+ self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e))
topic_list = (topic, )
while True:
for single_topic in topic_list:
- if single_topic not in self.files:
- self.files[single_topic] = open(self.path + single_topic, "a+")
+ if single_topic not in self.files_read:
+ self.files_read[single_topic] = open(self.path + single_topic, "a+")
self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files[single_topic].readline()
+ self.buffer[single_topic] += self.files_read[single_topic].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
msg_dict = yaml.load(self.buffer[single_topic])
raise
except Exception as e: # TODO refine
raise MsgException(str(e))
-