Refactors codebase
[osm/MON.git] / osm_mon / test / core / test_producer.py
diff --git a/osm_mon/test/core/test_producer.py b/osm_mon/test/core/test_producer.py
deleted file mode 100644 (file)
index 5dc3caf..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
-##
-"""This is a KafkaProducer with a request function to test the plugins."""
-
-import json
-
-import logging as log
-
-import os
-
-import jsmin
-
-from kafka import KafkaProducer as kaf
-
-from kafka.errors import KafkaError
-
-
-class KafkaProducer(object):
-    """A KafkaProducer for testing purposes."""
-
-    def __init__(self, topic):
-        """Initialize a KafkaProducer and it's topic."""
-        self._topic = topic
-
-        if "ZOOKEEPER_URI" in os.environ:
-            broker = os.getenv("ZOOKEEPER_URI")
-        else:
-            broker = "localhost:9092"
-
-            '''
-            If the zookeeper broker URI is not set in the env, by default,
-            localhost container is taken as the host because an instance of
-            is already running.
-            '''
-
-        self.producer = kaf(
-            key_serializer=str.encode,
-            value_serializer=lambda v: json.dumps(v).encode('ascii'),
-            bootstrap_servers=broker, api_version=(0, 10))
-
-    def publish(self, key, value, topic):
-        """Send messages to the message bus with a defing key and topic."""
-        try:
-            future = self.producer.send(topic=topic, key=key, value=value)
-            self.producer.flush()
-        except Exception:
-            log.exception("Error publishing to {} topic." .format(topic))
-            raise
-        try:
-            record_metadata = future.get(timeout=10)
-            log.debug("TOPIC:", record_metadata.topic)
-            log.debug("PARTITION:", record_metadata.partition)
-            log.debug("OFFSET:", record_metadata.offset)
-        except KafkaError:
-            pass
-
-    def request(self, path, key, message, topic):
-        """Test json files are loaded and sent on the message bus."""
-        # External to MON
-        payload_create_alarm = jsmin(open(os.path.join(path)).read())
-        self.publish(key=key,
-                     value=json.loads(payload_create_alarm),
-                     topic=topic)