Kafka Producer and Consumer
[osm/MON.git] / core / message-bus / northbound_producer.py
1 from kafka import KafkaProducer
2 from kafka.errors import KafkaError
3 import logging
4
5
6 class KafkaProducer(object):
7
8 def __init__(self):
9 if not cfg.CONF.kafka.uri:
10 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
11 else:
12 broker = cfg.CONF.kafka.uri
13 producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
14
15 def publish(self, topic, messages):
16
17 """Takes messages and puts them on the supplied kafka topic. The topic is
18 hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
19
20 """
21 try:
22 future = producer.send('alarms', b'memory_usage')
23 producer.flush()
24 except Exception:
25 log.exception('Error publishing to {} topic.'.format(topic))
26 raise
27 try:
28 record_metadata = future.get(timeout=10)
29 self._log.debug("TOPIC:", record_metadata.topic)
30 self._log.debug("PARTITION:", record_metadata.partition)
31 self._log.debug("OFFSET:", record_metadata.offset)
32 except KafkaError:
33 pass
34 #producer = KafkaProducer(retries=5)
35