1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
24 This is a kafka producer with a common request function to test the plugins.
29 from kafka
import KafkaProducer
as kaf
30 from kafka
.errors
import KafkaError
36 from os
import listdir
37 from jsmin
import jsmin
38 import os
.path
as path
42 class KafkaProducer(object):
44 def __init__(self
, topic
):
48 if "ZOOKEEPER_URI" in os
.environ
:
49 broker
= os
.getenv("ZOOKEEPER_URI")
51 broker
= "localhost:9092"
54 If the zookeeper broker URI is not set in the env, by default,
55 localhost container is taken as the host because an instance of
59 self
.producer
= kaf(key_serializer
=str.encode
,
60 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
61 bootstrap_servers
=broker
, api_version
=(0,10))
65 def publish(self
, key
, value
, topic
):
67 future
= self
.producer
.send(key
=key
, value
=value
,topic
=topic
)
70 log
.exception("Error publishing to {} topic." .format(topic
))
73 record_metadata
= future
.get(timeout
=10)
74 #self._log.debug("TOPIC:", record_metadata.topic)
75 #self._log.debug("PARTITION:", record_metadata.partition)
76 #self._log.debug("OFFSET:", record_metadata.offset)
80 json_path
= path
.abspath(path
.join(os
.getcwd(),"../.."))
82 def request(self
, path
, key
, message
, topic
):
84 payload_create_alarm
= jsmin(open(os
.path
.join(path
)).read())
86 value
= json
.loads(payload_create_alarm
),