X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Ftests%2Fintegration%2Ftest_kafka_messages.py;h=29d88e2db36989d3c5e9b7603714c6da6f75f888;hb=16256cbbf6cdfde8debc3254bf55ce0b8fa51b08;hp=581d53f9a07ae8cfbb7ee530a9ea23b2005c7eb5;hpb=bd499e01b67f2eb3cd9263f5d5d819fb118c5756;p=osm%2FPOL.git diff --git a/osm_policy_module/tests/integration/test_kafka_messages.py b/osm_policy_module/tests/integration/test_kafka_messages.py index 581d53f..29d88e2 100644 --- a/osm_policy_module/tests/integration/test_kafka_messages.py +++ b/osm_policy_module/tests/integration/test_kafka_messages.py @@ -21,13 +21,14 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import asyncio import json import logging import os import sys import unittest -from kafka import KafkaProducer, KafkaConsumer +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from kafka.errors import KafkaError from osm_policy_module.core.config import Config @@ -40,38 +41,51 @@ log.addHandler(stream_handler) class KafkaMessagesTest(unittest.TestCase): def setUp(self): - try: - cfg = Config.instance() - kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) - self.producer = KafkaProducer(bootstrap_servers=kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - self.consumer = KafkaConsumer(bootstrap_servers=kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - auto_offset_reset='earliest', - consumer_timeout_ms=5000) - self.consumer.subscribe(['ns']) - except KafkaError: - self.skipTest('Kafka server not present.') + super() + cfg = Config.instance() + self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, + cfg.OSMPOL_MESSAGE_PORT) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) def tearDown(self): - self.producer.close() - self.consumer.close() + super() def test_send_instantiated_msg(self): - with open( - os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file: - payload = json.load(file) - self.producer.send('ns', json.dumps(payload), key="instantiated") - self.producer.flush() - - for message in self.consumer: - if message.key == 'instantiated': - self.assertIsNotNone(message.value) - return - self.fail("No message received in consumer") + async def test_send_instantiated_msg(): + producer = AIOKafkaProducer(loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + await producer.start() + consumer = AIOKafkaConsumer( + "ns", + loop=self.loop, + bootstrap_servers=self.kafka_server, + consumer_timeout_ms=10000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode, + key_deserializer=bytes.decode) + await consumer.start() + try: + with open( + os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file: + payload = json.load(file) + await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload)) + finally: + await producer.stop() + try: + async for message in consumer: + if message.key == 'instantiated': + self.assertIsNotNone(message.value) + return + finally: + await consumer.stop() + + try: + self.loop.run_until_complete(test_send_instantiated_msg()) + except KafkaError: + self.skipTest('Kafka server not present.') if __name__ == '__main__':