- async def aioread(self, topic):
- self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker)
- await self.consumer.start()
- self.consumer.subscribe(topic)
+ async def aioread(self, topic, loop=None, callback=None, *args):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :callback: callback function that will handle the message in kafka bus
+ :*args: optional arguments for callback function
+ :return: topic, key, message
+ """
+
+ if not loop:
+ loop = self.loop