Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / test / core / test_producer.py
index fa2881b..5dc3caf 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
 ##
+"""This is a KafkaProducer with a request function to test the plugins."""
 
-'''
-This is a kafka producer with a common request function to test the plugins.
-'''
+import json
 
+import logging as log
 
+import os
 
-from kafka import KafkaProducer as kaf
-from kafka.errors import KafkaError
-import logging as log
-import json
 import jsmin
-import os
-import sys
-from os import listdir
-from jsmin import jsmin
-import os.path as path
 
+from kafka import KafkaProducer as kaf
+
+from kafka.errors import KafkaError
 
 
 class KafkaProducer(object):
+    """A KafkaProducer for testing purposes."""
 
     def __init__(self, topic):
-
-        self._topic= topic
+        """Initialize a KafkaProducer and it's topic."""
+        self._topic = topic
 
         if "ZOOKEEPER_URI" in os.environ:
             broker = os.getenv("ZOOKEEPER_URI")
@@ -56,34 +52,31 @@ class KafkaProducer(object):
             is already running.
             '''
 
-        self.producer = kaf(key_serializer=str.encode,
-                       value_serializer=lambda v: json.dumps(v).encode('ascii'),
-                       bootstrap_servers=broker, api_version=(0,10))
-
-
+        self.producer = kaf(
+            key_serializer=str.encode,
+            value_serializer=lambda v: json.dumps(v).encode('ascii'),
+            bootstrap_servers=broker, api_version=(0, 10))
 
     def publish(self, key, value, topic):
+        """Send messages to the message bus with a defing key and topic."""
         try:
-            future = self.producer.send(key=key, value=value,topic=topic)
+            future = self.producer.send(topic=topic, key=key, value=value)
             self.producer.flush()
         except Exception:
             log.exception("Error publishing to {} topic." .format(topic))
             raise
         try:
             record_metadata = future.get(timeout=10)
-            #self._log.debug("TOPIC:", record_metadata.topic)
-            #self._log.debug("PARTITION:", record_metadata.partition)
-            #self._log.debug("OFFSET:", record_metadata.offset)
+            log.debug("TOPIC:", record_metadata.topic)
+            log.debug("PARTITION:", record_metadata.partition)
+            log.debug("OFFSET:", record_metadata.offset)
         except KafkaError:
             pass
 
-    json_path = path.abspath(path.join(os.getcwd(),"../.."))
-
-    def request(self, path, key, message, topic): 
-       #External to MON
+    def request(self, path, key, message, topic):
+        """Test json files are loaded and sent on the message bus."""
+        # External to MON
         payload_create_alarm = jsmin(open(os.path.join(path)).read())
         self.publish(key=key,
-                    value = json.loads(payload_create_alarm),
-                    topic=topic)
-
-    
+                     value=json.loads(payload_create_alarm),
+                     topic=topic)