Producer minor alterations
Signed-off-by: prithiv <prithiv.mohan@intel.com>
diff --git a/core/message_bus/producer.py b/core/message_bus/producer.py
index 0239748..4956292 100644
--- a/core/message_bus/producer.py
+++ b/core/message_bus/producer.py
@@ -29,7 +29,7 @@
__date__ = "06/Sep/2017"
-from kafka import KafkaProducer
+from kafka import KafkaProducer as kaf
from kafka.errors import KafkaError
import logging
import json
@@ -38,16 +38,19 @@
from os import listdir
from jsmin import jsmin
+json_path = os.path.join(os.pardir+"/models/")
+
+
class KafkaProducer(object):
def __init__(self, topic):
self._topic= topic
- if "ZOOKEEPER_URI" in os.environ:
- broker = os.getenv("ZOOKEEPER_URI")
- else:
- broker = "localhost:2181"
+ if "ZOOKEEPER_URI" in os.environ:
+ broker = os.getenv("ZOOKEEPER_URI")
+ else:
+ broker = "localhost:2181"
'''
If the zookeeper broker URI is not set in the env, by default,
@@ -55,28 +58,26 @@
is already running.
'''
- producer = KafkaProducer(key_serializer=str.encode,
+ self.producer = kaf(key_serializer=str.encode,
value_serializer=lambda v: json.dumps(v).encode('ascii'),
bootstrap_servers=broker, api_version=(0,10))
- def publish(self, key, message, topic=None):
+ def publish(self, key, value, topic=None):
try:
- future = producer.send('key', 'payload',group_id='osm_mon')
- producer.flush()
+ future = self.producer.send(key, value, topic)
+ self.producer.flush()
except Exception:
- log.exception("Error publishing to {} topic." .format(topic))
+ logging.exception("Error publishing to {} topic." .format(topic))
raise
try:
record_metadata = future.get(timeout=10)
- self._log.debug("TOPIC:", record_metadata.topic)
- self._log.debug("PARTITION:", record_metadata.partition)
- self._log.debug("OFFSET:", record_metadata.offset)
+ logging.debug("TOPIC:", record_metadata.topic)
+ logging.debug("PARTITION:", record_metadata.partition)
+ logging.debug("OFFSET:", record_metadata.offset)
except KafkaError:
pass
- json_path = os.path.join(os.pardir+"/models/")
-
def create_alarm_request(self, key, message, topic):
#External to MON