1 """Send alarm info from Aodh to SO via MON."""
6 from collections
import OrderedDict
8 from kafka
import KafkaConsumer
10 from plugins
.OpenStack
.common
import Common
17 "CRITICAL": "critical",
18 "INDETERMINATE": "critical"}
21 class Alarming(object):
22 """Receives alarm info from Aodh."""
25 """Create the aodh_receiver instance."""
26 self
._common
= Common()
27 self
.auth_token
= None
29 self
.resp_status
= None
31 # TODO(mcgoughh): Remove hardcoded kafkaconsumer
32 # Initialize a generic consumer object to consume message from the SO
33 server
= {'server': 'localhost:9092', 'topic': 'alarm_request'}
34 self
._consumer
= KafkaConsumer(server
['topic'],
36 bootstrap_servers
=server
['server'])
38 # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
41 """Consume info from the message bus to manage alarms."""
42 # Check the alarming functionlity that needs to be performed
43 for message
in self
._consumer
:
45 values
= json
.loads(message
.value
)
46 vim_type
= values
['vim_type'].lower()
48 if vim_type
== "openstack":
49 log
.info("Alarm action required: %s" % (message
.topic
))
51 if message
.key
== "create_alarm_request":
52 # Configure/Update an alarm
53 alarm_details
= values
['alarm_create_request']
55 # Generate an auth_token and endpoint
56 auth_token
= self
._common
._authenticate
(
57 tenant_id
=alarm_details
['tenant_uuid'])
58 endpoint
= self
._common
.get_endpoint("alarming")
60 alarm_id
= self
.configure_alarm(
61 endpoint
, auth_token
, alarm_details
)
63 # TODO(mcgoughh): will send an acknowledge message back on
64 # the bus via the producer
65 if alarm_id
is not None:
66 self
.resp_status
= True
67 log
.debug("A valid alarm was found/created: %s",
70 self
.resp_status
= False
71 log
.debug("Failed to create desired alarm: %s",
74 elif message
.key
== "list_alarm_request":
75 auth_token
= self
._common
._authenticate
()
76 endpoint
= self
._common
.get_endpoint("alarming")
78 # List all of the alarms
79 alarm_list
= self
.list_alarms(endpoint
, auth_token
)
81 # TODO(mcgoughh): send a repsonse back to SO
82 if alarm_list
is not None:
83 self
.resp_status
= True
84 log
.info("A list of alarms was generated: %s",
87 self
.resp_status
= False
88 log
.warn("Failed to generae an alarm list")
90 elif message
.key
== "delete_alarm_request":
91 # Delete the specified alarm
92 auth_token
= self
._common
._authenticate
()
93 endpoint
= self
._common
.get_endpoint("alarming")
95 alarm_id
= values
['alarm_delete_request']['alarm_uuid']
97 response
= self
.delete_alarm(
98 endpoint
, auth_token
, alarm_id
)
100 # TODO(mcgoughh): send a response back on the bus
102 log
.info("Requested alarm has been deleted: %s",
105 log
.warn("Failed to delete requested alarm.")
107 elif message
.key
== "acknowledge_alarm":
108 # Acknowledge that an alarm has been dealt with by the SO
109 # Set its state to ok
110 auth_token
= self
._common
._authenticate
()
111 endpoint
= self
._common
.get_endpoint("alarming")
113 alarm_id
= values
['ack_details']['alarm_uuid']
115 response
= self
.update_alarm_state(
116 endpoint
, auth_token
, alarm_id
)
119 log
.info("Status has been updated for alarm, %s.",
122 log
.warn("Failed update the state of requested alarm.")
124 elif message
.key
== "update_alarm_request":
125 # Update alarm configurations
126 auth_token
= self
._common
._authenticate
()
127 endpoint
= self
._common
.get_endpoint("alarming")
129 alarm_details
= values
['alarm_update_request']
131 alarm_id
= self
.update_alarm(
132 endpoint
, auth_token
, alarm_details
)
134 # TODO(mcgoughh): send a response message to the SO
135 if alarm_id
is not None:
136 log
.info("Alarm configuration was update correctly.")
138 log
.warn("Unable to update the specified alarm")
141 log
.debug("Unknown key, no action will be performed")
143 log
.info("Message topic not relevant to this plugin: %s",
148 def get_alarm_id(self
, endpoint
, auth_token
, alarm_name
):
149 """Get a list of alarms that exist in Aodh."""
151 url
= "{}/v2/alarms/".format(endpoint
)
153 # TODO(mcgoughh): will query on resource_id once it has been
154 # implemented need to create the query field when creating
156 query
= OrderedDict([("q.field", 'name'), ("q.op", "eq"),
157 ("q.value", alarm_name
)])
159 result
= self
._common
._perform
_request
(
160 url
, auth_token
, req_type
="get", params
=query
)
163 alarm_id
= json
.loads(result
.text
)[0]['alarm_id']
164 log
.info("An existing alarm was found: %s", alarm_id
)
167 log
.debug("Alarm doesn't exist, needs to be created.")
170 def configure_alarm(self
, endpoint
, auth_token
, values
):
171 """Create requested alarm in Aodh."""
172 url
= "{}/v2/alarms/".format(endpoint
)
174 alarm_name
= values
['alarm_name']
176 # Confirm alarm doesn't exist
177 alarm_id
= self
.get_alarm_id(endpoint
, auth_token
, alarm_name
)
179 # Try to create the alarm
181 metric_name
= values
['metric_name']
182 resource_id
= values
['resource_uuid']
183 payload
= self
.check_payload(values
, metric_name
, resource_id
,
185 new_alarm
= self
._common
._perform
_request
(
186 url
, auth_token
, req_type
="post", payload
=payload
)
188 return json
.loads(new_alarm
.text
)['alarm_id']
189 except Exception as exc
:
190 log
.warn("Alarm creation could not be performed: %s", exc
)
193 log
.warn("This alarm already exists. Try an update instead.")
196 def delete_alarm(self
, endpoint
, auth_token
, alarm_id
):
197 """Delete alarm function."""
198 url
= "{}/v2/alarms/%s".format(endpoint
) % (alarm_id
)
202 self
._common
._perform
_request
(url
, auth_token
, req_type
="delete")
204 except Exception as exc
:
205 log
.warn("Failed to delete alarm: %s because %s.", alarm_id
, exc
)
208 def list_alarms(self
, endpoint
, auth_token
,
209 alarm_name
=None, resource_id
=None, severity
=None):
210 """Generate the requested list of alarms."""
212 if (alarm_name
and resource_id
and severity
) is None:
214 url
= "{}/v2/alarms/".format(endpoint
)
217 result
= self
._common
._perform
_request
(
218 url
, auth_token
, req_type
="get")
219 return json
.loads(result
.text
)
220 except Exception as exc
:
221 log
.warn("Unable to generate alarm list: %s", exc
)
225 # TODO(mcgoughh): support more specific lists
226 log
.debug("Requested list is unavailable")
230 def update_alarm_state(self
, endpoint
, auth_token
, alarm_id
):
231 """Set the state of an alarm to ok when ack message is received."""
234 url
= "{}/v2/alarms/%s/state".format(endpoint
) % alarm_id
235 payload
= json
.dumps("ok")
238 result
= self
._common
._perform
_request
(
239 url
, auth_token
, req_type
="put", payload
=payload
)
241 except Exception as exc
:
242 log
.warn("Unable to update alarm state: %s", exc
)
245 def update_alarm(self
, endpoint
, auth_token
, values
):
246 """Get alarm name for an alarm configuration update."""
247 # Get already existing alarm details
248 url
= "{}/v2/alarms/%s".format(endpoint
) % values
['alarm_uuid']
251 result
= self
._common
._perform
_request
(
252 url
, auth_token
, req_type
="get")
253 alarm_name
= json
.loads(result
.text
)['name']
254 rule
= json
.loads(result
.text
)['gnocchi_resources_threshold_rule']
255 alarm_state
= json
.loads(result
.text
)['state']
256 resource_id
= rule
['resource_id']
257 metric_name
= rule
['metric']
258 except Exception as exc
:
259 log
.warn("Failed to retreive existing alarm info: %s.\
260 Can only update OSM created alarms.", exc
)
263 # Genate and check payload configuration for alarm update
264 payload
= self
.check_payload(values
, metric_name
, resource_id
,
265 alarm_name
, alarm_state
=alarm_state
)
267 if payload
is not None:
269 update_alarm
= self
._common
._perform
_request
(
270 url
, auth_token
, req_type
="put", payload
=payload
)
272 return json
.loads(update_alarm
.text
)['alarm_id']
273 except Exception as exc
:
274 log
.warn("Alarm update could not be performed: %s", exc
)
278 def check_payload(self
, values
, metric_name
, resource_id
,
279 alarm_name
, alarm_state
=None):
280 """Check that the payload is configuration for update/create alarm."""
282 # Check state and severity
283 severity
= values
['severity']
284 if severity
== "INDETERMINATE":
285 alarm_state
= "insufficient data"
287 if alarm_state
is None:
290 # Try to configure the payload for the update/create request
291 rule
= {'threshold': values
['threshold_value'],
292 'comparison_operator': values
['operation'].lower(),
293 'metric': metric_name
,
294 'resource_id': resource_id
,
295 'resource_type': 'generic',
296 'aggregation_method': values
['statistic'].lower()}
297 payload
= json
.dumps({'state': alarm_state
,
299 'severity': SEVERITIES
[severity
],
300 'type': 'gnocchi_resources_threshold',
301 'gnocchi_resources_threshold_rule': rule
, })
303 except KeyError as exc
:
304 log
.warn("Alarm is not configured correctly: %s", exc
)