async def start(self):
topics = ["users", "project"]
- await self.msg_bus.aioread(topics, self._user_msg)
+ try:
+ await self.msg_bus.aioread(topics, self._user_msg)
+ except Exception as e:
+ # Failed to subscribe to kafka topics
+ log.error("Error when subscribing to topics %s", str(topics))
+ log.exception("Exception %s", str(e))
async def _user_msg(self, topic, key, values):
log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key)