Key Value pair in northbound_producer
[osm/MON.git] / core / message-bus / northbound_producer.py
1 from kafka import KafkaProducer
2 from kafka.errors import KafkaError
3 import logging
4 import json
5
6
7 class KafkaProducer(object):
8
9 def __init__(self):
10 if not cfg.CONF.kafka.uri:
11 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
12 else:
13 broker = cfg.CONF.kafka.uri
14 producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10))
15
16 def publish(self, topic, messages):
17
18 """Takes messages and puts them on the supplied kafka topic
19 Memory Usage is used as a test value for alarm creation for
20 topic 'alarms'. 'Configure_alarm' is the key, resource UUID
21 and type of alarm are the list of values.
22
23 """
24
25 payload = {'configure_alarm': ['memory_usage']}
26 try:
27 future = producer.send('alarms', payload)
28 producer.flush()
29 except Exception:
30 log.exception('Error publishing to {} topic.'.format(topic))
31 raise
32 try:
33 record_metadata = future.get(timeout=10)
34 self._log.debug("TOPIC:", record_metadata.topic)
35 self._log.debug("PARTITION:", record_metadata.partition)
36 self._log.debug("OFFSET:", record_metadata.offset)
37 except KafkaError:
38 pass
39 #producer = KafkaProducer(retries=5)
40