Kafka Producer and Consumer

Initial Producer and Consumer Application. Config details and Consumer messages are yet to be modified.

Change-Id: I28091573c4b67bd559dc90611f1e5af7420a1df6

Signed-off-by: prithiv <prithiv.mohan@intel.com>
diff --git a/core/message-bus/northbound_consumer.py b/core/message-bus/northbound_consumer.py
new file mode 100644
index 0000000..be1c9ea
--- /dev/null
+++ b/core/message-bus/northbound_consumer.py
@@ -0,0 +1,27 @@
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import logging
+
+class KafkaConsumer(object):
+    """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
+    'my_group' for now.
+
+    """
+
+    def __init__(self, uri):
+        """Init
+
+             uri - kafka connection details
+        """
+        if not cfg.CONF.kafka.uri:
+            raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+        else:
+            broker = cfg.CONF.kafka.uri
+        consumer = KafkaConsumer('alarms',
+            group_id='my_group',
+            bootstrap_servers=broker, api_version=(0,10))
+        #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+    def consume(self, topic, messages):
+        for message in self._consumer:
+            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
diff --git a/core/message-bus/northbound_producer.py b/core/message-bus/northbound_producer.py
new file mode 100644
index 0000000..a77b944
--- /dev/null
+++ b/core/message-bus/northbound_producer.py
@@ -0,0 +1,35 @@
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
+import logging
+
+
+class KafkaProducer(object):
+
+    def __init__(self):
+        if not cfg.CONF.kafka.uri:
+            raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
+        else:
+            broker = cfg.CONF.kafka.uri
+        producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10))
+
+    def publish(self, topic, messages):
+
+        """Takes messages and puts them on the supplied kafka topic. The topic is
+        hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now.
+
+        """
+        try:
+            future = producer.send('alarms', b'memory_usage')
+            producer.flush()
+        except Exception:
+            log.exception('Error publishing to {} topic.'.format(topic))
+            raise
+        try:
+            record_metadata = future.get(timeout=10)
+            self._log.debug("TOPIC:", record_metadata.topic)
+            self._log.debug("PARTITION:", record_metadata.partition)
+            self._log.debug("OFFSET:", record_metadata.offset)
+        except KafkaError:
+            pass
+    #producer = KafkaProducer(retries=5)
+
diff --git a/core/models/.gitkeep b/core/models/.gitkeep
new file mode 100644
index 0000000..2272ebb
--- /dev/null
+++ b/core/models/.gitkeep
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.