1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Carry out alarming requests via Aodh API."""
27 from core
.message_bus
.producer
import KafkaProducer
29 from kafka
import KafkaConsumer
31 from plugins
.OpenStack
.common
import Common
32 from plugins
.OpenStack
.response
import OpenStack_Response
34 __author__
= "Helena McGough"
37 "Average_Memory_Usage_Above_Threshold",
38 "Read_Latency_Above_Threshold",
39 "Write_Latency_Above_Threshold",
44 "Net_Packets_Dropped",
45 "Packets_in_Above_Threshold",
46 "Packets_out_Above_Threshold",
47 "CPU_Utilization_Above_Threshold"]
53 "CRITICAL": "critical",
54 "INDETERMINATE": "critical"}
57 class Alarming(object):
58 """Carries out alarming requests and responses via Aodh API."""
61 """Create the OpenStack alarming instance."""
62 self
._common
= Common()
64 # TODO(mcgoughh): Remove hardcoded kafkaconsumer
65 # Initialize a generic consumer object to consume message from the SO
66 server
= {'server': 'localhost:9092', 'topic': 'alarm_request'}
67 self
._consumer
= KafkaConsumer(server
['topic'],
69 bootstrap_servers
=server
['server'])
71 # Use the Response class to generate valid json response messages
72 self
._response
= OpenStack_Response()
74 # Initializer a producer to send responses back to SO
75 self
._producer
= KafkaProducer("alarm_response")
78 """Consume info from the message bus to manage alarms."""
79 # Check the alarming functionlity that needs to be performed
80 for message
in self
._consumer
:
82 values
= json
.loads(message
.value
)
83 vim_type
= values
['vim_type'].lower()
85 if vim_type
== "openstack":
86 log
.info("Alarm action required: %s" % (message
.topic
))
88 # Generate and auth_token and endpoint for request
89 auth_token
, endpoint
= self
.authenticate(values
)
91 if message
.key
== "create_alarm_request":
92 # Configure/Update an alarm
93 alarm_details
= values
['alarm_create_request']
95 alarm_id
, alarm_status
= self
.configure_alarm(
96 endpoint
, auth_token
, alarm_details
)
98 # Generate a valid response message, send via producer
100 resp_message
= self
._response
.generate_response(
101 'create_alarm_response', status
=alarm_status
,
103 cor_id
=alarm_details
['correlation_id'])
104 self
._producer
.create_alarm_response(
105 'create_alarm_resonse', resp_message
,
107 except Exception as exc
:
108 log
.warn("Response creation failed: %s", exc
)
110 elif message
.key
== "list_alarm_request":
111 # Check for a specifed: alarm_name, resource_uuid, severity
112 # and generate the appropriate list
113 list_details
= values
['alarm_list_request']
115 name
= list_details
['alarm_name']
116 alarm_list
= self
.list_alarms(
117 endpoint
, auth_token
, alarm_name
=name
)
118 except Exception as a_name
:
119 log
.debug("No name specified for list:%s", a_name
)
121 resource
= list_details
['resource_uuid']
122 alarm_list
= self
.list_alarms(
123 endpoint
, auth_token
, resource_id
=resource
)
124 except Exception as r_id
:
125 log
.debug("No resource id specified for this list:\
128 severe
= list_details
['severity']
129 alarm_list
= self
.list_alarms(
130 endpoint
, auth_token
, severity
=severe
)
131 except Exception as exc
:
132 log
.warn("No severity specified for list: %s.\
133 will return full list.", exc
)
134 alarm_list
= self
.list_alarms(
135 endpoint
, auth_token
)
138 # Generate and send a list response back
139 resp_message
= self
._response
.generate_response(
140 'list_alarm_response', alarm_list
=alarm_list
,
141 cor_id
=list_details
['correlation_id'])
142 self
._producer
.list_alarm_response(
143 'list_alarm_response', resp_message
,
145 except Exception as exc
:
146 log
.warn("Failed to send a valid response back.")
148 elif message
.key
== "delete_alarm_request":
149 request_details
= values
['alarm_delete_request']
150 alarm_id
= request_details
['alarm_uuid']
152 resp_status
= self
.delete_alarm(
153 endpoint
, auth_token
, alarm_id
)
155 # Generate and send a response message
157 resp_message
= self
._response
.generate_response(
158 'delete_alarm_response', alarm_id
=alarm_id
,
160 cor_id
=request_details
['correlation_id'])
161 self
._producer
.delete_alarm_response(
162 'delete_alarm_response', resp_message
,
164 except Exception as exc
:
165 log
.warn("Failed to create delete reponse:%s", exc
)
167 elif message
.key
== "acknowledge_alarm":
168 # Acknowledge that an alarm has been dealt with by the SO
169 alarm_id
= values
['ack_details']['alarm_uuid']
171 response
= self
.update_alarm_state(
172 endpoint
, auth_token
, alarm_id
)
174 # Log if an alarm was reset
176 log
.info("Acknowledged the alarm and cleared it.")
178 log
.warn("Failed to acknowledge/clear the alarm.")
180 elif message
.key
== "update_alarm_request":
181 # Update alarm configurations
182 alarm_details
= values
['alarm_update_request']
184 alarm_id
, status
= self
.update_alarm(
185 endpoint
, auth_token
, alarm_details
)
187 # Generate a response for an update request
189 resp_message
= self
._response
.generate_response(
190 'update_alarm_response', alarm_id
=alarm_id
,
191 cor_id
=alarm_details
['correlation_id'],
193 self
._producer
.update_alarm_response(
194 'update_alarm_response', resp_message
,
196 except Exception as exc
:
197 log
.warn("Failed to send an update response:%s", exc
)
200 log
.debug("Unknown key, no action will be performed")
202 log
.info("Message topic not relevant to this plugin: %s",
207 def configure_alarm(self
, endpoint
, auth_token
, values
):
208 """Create requested alarm in Aodh."""
209 url
= "{}/v2/alarms/".format(endpoint
)
211 # Check if the desired alarm is supported
212 alarm_name
= values
['alarm_name']
213 if alarm_name
not in ALARM_NAMES
:
214 log
.warn("This alarm is not supported, by a valid metric.")
218 metric_name
= values
['metric_name']
219 resource_id
= values
['resource_uuid']
220 # Check the payload for the desired alarm
221 payload
= self
.check_payload(values
, metric_name
, resource_id
,
223 new_alarm
= self
._common
._perform
_request
(
224 url
, auth_token
, req_type
="post", payload
=payload
)
226 return json
.loads(new_alarm
.text
)['alarm_id'], True
227 except Exception as exc
:
228 log
.warn("Alarm creation could not be performed: %s", exc
)
231 def delete_alarm(self
, endpoint
, auth_token
, alarm_id
):
232 """Delete alarm function."""
233 url
= "{}/v2/alarms/%s".format(endpoint
) % (alarm_id
)
236 result
= self
._common
._perform
_request
(
237 url
, auth_token
, req_type
="delete")
238 if str(result
.status_code
) == "404":
239 # If status code is 404 alarm did not exist
244 except Exception as exc
:
245 log
.warn("Failed to delete alarm: %s because %s.", alarm_id
, exc
)
248 def list_alarms(self
, endpoint
, auth_token
,
249 alarm_name
=None, resource_id
=None, severity
=None):
250 """Generate the requested list of alarms."""
251 url
= "{}/v2/alarms/".format(endpoint
)
254 result
= self
._common
._perform
_request
(
255 url
, auth_token
, req_type
="get")
256 if result
is not None:
257 # Check for a specified list based on:
258 # alarm_name, severity, resource_id
259 if alarm_name
is not None:
260 for alarm
in json
.loads(result
.text
):
261 if alarm_name
in str(alarm
):
262 alarm_list
.append(str(alarm
))
263 elif resource_id
is not None:
264 for alarm
in json
.loads(result
.text
):
265 if resource_id
in str(alarm
):
266 alarm_list
.append(str(alarm
))
267 elif severity
is not None:
268 for alarm
in json
.loads(result
.text
):
269 if severity
in str(alarm
):
270 alarm_list
.append(str(alarm
))
272 alarm_list
= result
.text
277 def update_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
278 """Set the state of an alarm to ok when ack message is received."""
279 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
280 payload
= json
.dumps("ok")
283 self
._common
._perform
_request
(
284 url
, auth_token
, req_type
="put", payload
=payload
)
286 except Exception as exc
:
287 log
.warn("Unable to update alarm state: %s", exc
)
290 def update_alarm(self
, endpoint
, auth_token
, values
):
291 """Get alarm name for an alarm configuration update."""
292 # Get already existing alarm details
293 url
= "{}/v2/alarms/%s".format(endpoint
) % values
['alarm_uuid']
295 # Gets current configurations about the alarm
297 result
= self
._common
._perform
_request
(
298 url
, auth_token
, req_type
="get")
299 alarm_name
= json
.loads(result
.text
)['name']
300 rule
= json
.loads(result
.text
)['gnocchi_resources_threshold_rule']
301 alarm_state
= json
.loads(result
.text
)['state']
302 resource_id
= rule
['resource_id']
303 metric_name
= rule
['metric']
304 except Exception as exc
:
305 log
.warn("Failed to retreive existing alarm info: %s.\
306 Can only update OSM created alarms.", exc
)
309 # Generates and check payload configuration for alarm update
310 payload
= self
.check_payload(values
, metric_name
, resource_id
,
311 alarm_name
, alarm_state
=alarm_state
)
313 # Updates the alarm configurations with the valid payload
314 if payload
is not None:
316 update_alarm
= self
._common
._perform
_request
(
317 url
, auth_token
, req_type
="put", payload
=payload
)
319 return json
.loads(update_alarm
.text
)['alarm_id'], True
320 except Exception as exc
:
321 log
.warn("Alarm update could not be performed: %s", exc
)
325 def check_payload(self
, values
, metric_name
, resource_id
,
326 alarm_name
, alarm_state
=None):
327 """Check that the payload is configuration for update/create alarm."""
329 # Check state and severity
330 severity
= values
['severity']
331 if severity
== "INDETERMINATE":
332 alarm_state
= "insufficient data"
333 if alarm_state
is None:
336 # Try to configure the payload for the update/create request
337 # Can only update: threshold, operation, statistic and
338 # the severity of the alarm
339 rule
= {'threshold': values
['threshold_value'],
340 'comparison_operator': values
['operation'].lower(),
341 'metric': metric_name
,
342 'resource_id': resource_id
,
343 'resource_type': 'generic',
344 'aggregation_method': values
['statistic'].lower()}
345 payload
= json
.dumps({'state': alarm_state
,
347 'severity': SEVERITIES
[severity
],
348 'type': 'gnocchi_resources_threshold',
349 'gnocchi_resources_threshold_rule': rule
, })
351 except KeyError as exc
:
352 log
.warn("Alarm is not configured correctly: %s", exc
)
355 def authenticate(self
, values
):
356 """Generate an authentication token and endpoint for alarm request."""
358 # Check for a tenant_id
359 auth_token
= self
._common
._authenticate
(
360 tenant_id
=values
['tenant_uuid'])
361 endpoint
= self
._common
.get_endpoint("alarming")
362 except Exception as exc
:
363 log
.warn("Tenant ID is not specified. Will use a generic\
364 authentication: %s", exc
)
365 auth_token
= self
._common
._authenticate
()
366 endpoint
= self
._common
.get_endpoint("alarming")
368 return auth_token
, endpoint
370 def get_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
371 """Get the state of the alarm."""
372 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
375 alarm_state
= self
._common
._perform
_request
(
376 url
, auth_token
, req_type
="get")
377 return json
.loads(alarm_state
.text
)
378 except Exception as exc
:
379 log
.warn("Failed to get the state of the alarm:%s", exc
)