def setUp(self):
super()
cfg = Config()
- self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'),
- cfg.get('message', 'port'))
+ self.kafka_server = "{}:{}".format(
+ cfg.get("message", "host"), cfg.get("message", "port")
+ )
self.loop = asyncio.new_event_loop()
def tearDown(self):
def test_send_instantiated_msg(self):
async def test_send_instantiated_msg():
- producer = AIOKafkaProducer(loop=self.loop,
- bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
+ producer = AIOKafkaProducer(
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode,
+ )
await producer.start()
consumer = AIOKafkaConsumer(
"ns",
loop=self.loop,
bootstrap_servers=self.kafka_server,
consumer_timeout_ms=10000,
- auto_offset_reset='earliest',
+ auto_offset_reset="earliest",
value_deserializer=bytes.decode,
- key_deserializer=bytes.decode)
+ key_deserializer=bytes.decode,
+ )
await consumer.start()
try:
with open(
- os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+ os.path.join(
+ os.path.dirname(__file__), "../examples/instantiated.json"
+ )
+ ) as file:
payload = json.load(file)
- await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+ await producer.send_and_wait(
+ "ns", key="instantiated", value=json.dumps(payload)
+ )
finally:
await producer.stop()
try:
async for message in consumer:
- if message.key == 'instantiated':
+ if message.key == "instantiated":
self.assertIsNotNone(message.value)
return
finally:
try:
self.loop.run_until_complete(test_send_instantiated_msg())
except KafkaError:
- self.skipTest('Kafka server not present.')
+ self.skipTest("Kafka server not present.")
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()