- "alarm_request",
- loop=self.loop,
- bootstrap_servers=self.kafka_server,
- group_id="mon-server",
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- )
- await consumer.start()
- try:
- async for message in consumer:
- log.info("Message arrived: %s", message)
- await self.consume_message(message)
- finally:
- await consumer.stop()