- def _publish_response(self, topic: str, key: str, msg: dict):
- topic = topic.replace('request', 'response')
- key = key.replace('request', 'response')
- producer = Producer()
- producer.send(topic=topic, key=key, value=json.dumps(msg))
- producer.flush(timeout=5)
- producer.close()
+ async def _publish_response(self, topic: str, key: str, msg: dict):
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
+ try:
+ await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
+ finally:
+ await producer.stop()