1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Carry out alarming requests via Aodh API."""
28 from osm_mon
.core
.message_bus
.producer
import KafkaProducer
30 from osm_mon
.plugins
.OpenStack
.response
import OpenStack_Response
31 from osm_mon
.plugins
.OpenStack
.settings
import Config
32 from osm_mon
.plugins
.OpenStack
.Gnocchi
.metrics
import Metrics
34 log
= logging
.getLogger(__name__
)
37 "average_memory_usage_above_threshold": "average_memory_utilization",
38 "disk_read_ops": "disk_read_ops",
39 "disk_write_ops": "disk_write_ops",
40 "disk_read_bytes": "disk_read_bytes",
41 "disk_write_bytes": "disk_write_bytes",
42 "net_packets_dropped": "packets_dropped",
43 "packets_in_above_threshold": "packets_received",
44 "packets_out_above_threshold": "packets_sent",
45 "cpu_utilization_above_threshold": "cpu_utilization"}
48 "average_memory_utilization": "memory.percent",
49 "disk_read_ops": "disk.disk_ops",
50 "disk_write_ops": "disk.disk_ops",
51 "disk_read_bytes": "disk.read.bytes",
52 "disk_write_bytes": "disk.write.bytes",
53 "packets_dropped": "interface.if_dropped",
54 "packets_received": "interface.if_packets",
55 "packets_sent": "interface.if_packets",
56 "cpu_utilization": "cpu_util",
63 "critical": "critical",
64 "indeterminate": "critical"}
74 class Alarming(object):
75 """Carries out alarming requests and responses via Aodh API."""
78 """Create the OpenStack alarming instance."""
79 # Initialize configuration and notifications
80 config
= Config
.instance()
81 config
.read_environ("aodh")
83 # Initialise authentication for API requests
84 self
.auth_token
= None
88 # Use the Response class to generate valid json response messages
89 self
._response
= OpenStack_Response()
91 # Initializer a producer to send responses back to SO
92 self
._producer
= KafkaProducer("alarm_response")
94 def alarming(self
, message
, common
, auth_token
):
95 """Consume info from the message bus to manage alarms."""
96 values
= json
.loads(message
.value
)
99 log
.info("OpenStack alarm action required.")
101 # Generate and auth_token and endpoint for request
102 if auth_token
is not None:
103 if self
.auth_token
!= auth_token
:
104 log
.info("Auth_token for alarming set by access_credentials.")
105 self
.auth_token
= auth_token
107 log
.info("Auth_token has not been updated.")
109 log
.info("Using environment variables to set auth_token for Aodh.")
110 self
.auth_token
= self
.common
._authenticate
()
112 if self
.endpoint
is None:
113 log
.info("Generating a new endpoint for Aodh.")
114 self
.endpoint
= self
.common
.get_endpoint("alarming")
116 if message
.key
== "create_alarm_request":
117 # Configure/Update an alarm
118 alarm_details
= values
['alarm_create_request']
120 alarm_id
, alarm_status
= self
.configure_alarm(
121 self
.endpoint
, self
.auth_token
, alarm_details
)
123 # Generate a valid response message, send via producer
125 if alarm_status
is True:
126 log
.info("Alarm successfully created")
128 resp_message
= self
._response
.generate_response(
129 'create_alarm_response', status
=alarm_status
,
131 cor_id
=alarm_details
['correlation_id'])
132 log
.info("Response Message: %s", resp_message
)
133 self
._producer
.create_alarm_response(
134 'create_alarm_response', resp_message
,
136 except Exception as exc
:
137 log
.warn("Response creation failed: %s", exc
)
139 elif message
.key
== "list_alarm_request":
140 # Check for a specifed: alarm_name, resource_uuid, severity
141 # and generate the appropriate list
142 list_details
= values
['alarm_list_request']
144 alarm_list
= self
.list_alarms(
145 self
.endpoint
, self
.auth_token
, list_details
)
148 # Generate and send a list response back
149 resp_message
= self
._response
.generate_response(
150 'list_alarm_response', alarm_list
=alarm_list
,
151 cor_id
=list_details
['correlation_id'])
152 log
.info("Response Message: %s", resp_message
)
153 self
._producer
.list_alarm_response(
154 'list_alarm_response', resp_message
,
156 except Exception as exc
:
157 log
.warn("Failed to send a valid response back.")
159 elif message
.key
== "delete_alarm_request":
160 request_details
= values
['alarm_delete_request']
161 alarm_id
= request_details
['alarm_uuid']
163 resp_status
= self
.delete_alarm(
164 self
.endpoint
, self
.auth_token
, alarm_id
)
166 # Generate and send a response message
168 resp_message
= self
._response
.generate_response(
169 'delete_alarm_response', alarm_id
=alarm_id
,
171 cor_id
=request_details
['correlation_id'])
172 log
.info("Response message: %s", resp_message
)
173 self
._producer
.delete_alarm_response(
174 'delete_alarm_response', resp_message
,
176 except Exception as exc
:
177 log
.warn("Failed to create delete reponse:%s", exc
)
179 elif message
.key
== "acknowledge_alarm":
180 # Acknowledge that an alarm has been dealt with by the SO
181 alarm_id
= values
['ack_details']['alarm_uuid']
183 response
= self
.update_alarm_state(
184 self
.endpoint
, self
.auth_token
, alarm_id
)
186 # Log if an alarm was reset
188 log
.info("Acknowledged the alarm and cleared it.")
190 log
.warn("Failed to acknowledge/clear the alarm.")
192 elif message
.key
== "update_alarm_request":
193 # Update alarm configurations
194 alarm_details
= values
['alarm_update_request']
196 alarm_id
, status
= self
.update_alarm(
197 self
.endpoint
, self
.auth_token
, alarm_details
)
199 # Generate a response for an update request
201 resp_message
= self
._response
.generate_response(
202 'update_alarm_response', alarm_id
=alarm_id
,
203 cor_id
=alarm_details
['correlation_id'],
205 log
.info("Response message: %s", resp_message
)
206 self
._producer
.update_alarm_response(
207 'update_alarm_response', resp_message
,
209 except Exception as exc
:
210 log
.warn("Failed to send an update response:%s", exc
)
213 log
.debug("Unknown key, no action will be performed")
217 def configure_alarm(self
, endpoint
, auth_token
, values
):
218 """Create requested alarm in Aodh."""
219 url
= "{}/v2/alarms/".format(endpoint
)
221 # Check if the desired alarm is supported
222 alarm_name
= values
['alarm_name'].lower()
223 metric_name
= values
['metric_name'].lower()
224 resource_id
= values
['resource_uuid']
226 if alarm_name
not in ALARM_NAMES
.keys():
227 log
.warn("This alarm is not supported, by a valid metric.")
229 if ALARM_NAMES
[alarm_name
] != metric_name
:
230 log
.warn("This is not the correct metric for this alarm.")
233 # Check for the required metric
234 metric_id
= self
.check_for_metric(auth_token
, metric_name
, resource_id
)
237 if metric_id
is not None:
238 # Create the alarm if metric is available
239 payload
= self
.check_payload(values
, metric_name
, resource_id
,
241 new_alarm
= self
.common
._perform
_request
(
242 url
, auth_token
, req_type
="post", payload
=payload
)
243 return json
.loads(new_alarm
.text
)['alarm_id'], True
245 log
.warn("The required Gnocchi metric does not exist.")
248 except Exception as exc
:
249 log
.warn("Failed to create the alarm: %s", exc
)
252 def delete_alarm(self
, endpoint
, auth_token
, alarm_id
):
253 """Delete alarm function."""
254 url
= "{}/v2/alarms/%s".format(endpoint
) % (alarm_id
)
257 result
= self
.common
._perform
_request
(
258 url
, auth_token
, req_type
="delete")
259 if str(result
.status_code
) == "404":
260 log
.info("Alarm doesn't exist: %s", result
.status_code
)
261 # If status code is 404 alarm did not exist
266 except Exception as exc
:
267 log
.warn("Failed to delete alarm: %s because %s.", alarm_id
, exc
)
270 def list_alarms(self
, endpoint
, auth_token
, list_details
):
271 """Generate the requested list of alarms."""
272 url
= "{}/v2/alarms/".format(endpoint
)
273 a_list
, name_list
, sev_list
, res_list
= [], [], [], []
275 # TODO(mcgoughh): for now resource_id is a mandatory field
276 # Check for a reqource is
278 resource
= list_details
['resource_uuid']
279 except KeyError as exc
:
280 log
.warn("Resource id not specified for list request: %s", exc
)
283 # Checking what fields are specified for a list request
285 name
= list_details
['alarm_name'].lower()
286 if name
not in ALARM_NAMES
.keys():
287 log
.warn("This alarm is not supported, won't be used!")
289 except KeyError as exc
:
290 log
.info("Alarm name isn't specified.")
294 severity
= list_details
['severity'].lower()
295 sev
= SEVERITIES
[severity
]
296 except KeyError as exc
:
297 log
.info("Severity is unspecified/incorrectly configured")
300 # Perform the request to get the desired list
302 result
= self
.common
._perform
_request
(
303 url
, auth_token
, req_type
="get")
305 if result
is not None:
306 # Get list based on resource id
307 for alarm
in json
.loads(result
.text
):
308 rule
= alarm
['gnocchi_resources_threshold_rule']
309 if resource
== rule
['resource_id']:
310 res_list
.append(str(alarm
))
312 log
.info("No alarms for this resource")
315 # Generate specified listed if requested
316 if name
is not None and sev
is not None:
317 log
.info("Return a list of %s alarms with %s severity.",
319 for alarm
in json
.loads(result
.text
):
320 if name
== alarm
['name']:
321 name_list
.append(str(alarm
))
322 for alarm
in json
.loads(result
.text
):
323 if sev
== alarm
['severity']:
324 sev_list
.append(str(alarm
))
325 name_sev_list
= list(set(name_list
).intersection(sev_list
))
326 a_list
= list(set(name_sev_list
).intersection(res_list
))
327 elif name
is not None:
328 log
.info("Returning a %s list of alarms.", name
)
329 for alarm
in json
.loads(result
.text
):
330 if name
== alarm
['name']:
331 name_list
.append(str(alarm
))
332 a_list
= list(set(name_list
).intersection(res_list
))
333 elif sev
is not None:
334 log
.info("Returning %s severity alarm list.", sev
)
335 for alarm
in json
.loads(result
.text
):
336 if sev
== alarm
['severity']:
337 sev_list
.append(str(alarm
))
338 a_list
= list(set(sev_list
).intersection(res_list
))
340 log
.info("Returning an entire list of alarms.")
343 log
.info("There are no alarms!")
345 except Exception as exc
:
346 log
.info("Failed to generate required list: %s", exc
)
351 def update_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
352 """Set the state of an alarm to ok when ack message is received."""
353 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
354 payload
= json
.dumps("ok")
357 self
.common
._perform
_request
(
358 url
, auth_token
, req_type
="put", payload
=payload
)
360 except Exception as exc
:
361 log
.warn("Unable to update alarm state: %s", exc
)
364 def update_alarm(self
, endpoint
, auth_token
, values
):
365 """Get alarm name for an alarm configuration update."""
366 # Get already existing alarm details
367 url
= "{}/v2/alarms/%s".format(endpoint
) % values
['alarm_uuid']
369 # Gets current configurations about the alarm
371 result
= self
.common
._perform
_request
(
372 url
, auth_token
, req_type
="get")
373 alarm_name
= json
.loads(result
.text
)['name']
374 rule
= json
.loads(result
.text
)['gnocchi_resources_threshold_rule']
375 alarm_state
= json
.loads(result
.text
)['state']
376 resource_id
= rule
['resource_id']
377 metric_name
= rule
['metric']
378 except Exception as exc
:
379 log
.warn("Failed to retreive existing alarm info: %s.\
380 Can only update OSM alarms.", exc
)
383 # Generates and check payload configuration for alarm update
384 payload
= self
.check_payload(values
, metric_name
, resource_id
,
385 alarm_name
, alarm_state
=alarm_state
)
387 # Updates the alarm configurations with the valid payload
388 if payload
is not None:
390 update_alarm
= self
.common
._perform
_request
(
391 url
, auth_token
, req_type
="put", payload
=payload
)
393 return json
.loads(update_alarm
.text
)['alarm_id'], True
394 except Exception as exc
:
395 log
.warn("Alarm update could not be performed: %s", exc
)
399 def check_payload(self
, values
, metric_name
, resource_id
,
400 alarm_name
, alarm_state
=None):
401 """Check that the payload is configuration for update/create alarm."""
403 cfg
= Config
.instance()
404 # Check state and severity
405 severity
= values
['severity'].lower()
406 if severity
== "indeterminate":
407 alarm_state
= "insufficient data"
408 if alarm_state
is None:
411 statistic
= values
['statistic'].lower()
412 granularity
= values
['granularity']
413 resource_type
= values
['resource_type'].lower()
415 # Try to configure the payload for the update/create request
416 # Can only update: threshold, operation, statistic and
417 # the severity of the alarm
418 rule
= {'threshold': values
['threshold_value'],
419 'comparison_operator': values
['operation'].lower(),
420 'metric': METRIC_MAPPINGS
[metric_name
],
421 'resource_id': resource_id
,
422 'resource_type': resource_type
,
423 'aggregation_method': STATISTICS
[statistic
],
424 'granularity': granularity
, }
425 payload
= json
.dumps({'state': alarm_state
,
427 'severity': SEVERITIES
[severity
],
428 'type': 'gnocchi_resources_threshold',
429 'gnocchi_resources_threshold_rule': rule
,
430 'alarm_actions': [cfg
.OS_NOTIFIER_URI
], })
432 except KeyError as exc
:
433 log
.warn("Alarm is not configured correctly: %s", exc
)
436 def get_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
437 """Get the state of the alarm."""
438 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
441 alarm_state
= self
.common
._perform
_request
(
442 url
, auth_token
, req_type
="get")
443 return json
.loads(alarm_state
.text
)
444 except Exception as exc
:
445 log
.warn("Failed to get the state of the alarm:%s", exc
)
448 def check_for_metric(self
, auth_token
, m_name
, r_id
):
449 """Check for the alarm metric."""
451 endpoint
= self
.common
.get_endpoint("metric")
452 url
= "{}/v1/metric?sort=name:asc".format(endpoint
)
453 result
= self
.common
._perform
_request
(
454 url
, auth_token
, req_type
="get")
456 metrics_partial
= json
.loads(result
.text
)
457 for metric
in metrics_partial
:
458 metric_list
.append(metric
)
460 while len(json
.loads(result
.text
)) > 0:
461 last_metric_id
= metrics_partial
[-1]['id']
462 url
= "{}/v1/metric?sort=name:asc&marker={}".format(endpoint
, last_metric_id
)
463 result
= self
.common
._perform
_request
(
464 url
, auth_token
, req_type
="get")
465 if len(json
.loads(result
.text
)) > 0:
466 metrics_partial
= json
.loads(result
.text
)
467 for metric
in metrics_partial
:
468 metric_list
.append(metric
)
470 for metric
in metric_list
:
471 name
= metric
['name']
472 resource
= metric
['resource_id']
473 if (name
== METRIC_MAPPINGS
[m_name
] and resource
== r_id
):
474 metric_id
= metric
['id']
475 log
.info("The required metric exists, an alarm will be created.")
477 except Exception as exc
:
478 log
.info("Desired Gnocchi metric not found:%s", exc
)