Implements aiokafka and modifies code to support asyncio
[osm/POL.git] / osm_policy_module / tests / integration / test_kafka_messages.py
index 581d53f..29d88e2 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import json
 import logging
 import os
 import sys
 import unittest
 
-from kafka import KafkaProducer, KafkaConsumer
+from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
 from kafka.errors import KafkaError
 
 from osm_policy_module.core.config import Config
@@ -40,38 +41,51 @@ log.addHandler(stream_handler)
 
 class KafkaMessagesTest(unittest.TestCase):
     def setUp(self):
-        try:
-            cfg = Config.instance()
-            kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                          cfg.OSMPOL_MESSAGE_PORT)
-            self.producer = KafkaProducer(bootstrap_servers=kafka_server,
-                                          key_serializer=str.encode,
-                                          value_serializer=str.encode)
-            self.consumer = KafkaConsumer(bootstrap_servers=kafka_server,
-                                          key_deserializer=bytes.decode,
-                                          value_deserializer=bytes.decode,
-                                          auto_offset_reset='earliest',
-                                          consumer_timeout_ms=5000)
-            self.consumer.subscribe(['ns'])
-        except KafkaError:
-            self.skipTest('Kafka server not present.')
+        super()
+        cfg = Config.instance()
+        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
+                                           cfg.OSMPOL_MESSAGE_PORT)
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(None)
 
     def tearDown(self):
-        self.producer.close()
-        self.consumer.close()
+        super()
 
     def test_send_instantiated_msg(self):
-        with open(
-                os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
-            payload = json.load(file)
-            self.producer.send('ns', json.dumps(payload), key="instantiated")
-            self.producer.flush()
-
-        for message in self.consumer:
-            if message.key == 'instantiated':
-                self.assertIsNotNone(message.value)
-                return
-        self.fail("No message received in consumer")
+        async def test_send_instantiated_msg():
+            producer = AIOKafkaProducer(loop=self.loop,
+                                        bootstrap_servers=self.kafka_server,
+                                        key_serializer=str.encode,
+                                        value_serializer=str.encode)
+            await producer.start()
+            consumer = AIOKafkaConsumer(
+                "ns",
+                loop=self.loop,
+                bootstrap_servers=self.kafka_server,
+                consumer_timeout_ms=10000,
+                auto_offset_reset='earliest',
+                value_deserializer=bytes.decode,
+                key_deserializer=bytes.decode)
+            await consumer.start()
+            try:
+                with open(
+                        os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+                    payload = json.load(file)
+                    await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+            finally:
+                await producer.stop()
+            try:
+                async for message in consumer:
+                    if message.key == 'instantiated':
+                        self.assertIsNotNone(message.value)
+                        return
+            finally:
+                await consumer.stop()
+
+        try:
+            self.loop.run_until_complete(test_send_instantiated_msg())
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
 
 
 if __name__ == '__main__':