1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
28 __author__
= "Prithiv Mohan"
29 __date__
= "06/Sep/2017"
32 from kafka
import KafkaProducer
as kaf
33 from kafka
.errors
import KafkaError
38 from os
import listdir
39 from jsmin
import jsmin
41 json_path
= os
.path
.join(os
.pardir
+"/models/")
44 class KafkaProducer(object):
46 def __init__(self
, topic
):
50 if "ZOOKEEPER_URI" in os
.environ
:
51 broker
= os
.getenv("ZOOKEEPER_URI")
53 broker
= "localhost:2181"
56 If the zookeeper broker URI is not set in the env, by default,
57 localhost container is taken as the host because an instance of
61 self
.producer
= kaf(key_serializer
=str.encode
,
62 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
63 bootstrap_servers
=broker
, api_version
=(0,10))
66 def publish(self
, key
, value
, topic
=None):
68 future
= self
.producer
.send(key
, value
, topic
)
71 logging
.exception("Error publishing to {} topic." .format(topic
))
74 record_metadata
= future
.get(timeout
=10)
75 logging
.debug("TOPIC:", record_metadata
.topic
)
76 logging
.debug("PARTITION:", record_metadata
.partition
)
77 logging
.debug("OFFSET:", record_metadata
.offset
)
81 def create_alarm_request(self
, key
, message
, topic
):
85 payload_create_alarm
= jsmin(open(os
.path
.join(json_path
,
86 'create_alarm.json')).read())
88 value
=json
.dumps(payload_create_alarm
),
89 topic
='alarm_request')
91 def create_alarm_response(self
, key
, message
, topic
):
95 payload_create_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
96 'create_alarm_resp.json')).read())
100 topic
= 'alarm_response')
102 def acknowledge_alarm(self
, key
, message
, topic
):
106 payload_acknowledge_alarm
= jsmin(open(os
.path
.join(json_path
,
107 'acknowledge_alarm.json')).read())
110 value
= json
.dumps(payload_acknowledge_alarm
),
111 topic
= 'alarm_request')
113 def list_alarm_request(self
, key
, message
, topic
):
117 payload_alarm_list_req
= jsmin(open(os
.path
.join(json_path
,
118 'list_alarm_req.json')).read())
121 value
=json
.dumps(payload_alarm_list_req
),
122 topic
='alarm_request')
124 def notify_alarm(self
, key
, message
, topic
):
126 payload_notify_alarm
= jsmin(open(os
.path
.join(json_path
,
127 'notify_alarm.json')).read())
131 topic
='alarm_response')
133 def list_alarm_response(self
, key
, message
, topic
):
135 payload_list_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
136 'list_alarm_resp.json')).read())
140 topic
='alarm_response')
143 def update_alarm_request(self
, key
, message
, topic
):
147 payload_update_alarm_req
= jsmin(open(os
.path
.join(json_path
,
148 'update_alarm_req.json')).read())
151 value
=json
.dumps(payload_update_alarm_req
),
152 topic
='alarm_request')
155 def update_alarm_response(self
, key
, message
, topic
):
159 payload_update_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
160 'update_alarm_resp.json')).read())
164 topic
='alarm_response')
167 def delete_alarm_request(self
, key
, message
, topic
):
171 payload_delete_alarm_req
= jsmin(open(os
.path
.join(json_path
,
172 'delete_alarm_req.json')).read())
175 value
=json
.dumps(payload_delete_alarm_req
),
176 topic
='alarm_request')
178 def delete_alarm_response(self
, key
, message
, topic
):
182 payload_delete_alarm_resp
= jsmin(open(os
.path
.join(json_path
,
183 'delete_alarm_resp.json')).read())
187 topic
='alarm_response')
191 def create_metrics_request(self
, key
, message
, topic
):
195 payload_create_metrics_req
= jsmin(open(os
.path
.join(json_path
,
196 'create_metric_req.json')).read())
199 value
=json
.dumps(payload_create_metrics_req
),
200 topic
='metric_request')
203 def create_metrics_resp(self
, key
, message
, topic
):
207 payload_create_metrics_resp
= jsmin(open(os
.path
.join(json_path
,
208 'create_metric_resp.json')).read())
212 topic
='metric_response')
215 def read_metric_data_request(self
, key
, message
, topic
):
219 payload_read_metric_data_request
= jsmin(open(os
.path
.join(json_path
,
220 'read_metric_data_req.json')).read())
223 value
=json
.dumps(payload_read_metric_data_request
),
224 topic
='metric_request')
227 def read_metric_data_response(self
, key
, message
, topic
):
231 payload_metric_data_response
= jsmin(open(os
.path
.join(json_path
,
232 'read_metric_data_resp.json')).read())
236 topic
='metric_response')
239 def list_metric_request(self
, key
, message
, topic
):
243 payload_metric_list_req
= jsmin(open(os
.path
.join(json_path
,
244 'list_metric_req.json')).read())
247 value
=json
.dumps(payload_metric_list_req
),
248 topic
='metric_request')
250 def list_metric_response(self
, key
, message
, topic
):
254 payload_metric_list_resp
= jsmin(open(os
.path
.join(json_path
,
255 'list_metrics_resp.json')).read())
259 topic
='metric_response')
262 def delete_metric_request(self
, key
, message
, topic
):
266 payload_delete_metric_req
= jsmin(open(os
.path
.join(json_path
,
267 'delete_metric_req.json')).read())
270 value
=json
.dumps(payload_delete_metric_req
),
271 topic
='metric_request')
274 def delete_metric_response(self
, key
, message
, topic
):
278 payload_delete_metric_resp
= jsmin(open(os
.path
.join(json_path
,
279 'delete_metric_resp.json')).read())
283 topic
='metric_response')
286 def update_metric_request(self
, key
, message
, topic
):
290 payload_update_metric_req
= jsmin(open(os
.path
.join(json_path
,
291 'update_metric_req.json')).read())
294 value
=json
.dumps(payload_update_metric_req
),
295 topic
='metric_request')
298 def update_metric_response(self
, key
, message
, topic
):
302 payload_update_metric_resp
= jsmin(open(os
.path
.join(json_path
,
303 'update_metric_resp.json')).read())
307 topic
='metric_response')
309 def access_credentials(self
, key
, message
, topic
):
311 payload_access_credentials
= jsmin(open(os
.path
.join(json_path
,
312 'access_credentials.json')).read())
315 value
=json
.dumps(payload_access_credentials
),
316 topic
='access_credentials')