1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
28 __author__
= "Prithiv Mohan"
29 __date__
= "06/Sep/2017"
32 from kafka
import KafkaProducer
33 from kafka
.errors
import KafkaError
38 from os
import listdir
39 from jsmin
import jsmin
41 class KafkaProducer(object):
43 def __init__(self
, topic
):
47 if "ZOOKEEPER_URI" in os
.environ
:
48 broker
= os
.getenv("ZOOKEEPER_URI")
50 broker
= "localhost:2181"
53 If the zookeeper broker URI is not set in the env, by default,
54 localhost container is taken as the host because an instance of
58 producer
= KafkaProducer(key_serializer
=str.encode
,
59 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
60 bootstrap_servers
=broker
, api_version
=(0,10))
63 def publish(self
, key
, message
, topic
=None):
65 future
= producer
.send('key', 'payload',group_id
='osm_mon')
68 log
.exception("Error publishing to {} topic." .format(topic
))
71 record_metadata
= future
.get(timeout
=10)
72 self
._log
.debug("TOPIC:", record_metadata
.topic
)
73 self
._log
.debug("PARTITION:", record_metadata
.partition
)
74 self
._log
.debug("OFFSET:", record_metadata
.offset
)
78 json_path
= os
.path
.join(os
.pardir
+"/models/")
80 def create_alarm_request(self
, key
, message
, topic
):
84 payload_create_alarm
= jsmin(open(os
.path
.join(json_path
,
85 'create_alarm.json')).read())
87 value
=json
.dumps(payload_create_alarm
),
88 topic
='alarm_request')
90 def create_alarm_response(self
, key
, message
, topic
):
94 payload_create_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
95 'create_alarm_resp.json')).read())
99 topic
= 'alarm_response')
101 def acknowledge_alarm(self
, key
, message
, topic
):
105 payload_acknowledge_alarm
= jsmin(open(os
.path
.join(json_path
,
106 'acknowledge_alarm.json')).read())
109 value
= json
.dumps(payload_acknowledge_alarm
),
110 topic
= 'alarm_request')
112 def list_alarm_request(self
, key
, message
, topic
):
116 payload_alarm_list_req
= jsmin(open(os
.path
.join(json_path
,
117 'list_alarm_req.json')).read())
120 value
=json
.dumps(payload_alarm_list_req
),
121 topic
='alarm_request')
123 def notify_alarm(self
, key
, message
, topic
):
125 payload_notify_alarm
= jsmin(open(os
.path
.join(json_path
,
126 'notify_alarm.json')).read())
130 topic
='alarm_response')
132 def list_alarm_response(self
, key
, message
, topic
):
134 payload_list_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
135 'list_alarm_resp.json')).read())
139 topic
='alarm_response')
142 def update_alarm_request(self
, key
, message
, topic
):
146 payload_update_alarm_req
= jsmin(open(os
.path
.join(json_path
,
147 'update_alarm_req.json')).read())
150 value
=json
.dumps(payload_update_alarm_req
),
151 topic
='alarm_request')
154 def update_alarm_response(self
, key
, message
, topic
):
158 payload_update_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
159 'update_alarm_resp.json')).read())
163 topic
='alarm_response')
166 def delete_alarm_request(self
, key
, message
, topic
):
170 payload_delete_alarm_req
= jsmin(open(os
.path
.join(json_path
,
171 'delete_alarm_req.json')).read())
174 value
=json
.dumps(payload_delete_alarm_req
),
175 topic
='alarm_request')
177 def delete_alarm_response(self
, key
, message
, topic
):
181 payload_delete_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
182 'delete_alarm_resp.json')).read())
186 topic
='alarm_response')
190 def create_metrics_request(self
, key
, message
, topic
):
194 payload_create_metrics_req
= jsmin(open(os
.path
.join(json_path
,
195 'create_metric_req.json')).read())
198 value
=json
.dumps(payload_create_metrics_req
),
199 topic
='metric_request')
202 def create_metrics_resp(self
, key
, message
, topic
):
206 payload_create_metrics_resp
= jsmin(open(os
.path
.join(json_path
,
207 'create_metric_resp.json')).read())
211 topic
='metric_response')
214 def read_metric_data_request(self
, key
, message
, topic
):
218 payload_read_metric_data_request
= jsmin(open(os
.path
.join(json_path
,
219 'read_metric_data_req.json')).read())
222 value
=json
.dumps(payload_read_metric_data_request
),
223 topic
='metric_request')
226 def read_metric_data_response(self
, key
, message
, topic
):
230 payload_metric_data_response
= jsmin(open(os
.path
.join(json_path
,
231 'read_metric_data_resp.json')).read())
235 topic
='metric_response')
238 def list_metric_request(self
, key
, message
, topic
):
242 payload_metric_list_req
= jsmin(open(os
.path
.join(json_path
,
243 'list_metric_req.json')).read())
246 value
=json
.dumps(payload_metric_list_req
),
247 topic
='metric_request')
249 def list_metric_response(self
, key
, message
, topic
):
253 payload_metric_list_resp
= jsmin(open(os
.path
.join(json_path
,
254 'list_metrics_resp.json')).read())
258 topic
='metric_response')
261 def delete_metric_request(self
, key
, message
, topic
):
265 payload_delete_metric_req
= jsmin(open(os
.path
.join(json_path
,
266 'delete_metric_req.json')).read())
269 value
=json
.dumps(payload_delete_metric_req
),
270 topic
='metric_request')
273 def delete_metric_response(self
, key
, message
, topic
):
277 payload_delete_metric_resp
= jsmin(open(os
.path
.join(json_path
,
278 'delete_metric_resp.json')).read())
282 topic
='metric_response')
285 def update_metric_request(self
, key
, message
, topic
):
289 payload_update_metric_req
= jsmin(open(os
.path
.join(json_path
,
290 'update_metric_req.json')).read())
293 value
=json
.dumps(payload_update_metric_req
),
294 topic
='metric_request')
297 def update_metric_response(self
, key
, message
, topic
):
301 payload_update_metric_resp
= jsmin(open(os
.path
.join(json_path
,
302 'update_metric_resp.json')).read())
306 topic
='metric_response')
308 def access_credentials(self
, key
, message
, topic
):
310 payload_access_credentials
= jsmin(open(os
.path
.join(json_path
,
311 'access_credentials.json')).read())
314 value
=json
.dumps(payload_access_credentials
),
315 topic
='access_credentials')