Updated the OSM Aodh plugin to align with the design doc
[osm/MON.git] / core / message-bus / northbound_consumer.py
1 from kafka import KafkaConsumer
2 from kafka.errors import KafkaError
3 import logging
4
5 class KafkaConsumer(object):
6 """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
7 'my_group' for now.
8
9 """
10
11 def __init__(self, uri):
12 """Init
13
14 uri - kafka connection details
15 """
16 if not cfg.CONF.kafka.uri:
17 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
18 else:
19 broker = cfg.CONF.kafka.uri
20 consumer = KafkaConsumer('alarms',
21 group_id='my_group',
22 bootstrap_servers=broker, api_version=(0,10))
23 #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
24
25 def consume(self, topic, messages):
26 for message in self._consumer:
27 print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))