1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 """This is a common kafka producer app.
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
31 from jsmin
import jsmin
33 from kafka
import KafkaProducer
as kaf
35 from kafka
.errors
import KafkaError
37 __author__
= "Prithiv Mohan"
38 __date__
= "06/Sep/2017"
40 current_path
= os
.path
.realpath(__file__
)
41 json_path
= os
.path
.abspath(os
.path
.join(current_path
, '..', '..', 'models'))
43 # TODO(): validate all of the request and response messages against the
47 class KafkaProducer(object):
48 """A common KafkaProducer for requests and responses."""
50 def __init__(self
, topic
):
51 """Initialize the common kafka producer."""
54 if "BROKER_URI" in os
.environ
:
55 broker
= os
.getenv("BROKER_URI")
57 broker
= "localhost:9092"
60 If the broker URI is not set in the env by default,
61 localhost container is taken as the host because an instance of
66 key_serializer
=str.encode
,
67 value_serializer
=str.encode
,
68 bootstrap_servers
=broker
, api_version
=(0, 10))
70 def publish(self
, key
, value
, topic
=None):
71 """Send the required message on the Kafka message bus."""
73 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
76 logging
.exception("Error publishing to {} topic." .format(topic
))
79 record_metadata
= future
.get(timeout
=10)
80 logging
.debug("TOPIC:", record_metadata
.topic
)
81 logging
.debug("PARTITION:", record_metadata
.partition
)
82 logging
.debug("OFFSET:", record_metadata
.offset
)
86 def create_alarm_request(self
, key
, message
, topic
):
87 """Create alarm request from SO to MON."""
90 payload_create_alarm
= jsmin(
91 open(os
.path
.join(json_path
, 'create_alarm.json')).read())
94 topic
='alarm_request')
96 def create_alarm_response(self
, key
, message
, topic
):
97 """Response to a create alarm request from MON to SO."""
100 payload_create_alarm_resp
= jsmin(
101 open(os
.path
.join(json_path
, 'create_alarm_resp.json')).read())
105 topic
='alarm_response')
107 def acknowledge_alarm(self
, key
, message
, topic
):
108 """Alarm acknowledgement request from SO to MON."""
111 payload_acknowledge_alarm
= jsmin(
112 open(os
.path
.join(json_path
, 'acknowledge_alarm.json')).read())
116 topic
='alarm_request')
118 def list_alarm_request(self
, key
, message
, topic
):
119 """List alarms request from SO to MON."""
122 payload_alarm_list_req
= jsmin(
123 open(os
.path
.join(json_path
, 'list_alarm_req.json')).read())
127 topic
='alarm_request')
129 def notify_alarm(self
, key
, message
, topic
):
130 """Notify of triggered alarm from MON to SO."""
131 payload_notify_alarm
= jsmin(
132 open(os
.path
.join(json_path
, 'notify_alarm.json')).read())
136 topic
='alarm_response')
138 def list_alarm_response(self
, key
, message
, topic
):
139 """Response for list alarms request from MON to SO."""
140 payload_list_alarm_resp
= jsmin(
141 open(os
.path
.join(json_path
, 'list_alarm_resp.json')).read())
145 topic
='alarm_response')
147 def update_alarm_request(self
, key
, message
, topic
):
148 """Update alarm request from SO to MON."""
151 payload_update_alarm_req
= jsmin(
152 open(os
.path
.join(json_path
, 'update_alarm_req.json')).read())
156 topic
='alarm_request')
158 def update_alarm_response(self
, key
, message
, topic
):
159 """Response from update alarm request from MON to SO."""
162 payload_update_alarm_resp
= jsmin(
163 open(os
.path
.join(json_path
, 'update_alarm_resp.json')).read())
167 topic
='alarm_response')
169 def delete_alarm_request(self
, key
, message
, topic
):
170 """Delete alarm request from SO to MON."""
173 payload_delete_alarm_req
= jsmin(
174 open(os
.path
.join(json_path
, 'delete_alarm_req.json')).read())
178 topic
='alarm_request')
180 def delete_alarm_response(self
, key
, message
, topic
):
181 """Response for a delete alarm request from MON to SO."""
184 payload_delete_alarm_resp
= jsmin(
185 open(os
.path
.join(json_path
, 'delete_alarm_resp.json')).read())
189 topic
='alarm_response')
191 def create_metrics_request(self
, key
, message
, topic
):
192 """Create metrics request from SO to MON."""
195 payload_create_metrics_req
= jsmin(
196 open(os
.path
.join(json_path
, 'create_metric_req.json')).read())
200 topic
='metric_request')
202 def create_metrics_resp(self
, key
, message
, topic
):
203 """Response for a create metric request from MON to SO."""
206 payload_create_metrics_resp
= jsmin(
207 open(os
.path
.join(json_path
, 'create_metric_resp.json')).read())
211 topic
='metric_response')
213 def read_metric_data_request(self
, key
, message
, topic
):
214 """Read metric data request from SO to MON."""
217 payload_read_metric_data_request
= jsmin(
218 open(os
.path
.join(json_path
, 'read_metric_data_req.json')).read())
222 topic
='metric_request')
224 def read_metric_data_response(self
, key
, message
, topic
):
225 """Response from MON to SO for read metric data request."""
228 payload_metric_data_response
= jsmin(
229 open(os
.path
.join(json_path
, 'read_metric_data_resp.json')).read())
233 topic
='metric_response')
235 def list_metric_request(self
, key
, message
, topic
):
236 """List metric request from SO to MON."""
239 payload_metric_list_req
= jsmin(
240 open(os
.path
.join(json_path
, 'list_metric_req.json')).read())
244 topic
='metric_request')
246 def list_metric_response(self
, key
, message
, topic
):
247 """Response from SO to MON for list metrics request."""
250 payload_metric_list_resp
= jsmin(
251 open(os
.path
.join(json_path
, 'list_metric_resp.json')).read())
255 topic
='metric_response')
257 def delete_metric_request(self
, key
, message
, topic
):
258 """Delete metric request from SO to MON."""
261 payload_delete_metric_req
= jsmin(
262 open(os
.path
.join(json_path
, 'delete_metric_req.json')).read())
266 topic
='metric_request')
268 def delete_metric_response(self
, key
, message
, topic
):
269 """Response from MON to SO for delete metric request."""
272 payload_delete_metric_resp
= jsmin(
273 open(os
.path
.join(json_path
, 'delete_metric_resp.json')).read())
277 topic
='metric_response')
279 def update_metric_request(self
, key
, message
, topic
):
280 """Metric update request from SO to MON."""
283 payload_update_metric_req
= jsmin(
284 open(os
.path
.join(json_path
, 'update_metric_req.json')).read())
288 topic
='metric_request')
290 def update_metric_response(self
, key
, message
, topic
):
291 """Reponse from MON to SO for metric update."""
294 payload_update_metric_resp
= jsmin(
295 open(os
.path
.join(json_path
, 'update_metric_resp.json')).read())
299 topic
='metric_response')
301 def access_credentials(self
, key
, message
, topic
):
302 """Send access credentials to MON from SO."""
303 payload_access_credentials
= jsmin(
304 open(os
.path
.join(json_path
, 'access_credentials.json')).read())
308 topic
='access_credentials')