- with open(
- os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
- payload = json.load(file)
- self.producer.send('ns', json.dumps(payload), key="instantiated")
- self.producer.flush()
-
- for message in self.consumer:
- if message.key == 'instantiated':
- self.assertIsNotNone(message.value)
- return
- self.fail("No message received in consumer")
+ async def test_send_instantiated_msg():
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ consumer = AIOKafkaConsumer(
+ "ns",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ consumer_timeout_ms=10000,
+ auto_offset_reset='earliest',
+ value_deserializer=bytes.decode,
+ key_deserializer=bytes.decode)
+ await consumer.start()
+ try:
+ with open(
+ os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+ payload = json.load(file)
+ await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+ finally:
+ await producer.stop()
+ try:
+ async for message in consumer:
+ if message.key == 'instantiated':
+ self.assertIsNotNone(message.value)
+ return
+ finally:
+ await consumer.stop()
+
+ try:
+ self.loop.run_until_complete(test_send_instantiated_msg())
+ except KafkaError:
+ self.skipTest('Kafka server not present.')