- for message in consumer:
- t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
- t.start()
+ async def start(self):
+ consumer = AIOKafkaConsumer(
+ "ns",
+ "alarm_response",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ group_id="pol-consumer",
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ )
+ await consumer.start()
+ try:
+ async for msg in consumer:
+ await self._process_msg(msg.topic, msg.key, msg.value)
+ finally:
+ await consumer.stop()