Initial vROPs plugin work for MON module. Configure alarm & enable notification for...
[osm/MON.git] / core / message-bus / northbound_producer.py
index a77b944..4c304a0 100644 (file)
@@ -1,6 +1,7 @@
 from kafka import KafkaProducer
 from kafka.errors import KafkaError
 import logging
+import json
 
 
 class KafkaProducer(object):
@@ -10,16 +11,20 @@ class KafkaProducer(object):
             raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
         else:
             broker = cfg.CONF.kafka.uri
-        producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+        producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10))
 
     def publish(self, topic, messages):
 
-        """Takes messages and puts them on the supplied kafka topic. The topic is
-        hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+        """Takes messages and puts them on the supplied kafka topic
+        Memory Usage is used as a test value for alarm creation for
+        topic 'alarms'. 'Configure_alarm' is the key, resource UUID
+        and type of alarm are the list of values.
 
         """
+
+        payload = {'configure_alarm': ['memory_usage']}
         try:
-            future = producer.send('alarms', b'memory_usage')
+            future = producer.send('alarms', payload)
             producer.flush()
         except Exception:
             log.exception('Error publishing to {} topic.'.format(topic))