from kafka import KafkaProducer
from kafka.errors import KafkaError
import logging
+import json
class KafkaProducer(object):
raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
broker = cfg.CONF.kafka.uri
- producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+ producer = KafkaProducer(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers='localhost:9092', api_version=(0,10))
def publish(self, topic, messages):
- """Takes messages and puts them on the supplied kafka topic. The topic is
- hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+ """Takes messages and puts them on the supplied kafka topic
+ Memory Usage is used as a test value for alarm creation for
+ topic 'alarms'. 'Configure_alarm' is the key, resource UUID
+ and type of alarm are the list of values.
+ payload = {'configure_alarm': ['memory_usage']}
- future = producer.send('alarms', b'memory_usage')
+ future = producer.send('alarms', payload)
except Exception:
log.exception('Error publishing to {} topic.'.format(topic))