fix getting kafka aioread exception
Change-Id: I11d243b4b16a0757415aa72ded1e6b2134f7f3f8
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py
index 393918c..711455e 100644
--- a/osm_nbi/subscriptions.py
+++ b/osm_nbi/subscriptions.py
@@ -80,21 +80,32 @@
if not kafka_working:
self.logger.critical("kafka is working again")
kafka_working = True
- if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
+ if not self.aiomain_task_admin:
await asyncio.sleep(10, loop=self.loop)
self.logger.debug("Starting admin subscription task")
self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
group_id=False,
aiocallback=self._msg_callback),
loop=self.loop)
- if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
+ if not self.aiomain_task:
await asyncio.sleep(10, loop=self.loop)
self.logger.debug("Starting non-admin subscription task")
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
aiocallback=self._msg_callback),
loop=self.loop)
- await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
- timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+ timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ try:
+ if self.aiomain_task_admin in done:
+ exc = self.aiomain_task_admin.exception()
+ self.logger.error("admin subscription task exception: {}".format(exc))
+ self.aiomain_task_admin = None
+ if self.aiomain_task in done:
+ exc = self.aiomain_task.exception()
+ self.logger.error("non-admin subscription task exception: {}".format(exc))
+ self.aiomain_task = None
+ except asyncio.CancelledError:
+ pass
except Exception as e:
if self.to_terminate:
return
@@ -226,3 +237,5 @@
self.to_terminate = True
if self.aiomain_task:
self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
+ if self.aiomain_task_admin:
+ self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)