X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=inline;f=osm-mon%2Ftest%2Fcore%2Ftest_producer.py;fp=osm-mon%2Ftest%2Fcore%2Ftest_producer.py;h=5dc3caf87daf67c2b4c1edf4f775b75adbc75fc6;hb=71ce7eca516321aff84332df56702e718968735b;hp=0000000000000000000000000000000000000000;hpb=7d5e61f406eab8d151125001a09f742dbadc3d70;p=osm%2FMON.git diff --git a/osm-mon/test/core/test_producer.py b/osm-mon/test/core/test_producer.py new file mode 100644 index 0000000..5dc3caf --- /dev/null +++ b/osm-mon/test/core/test_producer.py @@ -0,0 +1,82 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com +## +"""This is a KafkaProducer with a request function to test the plugins.""" + +import json + +import logging as log + +import os + +import jsmin + +from kafka import KafkaProducer as kaf + +from kafka.errors import KafkaError + + +class KafkaProducer(object): + """A KafkaProducer for testing purposes.""" + + def __init__(self, topic): + """Initialize a KafkaProducer and it's topic.""" + self._topic = topic + + if "ZOOKEEPER_URI" in os.environ: + broker = os.getenv("ZOOKEEPER_URI") + else: + broker = "localhost:9092" + + ''' + If the zookeeper broker URI is not set in the env, by default, + localhost container is taken as the host because an instance of + is already running. + ''' + + self.producer = kaf( + key_serializer=str.encode, + value_serializer=lambda v: json.dumps(v).encode('ascii'), + bootstrap_servers=broker, api_version=(0, 10)) + + def publish(self, key, value, topic): + """Send messages to the message bus with a defing key and topic.""" + try: + future = self.producer.send(topic=topic, key=key, value=value) + self.producer.flush() + except Exception: + log.exception("Error publishing to {} topic." .format(topic)) + raise + try: + record_metadata = future.get(timeout=10) + log.debug("TOPIC:", record_metadata.topic) + log.debug("PARTITION:", record_metadata.partition) + log.debug("OFFSET:", record_metadata.offset) + except KafkaError: + pass + + def request(self, path, key, message, topic): + """Test json files are loaded and sent on the message bus.""" + # External to MON + payload_create_alarm = jsmin(open(os.path.join(path)).read()) + self.publish(key=key, + value=json.loads(payload_create_alarm), + topic=topic)