1 from kafka
import KafkaProducer
2 from kafka
.errors
import KafkaError
6 class KafkaProducer(object):
9 if not cfg
.CONF
.kafka
.uri
:
10 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
12 broker
= cfg
.CONF
.kafka
.uri
13 producer
= KafkaProducer(bootstrap_servers
=broker
, api_version
=(0,10))
15 def publish(self
, topic
, messages
):
17 """Takes messages and puts them on the supplied kafka topic. The topic is
18 hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
22 future
= producer
.send('alarms', b
'memory_usage')
25 log
.exception('Error publishing to {} topic.'.format(topic
))
28 record_metadata
= future
.get(timeout
=10)
29 self
._log
.debug("TOPIC:", record_metadata
.topic
)
30 self
._log
.debug("PARTITION:", record_metadata
.partition
)
31 self
._log
.debug("OFFSET:", record_metadata
.offset
)
34 #producer = KafkaProducer(retries=5)