1 from kafka
import KafkaConsumer
2 from kafka
.errors
import KafkaError
5 class KafkaConsumer(object):
6 """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
11 def __init__(self
, uri
):
14 uri - kafka connection details
16 if not cfg
.CONF
.kafka
.uri
:
17 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
19 broker
= cfg
.CONF
.kafka
.uri
20 consumer
= KafkaConsumer('alarms',
22 bootstrap_servers
=broker
, api_version
=(0,10))
23 #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
25 def consume(self
, topic
, messages
):
26 for message
in self
._consumer
:
27 print ("%s:%d:%d: key=%s value=%s" % (message
.topic
, message
.partition
, message
.offset
, message
.key
, message
.value
))