Initial Producer and Consumer Application. Config details and Consumer messages are yet to be modified.
Change-Id: I28091573c4b67bd559dc90611f1e5af7420a1df6
Signed-off-by: prithiv <prithiv.mohan@intel.com>
--- /dev/null
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import logging
+
+class KafkaConsumer(object):
+ """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
+ 'my_group' for now.
+
+ """
+
+ def __init__(self, uri):
+ """Init
+
+ uri - kafka connection details
+ """
+ if not cfg.CONF.kafka.uri:
+ raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+ else:
+ broker = cfg.CONF.kafka.uri
+ consumer = KafkaConsumer('alarms',
+ group_id='my_group',
+ bootstrap_servers=broker, api_version=(0,10))
+ #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+ def consume(self, topic, messages):
+ for message in self._consumer:
+ print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
--- /dev/null
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
+import logging
+
+
+class KafkaProducer(object):
+
+ def __init__(self):
+ if not cfg.CONF.kafka.uri:
+ raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+ else:
+ broker = cfg.CONF.kafka.uri
+ producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+
+ def publish(self, topic, messages):
+
+ """Takes messages and puts them on the supplied kafka topic. The topic is
+ hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+
+ """
+ try:
+ future = producer.send('alarms', b'memory_usage')
+ producer.flush()
+ except Exception:
+ log.exception('Error publishing to {} topic.'.format(topic))
+ raise
+ try:
+ record_metadata = future.get(timeout=10)
+ self._log.debug("TOPIC:", record_metadata.topic)
+ self._log.debug("PARTITION:", record_metadata.partition)
+ self._log.debug("OFFSET:", record_metadata.offset)
+ except KafkaError:
+ pass
+ #producer = KafkaProducer(retries=5)
+
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.
--- /dev/null
+#gitkeep file to keep the initial empty directory structure.