+ self.nslcm = None
+
+ async def start_kafka(self):
+ # timeout_wait_for_kafka = 3*60
+ kafka_working = True
+ while not self.to_terminate:
+ try:
+ # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
+ # created.
+ # Before subscribe, send dummy messages
+ await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
+ await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
+ await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+ if not kafka_working:
+ self.logger.critical("kafka is working again")
+ kafka_working = True
+ if not self.aiomain_task_admin:
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting admin subscription task")
+ self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
+ group_id=False,
+ aiocallback=self._msg_callback),
+ loop=self.loop)
+ if not self.aiomain_task:
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting non-admin subscription task")
+ self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
+ aiocallback=self._msg_callback),
+ loop=self.loop)
+ done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+ timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+ try:
+ if self.aiomain_task_admin in done:
+ exc = self.aiomain_task_admin.exception()
+ self.logger.error("admin subscription task exception: {}".format(exc))
+ self.aiomain_task_admin = None
+ if self.aiomain_task in done:
+ exc = self.aiomain_task.exception()
+ self.logger.error("non-admin subscription task exception: {}".format(exc))
+ self.aiomain_task = None
+ except asyncio.CancelledError:
+ pass
+ except Exception as e:
+ if self.to_terminate:
+ return
+ if kafka_working:
+ # logging only first time
+ self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+ kafka_working = False
+ await asyncio.sleep(10, loop=self.loop)