0f4a2dac78816b190e08980dc54a2fca49d94b8c
[osm/MON.git] / plugins / OpenStack / Aodh / alarming.py
1 """Send alarm info from Aodh to SO via MON."""
2
3 import json
4 import logging as log
5
6 from collections import OrderedDict
7
8 from kafka import KafkaConsumer
9
10 from plugins.OpenStack.common import Common
11
12
13 SEVERITIES = {
14 "WARNING": "low",
15 "MINOR": "low",
16 "MAJOR": "moderate",
17 "CRITICAL": "critical",
18 "INDETERMINATE": "critical"}
19
20
21 class Alarming(object):
22 """Receives alarm info from Aodh."""
23
24 def __init__(self):
25 """Create the aodh_receiver instance."""
26 self._common = Common()
27 self.auth_token = None
28 self.endpoint = None
29 self.resp_status = None
30
31 # TODO(mcgoughh): Remove hardcoded kafkaconsumer
32 # Initialize a generic consumer object to consume message from the SO
33 server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
34 self._consumer = KafkaConsumer(server['topic'],
35 group_id='osm_mon',
36 bootstrap_servers=server['server'])
37
38 # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
39
40 def alarming(self):
41 """Consume info from the message bus to manage alarms."""
42 # Check the alarming functionlity that needs to be performed
43 for message in self._consumer:
44
45 values = json.loads(message.value)
46 vim_type = values['vim_type'].lower()
47
48 if vim_type == "openstack":
49 log.info("Alarm action required: %s" % (message.topic))
50
51 if message.key == "create_alarm_request":
52 # Configure/Update an alarm
53 alarm_details = values['alarm_create_request']
54
55 # Generate an auth_token and endpoint
56 auth_token = self._common._authenticate(
57 tenant_id=alarm_details['tenant_uuid'])
58 endpoint = self._common.get_endpoint("alarming")
59
60 alarm_id = self.configure_alarm(
61 endpoint, auth_token, alarm_details)
62
63 # TODO(mcgoughh): will send an acknowledge message back on
64 # the bus via the producer
65 if alarm_id is not None:
66 self.resp_status = True
67 log.debug("A valid alarm was found/created: %s",
68 self.resp_status)
69 else:
70 self.resp_status = False
71 log.debug("Failed to create desired alarm: %s",
72 self.resp_status)
73
74 elif message.key == "list_alarm_request":
75 auth_token = self._common._authenticate()
76 endpoint = self._common.get_endpoint("alarming")
77
78 # List all of the alarms
79 alarm_list = self.list_alarms(endpoint, auth_token)
80
81 # TODO(mcgoughh): send a repsonse back to SO
82 if alarm_list is not None:
83 self.resp_status = True
84 log.info("A list of alarms was generated: %s",
85 alarm_list)
86 else:
87 self.resp_status = False
88 log.warn("Failed to generae an alarm list")
89
90 elif message.key == "delete_alarm_request":
91 # Delete the specified alarm
92 auth_token = self._common._authenticate()
93 endpoint = self._common.get_endpoint("alarming")
94
95 alarm_id = values['alarm_delete_request']['alarm_uuid']
96
97 response = self.delete_alarm(
98 endpoint, auth_token, alarm_id)
99
100 # TODO(mcgoughh): send a response back on the bus
101 if response is True:
102 log.info("Requested alarm has been deleted: %s",
103 alarm_id)
104 else:
105 log.warn("Failed to delete requested alarm.")
106
107 elif message.key == "acknowledge_alarm":
108 # Acknowledge that an alarm has been dealt with by the SO
109 # Set its state to ok
110 auth_token = self._common._authenticate()
111 endpoint = self._common.get_endpoint("alarming")
112
113 alarm_id = values['ack_details']['alarm_uuid']
114
115 response = self.update_alarm_state(
116 endpoint, auth_token, alarm_id)
117
118 if response is True:
119 log.info("Status has been updated for alarm, %s.",
120 alarm_id)
121 else:
122 log.warn("Failed update the state of requested alarm.")
123
124 elif message.key == "update_alarm_request":
125 # Update alarm configurations
126 auth_token = self._common._authenticate()
127 endpoint = self._common.get_endpoint("alarming")
128
129 alarm_details = values['alarm_update_request']
130
131 alarm_id = self.update_alarm(
132 endpoint, auth_token, alarm_details)
133
134 # TODO(mcgoughh): send a response message to the SO
135 if alarm_id is not None:
136 log.info("Alarm configuration was update correctly.")
137 else:
138 log.warn("Unable to update the specified alarm")
139
140 else:
141 log.debug("Unknown key, no action will be performed")
142 else:
143 log.info("Message topic not relevant to this plugin: %s",
144 message.topic)
145
146 return
147
148 def get_alarm_id(self, endpoint, auth_token, alarm_name):
149 """Get a list of alarms that exist in Aodh."""
150 alarm_id = None
151 url = "{}/v2/alarms/".format(endpoint)
152
153 # TODO(mcgoughh): will query on resource_id once it has been
154 # implemented need to create the query field when creating
155 # the alarm
156 query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
157 ("q.value", alarm_name)])
158
159 result = self._common._perform_request(
160 url, auth_token, req_type="get", params=query)
161
162 try:
163 alarm_id = json.loads(result.text)[0]['alarm_id']
164 log.info("An existing alarm was found: %s", alarm_id)
165 return alarm_id
166 except Exception:
167 log.debug("Alarm doesn't exist, needs to be created.")
168 return alarm_id
169
170 def configure_alarm(self, endpoint, auth_token, values):
171 """Create requested alarm in Aodh."""
172 url = "{}/v2/alarms/".format(endpoint)
173
174 alarm_name = values['alarm_name']
175
176 # Confirm alarm doesn't exist
177 alarm_id = self.get_alarm_id(endpoint, auth_token, alarm_name)
178 if alarm_id is None:
179 # Try to create the alarm
180 try:
181 metric_name = values['metric_name']
182 resource_id = values['resource_uuid']
183 payload = self.check_payload(values, metric_name, resource_id,
184 alarm_name)
185 new_alarm = self._common._perform_request(
186 url, auth_token, req_type="post", payload=payload)
187
188 return json.loads(new_alarm.text)['alarm_id']
189 except Exception as exc:
190 log.warn("Alarm creation could not be performed: %s", exc)
191 return alarm_id
192 else:
193 log.warn("This alarm already exists. Try an update instead.")
194 return None
195
196 def delete_alarm(self, endpoint, auth_token, alarm_id):
197 """Delete alarm function."""
198 url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
199
200 result = False
201 try:
202 self._common._perform_request(url, auth_token, req_type="delete")
203 return True
204 except Exception as exc:
205 log.warn("Failed to delete alarm: %s because %s.", alarm_id, exc)
206 return result
207
208 def list_alarms(self, endpoint, auth_token,
209 alarm_name=None, resource_id=None, severity=None):
210 """Generate the requested list of alarms."""
211 result = None
212 if (alarm_name and resource_id and severity) is None:
213 # List all alarms
214 url = "{}/v2/alarms/".format(endpoint)
215
216 try:
217 result = self._common._perform_request(
218 url, auth_token, req_type="get")
219 return json.loads(result.text)
220 except Exception as exc:
221 log.warn("Unable to generate alarm list: %s", exc)
222
223 return result
224 else:
225 # TODO(mcgoughh): support more specific lists
226 log.debug("Requested list is unavailable")
227
228 return result
229
230 def update_alarm_state(self, endpoint, auth_token, alarm_id):
231 """Set the state of an alarm to ok when ack message is received."""
232 result = False
233
234 url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
235 payload = json.dumps("ok")
236
237 try:
238 result = self._common._perform_request(
239 url, auth_token, req_type="put", payload=payload)
240 return True
241 except Exception as exc:
242 log.warn("Unable to update alarm state: %s", exc)
243 return result
244
245 def update_alarm(self, endpoint, auth_token, values):
246 """Get alarm name for an alarm configuration update."""
247 # Get already existing alarm details
248 url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
249
250 try:
251 result = self._common._perform_request(
252 url, auth_token, req_type="get")
253 alarm_name = json.loads(result.text)['name']
254 rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
255 alarm_state = json.loads(result.text)['state']
256 resource_id = rule['resource_id']
257 metric_name = rule['metric']
258 except Exception as exc:
259 log.warn("Failed to retreive existing alarm info: %s.\
260 Can only update OSM created alarms.", exc)
261 return None
262
263 # Genate and check payload configuration for alarm update
264 payload = self.check_payload(values, metric_name, resource_id,
265 alarm_name, alarm_state=alarm_state)
266
267 if payload is not None:
268 try:
269 update_alarm = self._common._perform_request(
270 url, auth_token, req_type="put", payload=payload)
271
272 return json.loads(update_alarm.text)['alarm_id']
273 except Exception as exc:
274 log.warn("Alarm update could not be performed: %s", exc)
275 return None
276 return None
277
278 def check_payload(self, values, metric_name, resource_id,
279 alarm_name, alarm_state=None):
280 """Check that the payload is configuration for update/create alarm."""
281 try:
282 # Check state and severity
283 severity = values['severity']
284 if severity == "INDETERMINATE":
285 alarm_state = "insufficient data"
286
287 if alarm_state is None:
288 alarm_state = "ok"
289
290 # Try to configure the payload for the update/create request
291 rule = {'threshold': values['threshold_value'],
292 'comparison_operator': values['operation'].lower(),
293 'metric': metric_name,
294 'resource_id': resource_id,
295 'resource_type': 'generic',
296 'aggregation_method': values['statistic'].lower()}
297 payload = json.dumps({'state': alarm_state,
298 'name': alarm_name,
299 'severity': SEVERITIES[severity],
300 'type': 'gnocchi_resources_threshold',
301 'gnocchi_resources_threshold_rule': rule, })
302 return payload
303 except KeyError as exc:
304 log.warn("Alarm is not configured correctly: %s", exc)
305 return None