1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
23 This is a kafka producer app that interacts with the SO and the plugins of the
24 datacenters like OpenStack, VMWare, AWS.
27 from kafka
import KafkaProducer
as kaf
28 from kafka
.errors
import KafkaError
33 from os
import listdir
34 from jsmin
import jsmin
36 __author__
= "Prithiv Mohan"
37 __date__
= "06/Sep/2017"
39 json_path
= os
.path
.join(os
.pardir
+"/models/")
42 class KafkaProducer(object):
44 def __init__(self
, topic
):
48 if "BROKER_URI" in os
.environ
:
49 broker
= os
.getenv("BROKER_URI")
51 broker
= "localhost:9092"
54 If the broker URI is not set in the env, by default,
55 localhost container is taken as the host because an instance of
60 key_serializer
=str.encode
,
61 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
62 bootstrap_servers
=broker
, api_version
=(0, 10))
64 def publish(self
, key
, value
, topic
=None):
66 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
69 logging
.exception("Error publishing to {} topic." .format(topic
))
72 record_metadata
= future
.get(timeout
=10)
73 logging
.debug("TOPIC:", record_metadata
.topic
)
74 logging
.debug("PARTITION:", record_metadata
.partition
)
75 logging
.debug("OFFSET:", record_metadata
.offset
)
79 def create_alarm_request(self
, key
, message
, topic
):
83 payload_create_alarm
= jsmin(
84 open(os
.path
.join(json_path
, 'create_alarm.json')).read())
86 value
=json
.dumps(payload_create_alarm
),
87 topic
='alarm_request')
89 def create_alarm_response(self
, key
, message
, topic
):
93 payload_create_alarm_resp
= jsmin(
94 open(os
.path
.join(json_path
, 'create_alarm_resp.json')).read())
98 topic
='alarm_response')
100 def acknowledge_alarm(self
, key
, message
, topic
):
104 payload_acknowledge_alarm
= jsmin(
105 open(os
.path
.join(json_path
, 'acknowledge_alarm.json')).read())
108 value
=json
.dumps(payload_acknowledge_alarm
),
109 topic
='alarm_request')
111 def list_alarm_request(self
, key
, message
, topic
):
115 payload_alarm_list_req
= jsmin(
116 open(os
.path
.join(json_path
, 'list_alarm_req.json')).read())
119 value
=json
.dumps(payload_alarm_list_req
),
120 topic
='alarm_request')
122 def notify_alarm(self
, key
, message
, topic
):
124 payload_notify_alarm
= jsmin(
125 open(os
.path
.join(json_path
, 'notify_alarm.json')).read())
129 topic
='alarm_response')
131 def list_alarm_response(self
, key
, message
, topic
):
133 payload_list_alarm_resp
= jsmin(
134 open(os
.path
.join(json_path
, 'list_alarm_resp.json')).read())
138 topic
='alarm_response')
140 def update_alarm_request(self
, key
, message
, topic
):
144 payload_update_alarm_req
= jsmin(
145 open(os
.path
.join(json_path
, 'update_alarm_req.json')).read())
148 value
=json
.dumps(payload_update_alarm_req
),
149 topic
='alarm_request')
151 def update_alarm_response(self
, key
, message
, topic
):
155 payload_update_alarm_resp
= jsmin(
156 open(os
.path
.join(json_path
, 'update_alarm_resp.json')).read())
160 topic
='alarm_response')
162 def delete_alarm_request(self
, key
, message
, topic
):
166 payload_delete_alarm_req
= jsmin(
167 open(os
.path
.join(json_path
, 'delete_alarm_req.json')).read())
170 value
=json
.dumps(payload_delete_alarm_req
),
171 topic
='alarm_request')
173 def delete_alarm_response(self
, key
, message
, topic
):
177 payload_delete_alarm_resp
= jsmin(
178 open(os
.path
.join(json_path
, 'delete_alarm_resp.json')).read())
182 topic
='alarm_response')
184 def create_metrics_request(self
, key
, message
, topic
):
188 payload_create_metrics_req
= jsmin(
189 open(os
.path
.join(json_path
, 'create_metric_req.json')).read())
192 value
=json
.dumps(payload_create_metrics_req
),
193 topic
='metric_request')
195 def create_metrics_resp(self
, key
, message
, topic
):
199 payload_create_metrics_resp
= jsmin(
200 open(os
.path
.join(json_path
, 'create_metric_resp.json')).read())
204 topic
='metric_response')
206 def read_metric_data_request(self
, key
, message
, topic
):
210 payload_read_metric_data_request
= jsmin(
211 open(os
.path
.join(json_path
, 'read_metric_data_req.json')).read())
214 value
=json
.dumps(payload_read_metric_data_request
),
215 topic
='metric_request')
217 def read_metric_data_response(self
, key
, message
, topic
):
221 payload_metric_data_response
= jsmin(
222 open(os
.path
.join(json_path
, 'read_metric_data_resp.json')).read())
226 topic
='metric_response')
228 def list_metric_request(self
, key
, message
, topic
):
232 payload_metric_list_req
= jsmin(
233 open(os
.path
.join(json_path
, 'list_metric_req.json')).read())
236 value
=json
.dumps(payload_metric_list_req
),
237 topic
='metric_request')
239 def list_metric_response(self
, key
, message
, topic
):
243 payload_metric_list_resp
= jsmin(
244 open(os
.path
.join(json_path
, 'list_metrics_resp.json')).read())
248 topic
='metric_response')
250 def delete_metric_request(self
, key
, message
, topic
):
254 payload_delete_metric_req
= jsmin(
255 open(os
.path
.join(json_path
, 'delete_metric_req.json')).read())
258 value
=json
.dumps(payload_delete_metric_req
),
259 topic
='metric_request')
261 def delete_metric_response(self
, key
, message
, topic
):
265 payload_delete_metric_resp
= jsmin(
266 open(os
.path
.join(json_path
, 'delete_metric_resp.json')).read())
270 topic
='metric_response')
272 def update_metric_request(self
, key
, message
, topic
):
276 payload_update_metric_req
= jsmin(
277 open(os
.path
.join(json_path
, 'update_metric_req.json')).read())
280 value
=json
.dumps(payload_update_metric_req
),
281 topic
='metric_request')
283 def update_metric_response(self
, key
, message
, topic
):
287 payload_update_metric_resp
= jsmin(
288 open(os
.path
.join(json_path
, 'update_metric_resp.json')).read())
292 topic
='metric_response')
294 def access_credentials(self
, key
, message
, topic
):
296 payload_access_credentials
= jsmin(
297 open(os
.path
.join(json_path
, 'access_credentials.json')).read())
300 value
=json
.dumps(payload_access_credentials
),
301 topic
='access_credentials')