self.buffer[single_topic] += self.files_read[single_topic].readline()
if not self.buffer[single_topic].endswith("\n"):
continue
- msg_dict = yaml.load(self.buffer[single_topic])
+ msg_dict = yaml.safe_load(self.buffer[single_topic])
self.buffer[single_topic] = ""
assert len(msg_dict) == 1
for k, v in msg_dict.items():
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
+ async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop
- :return: topic, key, message
+ :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
+ :param callback: synchronous callback function that will handle the message
+ :param aiocallback: async callback function that will handle the message
+ :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
+ group_id provided at connect inside config), or a group_id string
+ :param kwargs: optional keyword arguments for callback function
+ :return: If no callback defined, it returns (topic, key, message)
"""
_loop = loop or self.loop
try: