if not kafka_working:
self.logger.critical("kafka is working again")
kafka_working = True
- if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
+ if not self.aiomain_task_admin:
await asyncio.sleep(10, loop=self.loop)
self.logger.debug("Starting admin subscription task")
self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
group_id=False,
aiocallback=self._msg_callback),
loop=self.loop)
- if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
+ if not self.aiomain_task:
await asyncio.sleep(10, loop=self.loop)
self.logger.debug("Starting non-admin subscription task")
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
aiocallback=self._msg_callback),
loop=self.loop)
- await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
- timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+ timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ try:
+ if self.aiomain_task_admin in done:
+ exc = self.aiomain_task_admin.exception()
+ self.logger.error("admin subscription task exception: {}".format(exc))
+ self.aiomain_task_admin = None
+ if self.aiomain_task in done:
+ exc = self.aiomain_task.exception()
+ self.logger.error("non-admin subscription task exception: {}".format(exc))
+ self.aiomain_task = None
+ except asyncio.CancelledError:
+ pass
except Exception as e:
if self.to_terminate:
return
self.to_terminate = True
if self.aiomain_task:
self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
+ if self.aiomain_task_admin:
+ self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)