49562921231e0c64e428bba2223b0f47e2868607
[osm/MON.git] / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22
23 '''
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
26 '''
27
28 __author__ = "Prithiv Mohan"
29 __date__ = "06/Sep/2017"
30
31
32 from kafka import KafkaProducer as kaf
33 from kafka.errors import KafkaError
34 import logging
35 import json
36 import jsmin
37 import os
38 from os import listdir
39 from jsmin import jsmin
40
41 json_path = os.path.join(os.pardir+"/models/")
42
43
44 class KafkaProducer(object):
45
46 def __init__(self, topic):
47
48 self._topic= topic
49
50 if "ZOOKEEPER_URI" in os.environ:
51 broker = os.getenv("ZOOKEEPER_URI")
52 else:
53 broker = "localhost:2181"
54
55 '''
56 If the zookeeper broker URI is not set in the env, by default,
57 localhost container is taken as the host because an instance of
58 is already running.
59 '''
60
61 self.producer = kaf(key_serializer=str.encode,
62 value_serializer=lambda v: json.dumps(v).encode('ascii'),
63 bootstrap_servers=broker, api_version=(0,10))
64
65
66 def publish(self, key, value, topic=None):
67 try:
68 future = self.producer.send(key, value, topic)
69 self.producer.flush()
70 except Exception:
71 logging.exception("Error publishing to {} topic." .format(topic))
72 raise
73 try:
74 record_metadata = future.get(timeout=10)
75 logging.debug("TOPIC:", record_metadata.topic)
76 logging.debug("PARTITION:", record_metadata.partition)
77 logging.debug("OFFSET:", record_metadata.offset)
78 except KafkaError:
79 pass
80
81 def create_alarm_request(self, key, message, topic):
82
83 #External to MON
84
85 payload_create_alarm = jsmin(open(os.path.join(json_path,
86 'create_alarm.json')).read())
87 self.publish(key,
88 value=json.dumps(payload_create_alarm),
89 topic='alarm_request')
90
91 def create_alarm_response(self, key, message, topic):
92
93 #Internal to MON
94
95 payload_create_alarm_resp = jsmin(open(os.path.join(json_path,
96 'create_alarm_resp.json')).read())
97
98 self.publish(key,
99 value = message,
100 topic = 'alarm_response')
101
102 def acknowledge_alarm(self, key, message, topic):
103
104 #Internal to MON
105
106 payload_acknowledge_alarm = jsmin(open(os.path.join(json_path,
107 'acknowledge_alarm.json')).read())
108
109 self.publish(key,
110 value = json.dumps(payload_acknowledge_alarm),
111 topic = 'alarm_request')
112
113 def list_alarm_request(self, key, message, topic):
114
115 #External to MON
116
117 payload_alarm_list_req = jsmin(open(os.path.join(json_path,
118 'list_alarm_req.json')).read())
119
120 self.publish(key,
121 value=json.dumps(payload_alarm_list_req),
122 topic='alarm_request')
123
124 def notify_alarm(self, key, message, topic):
125
126 payload_notify_alarm = jsmin(open(os.path.join(json_path,
127 'notify_alarm.json')).read())
128
129 self.publish(key,
130 value=message,
131 topic='alarm_response')
132
133 def list_alarm_response(self, key, message, topic):
134
135 payload_list_alarm_resp = jsmin(open(os.path.join(json_path,
136 'list_alarm_resp.json')).read())
137
138 self.publish(key,
139 value=message,
140 topic='alarm_response')
141
142
143 def update_alarm_request(self, key, message, topic):
144
145 # External to Mon
146
147 payload_update_alarm_req = jsmin(open(os.path.join(json_path,
148 'update_alarm_req.json')).read())
149
150 self.publish(key,
151 value=json.dumps(payload_update_alarm_req),
152 topic='alarm_request')
153
154
155 def update_alarm_response(self, key, message, topic):
156
157 # Internal to Mon
158
159 payload_update_alarm_resp = jsmin(open(os.path.join(json_path,
160 'update_alarm_resp.json')).read())
161
162 self.publish(key,
163 value=message,
164 topic='alarm_response')
165
166
167 def delete_alarm_request(self, key, message, topic):
168
169 # External to Mon
170
171 payload_delete_alarm_req = jsmin(open(os.path.join(json_path,
172 'delete_alarm_req.json')).read())
173
174 self.publish(key,
175 value=json.dumps(payload_delete_alarm_req),
176 topic='alarm_request')
177
178 def delete_alarm_response(self, key, message, topic):
179
180 # Internal to Mon
181
182 payload_delete_alarm_resp = jsmin(open(os.path.join(json_path,
183 'delete_alarm_resp.json')).read())
184
185 self.publish(key,
186 value=message,
187 topic='alarm_response')
188
189
190
191 def create_metrics_request(self, key, message, topic):
192
193 # External to Mon
194
195 payload_create_metrics_req = jsmin(open(os.path.join(json_path,
196 'create_metric_req.json')).read())
197
198 self.publish(key,
199 value=json.dumps(payload_create_metrics_req),
200 topic='metric_request')
201
202
203 def create_metrics_resp(self, key, message, topic):
204
205 # Internal to Mon
206
207 payload_create_metrics_resp = jsmin(open(os.path.join(json_path,
208 'create_metric_resp.json')).read())
209
210 self.publish(key,
211 value=message,
212 topic='metric_response')
213
214
215 def read_metric_data_request(self, key, message, topic):
216
217 # External to Mon
218
219 payload_read_metric_data_request = jsmin(open(os.path.join(json_path,
220 'read_metric_data_req.json')).read())
221
222 self.publish(key,
223 value=json.dumps(payload_read_metric_data_request),
224 topic='metric_request')
225
226
227 def read_metric_data_response(self, key, message, topic):
228
229 # Internal to Mon
230
231 payload_metric_data_response = jsmin(open(os.path.join(json_path,
232 'read_metric_data_resp.json')).read())
233
234 self.publish(key,
235 value=message,
236 topic='metric_response')
237
238
239 def list_metric_request(self, key, message, topic):
240
241 #External to MON
242
243 payload_metric_list_req = jsmin(open(os.path.join(json_path,
244 'list_metric_req.json')).read())
245
246 self.publish(key,
247 value=json.dumps(payload_metric_list_req),
248 topic='metric_request')
249
250 def list_metric_response(self, key, message, topic):
251
252 #Internal to MON
253
254 payload_metric_list_resp = jsmin(open(os.path.join(json_path,
255 'list_metrics_resp.json')).read())
256
257 self.publish(key,
258 value=message,
259 topic='metric_response')
260
261
262 def delete_metric_request(self, key, message, topic):
263
264 # External to Mon
265
266 payload_delete_metric_req = jsmin(open(os.path.join(json_path,
267 'delete_metric_req.json')).read())
268
269 self.publish(key,
270 value=json.dumps(payload_delete_metric_req),
271 topic='metric_request')
272
273
274 def delete_metric_response(self, key, message, topic):
275
276 # Internal to Mon
277
278 payload_delete_metric_resp = jsmin(open(os.path.join(json_path,
279 'delete_metric_resp.json')).read())
280
281 self.publish(key,
282 value=message,
283 topic='metric_response')
284
285
286 def update_metric_request(self, key, message, topic):
287
288 # External to Mon
289
290 payload_update_metric_req = jsmin(open(os.path.join(json_path,
291 'update_metric_req.json')).read())
292
293 self.publish(key,
294 value=json.dumps(payload_update_metric_req),
295 topic='metric_request')
296
297
298 def update_metric_response(self, key, message, topic):
299
300 # Internal to Mon
301
302 payload_update_metric_resp = jsmin(open(os.path.join(json_path,
303 'update_metric_resp.json')).read())
304
305 self.publish(key,
306 value=message,
307 topic='metric_response')
308
309 def access_credentials(self, key, message, topic):
310
311 payload_access_credentials = jsmin(open(os.path.join(json_path,
312 'access_credentials.json')).read())
313
314 self.publish(key,
315 value=json.dumps(payload_access_credentials),
316 topic='access_credentials')