From 79ac4f64e16eaa7b124015fc27db0f8c0fab5ac9 Mon Sep 17 00:00:00 2001 From: prithiv Date: Mon, 28 Aug 2017 14:14:42 +0100 Subject: [PATCH] Key Value pair in northbound_producer Modified the northbound producer to send key value paired messages. This would facilitate the inclusion of keys as configure_alarm, acknowledge_alarm, notify_alarm. Signed-off-by: prithiv --- core/message-bus/northbound_producer.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/message-bus/northbound_producer.py b/core/message-bus/northbound_producer.py index a77b944..4c304a0 100644 --- a/core/message-bus/northbound_producer.py +++ b/core/message-bus/northbound_producer.py @@ -1,6 +1,7 @@ from kafka import KafkaProducer from kafka.errors import KafkaError import logging +import json class KafkaProducer(object): @@ -10,16 +11,20 @@ class KafkaProducer(object): raise Exception("Kafka URI Not Found. Check the config file for Kafka URI") else: broker = cfg.CONF.kafka.uri - producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10)) + producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10)) def publish(self, topic, messages): - """Takes messages and puts them on the supplied kafka topic. The topic is - hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now. + """Takes messages and puts them on the supplied kafka topic + Memory Usage is used as a test value for alarm creation for + topic 'alarms'. 'Configure_alarm' is the key, resource UUID + and type of alarm are the list of values. """ + + payload = {'configure_alarm': ['memory_usage']} try: - future = producer.send('alarms', b'memory_usage') + future = producer.send('alarms', payload) producer.flush() except Exception: log.exception('Error publishing to {} topic.'.format(topic)) -- 2.25.1