Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22 '''
23 This is a kafka producer app that interacts with the SO and the plugins of the
24 datacenters like OpenStack, VMWare, AWS.
25 '''
26
27 from kafka import KafkaProducer as kaf
28 from kafka.errors import KafkaError
29 import logging
30 import json
31 import jsmin
32 import os
33 from os import listdir
34 from jsmin import jsmin
35
36 __author__ = "Prithiv Mohan"
37 __date__ = "06/Sep/2017"
38
39 json_path = os.path.join(os.pardir+"/models/")
40
41
42 class KafkaProducer(object):
43
44 def __init__(self, topic):
45
46 self._topic = topic
47
48 if "BROKER_URI" in os.environ:
49 broker = os.getenv("BROKER_URI")
50 else:
51 broker = "localhost:9092"
52
53 '''
54 If the broker URI is not set in the env, by default,
55 localhost container is taken as the host because an instance of
56 is already running.
57 '''
58
59 self.producer = kaf(
60 key_serializer=str.encode,
61 value_serializer=lambda v: json.dumps(v).encode('ascii'),
62 bootstrap_servers=broker, api_version=(0, 10))
63
64 def publish(self, key, value, topic=None):
65 try:
66 future = self.producer.send(topic=topic, key=key, value=value)
67 self.producer.flush()
68 except Exception:
69 logging.exception("Error publishing to {} topic." .format(topic))
70 raise
71 try:
72 record_metadata = future.get(timeout=10)
73 logging.debug("TOPIC:", record_metadata.topic)
74 logging.debug("PARTITION:", record_metadata.partition)
75 logging.debug("OFFSET:", record_metadata.offset)
76 except KafkaError:
77 pass
78
79 def create_alarm_request(self, key, message, topic):
80
81 # External to MON
82
83 payload_create_alarm = jsmin(
84 open(os.path.join(json_path, 'create_alarm.json')).read())
85 self.publish(key,
86 value=json.dumps(payload_create_alarm),
87 topic='alarm_request')
88
89 def create_alarm_response(self, key, message, topic):
90
91 # Internal to MON
92
93 payload_create_alarm_resp = jsmin(
94 open(os.path.join(json_path, 'create_alarm_resp.json')).read())
95
96 self.publish(key,
97 value=message,
98 topic='alarm_response')
99
100 def acknowledge_alarm(self, key, message, topic):
101
102 # Internal to MON
103
104 payload_acknowledge_alarm = jsmin(
105 open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
106
107 self.publish(key,
108 value=json.dumps(payload_acknowledge_alarm),
109 topic='alarm_request')
110
111 def list_alarm_request(self, key, message, topic):
112
113 # External to MON
114
115 payload_alarm_list_req = jsmin(
116 open(os.path.join(json_path, 'list_alarm_req.json')).read())
117
118 self.publish(key,
119 value=json.dumps(payload_alarm_list_req),
120 topic='alarm_request')
121
122 def notify_alarm(self, key, message, topic):
123
124 payload_notify_alarm = jsmin(
125 open(os.path.join(json_path, 'notify_alarm.json')).read())
126
127 self.publish(key,
128 value=message,
129 topic='alarm_response')
130
131 def list_alarm_response(self, key, message, topic):
132
133 payload_list_alarm_resp = jsmin(
134 open(os.path.join(json_path, 'list_alarm_resp.json')).read())
135
136 self.publish(key,
137 value=message,
138 topic='alarm_response')
139
140 def update_alarm_request(self, key, message, topic):
141
142 # External to Mon
143
144 payload_update_alarm_req = jsmin(
145 open(os.path.join(json_path, 'update_alarm_req.json')).read())
146
147 self.publish(key,
148 value=json.dumps(payload_update_alarm_req),
149 topic='alarm_request')
150
151 def update_alarm_response(self, key, message, topic):
152
153 # Internal to Mon
154
155 payload_update_alarm_resp = jsmin(
156 open(os.path.join(json_path, 'update_alarm_resp.json')).read())
157
158 self.publish(key,
159 value=message,
160 topic='alarm_response')
161
162 def delete_alarm_request(self, key, message, topic):
163
164 # External to Mon
165
166 payload_delete_alarm_req = jsmin(
167 open(os.path.join(json_path, 'delete_alarm_req.json')).read())
168
169 self.publish(key,
170 value=json.dumps(payload_delete_alarm_req),
171 topic='alarm_request')
172
173 def delete_alarm_response(self, key, message, topic):
174
175 # Internal to Mon
176
177 payload_delete_alarm_resp = jsmin(
178 open(os.path.join(json_path, 'delete_alarm_resp.json')).read())
179
180 self.publish(key,
181 value=message,
182 topic='alarm_response')
183
184 def create_metrics_request(self, key, message, topic):
185
186 # External to Mon
187
188 payload_create_metrics_req = jsmin(
189 open(os.path.join(json_path, 'create_metric_req.json')).read())
190
191 self.publish(key,
192 value=json.dumps(payload_create_metrics_req),
193 topic='metric_request')
194
195 def create_metrics_resp(self, key, message, topic):
196
197 # Internal to Mon
198
199 payload_create_metrics_resp = jsmin(
200 open(os.path.join(json_path, 'create_metric_resp.json')).read())
201
202 self.publish(key,
203 value=message,
204 topic='metric_response')
205
206 def read_metric_data_request(self, key, message, topic):
207
208 # External to Mon
209
210 payload_read_metric_data_request = jsmin(
211 open(os.path.join(json_path, 'read_metric_data_req.json')).read())
212
213 self.publish(key,
214 value=json.dumps(payload_read_metric_data_request),
215 topic='metric_request')
216
217 def read_metric_data_response(self, key, message, topic):
218
219 # Internal to Mon
220
221 payload_metric_data_response = jsmin(
222 open(os.path.join(json_path, 'read_metric_data_resp.json')).read())
223
224 self.publish(key,
225 value=message,
226 topic='metric_response')
227
228 def list_metric_request(self, key, message, topic):
229
230 # External to MON
231
232 payload_metric_list_req = jsmin(
233 open(os.path.join(json_path, 'list_metric_req.json')).read())
234
235 self.publish(key,
236 value=json.dumps(payload_metric_list_req),
237 topic='metric_request')
238
239 def list_metric_response(self, key, message, topic):
240
241 # Internal to MON
242
243 payload_metric_list_resp = jsmin(
244 open(os.path.join(json_path, 'list_metrics_resp.json')).read())
245
246 self.publish(key,
247 value=message,
248 topic='metric_response')
249
250 def delete_metric_request(self, key, message, topic):
251
252 # External to Mon
253
254 payload_delete_metric_req = jsmin(
255 open(os.path.join(json_path, 'delete_metric_req.json')).read())
256
257 self.publish(key,
258 value=json.dumps(payload_delete_metric_req),
259 topic='metric_request')
260
261 def delete_metric_response(self, key, message, topic):
262
263 # Internal to Mon
264
265 payload_delete_metric_resp = jsmin(
266 open(os.path.join(json_path, 'delete_metric_resp.json')).read())
267
268 self.publish(key,
269 value=message,
270 topic='metric_response')
271
272 def update_metric_request(self, key, message, topic):
273
274 # External to Mon
275
276 payload_update_metric_req = jsmin(
277 open(os.path.join(json_path, 'update_metric_req.json')).read())
278
279 self.publish(key,
280 value=json.dumps(payload_update_metric_req),
281 topic='metric_request')
282
283 def update_metric_response(self, key, message, topic):
284
285 # Internal to Mon
286
287 payload_update_metric_resp = jsmin(
288 open(os.path.join(json_path, 'update_metric_resp.json')).read())
289
290 self.publish(key,
291 value=message,
292 topic='metric_response')
293
294 def access_credentials(self, key, message, topic):
295
296 payload_access_credentials = jsmin(
297 open(os.path.join(json_path, 'access_credentials.json')).read())
298
299 self.publish(key,
300 value=json.dumps(payload_access_credentials),
301 topic='access_credentials')