Key Value pair in northbound_producer 01/2101/3
authorprithiv <prithiv.mohan@intel.com>
Mon, 28 Aug 2017 13:14:42 +0000 (14:14 +0100)
committerprithiv <prithiv.mohan@intel.com>
Mon, 28 Aug 2017 13:18:01 +0000 (15:18 +0200)
Modified the northbound producer to send key value paired messages.
This would facilitate the inclusion of keys as configure_alarm,
acknowledge_alarm, notify_alarm.

Signed-off-by: prithiv <prithiv.mohan@intel.com>
core/message-bus/northbound_producer.py

index a77b944..4c304a0 100644 (file)
@@ -1,6 +1,7 @@
 from kafka import KafkaProducer
 from kafka.errors import KafkaError
 import logging
+import json
 
 
 class KafkaProducer(object):
@@ -10,16 +11,20 @@ class KafkaProducer(object):
             raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
         else:
             broker = cfg.CONF.kafka.uri
-        producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+        producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10))
 
     def publish(self, topic, messages):
 
-        """Takes messages and puts them on the supplied kafka topic. The topic is
-        hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+        """Takes messages and puts them on the supplied kafka topic
+        Memory Usage is used as a test value for alarm creation for
+        topic 'alarms'. 'Configure_alarm' is the key, resource UUID
+        and type of alarm are the list of values.
 
         """
+
+        payload = {'configure_alarm': ['memory_usage']}
         try:
-            future = producer.send('alarms', b'memory_usage')
+            future = producer.send('alarms', payload)
             producer.flush()
         except Exception:
             log.exception('Error publishing to {} topic.'.format(topic))