self.files_read = {}
self.files_write = {}
self.buffer = {}
+ self.loop = None
def connect(self, config):
try:
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
+ self.loop = config.get("loop")
+
except MsgException:
raise
except Exception as e: # TODO refine
:param loop: asyncio loop
:return: topic, key, message
"""
+ _loop = loop or self.loop
try:
while True:
msg = self.read(topic, blocks=False)
await aiocallback(*msg, **kwargs)
else:
return msg
- await asyncio.sleep(2, loop=loop)
+ await asyncio.sleep(2, loop=_loop)
except MsgException:
raise
except Exception as e: # TODO refine