1 from kafka
import KafkaProducer
2 from kafka
.errors
import KafkaError
7 class KafkaProducer(object):
10 if not cfg
.CONF
.kafka
.uri
:
11 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
13 broker
= cfg
.CONF
.kafka
.uri
14 producer
= KafkaProducer(key_serializer
=str.encode
, value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'), bootstrap_servers
='localhost:9092', api_version
=(0,10))
16 def publish(self
, topic
, messages
):
18 """Takes messages and puts them on the supplied kafka topic
19 Memory Usage is used as a test value for alarm creation for
20 topic 'alarms'. 'Configure_alarm' is the key, resource UUID
21 and type of alarm are the list of values.
25 payload
= {'configure_alarm': ['memory_usage']}
27 future
= producer
.send('alarms', payload
)
30 log
.exception('Error publishing to {} topic.'.format(topic
))
33 record_metadata
= future
.get(timeout
=10)
34 self
._log
.debug("TOPIC:", record_metadata
.topic
)
35 self
._log
.debug("PARTITION:", record_metadata
.partition
)
36 self
._log
.debug("OFFSET:", record_metadata
.offset
)
39 #producer = KafkaProducer(retries=5)