+ self.nslcm = None
+ self.vnflcm = None
+
+ async def start_kafka(self):
+ # timeout_wait_for_kafka = 3*60
+ kafka_working = True
+ while not self.to_terminate:
+ try:
+ # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
+ # created.
+ # Before subscribe, send dummy messages
+ await self.msg.aiowrite(
+ "admin",
+ "echo",
+ "dummy message",
+ )
+ await self.msg.aiowrite("ns", "echo", "dummy message")
+ await self.msg.aiowrite("nsi", "echo", "dummy message")
+ await self.msg.aiowrite("vnf", "echo", "dummy message")
+ if not kafka_working:
+ self.logger.critical("kafka is working again")
+ kafka_working = True
+ if not self.aiomain_task_admin:
+ await asyncio.sleep(10)
+ self.logger.debug("Starting admin subscription task")
+ self.aiomain_task_admin = asyncio.ensure_future(
+ self.msg.aioread(
+ ("admin",),
+ group_id=False,
+ aiocallback=self._msg_callback,
+ ),
+ )
+ if not self.aiomain_task:
+ await asyncio.sleep(10)
+ self.logger.debug("Starting non-admin subscription task")
+ self.aiomain_task = asyncio.ensure_future(
+ self.msg.aioread(
+ ("ns", "nsi", "vnf"),
+ aiocallback=self._msg_callback,
+ ),
+ )
+ done, _ = await asyncio.wait(
+ [self.aiomain_task, self.aiomain_task_admin],
+ timeout=None,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ try:
+ if self.aiomain_task_admin in done:
+ exc = self.aiomain_task_admin.exception()
+ self.logger.error(
+ "admin subscription task exception: {}".format(exc)
+ )
+ self.aiomain_task_admin = None
+ if self.aiomain_task in done:
+ exc = self.aiomain_task.exception()
+ self.logger.error(
+ "non-admin subscription task exception: {}".format(exc)
+ )
+ self.aiomain_task = None
+ except asyncio.CancelledError:
+ pass
+ except Exception as e:
+ if self.to_terminate:
+ return
+ if kafka_working:
+ # logging only first time
+ self.logger.critical(
+ "Error accessing kafka '{}'. Retrying ...".format(e)
+ )
+ kafka_working = False
+ await asyncio.sleep(10)