1 # Copyright© 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
26 #TODO: Interfacing with the APIs of the monitoring tool plugins (Prithiv Mohan).
29 __author__
= "Prithiv Mohan"
30 __date__
= "06/Sep/2017"
33 from kafka
import KafkaProducer
34 from kafka
.errors
import KafkaError
38 from os
import listdir
42 class KafkaProducer(object):
44 def __init__(self
, topic
, message
):
47 self
._message
= message
49 if "ZOOKEEPER_URI" in os
.environ
:
50 broker
= os
.getenv("ZOOKEEPER_URI")
52 broker
= "localhost:2181"
55 If the zookeeper broker URI is not set in the env, by default,
56 localhost container is taken as the host because an instance of
60 producer
= KafkaProducer(key_serializer
=str.encode
,
61 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
62 bootstrap_servers
=broker
, api_version
=(0,10))
65 def publish(self
, key
, message
, topic
=None):
67 future
= producer
.send('alarms', key
, payload
)
70 log
.exception("Error publishing to {} topic." .format(topic
))
73 record_metadata
= future
.get(timeout
=10)
74 self
._log
.debug("TOPIC:", record_metadata
.topic
)
75 self
._log
.debug("PARTITION:", record_metadata
.partition
)
76 self
._log
.debug("OFFSET:", record_metadata
.offset
)
80 json_path
= os
.path
.join(os
.pardir
+"/models/")
82 def create_alarm_request(self
, key
, message
, topic
):
86 payload_create_alarm
= json
.loads(open(os
.path
.join(json_path
,
87 'create_alarm.json')).read())
89 value
=json
.dumps(payload_create_alarm
),
90 topic
='alarm_request')
92 def create_alarm_response(self
, key
, message
, topic
):
96 payload_create_alarm_resp
= json
.loads(open(os
.path
.join(json_path
,
97 'create_alarm_resp.json')).read())
100 value
= json
.dumps(payload_create_alarm_resp
),
101 topic
= 'alarm_response')
104 def list_alarm_request(self
, key
, message
, topic
):
108 payload_alarm_list_req
= json
.loads(open(os
.path
.join(json_path
,
109 'list_alarm_req.json')).read())
112 value
=json
.dumps(payload_alarm_list_req
),
113 topic
='alarm_request')
115 def notify_alarm(self
, key
, message
, topic
):
117 payload_notify_alarm
= json
.loads(open(os
.path
.join(json_path
,
118 'notify_alarm.json')).read())
121 value
=json
.dumps(payload_notify_alarm
),
122 topic
='alarm_response')
124 def list_alarm_response(self
, key
, message
, topic
):
126 payload_list_alarm_resp
= json
.loads(open(os
.path
.join(json_path
,
127 'list_alarm_resp.json')).read())
130 value
=json
.dumps(payload_list_alarm_resp
),
131 topic
='alarm_response')
134 def update_alarm_request(self
, key
, message
, topic
):
138 payload_update_alarm_req
= json
.loads(open(os
.path
.join(json_path
,
139 'update_alarm_req.json')).read())
142 value
=json
.dumps(payload_update_alarm_req
),
143 topic
='alarm_request')
146 def update_alarm_response(self
, key
, message
, topic
):
150 payload_update_alarm_resp
= json
.loads(open(os
.path
.join(json_path
,
151 'update_alarm_resp.json')).read())
154 value
=json
.dumps(payload_update_alarm_resp
),
155 topic
='alarm_response')
158 def delete_alarm_request(self
, key
, message
, topic
):
162 payload_delete_alarm_req
= json
.loads(open(os
.path
.join(json_path
,
163 'delete_alarm_req.json')).read())
166 value
=json
.dumps(payload_delete_alarm_req
),
167 topic
='alarm_request')
169 def delete_alarm_response(self
, key
, message
, topic
):
173 payload_delete_alarm_resp
= json
.loads(open(os
.path
.join(json_path
,
174 'delete_alarm_resp.json')).read())
177 value
=json
.dumps(payload_delete_alarm_resp
),
178 topic
='alarm_response')
182 def create_metrics_request(self
, key
, message
, topic
):
186 payload_create_metrics_req
= json
.loads(open(os
.path
.join(json_path
,
187 'create_metric_req.json')).read())
190 value
=json
.dumps(payload_create_metrics_req
),
191 topic
='metric_request')
194 def create_metrics_resp(self
, key
, message
, topic
):
198 payload_create_metrics_resp
= json
.loads(open(os
.path
.join(json_path
,
199 'create_metric_resp.json')).read())
202 value
=json
.dumps(payload_create_metrics_resp
),
203 topic
='metric_response')
206 def read_metric_data_request(self
, key
, message
, topic
):
210 payload_read_metric_data_request
= json
.loads(open(os
.path
.join(json_path
,
211 'read_metric_data_req.json')).read())
214 value
=json
.dumps(payload_read_metric_data_request
),
215 topic
='metric_request')
218 def read_metric_data_response(self
, key
, message
, topic
):
222 payload_metric_data_response
= json
.loads(open(os
.path
.join(json_path
,
223 'read_metric_data_resp.json')).read())
226 value
=json
.dumps(payload_metric_data_response
),
227 topic
='metric_response')
230 def list_metric_request(self
, key
, message
, topic
):
234 payload_metric_list_req
= json
.loads(open(os
.path
.join(json_path
,
235 'list_metric_req.json')).read())
238 value
=json
.dumps(payload_metric_list_req
),
239 topic
='metric_request')
241 def list_metric_response(self
, key
, message
, topic
):
245 payload_metric_list_resp
= json
.loads(open(os
.path
.join(json_path
,
246 'list_metrics_resp.json')).read())
249 value
=json
.dumps(payload_metric_list_resp
),
250 topic
='metric_response')
253 def delete_metric_request(self
, key
, message
, topic
):
257 payload_delete_metric_req
= json
.loads(open(os
.path
.join(json_path
,
258 'delete_metric_req.json')).read())
261 value
=json
.dumps(payload_delete_metric_req
),
262 topic
='metric_request')
265 def delete_metric_response(self
, key
, message
, topic
):
269 payload_delete_metric_resp
= json
.loads(open(os
.path
.join(json_path
,
270 'delete_metric_resp.json')).read())
273 value
=json
.dumps(payload_delete_metric_resp
),
274 topic
='metric_response')
277 def update_metric_request(self
, key
, message
, topic
):
281 payload_update_metric_req
= json
.loads(open(os
.path
.join(json_path
,
282 'update_metric_req.json')).read())
285 value
=json
.dumps(payload_update_metric_req
),
286 topic
='metric_request')
289 def update_metric_response(self
, key
, message
, topic
):
293 payload_update_metric_resp
= json
.loads(open(os
.path
.join(json_path
,
294 'update_metric_resp.json')).read())
297 value
=json
.dumps(payload_update_metric_resp
),
298 topic
='metric_response)
300 def access_credentials(self, key, message, topic):
302 payload_access_credentials = json.loads(open(os.path.join(json_path,
303 'access_credentials
.json
')).read())
306 value=json.dumps(payload_access_credentials),
307 topic='access_credentials
')