Minor bugs fix
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9
10 # http://www.apache.org/licenses/LICENSE-2.0
11
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
20 ##
21 """This is a common kafka producer app.
22
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
24 and AWS.
25 """
26
27 import logging
28
29 import os
30
31 from jsmin import jsmin
32
33 from kafka import KafkaProducer as kaf
34
35 from kafka.errors import KafkaError
36
37 __author__ = "Prithiv Mohan"
38 __date__ = "06/Sep/2017"
39
40 current_path = os.path.realpath(__file__)
41 json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models'))
42
43 # TODO(): validate all of the request and response messages against the
44 # json_schemas
45
46
47 class KafkaProducer(object):
48 """A common KafkaProducer for requests and responses."""
49
50 def __init__(self, topic):
51 """Initialize the common kafka producer."""
52 self._topic = topic
53
54 if "BROKER_URI" in os.environ:
55 broker = os.getenv("BROKER_URI")
56 else:
57 broker = "localhost:9092"
58
59 '''
60 If the broker URI is not set in the env by default,
61 localhost container is taken as the host because an instance of
62 is already running.
63 '''
64
65 self.producer = kaf(
66 key_serializer=str.encode,
67 value_serializer=str.encode,
68 bootstrap_servers=broker, api_version=(0, 10))
69
70 def publish(self, key, value, topic=None):
71 """Send the required message on the Kafka message bus."""
72 try:
73 future = self.producer.send(topic=topic, key=key, value=value)
74 self.producer.flush()
75 except Exception:
76 logging.exception("Error publishing to {} topic." .format(topic))
77 raise
78 try:
79 record_metadata = future.get(timeout=10)
80 logging.debug("TOPIC:", record_metadata.topic)
81 logging.debug("PARTITION:", record_metadata.partition)
82 logging.debug("OFFSET:", record_metadata.offset)
83 except KafkaError:
84 pass
85
86 def create_alarm_request(self, key, message, topic):
87 """Create alarm request from SO to MON."""
88 # External to MON
89
90 payload_create_alarm = jsmin(
91 open(os.path.join(json_path, 'create_alarm.json')).read())
92 self.publish(key,
93 value=message,
94 topic='alarm_request')
95
96 def create_alarm_response(self, key, message, topic):
97 """Response to a create alarm request from MON to SO."""
98 # Internal to MON
99
100 payload_create_alarm_resp = jsmin(
101 open(os.path.join(json_path, 'create_alarm_resp.json')).read())
102
103 self.publish(key,
104 value=message,
105 topic='alarm_response')
106
107 def acknowledge_alarm(self, key, message, topic):
108 """Alarm acknowledgement request from SO to MON."""
109 # Internal to MON
110
111 payload_acknowledge_alarm = jsmin(
112 open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
113
114 self.publish(key,
115 value=message,
116 topic='alarm_request')
117
118 def list_alarm_request(self, key, message, topic):
119 """List alarms request from SO to MON."""
120 # External to MON
121
122 payload_alarm_list_req = jsmin(
123 open(os.path.join(json_path, 'list_alarm_req.json')).read())
124
125 self.publish(key,
126 value=message,
127 topic='alarm_request')
128
129 def notify_alarm(self, key, message, topic):
130 """Notify of triggered alarm from MON to SO."""
131 payload_notify_alarm = jsmin(
132 open(os.path.join(json_path, 'notify_alarm.json')).read())
133
134 self.publish(key,
135 value=message,
136 topic='alarm_response')
137
138 def list_alarm_response(self, key, message, topic):
139 """Response for list alarms request from MON to SO."""
140 payload_list_alarm_resp = jsmin(
141 open(os.path.join(json_path, 'list_alarm_resp.json')).read())
142
143 self.publish(key,
144 value=message,
145 topic='alarm_response')
146
147 def update_alarm_request(self, key, message, topic):
148 """Update alarm request from SO to MON."""
149 # External to Mon
150
151 payload_update_alarm_req = jsmin(
152 open(os.path.join(json_path, 'update_alarm_req.json')).read())
153
154 self.publish(key,
155 value=message,
156 topic='alarm_request')
157
158 def update_alarm_response(self, key, message, topic):
159 """Response from update alarm request from MON to SO."""
160 # Internal to Mon
161
162 payload_update_alarm_resp = jsmin(
163 open(os.path.join(json_path, 'update_alarm_resp.json')).read())
164
165 self.publish(key,
166 value=message,
167 topic='alarm_response')
168
169 def delete_alarm_request(self, key, message, topic):
170 """Delete alarm request from SO to MON."""
171 # External to Mon
172
173 payload_delete_alarm_req = jsmin(
174 open(os.path.join(json_path, 'delete_alarm_req.json')).read())
175
176 self.publish(key,
177 value=message,
178 topic='alarm_request')
179
180 def delete_alarm_response(self, key, message, topic):
181 """Response for a delete alarm request from MON to SO."""
182 # Internal to Mon
183
184 payload_delete_alarm_resp = jsmin(
185 open(os.path.join(json_path, 'delete_alarm_resp.json')).read())
186
187 self.publish(key,
188 value=message,
189 topic='alarm_response')
190
191 def create_metrics_request(self, key, message, topic):
192 """Create metrics request from SO to MON."""
193 # External to Mon
194
195 payload_create_metrics_req = jsmin(
196 open(os.path.join(json_path, 'create_metric_req.json')).read())
197
198 self.publish(key,
199 value=message,
200 topic='metric_request')
201
202 def create_metrics_resp(self, key, message, topic):
203 """Response for a create metric request from MON to SO."""
204 # Internal to Mon
205
206 payload_create_metrics_resp = jsmin(
207 open(os.path.join(json_path, 'create_metric_resp.json')).read())
208
209 self.publish(key,
210 value=message,
211 topic='metric_response')
212
213 def read_metric_data_request(self, key, message, topic):
214 """Read metric data request from SO to MON."""
215 # External to Mon
216
217 payload_read_metric_data_request = jsmin(
218 open(os.path.join(json_path, 'read_metric_data_req.json')).read())
219
220 self.publish(key,
221 value=message,
222 topic='metric_request')
223
224 def read_metric_data_response(self, key, message, topic):
225 """Response from MON to SO for read metric data request."""
226 # Internal to Mon
227
228 payload_metric_data_response = jsmin(
229 open(os.path.join(json_path, 'read_metric_data_resp.json')).read())
230
231 self.publish(key,
232 value=message,
233 topic='metric_response')
234
235 def list_metric_request(self, key, message, topic):
236 """List metric request from SO to MON."""
237 # External to MON
238
239 payload_metric_list_req = jsmin(
240 open(os.path.join(json_path, 'list_metric_req.json')).read())
241
242 self.publish(key,
243 value=message,
244 topic='metric_request')
245
246 def list_metric_response(self, key, message, topic):
247 """Response from SO to MON for list metrics request."""
248 # Internal to MON
249
250 payload_metric_list_resp = jsmin(
251 open(os.path.join(json_path, 'list_metric_resp.json')).read())
252
253 self.publish(key,
254 value=message,
255 topic='metric_response')
256
257 def delete_metric_request(self, key, message, topic):
258 """Delete metric request from SO to MON."""
259 # External to Mon
260
261 payload_delete_metric_req = jsmin(
262 open(os.path.join(json_path, 'delete_metric_req.json')).read())
263
264 self.publish(key,
265 value=message,
266 topic='metric_request')
267
268 def delete_metric_response(self, key, message, topic):
269 """Response from MON to SO for delete metric request."""
270 # Internal to Mon
271
272 payload_delete_metric_resp = jsmin(
273 open(os.path.join(json_path, 'delete_metric_resp.json')).read())
274
275 self.publish(key,
276 value=message,
277 topic='metric_response')
278
279 def update_metric_request(self, key, message, topic):
280 """Metric update request from SO to MON."""
281 # External to Mon
282
283 payload_update_metric_req = jsmin(
284 open(os.path.join(json_path, 'update_metric_req.json')).read())
285
286 self.publish(key,
287 value=message,
288 topic='metric_request')
289
290 def update_metric_response(self, key, message, topic):
291 """Reponse from MON to SO for metric update."""
292 # Internal to Mon
293
294 payload_update_metric_resp = jsmin(
295 open(os.path.join(json_path, 'update_metric_resp.json')).read())
296
297 self.publish(key,
298 value=message,
299 topic='metric_response')
300
301 def access_credentials(self, key, message, topic):
302 """Send access credentials to MON from SO."""
303 payload_access_credentials = jsmin(
304 open(os.path.join(json_path, 'access_credentials.json')).read())
305
306 self.publish(key,
307 value=message,
308 topic='access_credentials')