5dc3caf87daf67c2b4c1edf4f775b75adbc75fc6
[osm/MON.git] / osm_mon / test / core / test_producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22 """This is a KafkaProducer with a request function to test the plugins."""
23
24 import json
25
26 import logging as log
27
28 import os
29
30 import jsmin
31
32 from kafka import KafkaProducer as kaf
33
34 from kafka.errors import KafkaError
35
36
37 class KafkaProducer(object):
38 """A KafkaProducer for testing purposes."""
39
40 def __init__(self, topic):
41 """Initialize a KafkaProducer and it's topic."""
42 self._topic = topic
43
44 if "ZOOKEEPER_URI" in os.environ:
45 broker = os.getenv("ZOOKEEPER_URI")
46 else:
47 broker = "localhost:9092"
48
49 '''
50 If the zookeeper broker URI is not set in the env, by default,
51 localhost container is taken as the host because an instance of
52 is already running.
53 '''
54
55 self.producer = kaf(
56 key_serializer=str.encode,
57 value_serializer=lambda v: json.dumps(v).encode('ascii'),
58 bootstrap_servers=broker, api_version=(0, 10))
59
60 def publish(self, key, value, topic):
61 """Send messages to the message bus with a defing key and topic."""
62 try:
63 future = self.producer.send(topic=topic, key=key, value=value)
64 self.producer.flush()
65 except Exception:
66 log.exception("Error publishing to {} topic." .format(topic))
67 raise
68 try:
69 record_metadata = future.get(timeout=10)
70 log.debug("TOPIC:", record_metadata.topic)
71 log.debug("PARTITION:", record_metadata.partition)
72 log.debug("OFFSET:", record_metadata.offset)
73 except KafkaError:
74 pass
75
76 def request(self, path, key, message, topic):
77 """Test json files are loaded and sent on the message bus."""
78 # External to MON
79 payload_create_alarm = jsmin(open(os.path.join(path)).read())
80 self.publish(key=key,
81 value=json.loads(payload_create_alarm),
82 topic=topic)