Producer minor alterations 82/2182/4
authorprithiv <prithiv.mohan@intel.com>
Fri, 15 Sep 2017 15:57:08 +0000 (16:57 +0100)
committerprithiv <prithiv.mohan@intel.com>
Tue, 19 Sep 2017 11:02:10 +0000 (13:02 +0200)
Signed-off-by: prithiv <prithiv.mohan@intel.com>
core/message_bus/producer.py

index 0239748..4956292 100644 (file)
@@ -29,7 +29,7 @@ __author__ = "Prithiv Mohan"
 __date__   = "06/Sep/2017"
 
 
-from kafka import KafkaProducer
+from kafka import KafkaProducer as kaf
 from kafka.errors import KafkaError
 import logging
 import json
@@ -38,16 +38,19 @@ import os
 from os import listdir
 from jsmin import jsmin
 
+json_path = os.path.join(os.pardir+"/models/")
+
+
 class KafkaProducer(object):
 
     def __init__(self, topic):
 
         self._topic= topic
 
-    if "ZOOKEEPER_URI" in os.environ:
-        broker = os.getenv("ZOOKEEPER_URI")
-    else:
-        broker = "localhost:2181"
+        if "ZOOKEEPER_URI" in os.environ:
+            broker = os.getenv("ZOOKEEPER_URI")
+        else:
+            broker = "localhost:2181"
 
         '''
         If the zookeeper broker URI is not set in the env, by default,
@@ -55,28 +58,26 @@ class KafkaProducer(object):
         is already running.
         '''
 
-        producer = KafkaProducer(key_serializer=str.encode,
+        self.producer = kaf(key_serializer=str.encode,
                    value_serializer=lambda v: json.dumps(v).encode('ascii'),
                    bootstrap_servers=broker, api_version=(0,10))
 
 
-    def publish(self, key, message, topic=None):
+    def publish(self, key, value, topic=None):
         try:
-            future = producer.send('key', 'payload',group_id='osm_mon')
-            producer.flush()
+            future = self.producer.send(key, value, topic)
+            self.producer.flush()
         except Exception:
-            log.exception("Error publishing to {} topic." .format(topic))
+            logging.exception("Error publishing to {} topic." .format(topic))
             raise
         try:
             record_metadata = future.get(timeout=10)
-            self._log.debug("TOPIC:", record_metadata.topic)
-            self._log.debug("PARTITION:", record_metadata.partition)
-            self._log.debug("OFFSET:", record_metadata.offset)
+            logging.debug("TOPIC:", record_metadata.topic)
+            logging.debug("PARTITION:", record_metadata.partition)
+            logging.debug("OFFSET:", record_metadata.offset)
         except KafkaError:
             pass
 
-    json_path = os.path.join(os.pardir+"/models/")
-
     def create_alarm_request(self, key, message, topic):
 
        #External to MON