From: prithiv Date: Mon, 28 Aug 2017 13:14:42 +0000 (+0100) Subject: Key Value pair in northbound_producer X-Git-Tag: v4.0.0~99 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=79ac4f64e16eaa7b124015fc27db0f8c0fab5ac9;hp=f358b4fdd006b427e5b653d467c29ae37a47406e;p=osm%2FMON.git Key Value pair in northbound_producer Modified the northbound producer to send key value paired messages. This would facilitate the inclusion of keys as configure_alarm, acknowledge_alarm, notify_alarm. Signed-off-by: prithiv --- diff --git a/core/message-bus/northbound_producer.py b/core/message-bus/northbound_producer.py index a77b944..4c304a0 100644 --- a/core/message-bus/northbound_producer.py +++ b/core/message-bus/northbound_producer.py @@ -1,6 +1,7 @@ from kafka import KafkaProducer from kafka.errors import KafkaError import logging +import json class KafkaProducer(object): @@ -10,16 +11,20 @@ class KafkaProducer(object): raise Exception("Kafka URI Not Found. Check the config file for Kafka URI") else: broker = cfg.CONF.kafka.uri - producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10)) + producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10)) def publish(self, topic, messages): - """Takes messages and puts them on the supplied kafka topic. The topic is - hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now. + """Takes messages and puts them on the supplied kafka topic + Memory Usage is used as a test value for alarm creation for + topic 'alarms'. 'Configure_alarm' is the key, resource UUID + and type of alarm are the list of values. """ + + payload = {'configure_alarm': ['memory_usage']} try: - future = producer.send('alarms', b'memory_usage') + future = producer.send('alarms', payload) producer.flush() except Exception: log.exception('Error publishing to {} topic.'.format(topic))