Renaming folder structure
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22 '''
23 This is a kafka producer app that interacts with the SO and the plugins of the
24 datacenters like OpenStack, VMWare, AWS.
25 '''
26
27 from kafka import KafkaProducer as kaf
28 from kafka.errors import KafkaError
29 import logging
30 import json
31 import jsmin
32 import os
33 from os import listdir
34 from jsmin import jsmin
35
36 __author__ = "Prithiv Mohan"
37 __date__ = "06/Sep/2017"
38
39 json_path=os.path.abspath(os.pardir+"/MON/osm_mon/core/models/")
40
41 class KafkaProducer(object):
42
43 def __init__(self, topic):
44
45 self._topic = topic
46
47 if "BROKER_URI" in os.environ:
48 broker = os.getenv("BROKER_URI")
49 else:
50 broker = "localhost:9092"
51
52 '''
53 If the broker URI is not set in the env, by default,
54 localhost container is taken as the host because an instance of
55 is already running.
56 '''
57
58 self.producer = kaf(
59 key_serializer=str.encode,
60 value_serializer=lambda v: json.dumps(v).encode('ascii'),
61 bootstrap_servers=broker, api_version=(0, 10))
62
63 def publish(self, key, value, topic=None):
64 try:
65 future = self.producer.send(topic=topic, key=key, value=value)
66 self.producer.flush()
67 except Exception:
68 logging.exception("Error publishing to {} topic." .format(topic))
69 raise
70 try:
71 record_metadata = future.get(timeout=10)
72 logging.debug("TOPIC:", record_metadata.topic)
73 logging.debug("PARTITION:", record_metadata.partition)
74 logging.debug("OFFSET:", record_metadata.offset)
75 except KafkaError:
76 pass
77
78 def create_alarm_request(self, key, message, topic):
79
80 # External to MON
81
82 payload_create_alarm = jsmin(
83 open(os.path.join(json_path, 'create_alarm.json')).read())
84 self.publish(key,
85 value=json.dumps(payload_create_alarm),
86 topic='alarm_request')
87
88 def create_alarm_response(self, key, message, topic):
89
90 # Internal to MON
91
92 payload_create_alarm_resp = jsmin(
93 open(os.path.join(json_path, 'create_alarm_resp.json')).read())
94
95 self.publish(key,
96 value=message,
97 topic='alarm_response')
98
99 def acknowledge_alarm(self, key, message, topic):
100
101 # Internal to MON
102
103 payload_acknowledge_alarm = jsmin(
104 open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
105
106 self.publish(key,
107 value=json.dumps(payload_acknowledge_alarm),
108 topic='alarm_request')
109
110 def list_alarm_request(self, key, message, topic):
111
112 # External to MON
113
114 payload_alarm_list_req = jsmin(
115 open(os.path.join(json_path, 'list_alarm_req.json')).read())
116
117 self.publish(key,
118 value=json.dumps(payload_alarm_list_req),
119 topic='alarm_request')
120
121 def notify_alarm(self, key, message, topic):
122
123 payload_notify_alarm = jsmin(
124 open(os.path.join(json_path, 'notify_alarm.json')).read())
125
126 self.publish(key,
127 value=message,
128 topic='alarm_response')
129
130 def list_alarm_response(self, key, message, topic):
131
132 payload_list_alarm_resp = jsmin(
133 open(os.path.join(json_path, 'list_alarm_resp.json')).read())
134
135 self.publish(key,
136 value=message,
137 topic='alarm_response')
138
139 def update_alarm_request(self, key, message, topic):
140
141 # External to Mon
142
143 payload_update_alarm_req = jsmin(
144 open(os.path.join(json_path, 'update_alarm_req.json')).read())
145
146 self.publish(key,
147 value=json.dumps(payload_update_alarm_req),
148 topic='alarm_request')
149
150 def update_alarm_response(self, key, message, topic):
151
152 # Internal to Mon
153
154 payload_update_alarm_resp = jsmin(
155 open(os.path.join(json_path, 'update_alarm_resp.json')).read())
156
157 self.publish(key,
158 value=message,
159 topic='alarm_response')
160
161 def delete_alarm_request(self, key, message, topic):
162
163 # External to Mon
164
165 payload_delete_alarm_req = jsmin(
166 open(os.path.join(json_path, 'delete_alarm_req.json')).read())
167
168 self.publish(key,
169 value=json.dumps(payload_delete_alarm_req),
170 topic='alarm_request')
171
172 def delete_alarm_response(self, key, message, topic):
173
174 # Internal to Mon
175
176 payload_delete_alarm_resp = jsmin(
177 open(os.path.join(json_path, 'delete_alarm_resp.json')).read())
178
179 self.publish(key,
180 value=message,
181 topic='alarm_response')
182
183 def create_metrics_request(self, key, message, topic):
184
185 # External to Mon
186
187 payload_create_metrics_req = jsmin(
188 open(os.path.join(json_path, 'create_metric_req.json')).read())
189
190 self.publish(key,
191 value=json.dumps(payload_create_metrics_req),
192 topic='metric_request')
193
194 def create_metrics_resp(self, key, message, topic):
195
196 # Internal to Mon
197
198 payload_create_metrics_resp = jsmin(
199 open(os.path.join(json_path, 'create_metric_resp.json')).read())
200
201 self.publish(key,
202 value=message,
203 topic='metric_response')
204
205 def read_metric_data_request(self, key, message, topic):
206
207 # External to Mon
208
209 payload_read_metric_data_request = jsmin(
210 open(os.path.join(json_path, 'read_metric_data_req.json')).read())
211
212 self.publish(key,
213 value=json.dumps(payload_read_metric_data_request),
214 topic='metric_request')
215
216 def read_metric_data_response(self, key, message, topic):
217
218 # Internal to Mon
219
220 payload_metric_data_response = jsmin(
221 open(os.path.join(json_path, 'read_metric_data_resp.json')).read())
222
223 self.publish(key,
224 value=message,
225 topic='metric_response')
226
227 def list_metric_request(self, key, message, topic):
228
229 # External to MON
230
231 payload_metric_list_req = jsmin(
232 open(os.path.join(json_path, 'list_metric_req.json')).read())
233
234 self.publish(key,
235 value=json.dumps(payload_metric_list_req),
236 topic='metric_request')
237
238 def list_metric_response(self, key, message, topic):
239
240 # Internal to MON
241
242 payload_metric_list_resp = jsmin(
243 open(os.path.join(json_path, 'list_metrics_resp.json')).read())
244
245 self.publish(key,
246 value=message,
247 topic='metric_response')
248
249 def delete_metric_request(self, key, message, topic):
250
251 # External to Mon
252
253 payload_delete_metric_req = jsmin(
254 open(os.path.join(json_path, 'delete_metric_req.json')).read())
255
256 self.publish(key,
257 value=json.dumps(payload_delete_metric_req),
258 topic='metric_request')
259
260 def delete_metric_response(self, key, message, topic):
261
262 # Internal to Mon
263
264 payload_delete_metric_resp = jsmin(
265 open(os.path.join(json_path, 'delete_metric_resp.json')).read())
266
267 self.publish(key,
268 value=message,
269 topic='metric_response')
270
271 def update_metric_request(self, key, message, topic):
272
273 # External to Mon
274
275 payload_update_metric_req = jsmin(
276 open(os.path.join(json_path, 'update_metric_req.json')).read())
277
278 self.publish(key,
279 value=json.dumps(payload_update_metric_req),
280 topic='metric_request')
281
282 def update_metric_response(self, key, message, topic):
283
284 # Internal to Mon
285
286 payload_update_metric_resp = jsmin(
287 open(os.path.join(json_path, 'update_metric_resp.json')).read())
288
289 self.publish(key,
290 value=message,
291 topic='metric_response')
292
293 def access_credentials(self, key, message, topic):
294
295 payload_access_credentials = jsmin(
296 open(os.path.join(json_path, 'access_credentials.json')).read())
297
298 self.publish(key,
299 value=json.dumps(payload_access_credentials),
300 topic='access_credentials')