class MsgLocal(MsgBase):
-
- def __init__(self, logger_name='msg', lock=False):
+ def __init__(self, logger_name="msg", lock=False):
super().__init__(logger_name, lock)
self.path = None
# create a different file for each topic
with self.lock:
if topic not in self.files_write:
self.files_write[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000)
+ yaml.safe_dump(
+ {key: msg},
+ self.files_write[topic],
+ default_flow_style=True,
+ width=20000,
+ )
self.files_write[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
if isinstance(topic, (list, tuple)):
topic_list = topic
else:
- topic_list = (topic, )
+ topic_list = (topic,)
while True:
for single_topic in topic_list:
with self.lock:
if single_topic not in self.files_read:
- self.files_read[single_topic] = open(self.path + single_topic, "a+")
+ self.files_read[single_topic] = open(
+ self.path + single_topic, "a+"
+ )
self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files_read[single_topic].readline()
+ self.buffer[single_topic] += self.files_read[
+ single_topic
+ ].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
msg_dict = yaml.safe_load(self.buffer[single_topic])
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
+ async def aioread(
+ self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ ):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics