Merge "Updated the OpenStack plugins"
[osm/MON.git] / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22
23 '''
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
26 '''
27
28 __author__ = "Prithiv Mohan"
29 __date__ = "06/Sep/2017"
30
31
32 from kafka import KafkaProducer
33 from kafka.errors import KafkaError
34 import logging
35 import json
36 import jsmin
37 import os
38 from os import listdir
39 from jsmin import jsmin
40
41 class KafkaProducer(object):
42
43 def __init__(self, topic):
44
45 self._topic= topic
46
47 if "ZOOKEEPER_URI" in os.environ:
48 broker = os.getenv("ZOOKEEPER_URI")
49 else:
50 broker = "localhost:2181"
51
52 '''
53 If the zookeeper broker URI is not set in the env, by default,
54 localhost container is taken as the host because an instance of
55 is already running.
56 '''
57
58 producer = KafkaProducer(key_serializer=str.encode,
59 value_serializer=lambda v: json.dumps(v).encode('ascii'),
60 bootstrap_servers=broker, api_version=(0,10))
61
62
63 def publish(self, key, message, topic=None):
64 try:
65 future = producer.send('key', 'payload',group_id='osm_mon')
66 producer.flush()
67 except Exception:
68 log.exception("Error publishing to {} topic." .format(topic))
69 raise
70 try:
71 record_metadata = future.get(timeout=10)
72 self._log.debug("TOPIC:", record_metadata.topic)
73 self._log.debug("PARTITION:", record_metadata.partition)
74 self._log.debug("OFFSET:", record_metadata.offset)
75 except KafkaError:
76 pass
77
78 json_path = os.path.join(os.pardir+"/models/")
79
80 def create_alarm_request(self, key, message, topic):
81
82 #External to MON
83
84 payload_create_alarm = jsmin(open(os.path.join(json_path,
85 'create_alarm.json')).read())
86 self.publish(key,
87 value=json.dumps(payload_create_alarm),
88 topic='alarm_request')
89
90 def create_alarm_response(self, key, message, topic):
91
92 #Internal to MON
93
94 payload_create_alarm_resp = jsmin(open(os.path.join(json_path,
95 'create_alarm_resp.json')).read())
96
97 self.publish(key,
98 value = message,
99 topic = 'alarm_response')
100
101 def acknowledge_alarm(self, key, message, topic):
102
103 #Internal to MON
104
105 payload_acknowledge_alarm = jsmin(open(os.path.join(json_path,
106 'acknowledge_alarm.json')).read())
107
108 self.publish(key,
109 value = json.dumps(payload_acknowledge_alarm),
110 topic = 'alarm_request')
111
112 def list_alarm_request(self, key, message, topic):
113
114 #External to MON
115
116 payload_alarm_list_req = jsmin(open(os.path.join(json_path,
117 'list_alarm_req.json')).read())
118
119 self.publish(key,
120 value=json.dumps(payload_alarm_list_req),
121 topic='alarm_request')
122
123 def notify_alarm(self, key, message, topic):
124
125 payload_notify_alarm = jsmin(open(os.path.join(json_path,
126 'notify_alarm.json')).read())
127
128 self.publish(key,
129 value=message,
130 topic='alarm_response')
131
132 def list_alarm_response(self, key, message, topic):
133
134 payload_list_alarm_resp = jsmin(open(os.path.join(json_path,
135 'list_alarm_resp.json')).read())
136
137 self.publish(key,
138 value=message,
139 topic='alarm_response')
140
141
142 def update_alarm_request(self, key, message, topic):
143
144 # External to Mon
145
146 payload_update_alarm_req = jsmin(open(os.path.join(json_path,
147 'update_alarm_req.json')).read())
148
149 self.publish(key,
150 value=json.dumps(payload_update_alarm_req),
151 topic='alarm_request')
152
153
154 def update_alarm_response(self, key, message, topic):
155
156 # Internal to Mon
157
158 payload_update_alarm_resp = jsmin(open(os.path.join(json_path,
159 'update_alarm_resp.json')).read())
160
161 self.publish(key,
162 value=message,
163 topic='alarm_response')
164
165
166 def delete_alarm_request(self, key, message, topic):
167
168 # External to Mon
169
170 payload_delete_alarm_req = jsmin(open(os.path.join(json_path,
171 'delete_alarm_req.json')).read())
172
173 self.publish(key,
174 value=json.dumps(payload_delete_alarm_req),
175 topic='alarm_request')
176
177 def delete_alarm_response(self, key, message, topic):
178
179 # Internal to Mon
180
181 payload_delete_alarm_resp = jsmin(open(os.path.join(json_path,
182 'delete_alarm_resp.json')).read())
183
184 self.publish(key,
185 value=message,
186 topic='alarm_response')
187
188
189
190 def create_metrics_request(self, key, message, topic):
191
192 # External to Mon
193
194 payload_create_metrics_req = jsmin(open(os.path.join(json_path,
195 'create_metric_req.json')).read())
196
197 self.publish(key,
198 value=json.dumps(payload_create_metrics_req),
199 topic='metric_request')
200
201
202 def create_metrics_resp(self, key, message, topic):
203
204 # Internal to Mon
205
206 payload_create_metrics_resp = jsmin(open(os.path.join(json_path,
207 'create_metric_resp.json')).read())
208
209 self.publish(key,
210 value=message,
211 topic='metric_response')
212
213
214 def read_metric_data_request(self, key, message, topic):
215
216 # External to Mon
217
218 payload_read_metric_data_request = jsmin(open(os.path.join(json_path,
219 'read_metric_data_req.json')).read())
220
221 self.publish(key,
222 value=json.dumps(payload_read_metric_data_request),
223 topic='metric_request')
224
225
226 def read_metric_data_response(self, key, message, topic):
227
228 # Internal to Mon
229
230 payload_metric_data_response = jsmin(open(os.path.join(json_path,
231 'read_metric_data_resp.json')).read())
232
233 self.publish(key,
234 value=message,
235 topic='metric_response')
236
237
238 def list_metric_request(self, key, message, topic):
239
240 #External to MON
241
242 payload_metric_list_req = jsmin(open(os.path.join(json_path,
243 'list_metric_req.json')).read())
244
245 self.publish(key,
246 value=json.dumps(payload_metric_list_req),
247 topic='metric_request')
248
249 def list_metric_response(self, key, message, topic):
250
251 #Internal to MON
252
253 payload_metric_list_resp = jsmin(open(os.path.join(json_path,
254 'list_metrics_resp.json')).read())
255
256 self.publish(key,
257 value=message,
258 topic='metric_response')
259
260
261 def delete_metric_request(self, key, message, topic):
262
263 # External to Mon
264
265 payload_delete_metric_req = jsmin(open(os.path.join(json_path,
266 'delete_metric_req.json')).read())
267
268 self.publish(key,
269 value=json.dumps(payload_delete_metric_req),
270 topic='metric_request')
271
272
273 def delete_metric_response(self, key, message, topic):
274
275 # Internal to Mon
276
277 payload_delete_metric_resp = jsmin(open(os.path.join(json_path,
278 'delete_metric_resp.json')).read())
279
280 self.publish(key,
281 value=message,
282 topic='metric_response')
283
284
285 def update_metric_request(self, key, message, topic):
286
287 # External to Mon
288
289 payload_update_metric_req = jsmin(open(os.path.join(json_path,
290 'update_metric_req.json')).read())
291
292 self.publish(key,
293 value=json.dumps(payload_update_metric_req),
294 topic='metric_request')
295
296
297 def update_metric_response(self, key, message, topic):
298
299 # Internal to Mon
300
301 payload_update_metric_resp = jsmin(open(os.path.join(json_path,
302 'update_metric_resp.json')).read())
303
304 self.publish(key,
305 value=message,
306 topic='metric_response')
307
308 def access_credentials(self, key, message, topic):
309
310 payload_access_credentials = jsmin(open(os.path.join(json_path,
311 'access_credentials.json')).read())
312
313 self.publish(key,
314 value=json.dumps(payload_access_credentials),
315 topic='access_credentials')