Fixes bugs for integration with MON module
[osm/MON.git] / policy_module / osm_policy_module / tests / integration / test_scaling_config_kafka_msg.py
index a444265..486afc8 100644 (file)
@@ -3,32 +3,37 @@ import logging
 import os
 import unittest
 
-from kafka import KafkaProducer
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import KafkaError
 
 log = logging.getLogger(__name__)
 
 
-# logging.basicConfig(stream=sys.stdout,
-#                     format='%(asctime)s %(message)s',
-#                     datefmt='%m/%d/%Y %I:%M:%S %p',
-#                     level=logging.DEBUG)
-
 class ScalingConfigTest(unittest.TestCase):
+    def setUp(self):
+        try:
+            kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+                                          os.getenv("KAFKA_SERVER_PORT", "9092"))
+            self.producer = KafkaProducer(bootstrap_servers=kafka_server,
+                                          key_serializer=str.encode,
+                                          value_serializer=str.encode)
+            self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                          group_id='osm_mon')
+            self.consumer.subscribe(['lcm_pm'])
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
+
     def test_send_scaling_config_msg(self):
         try:
             with open(
                     os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
                 payload = json.load(file)
-                kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
-                                              os.getenv("KAFKA_SERVER_PORT", "9092"))
-                producer = KafkaProducer(bootstrap_servers=kafka_server,
-                                         key_serializer=str.encode,
-                                         value_serializer=str.encode)
-                future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+                future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
                 result = future.get(timeout=60)
                 log.info('Result: %s', result)
 
-                producer.flush()
+                self.producer.flush()
+                # TODO: Improve assertions
                 self.assertIsNotNone(result)
         except Exception as e:
             self.fail(e)