- with open(
- os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
- payload = json.load(file)
- self.producer.send('ns', json.dumps(payload), key="instantiated")
- self.producer.flush()
+ async def test_send_instantiated_msg():
+ producer = AIOKafkaProducer(
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode,
+ )
+ await producer.start()
+ consumer = AIOKafkaConsumer(
+ "ns",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ consumer_timeout_ms=10000,
+ auto_offset_reset="earliest",
+ value_deserializer=bytes.decode,
+ key_deserializer=bytes.decode,
+ )
+ await consumer.start()
+ try:
+ with open(
+ os.path.join(
+ os.path.dirname(__file__), "../examples/instantiated.json"
+ )
+ ) as file:
+ payload = json.load(file)
+ await producer.send_and_wait(
+ "ns", key="instantiated", value=json.dumps(payload)
+ )
+ finally:
+ await producer.stop()
+ try:
+ async for message in consumer:
+ if message.key == "instantiated":
+ self.assertIsNotNone(message.value)
+ return
+ finally:
+ await consumer.stop()