msgkafka, provide loop on config
Change-Id: I974ce861c6ff88e61594c9983faff5459ac5708a
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py
index 1e8e089..eba4b97 100644
--- a/osm_common/msglocal.py
+++ b/osm_common/msglocal.py
@@ -42,6 +42,7 @@
self.files_read = {}
self.files_write = {}
self.buffer = {}
+ self.loop = None
def connect(self, config):
try:
@@ -52,6 +53,8 @@
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
+ self.loop = config.get("loop")
+
except MsgException:
raise
except Exception as e: # TODO refine
@@ -127,6 +130,7 @@
:param loop: asyncio loop
:return: topic, key, message
"""
+ _loop = loop or self.loop
try:
while True:
msg = self.read(topic, blocks=False)
@@ -137,7 +141,7 @@
await aiocallback(*msg, **kwargs)
else:
return msg
- await asyncio.sleep(2, loop=loop)
+ await asyncio.sleep(2, loop=_loop)
except MsgException:
raise
except Exception as e: # TODO refine