1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
23 This is a kafka producer app that interacts with the SO and the plugins of the
24 datacenters like OpenStack, VMWare, AWS.
27 from kafka
import KafkaProducer
as kaf
28 from kafka
.errors
import KafkaError
33 from os
import listdir
34 from jsmin
import jsmin
36 __author__
= "Prithiv Mohan"
37 __date__
= "06/Sep/2017"
39 json_path
=os
.path
.abspath(os
.pardir
+"/MON/osm_mon/core/models/")
41 class KafkaProducer(object):
43 def __init__(self
, topic
):
47 if "BROKER_URI" in os
.environ
:
48 broker
= os
.getenv("BROKER_URI")
50 broker
= "localhost:9092"
53 If the broker URI is not set in the env by default,
54 localhost container is taken as the host because an instance of
59 key_serializer
=str.encode
,
60 value_serializer
=lambda v
: json
.dumps(v
).str.encode
,
61 bootstrap_servers
=broker
, api_version
=(0, 10))
63 def publish(self
, key
, value
, topic
=None):
65 future
= self
.producer
.send(topic
=topic
, key
=key
, value
=value
)
68 logging
.exception("Error publishing to {} topic." .format(topic
))
71 record_metadata
= future
.get(timeout
=10)
72 logging
.debug("TOPIC:", record_metadata
.topic
)
73 logging
.debug("PARTITION:", record_metadata
.partition
)
74 logging
.debug("OFFSET:", record_metadata
.offset
)
78 def create_alarm_request(self
, key
, message
, topic
):
82 payload_create_alarm
= jsmin(
83 open(os
.path
.join(json_path
, 'create_alarm.json')).read())
85 value
=json
.dumps(payload_create_alarm
),
86 topic
='alarm_request')
88 def create_alarm_response(self
, key
, message
, topic
):
92 payload_create_alarm_resp
= jsmin(
93 open(os
.path
.join(json_path
, 'create_alarm_resp.json')).read())
97 topic
='alarm_response')
99 def acknowledge_alarm(self
, key
, message
, topic
):
103 payload_acknowledge_alarm
= jsmin(
104 open(os
.path
.join(json_path
, 'acknowledge_alarm.json')).read())
107 value
=json
.dumps(payload_acknowledge_alarm
),
108 topic
='alarm_request')
110 def list_alarm_request(self
, key
, message
, topic
):
114 payload_alarm_list_req
= jsmin(
115 open(os
.path
.join(json_path
, 'list_alarm_req.json')).read())
118 value
=json
.dumps(payload_alarm_list_req
),
119 topic
='alarm_request')
121 def notify_alarm(self
, key
, message
, topic
):
123 payload_notify_alarm
= jsmin(
124 open(os
.path
.join(json_path
, 'notify_alarm.json')).read())
128 topic
='alarm_response')
130 def list_alarm_response(self
, key
, message
, topic
):
132 payload_list_alarm_resp
= jsmin(
133 open(os
.path
.join(json_path
, 'list_alarm_resp.json')).read())
137 topic
='alarm_response')
139 def update_alarm_request(self
, key
, message
, topic
):
143 payload_update_alarm_req
= jsmin(
144 open(os
.path
.join(json_path
, 'update_alarm_req.json')).read())
147 value
=json
.dumps(payload_update_alarm_req
),
148 topic
='alarm_request')
150 def update_alarm_response(self
, key
, message
, topic
):
154 payload_update_alarm_resp
= jsmin(
155 open(os
.path
.join(json_path
, 'update_alarm_resp.json')).read())
159 topic
='alarm_response')
161 def delete_alarm_request(self
, key
, message
, topic
):
165 payload_delete_alarm_req
= jsmin(
166 open(os
.path
.join(json_path
, 'delete_alarm_req.json')).read())
169 value
=json
.dumps(payload_delete_alarm_req
),
170 topic
='alarm_request')
172 def delete_alarm_response(self
, key
, message
, topic
):
176 payload_delete_alarm_resp
= jsmin(
177 open(os
.path
.join(json_path
, 'delete_alarm_resp.json')).read())
181 topic
='alarm_response')
183 def create_metrics_request(self
, key
, message
, topic
):
187 payload_create_metrics_req
= jsmin(
188 open(os
.path
.join(json_path
, 'create_metric_req.json')).read())
191 value
=json
.dumps(payload_create_metrics_req
),
192 topic
='metric_request')
194 def create_metrics_resp(self
, key
, message
, topic
):
198 payload_create_metrics_resp
= jsmin(
199 open(os
.path
.join(json_path
, 'create_metric_resp.json')).read())
203 topic
='metric_response')
205 def read_metric_data_request(self
, key
, message
, topic
):
209 payload_read_metric_data_request
= jsmin(
210 open(os
.path
.join(json_path
, 'read_metric_data_req.json')).read())
213 value
=json
.dumps(payload_read_metric_data_request
),
214 topic
='metric_request')
216 def read_metric_data_response(self
, key
, message
, topic
):
220 payload_metric_data_response
= jsmin(
221 open(os
.path
.join(json_path
, 'read_metric_data_resp.json')).read())
225 topic
='metric_response')
227 def list_metric_request(self
, key
, message
, topic
):
231 payload_metric_list_req
= jsmin(
232 open(os
.path
.join(json_path
, 'list_metric_req.json')).read())
235 value
=json
.dumps(payload_metric_list_req
),
236 topic
='metric_request')
238 def list_metric_response(self
, key
, message
, topic
):
242 payload_metric_list_resp
= jsmin(
243 open(os
.path
.join(json_path
, 'list_metrics_resp.json')).read())
247 topic
='metric_response')
249 def delete_metric_request(self
, key
, message
, topic
):
253 payload_delete_metric_req
= jsmin(
254 open(os
.path
.join(json_path
, 'delete_metric_req.json')).read())
257 value
=json
.dumps(payload_delete_metric_req
),
258 topic
='metric_request')
260 def delete_metric_response(self
, key
, message
, topic
):
264 payload_delete_metric_resp
= jsmin(
265 open(os
.path
.join(json_path
, 'delete_metric_resp.json')).read())
269 topic
='metric_response')
271 def update_metric_request(self
, key
, message
, topic
):
275 payload_update_metric_req
= jsmin(
276 open(os
.path
.join(json_path
, 'update_metric_req.json')).read())
279 value
=json
.dumps(payload_update_metric_req
),
280 topic
='metric_request')
282 def update_metric_response(self
, key
, message
, topic
):
286 payload_update_metric_resp
= jsmin(
287 open(os
.path
.join(json_path
, 'update_metric_resp.json')).read())
291 topic
='metric_response')
293 def access_credentials(self
, key
, message
, topic
):
295 payload_access_credentials
= jsmin(
296 open(os
.path
.join(json_path
, 'access_credentials.json')).read())
299 value
=json
.dumps(payload_access_credentials
),
300 topic
='access_credentials')