From: prithiv Date: Fri, 15 Sep 2017 15:57:08 +0000 (+0100) Subject: Producer minor alterations X-Git-Tag: v4.0.0~80 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=e834a774df8b09fa1705f55b152973cfef9ecd26;p=osm%2FMON.git Producer minor alterations Signed-off-by: prithiv --- diff --git a/core/message_bus/producer.py b/core/message_bus/producer.py index 0239748..4956292 100644 --- a/core/message_bus/producer.py +++ b/core/message_bus/producer.py @@ -29,7 +29,7 @@ __author__ = "Prithiv Mohan" __date__ = "06/Sep/2017" -from kafka import KafkaProducer +from kafka import KafkaProducer as kaf from kafka.errors import KafkaError import logging import json @@ -38,16 +38,19 @@ import os from os import listdir from jsmin import jsmin +json_path = os.path.join(os.pardir+"/models/") + + class KafkaProducer(object): def __init__(self, topic): self._topic= topic - if "ZOOKEEPER_URI" in os.environ: - broker = os.getenv("ZOOKEEPER_URI") - else: - broker = "localhost:2181" + if "ZOOKEEPER_URI" in os.environ: + broker = os.getenv("ZOOKEEPER_URI") + else: + broker = "localhost:2181" ''' If the zookeeper broker URI is not set in the env, by default, @@ -55,28 +58,26 @@ class KafkaProducer(object): is already running. ''' - producer = KafkaProducer(key_serializer=str.encode, + self.producer = kaf(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers=broker, api_version=(0,10)) - def publish(self, key, message, topic=None): + def publish(self, key, value, topic=None): try: - future = producer.send('key', 'payload',group_id='osm_mon') - producer.flush() + future = self.producer.send(key, value, topic) + self.producer.flush() except Exception: - log.exception("Error publishing to {} topic." .format(topic)) + logging.exception("Error publishing to {} topic." .format(topic)) raise try: record_metadata = future.get(timeout=10) - self._log.debug("TOPIC:", record_metadata.topic) - self._log.debug("PARTITION:", record_metadata.partition) - self._log.debug("OFFSET:", record_metadata.offset) + logging.debug("TOPIC:", record_metadata.topic) + logging.debug("PARTITION:", record_metadata.partition) + logging.debug("OFFSET:", record_metadata.offset) except KafkaError: pass - json_path = os.path.join(os.pardir+"/models/") - def create_alarm_request(self, key, message, topic): #External to MON