MON Packaging Changes
[osm/MON.git] / osm-mon / test / core / test_producer.py
diff --git a/osm-mon/test/core/test_producer.py b/osm-mon/test/core/test_producer.py
new file mode 100644 (file)
index 0000000..5dc3caf
--- /dev/null
@@ -0,0 +1,82 @@
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
+##
+"""This is a KafkaProducer with a request function to test the plugins."""
+
+import json
+
+import logging as log
+
+import os
+
+import jsmin
+
+from kafka import KafkaProducer as kaf
+
+from kafka.errors import KafkaError
+
+
+class KafkaProducer(object):
+    """A KafkaProducer for testing purposes."""
+
+    def __init__(self, topic):
+        """Initialize a KafkaProducer and it's topic."""
+        self._topic = topic
+
+        if "ZOOKEEPER_URI" in os.environ:
+            broker = os.getenv("ZOOKEEPER_URI")
+        else:
+            broker = "localhost:9092"
+
+            '''
+            If the zookeeper broker URI is not set in the env, by default,
+            localhost container is taken as the host because an instance of
+            is already running.
+            '''
+
+        self.producer = kaf(
+            key_serializer=str.encode,
+            value_serializer=lambda v: json.dumps(v).encode('ascii'),
+            bootstrap_servers=broker, api_version=(0, 10))
+
+    def publish(self, key, value, topic):
+        """Send messages to the message bus with a defing key and topic."""
+        try:
+            future = self.producer.send(topic=topic, key=key, value=value)
+            self.producer.flush()
+        except Exception:
+            log.exception("Error publishing to {} topic." .format(topic))
+            raise
+        try:
+            record_metadata = future.get(timeout=10)
+            log.debug("TOPIC:", record_metadata.topic)
+            log.debug("PARTITION:", record_metadata.partition)
+            log.debug("OFFSET:", record_metadata.offset)
+        except KafkaError:
+            pass
+
+    def request(self, path, key, message, topic):
+        """Test json files are loaded and sent on the message bus."""
+        # External to MON
+        payload_create_alarm = jsmin(open(os.path.join(path)).read())
+        self.publish(key=key,
+                     value=json.loads(payload_create_alarm),
+                     topic=topic)