- async def aioread(self, topic, loop=None):
- if not loop:
- loop = self.loop
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
- await self.consumer.start()
- self.consumer.subscribe([topic])
+ async def aioread(self, topic, loop):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :return: topic, key, message
+ """