1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
22 """This is a KafkaProducer with a request function to test the plugins."""
32 from kafka
import KafkaProducer
as kaf
34 from kafka
.errors
import KafkaError
37 class KafkaProducer(object):
38 """A KafkaProducer for testing purposes."""
40 def __init__(self
, topic
):
41 """Initialize a KafkaProducer and it's topic."""
44 if "ZOOKEEPER_URI" in os
.environ
:
45 broker
= os
.getenv("ZOOKEEPER_URI")
47 broker
= "localhost:9092"
50 If the zookeeper broker URI is not set in the env, by default,
51 localhost container is taken as the host because an instance of
56 key_serializer
=str.encode
,
57 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
58 bootstrap_servers
=broker
, api_version
=(0, 10))
60 def publish(self
, key
, value
, topic
):
61 """Send messages to the message bus with a defing key and topic."""
63 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
66 log
.exception("Error publishing to {} topic." .format(topic
))
69 record_metadata
= future
.get(timeout
=10)
70 log
.debug("TOPIC:", record_metadata
.topic
)
71 log
.debug("PARTITION:", record_metadata
.partition
)
72 log
.debug("OFFSET:", record_metadata
.offset
)
76 def request(self
, path
, key
, message
, topic
):
77 """Test json files are loaded and sent on the message bus."""
79 payload_create_alarm
= jsmin(open(os
.path
.join(path
)).read())
81 value
=json
.loads(payload_create_alarm
),