X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=core%2Fmessage_bus%2Fproducer.py;h=49562921231e0c64e428bba2223b0f47e2868607;hb=ba691c177e68226a3ec74ba6c5ccf0d7e544a6bb;hp=02397487f2cb6ae4cd0e9e05a10ee5ab0a5eeb1a;hpb=b7e9ec0fe0dd55677c3a61e3221b6f9af1b303d2;p=osm%2FMON.git diff --git a/core/message_bus/producer.py b/core/message_bus/producer.py index 0239748..4956292 100644 --- a/core/message_bus/producer.py +++ b/core/message_bus/producer.py @@ -29,7 +29,7 @@ __author__ = "Prithiv Mohan" __date__ = "06/Sep/2017" -from kafka import KafkaProducer +from kafka import KafkaProducer as kaf from kafka.errors import KafkaError import logging import json @@ -38,16 +38,19 @@ import os from os import listdir from jsmin import jsmin +json_path = os.path.join(os.pardir+"/models/") + + class KafkaProducer(object): def __init__(self, topic): self._topic= topic - if "ZOOKEEPER_URI" in os.environ: - broker = os.getenv("ZOOKEEPER_URI") - else: - broker = "localhost:2181" + if "ZOOKEEPER_URI" in os.environ: + broker = os.getenv("ZOOKEEPER_URI") + else: + broker = "localhost:2181" ''' If the zookeeper broker URI is not set in the env, by default, @@ -55,28 +58,26 @@ class KafkaProducer(object): is already running. ''' - producer = KafkaProducer(key_serializer=str.encode, + self.producer = kaf(key_serializer=str.encode, value_serializer=lambda v: json.dumps(v).encode('ascii'), bootstrap_servers=broker, api_version=(0,10)) - def publish(self, key, message, topic=None): + def publish(self, key, value, topic=None): try: - future = producer.send('key', 'payload',group_id='osm_mon') - producer.flush() + future = self.producer.send(key, value, topic) + self.producer.flush() except Exception: - log.exception("Error publishing to {} topic." .format(topic)) + logging.exception("Error publishing to {} topic." .format(topic)) raise try: record_metadata = future.get(timeout=10) - self._log.debug("TOPIC:", record_metadata.topic) - self._log.debug("PARTITION:", record_metadata.partition) - self._log.debug("OFFSET:", record_metadata.offset) + logging.debug("TOPIC:", record_metadata.topic) + logging.debug("PARTITION:", record_metadata.partition) + logging.debug("OFFSET:", record_metadata.offset) except KafkaError: pass - json_path = os.path.join(os.pardir+"/models/") - def create_alarm_request(self, key, message, topic): #External to MON