+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ try:
+ # Produce message
+ await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
+ finally:
+ # Wait for all pending messages to be delivered or expire.
+ await producer.stop()