Updates to the common producer
[osm/MON.git] / osm_mon / core / message_bus / producer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9
10 # http://www.apache.org/licenses/LICENSE-2.0
11
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
20 ##
21 """This is a common kafka producer app.
22
23 It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
24 and AWS.
25 """
26
27 import logging
28
29 import os
30
31 import jsmin
32
33 from kafka import KafkaProducer as kaf
34
35 from kafka.errors import KafkaError
36
37 __author__ = "Prithiv Mohan"
38 __date__ = "06/Sep/2017"
39
40 json_path = os.path.abspath(os.pardir + "/MON/osm_mon/core/models/")
41
42 # TODO(): validate all of the request and response messages against the
43 # json_schemas
44
45
46 class KafkaProducer(object):
47 """A common KafkaProducer for requests and responses."""
48
49 def __init__(self, topic):
50 """Initialize the common kafka producer."""
51 self._topic = topic
52
53 if "BROKER_URI" in os.environ:
54 broker = os.getenv("BROKER_URI")
55 else:
56 broker = "localhost:9092"
57
58 '''
59 If the broker URI is not set in the env by default,
60 localhost container is taken as the host because an instance of
61 is already running.
62 '''
63
64 self.producer = kaf(
65 key_serializer=str.encode,
66 value_serializer=str.encode,
67 bootstrap_servers=broker, api_version=(0, 10))
68
69 def publish(self, key, value, topic=None):
70 """Send the required message on the Kafka message bus."""
71 try:
72 future = self.producer.send(topic=topic, key=key, value=value)
73 self.producer.flush()
74 except Exception:
75 logging.exception("Error publishing to {} topic." .format(topic))
76 raise
77 try:
78 record_metadata = future.get(timeout=10)
79 logging.debug("TOPIC:", record_metadata.topic)
80 logging.debug("PARTITION:", record_metadata.partition)
81 logging.debug("OFFSET:", record_metadata.offset)
82 except KafkaError:
83 pass
84
85 def create_alarm_request(self, key, message, topic):
86 """Create alarm request from SO to MON."""
87 # External to MON
88
89 payload_create_alarm = jsmin(
90 open(os.path.join(json_path, 'create_alarm.json')).read())
91 self.publish(key,
92 value=message,
93 topic='alarm_request')
94
95 def create_alarm_response(self, key, message, topic):
96 """Response to a create alarm request from MON to SO."""
97 # Internal to MON
98
99 payload_create_alarm_resp = jsmin(
100 open(os.path.join(json_path, 'create_alarm_resp.json')).read())
101
102 self.publish(key,
103 value=message,
104 topic='alarm_response')
105
106 def acknowledge_alarm(self, key, message, topic):
107 """Alarm acknowledgement request from SO to MON."""
108 # Internal to MON
109
110 payload_acknowledge_alarm = jsmin(
111 open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
112
113 self.publish(key,
114 value=message,
115 topic='alarm_request')
116
117 def list_alarm_request(self, key, message, topic):
118 """List alarms request from SO to MON."""
119 # External to MON
120
121 payload_alarm_list_req = jsmin(
122 open(os.path.join(json_path, 'list_alarm_req.json')).read())
123
124 self.publish(key,
125 value=message,
126 topic='alarm_request')
127
128 def notify_alarm(self, key, message, topic):
129 """Notify of triggered alarm from MON to SO."""
130 payload_notify_alarm = jsmin(
131 open(os.path.join(json_path, 'notify_alarm.json')).read())
132
133 self.publish(key,
134 value=message,
135 topic='alarm_response')
136
137 def list_alarm_response(self, key, message, topic):
138 """Response for list alarms request from MON to SO."""
139 payload_list_alarm_resp = jsmin(
140 open(os.path.join(json_path, 'list_alarm_resp.json')).read())
141
142 self.publish(key,
143 value=message,
144 topic='alarm_response')
145
146 def update_alarm_request(self, key, message, topic):
147 """Update alarm request from SO to MON."""
148 # External to Mon
149
150 payload_update_alarm_req = jsmin(
151 open(os.path.join(json_path, 'update_alarm_req.json')).read())
152
153 self.publish(key,
154 value=message,
155 topic='alarm_request')
156
157 def update_alarm_response(self, key, message, topic):
158 """Response from update alarm request from MON to SO."""
159 # Internal to Mon
160
161 payload_update_alarm_resp = jsmin(
162 open(os.path.join(json_path, 'update_alarm_resp.json')).read())
163
164 self.publish(key,
165 value=message,
166 topic='alarm_response')
167
168 def delete_alarm_request(self, key, message, topic):
169 """Delete alarm request from SO to MON."""
170 # External to Mon
171
172 payload_delete_alarm_req = jsmin(
173 open(os.path.join(json_path, 'delete_alarm_req.json')).read())
174
175 self.publish(key,
176 value=message,
177 topic='alarm_request')
178
179 def delete_alarm_response(self, key, message, topic):
180 """Response for a delete alarm request from MON to SO."""
181 # Internal to Mon
182
183 payload_delete_alarm_resp = jsmin(
184 open(os.path.join(json_path, 'delete_alarm_resp.json')).read())
185
186 self.publish(key,
187 value=message,
188 topic='alarm_response')
189
190 def create_metrics_request(self, key, message, topic):
191 """Create metrics request from SO to MON."""
192 # External to Mon
193
194 payload_create_metrics_req = jsmin(
195 open(os.path.join(json_path, 'create_metric_req.json')).read())
196
197 self.publish(key,
198 value=message,
199 topic='metric_request')
200
201 def create_metrics_resp(self, key, message, topic):
202 """Response for a create metric request from MON to SO."""
203 # Internal to Mon
204
205 payload_create_metrics_resp = jsmin(
206 open(os.path.join(json_path, 'create_metric_resp.json')).read())
207
208 self.publish(key,
209 value=message,
210 topic='metric_response')
211
212 def read_metric_data_request(self, key, message, topic):
213 """Read metric data request from SO to MON."""
214 # External to Mon
215
216 payload_read_metric_data_request = jsmin(
217 open(os.path.join(json_path, 'read_metric_data_req.json')).read())
218
219 self.publish(key,
220 value=message,
221 topic='metric_request')
222
223 def read_metric_data_response(self, key, message, topic):
224 """Response from MON to SO for read metric data request."""
225 # Internal to Mon
226
227 payload_metric_data_response = jsmin(
228 open(os.path.join(json_path, 'read_metric_data_resp.json')).read())
229
230 self.publish(key,
231 value=message,
232 topic='metric_response')
233
234 def list_metric_request(self, key, message, topic):
235 """List metric request from SO to MON."""
236 # External to MON
237
238 payload_metric_list_req = jsmin(
239 open(os.path.join(json_path, 'list_metric_req.json')).read())
240
241 self.publish(key,
242 value=message,
243 topic='metric_request')
244
245 def list_metric_response(self, key, message, topic):
246 """Response from SO to MON for list metrics request."""
247 # Internal to MON
248
249 payload_metric_list_resp = jsmin(
250 open(os.path.join(json_path, 'list_metrics_resp.json')).read())
251
252 self.publish(key,
253 value=message,
254 topic='metric_response')
255
256 def delete_metric_request(self, key, message, topic):
257 """Delete metric request from SO to MON."""
258 # External to Mon
259
260 payload_delete_metric_req = jsmin(
261 open(os.path.join(json_path, 'delete_metric_req.json')).read())
262
263 self.publish(key,
264 value=message,
265 topic='metric_request')
266
267 def delete_metric_response(self, key, message, topic):
268 """Response from MON to SO for delete metric request."""
269 # Internal to Mon
270
271 payload_delete_metric_resp = jsmin(
272 open(os.path.join(json_path, 'delete_metric_resp.json')).read())
273
274 self.publish(key,
275 value=message,
276 topic='metric_response')
277
278 def update_metric_request(self, key, message, topic):
279 """Metric update request from SO to MON."""
280 # External to Mon
281
282 payload_update_metric_req = jsmin(
283 open(os.path.join(json_path, 'update_metric_req.json')).read())
284
285 self.publish(key,
286 value=message,
287 topic='metric_request')
288
289 def update_metric_response(self, key, message, topic):
290 """Reponse from MON to SO for metric update."""
291 # Internal to Mon
292
293 payload_update_metric_resp = jsmin(
294 open(os.path.join(json_path, 'update_metric_resp.json')).read())
295
296 self.publish(key,
297 value=message,
298 topic='metric_response')
299
300 def access_credentials(self, key, message, topic):
301 """Send access credentials to MON from SO."""
302 payload_access_credentials = jsmin(
303 open(os.path.join(json_path, 'access_credentials.json')).read())
304
305 self.publish(key,
306 value=message,
307 topic='access_credentials')