1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 """This is a common kafka producer app.
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
33 from kafka
import KafkaProducer
as kaf
35 from kafka
.errors
import KafkaError
37 __author__
= "Prithiv Mohan"
38 __date__
= "06/Sep/2017"
40 json_path
= os
.path
.abspath(os
.pardir
+ "/MON/osm_mon/core/models/")
42 # TODO(): validate all of the request and response messages against the
46 class KafkaProducer(object):
47 """A common KafkaProducer for requests and responses."""
49 def __init__(self
, topic
):
50 """Initialize the common kafka producer."""
53 if "BROKER_URI" in os
.environ
:
54 broker
= os
.getenv("BROKER_URI")
56 broker
= "localhost:9092"
59 If the broker URI is not set in the env by default,
60 localhost container is taken as the host because an instance of
65 key_serializer
=str.encode
,
66 value_serializer
=str.encode
,
67 bootstrap_servers
=broker
, api_version
=(0, 10))
69 def publish(self
, key
, value
, topic
=None):
70 """Send the required message on the Kafka message bus."""
72 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
75 logging
.exception("Error publishing to {} topic." .format(topic
))
78 record_metadata
= future
.get(timeout
=10)
79 logging
.debug("TOPIC:", record_metadata
.topic
)
80 logging
.debug("PARTITION:", record_metadata
.partition
)
81 logging
.debug("OFFSET:", record_metadata
.offset
)
85 def create_alarm_request(self
, key
, message
, topic
):
86 """Create alarm request from SO to MON."""
89 payload_create_alarm
= jsmin(
90 open(os
.path
.join(json_path
, 'create_alarm.json')).read())
93 topic
='alarm_request')
95 def create_alarm_response(self
, key
, message
, topic
):
96 """Response to a create alarm request from MON to SO."""
99 payload_create_alarm_resp
= jsmin(
100 open(os
.path
.join(json_path
, 'create_alarm_resp.json')).read())
104 topic
='alarm_response')
106 def acknowledge_alarm(self
, key
, message
, topic
):
107 """Alarm acknowledgement request from SO to MON."""
110 payload_acknowledge_alarm
= jsmin(
111 open(os
.path
.join(json_path
, 'acknowledge_alarm.json')).read())
115 topic
='alarm_request')
117 def list_alarm_request(self
, key
, message
, topic
):
118 """List alarms request from SO to MON."""
121 payload_alarm_list_req
= jsmin(
122 open(os
.path
.join(json_path
, 'list_alarm_req.json')).read())
126 topic
='alarm_request')
128 def notify_alarm(self
, key
, message
, topic
):
129 """Notify of triggered alarm from MON to SO."""
130 payload_notify_alarm
= jsmin(
131 open(os
.path
.join(json_path
, 'notify_alarm.json')).read())
135 topic
='alarm_response')
137 def list_alarm_response(self
, key
, message
, topic
):
138 """Response for list alarms request from MON to SO."""
139 payload_list_alarm_resp
= jsmin(
140 open(os
.path
.join(json_path
, 'list_alarm_resp.json')).read())
144 topic
='alarm_response')
146 def update_alarm_request(self
, key
, message
, topic
):
147 """Update alarm request from SO to MON."""
150 payload_update_alarm_req
= jsmin(
151 open(os
.path
.join(json_path
, 'update_alarm_req.json')).read())
155 topic
='alarm_request')
157 def update_alarm_response(self
, key
, message
, topic
):
158 """Response from update alarm request from MON to SO."""
161 payload_update_alarm_resp
= jsmin(
162 open(os
.path
.join(json_path
, 'update_alarm_resp.json')).read())
166 topic
='alarm_response')
168 def delete_alarm_request(self
, key
, message
, topic
):
169 """Delete alarm request from SO to MON."""
172 payload_delete_alarm_req
= jsmin(
173 open(os
.path
.join(json_path
, 'delete_alarm_req.json')).read())
177 topic
='alarm_request')
179 def delete_alarm_response(self
, key
, message
, topic
):
180 """Response for a delete alarm request from MON to SO."""
183 payload_delete_alarm_resp
= jsmin(
184 open(os
.path
.join(json_path
, 'delete_alarm_resp.json')).read())
188 topic
='alarm_response')
190 def create_metrics_request(self
, key
, message
, topic
):
191 """Create metrics request from SO to MON."""
194 payload_create_metrics_req
= jsmin(
195 open(os
.path
.join(json_path
, 'create_metric_req.json')).read())
199 topic
='metric_request')
201 def create_metrics_resp(self
, key
, message
, topic
):
202 """Response for a create metric request from MON to SO."""
205 payload_create_metrics_resp
= jsmin(
206 open(os
.path
.join(json_path
, 'create_metric_resp.json')).read())
210 topic
='metric_response')
212 def read_metric_data_request(self
, key
, message
, topic
):
213 """Read metric data request from SO to MON."""
216 payload_read_metric_data_request
= jsmin(
217 open(os
.path
.join(json_path
, 'read_metric_data_req.json')).read())
221 topic
='metric_request')
223 def read_metric_data_response(self
, key
, message
, topic
):
224 """Response from MON to SO for read metric data request."""
227 payload_metric_data_response
= jsmin(
228 open(os
.path
.join(json_path
, 'read_metric_data_resp.json')).read())
232 topic
='metric_response')
234 def list_metric_request(self
, key
, message
, topic
):
235 """List metric request from SO to MON."""
238 payload_metric_list_req
= jsmin(
239 open(os
.path
.join(json_path
, 'list_metric_req.json')).read())
243 topic
='metric_request')
245 def list_metric_response(self
, key
, message
, topic
):
246 """Response from SO to MON for list metrics request."""
249 payload_metric_list_resp
= jsmin(
250 open(os
.path
.join(json_path
, 'list_metrics_resp.json')).read())
254 topic
='metric_response')
256 def delete_metric_request(self
, key
, message
, topic
):
257 """Delete metric request from SO to MON."""
260 payload_delete_metric_req
= jsmin(
261 open(os
.path
.join(json_path
, 'delete_metric_req.json')).read())
265 topic
='metric_request')
267 def delete_metric_response(self
, key
, message
, topic
):
268 """Response from MON to SO for delete metric request."""
271 payload_delete_metric_resp
= jsmin(
272 open(os
.path
.join(json_path
, 'delete_metric_resp.json')).read())
276 topic
='metric_response')
278 def update_metric_request(self
, key
, message
, topic
):
279 """Metric update request from SO to MON."""
282 payload_update_metric_req
= jsmin(
283 open(os
.path
.join(json_path
, 'update_metric_req.json')).read())
287 topic
='metric_request')
289 def update_metric_response(self
, key
, message
, topic
):
290 """Reponse from MON to SO for metric update."""
293 payload_update_metric_resp
= jsmin(
294 open(os
.path
.join(json_path
, 'update_metric_resp.json')).read())
298 topic
='metric_response')
300 def access_credentials(self
, key
, message
, topic
):
301 """Send access credentials to MON from SO."""
302 payload_access_credentials
= jsmin(
303 open(os
.path
.join(json_path
, 'access_credentials.json')).read())
307 topic
='access_credentials')