# For those usages not covered by the Apache License, Version 2.0 please
# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
##
+"""This is a KafkaProducer with a request function to test the plugins."""
-'''
-This is a kafka producer with a common request function to test the plugins.
-'''
+import json
+import logging as log
+import os
-from kafka import KafkaProducer as kaf
-from kafka.errors import KafkaError
-import logging as log
-import json
import jsmin
-import os
-import sys
-from os import listdir
-from jsmin import jsmin
-import os.path as path
+from kafka import KafkaProducer as kaf
+
+from kafka.errors import KafkaError
class KafkaProducer(object):
+ """A KafkaProducer for testing purposes."""
def __init__(self, topic):
-
- self._topic= topic
+ """Initialize a KafkaProducer and it's topic."""
+ self._topic = topic
if "ZOOKEEPER_URI" in os.environ:
broker = os.getenv("ZOOKEEPER_URI")
is already running.
'''
- self.producer = kaf(key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('ascii'),
- bootstrap_servers=broker, api_version=(0,10))
-
-
+ self.producer = kaf(
+ key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('ascii'),
+ bootstrap_servers=broker, api_version=(0, 10))
def publish(self, key, value, topic):
+ """Send messages to the message bus with a defing key and topic."""
try:
- future = self.producer.send(key=key, value=value,topic=topic)
+ future = self.producer.send(topic=topic, key=key, value=value)
self.producer.flush()
except Exception:
log.exception("Error publishing to {} topic." .format(topic))
raise
try:
record_metadata = future.get(timeout=10)
- #self._log.debug("TOPIC:", record_metadata.topic)
- #self._log.debug("PARTITION:", record_metadata.partition)
- #self._log.debug("OFFSET:", record_metadata.offset)
+ log.debug("TOPIC:", record_metadata.topic)
+ log.debug("PARTITION:", record_metadata.partition)
+ log.debug("OFFSET:", record_metadata.offset)
except KafkaError:
pass
- json_path = path.abspath(path.join(os.getcwd(),"../.."))
-
- def request(self, path, key, message, topic):
- #External to MON
+ def request(self, path, key, message, topic):
+ """Test json files are loaded and sent on the message bus."""
+ # External to MON
payload_create_alarm = jsmin(open(os.path.join(path)).read())
self.publish(key=key,
- value = json.loads(payload_create_alarm),
- topic=topic)
-
-
+ value=json.loads(payload_create_alarm),
+ topic=topic)