Kafka Producer and Consumer 77/2077/1
authorprithiv <prithiv.mohan@intel.com>
Wed, 23 Aug 2017 09:55:21 +0000 (10:55 +0100)
committerprithiv <prithiv.mohan@intel.com>
Wed, 23 Aug 2017 09:55:21 +0000 (10:55 +0100)
Initial Producer and Consumer Application. Config details and Consumer messages are yet to be modified.

Change-Id: I28091573c4b67bd559dc90611f1e5af7420a1df6

Signed-off-by: prithiv <prithiv.mohan@intel.com>
core/message-bus/northbound_consumer.py [new file with mode: 0644]
core/message-bus/northbound_producer.py [new file with mode: 0644]
core/models/.gitkeep [new file with mode: 0644]
devops-stages/.gitkeep [new file with mode: 0644]
plugins/CloudWatch/.gitkeep [new file with mode: 0644]
plugins/OpenStack/Ceilometer/.gitkeep [new file with mode: 0644]
plugins/vRealiseOps/.gitkeep [new file with mode: 0644]
test/.gitkeep [new file with mode: 0644]

diff --git a/core/message-bus/northbound_consumer.py b/core/message-bus/northbound_consumer.py
new file mode 100644 (file)
index 0000000..be1c9ea
--- /dev/null
@@ -0,0 +1,27 @@
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import logging
+
+class KafkaConsumer(object):
+    """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
+    'my_group' for now.
+
+    """
+
+    def __init__(self, uri):
+        """Init
+
+             uri - kafka connection details
+        """
+        if not cfg.CONF.kafka.uri:
+            raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+        else:
+            broker = cfg.CONF.kafka.uri
+        consumer = KafkaConsumer('alarms',
+            group_id='my_group',
+            bootstrap_servers=broker, api_version=(0,10))
+        #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+    def consume(self, topic, messages):
+        for message in self._consumer:
+            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
diff --git a/core/message-bus/northbound_producer.py b/core/message-bus/northbound_producer.py
new file mode 100644 (file)
index 0000000..a77b944
--- /dev/null
@@ -0,0 +1,35 @@
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
+import logging
+
+
+class KafkaProducer(object):
+
+    def __init__(self):
+        if not cfg.CONF.kafka.uri:
+            raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+        else:
+            broker = cfg.CONF.kafka.uri
+        producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+
+    def publish(self, topic, messages):
+
+        """Takes messages and puts them on the supplied kafka topic. The topic is
+        hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+
+        """
+        try:
+            future = producer.send('alarms', b'memory_usage')
+            producer.flush()
+        except Exception:
+            log.exception('Error publishing to {} topic.'.format(topic))
+            raise
+        try:
+            record_metadata = future.get(timeout=10)
+            self._log.debug("TOPIC:", record_metadata.topic)
+            self._log.debug("PARTITION:", record_metadata.partition)
+            self._log.debug("OFFSET:", record_metadata.offset)
+        except KafkaError:
+            pass
+    #producer = KafkaProducer(retries=5)
+
diff --git a/core/models/.gitkeep b/core/models/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/devops-stages/.gitkeep b/devops-stages/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/plugins/CloudWatch/.gitkeep b/plugins/CloudWatch/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/plugins/OpenStack/Ceilometer/.gitkeep b/plugins/OpenStack/Ceilometer/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/plugins/vRealiseOps/.gitkeep b/plugins/vRealiseOps/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/test/.gitkeep b/test/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.