1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Carry out alarming requests via Aodh API."""
27 from core
.message_bus
.producer
import KafkaProducer
29 from kafka
import KafkaConsumer
31 from plugins
.OpenStack
.common
import Common
32 from plugins
.OpenStack
.response
import OpenStack_Response
34 __author__
= "Helena McGough"
37 "Average_Memory_Usage_Above_Threshold",
38 "Read_Latency_Above_Threshold",
39 "Write_Latency_Above_Threshold",
44 "Net_Packets_Dropped",
45 "Packets_in_Above_Threshold",
46 "Packets_out_Above_Threshold",
47 "CPU_Utilization_Above_Threshold"]
53 "CRITICAL": "critical",
54 "INDETERMINATE": "critical"}
64 class Alarming(object):
65 """Carries out alarming requests and responses via Aodh API."""
68 """Create the OpenStack alarming instance."""
69 self
._common
= Common()
71 # TODO(mcgoughh): Remove hardcoded kafkaconsumer
72 # Initialize a generic consumer object to consume message from the SO
73 server
= {'server': 'localhost:9092', 'topic': 'alarm_request'}
74 self
._consumer
= KafkaConsumer(server
['topic'],
76 bootstrap_servers
=server
['server'])
78 # Use the Response class to generate valid json response messages
79 self
._response
= OpenStack_Response()
81 # Initializer a producer to send responses back to SO
82 self
._producer
= KafkaProducer("alarm_response")
85 """Consume info from the message bus to manage alarms."""
86 # Check the alarming functionlity that needs to be performed
87 for message
in self
._consumer
:
89 values
= json
.loads(message
.value
)
90 vim_type
= values
['vim_type'].lower()
92 if vim_type
== "openstack":
93 log
.info("Alarm action required: %s" % (message
.topic
))
95 # Generate and auth_token and endpoint for request
96 auth_token
, endpoint
= self
.authenticate(values
)
98 if message
.key
== "create_alarm_request":
99 # Configure/Update an alarm
100 alarm_details
= values
['alarm_create_request']
102 alarm_id
, alarm_status
= self
.configure_alarm(
103 endpoint
, auth_token
, alarm_details
)
105 # Generate a valid response message, send via producer
107 resp_message
= self
._response
.generate_response(
108 'create_alarm_response', status
=alarm_status
,
110 cor_id
=alarm_details
['correlation_id'])
111 self
._producer
.create_alarm_response(
112 'create_alarm_resonse', resp_message
,
114 except Exception as exc
:
115 log
.warn("Response creation failed: %s", exc
)
117 elif message
.key
== "list_alarm_request":
118 # Check for a specifed: alarm_name, resource_uuid, severity
119 # and generate the appropriate list
120 list_details
= values
['alarm_list_request']
122 name
= list_details
['alarm_name']
123 alarm_list
= self
.list_alarms(
124 endpoint
, auth_token
, alarm_name
=name
)
125 except Exception as a_name
:
126 log
.debug("No name specified for list:%s", a_name
)
128 resource
= list_details
['resource_uuid']
129 alarm_list
= self
.list_alarms(
130 endpoint
, auth_token
, resource_id
=resource
)
131 except Exception as r_id
:
132 log
.debug("No resource id specified for this list:\
135 severe
= list_details
['severity']
136 alarm_list
= self
.list_alarms(
137 endpoint
, auth_token
, severity
=severe
)
138 except Exception as exc
:
139 log
.warn("No severity specified for list: %s.\
140 will return full list.", exc
)
141 alarm_list
= self
.list_alarms(
142 endpoint
, auth_token
)
145 # Generate and send a list response back
146 resp_message
= self
._response
.generate_response(
147 'list_alarm_response', alarm_list
=alarm_list
,
148 cor_id
=list_details
['correlation_id'])
149 self
._producer
.list_alarm_response(
150 'list_alarm_response', resp_message
,
152 except Exception as exc
:
153 log
.warn("Failed to send a valid response back.")
155 elif message
.key
== "delete_alarm_request":
156 request_details
= values
['alarm_delete_request']
157 alarm_id
= request_details
['alarm_uuid']
159 resp_status
= self
.delete_alarm(
160 endpoint
, auth_token
, alarm_id
)
162 # Generate and send a response message
164 resp_message
= self
._response
.generate_response(
165 'delete_alarm_response', alarm_id
=alarm_id
,
167 cor_id
=request_details
['correlation_id'])
168 self
._producer
.delete_alarm_response(
169 'delete_alarm_response', resp_message
,
171 except Exception as exc
:
172 log
.warn("Failed to create delete reponse:%s", exc
)
174 elif message
.key
== "acknowledge_alarm":
175 # Acknowledge that an alarm has been dealt with by the SO
176 alarm_id
= values
['ack_details']['alarm_uuid']
178 response
= self
.update_alarm_state(
179 endpoint
, auth_token
, alarm_id
)
181 # Log if an alarm was reset
183 log
.info("Acknowledged the alarm and cleared it.")
185 log
.warn("Failed to acknowledge/clear the alarm.")
187 elif message
.key
== "update_alarm_request":
188 # Update alarm configurations
189 alarm_details
= values
['alarm_update_request']
191 alarm_id
, status
= self
.update_alarm(
192 endpoint
, auth_token
, alarm_details
)
194 # Generate a response for an update request
196 resp_message
= self
._response
.generate_response(
197 'update_alarm_response', alarm_id
=alarm_id
,
198 cor_id
=alarm_details
['correlation_id'],
200 self
._producer
.update_alarm_response(
201 'update_alarm_response', resp_message
,
203 except Exception as exc
:
204 log
.warn("Failed to send an update response:%s", exc
)
207 log
.debug("Unknown key, no action will be performed")
209 log
.info("Message topic not relevant to this plugin: %s",
214 def configure_alarm(self
, endpoint
, auth_token
, values
):
215 """Create requested alarm in Aodh."""
216 url
= "{}/v2/alarms/".format(endpoint
)
218 # Check if the desired alarm is supported
219 alarm_name
= values
['alarm_name']
220 if alarm_name
not in ALARM_NAMES
:
221 log
.warn("This alarm is not supported, by a valid metric.")
225 metric_name
= values
['metric_name']
226 resource_id
= values
['resource_uuid']
227 # Check the payload for the desired alarm
228 payload
= self
.check_payload(values
, metric_name
, resource_id
,
230 new_alarm
= self
._common
._perform
_request
(
231 url
, auth_token
, req_type
="post", payload
=payload
)
233 return json
.loads(new_alarm
.text
)['alarm_id'], True
234 except Exception as exc
:
235 log
.warn("Alarm creation could not be performed: %s", exc
)
238 def delete_alarm(self
, endpoint
, auth_token
, alarm_id
):
239 """Delete alarm function."""
240 url
= "{}/v2/alarms/%s".format(endpoint
) % (alarm_id
)
243 result
= self
._common
._perform
_request
(
244 url
, auth_token
, req_type
="delete")
245 if str(result
.status_code
) == "404":
246 # If status code is 404 alarm did not exist
251 except Exception as exc
:
252 log
.warn("Failed to delete alarm: %s because %s.", alarm_id
, exc
)
255 def list_alarms(self
, endpoint
, auth_token
,
256 alarm_name
=None, resource_id
=None, severity
=None):
257 """Generate the requested list of alarms."""
258 url
= "{}/v2/alarms/".format(endpoint
)
261 result
= self
._common
._perform
_request
(
262 url
, auth_token
, req_type
="get")
263 if result
is not None:
264 # Check for a specified list based on:
265 # alarm_name, severity, resource_id
266 if alarm_name
is not None:
267 for alarm
in json
.loads(result
.text
):
268 if alarm_name
in str(alarm
):
269 alarm_list
.append(str(alarm
))
270 elif resource_id
is not None:
271 for alarm
in json
.loads(result
.text
):
272 if resource_id
in str(alarm
):
273 alarm_list
.append(str(alarm
))
274 elif severity
is not None:
275 for alarm
in json
.loads(result
.text
):
276 if severity
in str(alarm
):
277 alarm_list
.append(str(alarm
))
279 alarm_list
= result
.text
284 def update_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
285 """Set the state of an alarm to ok when ack message is received."""
286 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
287 payload
= json
.dumps("ok")
290 self
._common
._perform
_request
(
291 url
, auth_token
, req_type
="put", payload
=payload
)
293 except Exception as exc
:
294 log
.warn("Unable to update alarm state: %s", exc
)
297 def update_alarm(self
, endpoint
, auth_token
, values
):
298 """Get alarm name for an alarm configuration update."""
299 # Get already existing alarm details
300 url
= "{}/v2/alarms/%s".format(endpoint
) % values
['alarm_uuid']
302 # Gets current configurations about the alarm
304 result
= self
._common
._perform
_request
(
305 url
, auth_token
, req_type
="get")
306 alarm_name
= json
.loads(result
.text
)['name']
307 rule
= json
.loads(result
.text
)['gnocchi_resources_threshold_rule']
308 alarm_state
= json
.loads(result
.text
)['state']
309 resource_id
= rule
['resource_id']
310 metric_name
= rule
['metric']
311 except Exception as exc
:
312 log
.warn("Failed to retreive existing alarm info: %s.\
313 Can only update OSM created alarms.", exc
)
316 # Generates and check payload configuration for alarm update
317 payload
= self
.check_payload(values
, metric_name
, resource_id
,
318 alarm_name
, alarm_state
=alarm_state
)
320 # Updates the alarm configurations with the valid payload
321 if payload
is not None:
323 update_alarm
= self
._common
._perform
_request
(
324 url
, auth_token
, req_type
="put", payload
=payload
)
326 return json
.loads(update_alarm
.text
)['alarm_id'], True
327 except Exception as exc
:
328 log
.warn("Alarm update could not be performed: %s", exc
)
332 def check_payload(self
, values
, metric_name
, resource_id
,
333 alarm_name
, alarm_state
=None):
334 """Check that the payload is configuration for update/create alarm."""
336 # Check state and severity
337 severity
= values
['severity']
338 if severity
== "INDETERMINATE":
339 alarm_state
= "insufficient data"
340 if alarm_state
is None:
343 statistic
= values
['statistic']
344 # Try to configure the payload for the update/create request
345 # Can only update: threshold, operation, statistic and
346 # the severity of the alarm
347 rule
= {'threshold': values
['threshold_value'],
348 'comparison_operator': values
['operation'].lower(),
349 'metric': metric_name
,
350 'resource_id': resource_id
,
351 'resource_type': 'generic',
352 'aggregation_method': STATISTICS
[statistic
]}
353 payload
= json
.dumps({'state': alarm_state
,
355 'severity': SEVERITIES
[severity
],
356 'type': 'gnocchi_resources_threshold',
357 'gnocchi_resources_threshold_rule': rule
, })
359 except KeyError as exc
:
360 log
.warn("Alarm is not configured correctly: %s", exc
)
363 def authenticate(self
, values
):
364 """Generate an authentication token and endpoint for alarm request."""
366 # Check for a tenant_id
367 auth_token
= self
._common
._authenticate
(
368 tenant_id
=values
['tenant_uuid'])
369 endpoint
= self
._common
.get_endpoint("alarming")
370 except Exception as exc
:
371 log
.warn("Tenant ID is not specified. Will use a generic\
372 authentication: %s", exc
)
373 auth_token
= self
._common
._authenticate
()
374 endpoint
= self
._common
.get_endpoint("alarming")
376 return auth_token
, endpoint
378 def get_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
379 """Get the state of the alarm."""
380 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
383 alarm_state
= self
._common
._perform
_request
(
384 url
, auth_token
, req_type
="get")
385 return json
.loads(alarm_state
.text
)
386 except Exception as exc
:
387 log
.warn("Failed to get the state of the alarm:%s", exc
)