Merge "Updated Kafka producer. This producer has the following changes:"
[osm/MON.git] / core / message_bus / producer.py
1 # Copyright© 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22
23 '''
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
26 #TODO: Interfacing with the APIs of the monitoring tool plugins (Prithiv Mohan).
27 '''
28
29 __author__ = "Prithiv Mohan"
30 __date__ = "06/Sep/2017"
31
32
33 from kafka import KafkaProducer
34 from kafka.errors import KafkaError
35 import logging
36 import json
37 import os
38 from os import listdir
39
40
41
42 class KafkaProducer(object):
43
44 def __init__(self, topic, message):
45
46 self._topic= topic
47 self._message = message
48
49 if "ZOOKEEPER_URI" in os.environ:
50 broker = os.getenv("ZOOKEEPER_URI")
51 else:
52 broker = "localhost:2181"
53
54 '''
55 If the zookeeper broker URI is not set in the env, by default,
56 localhost container is taken as the host because an instance of
57 is already running.
58 '''
59
60 producer = KafkaProducer(key_serializer=str.encode,
61 value_serializer=lambda v: json.dumps(v).encode('ascii'),
62 bootstrap_servers=broker, api_version=(0,10))
63
64
65 def publish(self, key, message, topic=None):
66 try:
67 future = producer.send('alarms', key, payload)
68 producer.flush()
69 except Exception:
70 log.exception("Error publishing to {} topic." .format(topic))
71 raise
72 try:
73 record_metadata = future.get(timeout=10)
74 self._log.debug("TOPIC:", record_metadata.topic)
75 self._log.debug("PARTITION:", record_metadata.partition)
76 self._log.debug("OFFSET:", record_metadata.offset)
77 except KafkaError:
78 pass
79
80 json_path = os.path.join(os.pardir+"/models/")
81
82 def create_alarm_request(self, key, message, topic):
83
84 #External to MON
85
86 payload_create_alarm = json.loads(open(os.path.join(json_path,
87 'create_alarm.json')).read())
88 publish(key,
89 value=json.dumps(payload_create_alarm),
90 topic='alarm_request')
91
92 def create_alarm_response(self, key, message, topic):
93
94 #Internal to MON
95
96 payload_create_alarm_resp = json.loads(open(os.path.join(json_path,
97 'create_alarm_resp.json')).read())
98
99 publish(key,
100 value = json.dumps(payload_create_alarm_resp),
101 topic = 'alarm_response')
102
103
104 def list_alarm_request(self, key, message, topic):
105
106 #External to MON
107
108 payload_alarm_list_req = json.loads(open(os.path.join(json_path,
109 'list_alarm_req.json')).read())
110
111 publish(key,
112 value=json.dumps(payload_alarm_list_req),
113 topic='alarm_request')
114
115 def notify_alarm(self, key, message, topic):
116
117 payload_notify_alarm = json.loads(open(os.path.join(json_path,
118 'notify_alarm.json')).read())
119
120 publish(key,
121 value=json.dumps(payload_notify_alarm),
122 topic='alarm_response')
123
124 def list_alarm_response(self, key, message, topic):
125
126 payload_list_alarm_resp = json.loads(open(os.path.join(json_path,
127 'list_alarm_resp.json')).read())
128
129 publish(key,
130 value=json.dumps(payload_list_alarm_resp),
131 topic='alarm_response')
132
133
134 def update_alarm_request(self, key, message, topic):
135
136 # External to Mon
137
138 payload_update_alarm_req = json.loads(open(os.path.join(json_path,
139 'update_alarm_req.json')).read())
140
141 publish(key,
142 value=json.dumps(payload_update_alarm_req),
143 topic='alarm_request')
144
145
146 def update_alarm_response(self, key, message, topic):
147
148 # Internal to Mon
149
150 payload_update_alarm_resp = json.loads(open(os.path.join(json_path,
151 'update_alarm_resp.json')).read())
152
153 publish(key,
154 value=json.dumps(payload_update_alarm_resp),
155 topic='alarm_response')
156
157
158 def delete_alarm_request(self, key, message, topic):
159
160 # External to Mon
161
162 payload_delete_alarm_req = json.loads(open(os.path.join(json_path,
163 'delete_alarm_req.json')).read())
164
165 publish(key,
166 value=json.dumps(payload_delete_alarm_req),
167 topic='alarm_request')
168
169 def delete_alarm_response(self, key, message, topic):
170
171 # Internal to Mon
172
173 payload_delete_alarm_resp = json.loads(open(os.path.join(json_path,
174 'delete_alarm_resp.json')).read())
175
176 publish(key,
177 value=json.dumps(payload_delete_alarm_resp),
178 topic='alarm_response')
179
180
181
182 def create_metrics_request(self, key, message, topic):
183
184 # External to Mon
185
186 payload_create_metrics_req = json.loads(open(os.path.join(json_path,
187 'create_metric_req.json')).read())
188
189 publish(key,
190 value=json.dumps(payload_create_metrics_req),
191 topic='metric_request')
192
193
194 def create_metrics_resp(self, key, message, topic):
195
196 # Internal to Mon
197
198 payload_create_metrics_resp = json.loads(open(os.path.join(json_path,
199 'create_metric_resp.json')).read())
200
201 publish(key,
202 value=json.dumps(payload_create_metrics_resp),
203 topic='metric_response')
204
205
206 def read_metric_data_request(self, key, message, topic):
207
208 # External to Mon
209
210 payload_read_metric_data_request = json.loads(open(os.path.join(json_path,
211 'read_metric_data_req.json')).read())
212
213 publish(key,
214 value=json.dumps(payload_read_metric_data_request),
215 topic='metric_request')
216
217
218 def read_metric_data_response(self, key, message, topic):
219
220 # Internal to Mon
221
222 payload_metric_data_response = json.loads(open(os.path.join(json_path,
223 'read_metric_data_resp.json')).read())
224
225 publish(key,
226 value=json.dumps(payload_metric_data_response),
227 topic='metric_response')
228
229
230 def list_metric_request(self, key, message, topic):
231
232 #External to MON
233
234 payload_metric_list_req = json.loads(open(os.path.join(json_path,
235 'list_metric_req.json')).read())
236
237 publish(key,
238 value=json.dumps(payload_metric_list_req),
239 topic='metric_request')
240
241 def list_metric_response(self, key, message, topic):
242
243 #Internal to MON
244
245 payload_metric_list_resp = json.loads(open(os.path.join(json_path,
246 'list_metrics_resp.json')).read())
247
248 publish(key,
249 value=json.dumps(payload_metric_list_resp),
250 topic='metric_response')
251
252
253 def delete_metric_request(self, key, message, topic):
254
255 # External to Mon
256
257 payload_delete_metric_req = json.loads(open(os.path.join(json_path,
258 'delete_metric_req.json')).read())
259
260 publish(key,
261 value=json.dumps(payload_delete_metric_req),
262 topic='metric_request')
263
264
265 def delete_metric_response(self, key, message, topic):
266
267 # Internal to Mon
268
269 payload_delete_metric_resp = json.loads(open(os.path.join(json_path,
270 'delete_metric_resp.json')).read())
271
272 publish(key,
273 value=json.dumps(payload_delete_metric_resp),
274 topic='metric_response')
275
276
277 def update_metric_request(self, key, message, topic):
278
279 # External to Mon
280
281 payload_update_metric_req = json.loads(open(os.path.join(json_path,
282 'update_metric_req.json')).read())
283
284 publish(key,
285 value=json.dumps(payload_update_metric_req),
286 topic='metric_request')
287
288
289 def update_metric_response(self, key, message, topic):
290
291 # Internal to Mon
292
293 payload_update_metric_resp = json.loads(open(os.path.join(json_path,
294 'update_metric_resp.json')).read())
295
296 publish(key,
297 value=json.dumps(payload_update_metric_resp),
298 topic='metric_response)
299
300 def access_credentials(self, key, message, topic):
301
302 payload_access_credentials = json.loads(open(os.path.join(json_path,
303 'access_credentials.json')).read())
304
305 publish(key,
306 value=json.dumps(payload_access_credentials),
307 topic='access_credentials')