from kafka import KafkaConsumer
-from plugins.OpenStack.Aodh.aodh_common import Aodh_Common
+from plugins.OpenStack.common import Common
+
+
+SEVERITIES = {
+ "WARNING": "low",
+ "MINOR": "low",
+ "MAJOR": "moderate",
+ "CRITICAL": "critical",
+ "INDETERMINATE": "critical"}
class Alarming(object):
def __init__(self):
"""Create the aodh_receiver instance."""
- self._aodh_common = Aodh_Common()
+ self._common = Common()
+ self.auth_token = None
+ self.endpoint = None
+ self.resp_status = None
+ # TODO(mcgoughh): Remove hardcoded kafkaconsumer
# Initialize a generic consumer object to consume message from the SO
- server = {'server': 'localhost:9092', 'topic': 'alarms'}
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
self._consumer = KafkaConsumer(server['topic'],
- group_id='my-group',
+ group_id='osm_mon',
bootstrap_servers=server['server'])
# TODO(mcgoughh): Initialize a producer to send messages bask to the SO
def alarming(self):
"""Consume info from the message bus to manage alarms."""
- # Generate authentication credentials to access keystone;
- # auth_token, endpoint
- auth_token = self._aodh_common._authenticate()
- endpoint = self._aodh_common.get_endpoint()
-
# Check the alarming functionlity that needs to be performed
for message in self._consumer:
- if message.topic == "alarms":
+
+ values = json.loads(message.value)
+ vim_type = values['vim_type'].lower()
+
+ if vim_type == "openstack":
log.info("Alarm action required: %s" % (message.topic))
- if message.key == "configure_alarm":
+ if message.key == "create_alarm_request":
# Configure/Update an alarm
- alarm_details = json.loads(message.value)
- alarm_id = self.configure_alarm(endpoint,
- auth_token, alarm_details)
- log.info("New alarm created with alarmID: %s", alarm_id)
+ alarm_details = values['alarm_create_request']
+
+ # Generate an auth_token and endpoint
+ auth_token = self._common._authenticate(
+ tenant_id=alarm_details['tenant_uuid'])
+ endpoint = self._common.get_endpoint("alarming")
+
+ alarm_id = self.configure_alarm(
+ endpoint, auth_token, alarm_details)
# TODO(mcgoughh): will send an acknowledge message back on
# the bus via the producer
+ if alarm_id is not None:
+ self.resp_status = True
+ log.debug("A valid alarm was found/created: %s",
+ self.resp_status)
+ else:
+ self.resp_status = False
+ log.debug("Failed to create desired alarm: %s",
+ self.resp_status)
+
+ elif message.key == "list_alarm_request":
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+
+ # List all of the alarms
+ alarm_list = self.list_alarms(endpoint, auth_token)
+
+ # TODO(mcgoughh): send a repsonse back to SO
+ if alarm_list is not None:
+ self.resp_status = True
+ log.info("A list of alarms was generated: %s",
+ alarm_list)
+ else:
+ self.resp_status = False
+ log.warn("Failed to generae an alarm list")
+
+ elif message.key == "delete_alarm_request":
+ # Delete the specified alarm
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+
+ alarm_id = values['alarm_delete_request']['alarm_uuid']
+
+ response = self.delete_alarm(
+ endpoint, auth_token, alarm_id)
+
+ # TODO(mcgoughh): send a response back on the bus
+ if response is True:
+ log.info("Requested alarm has been deleted: %s",
+ alarm_id)
+ else:
+ log.warn("Failed to delete requested alarm.")
+
+ elif message.key == "acknowledge_alarm":
+ # Acknowledge that an alarm has been dealt with by the SO
+ # Set its state to ok
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+
+ alarm_id = values['ack_details']['alarm_uuid']
+
+ response = self.update_alarm_state(
+ endpoint, auth_token, alarm_id)
+
+ if response is True:
+ log.info("Status has been updated for alarm, %s.",
+ alarm_id)
+ else:
+ log.warn("Failed update the state of requested alarm.")
+
+ elif message.key == "update_alarm_request":
+ # Update alarm configurations
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+
+ alarm_details = values['alarm_update_request']
+
+ alarm_id = self.update_alarm(
+ endpoint, auth_token, alarm_details)
+
+ # TODO(mcgoughh): send a response message to the SO
+ if alarm_id is not None:
+ log.info("Alarm configuration was update correctly.")
+ else:
+ log.warn("Unable to update the specified alarm")
else:
- # TODO(mcoughh): Key alternatives are "notify_alarm" and
- # "acknowledge_alarm" will be accomodated later
log.debug("Unknown key, no action will be performed")
else:
log.info("Message topic not relevant to this plugin: %s",
return
- def alarm_check(self, endpoint, auth_token, alarm_name):
+ def get_alarm_id(self, endpoint, auth_token, alarm_name):
"""Get a list of alarms that exist in Aodh."""
+ alarm_id = None
url = "{}/v2/alarms/".format(endpoint)
# TODO(mcgoughh): will query on resource_id once it has been
# implemented need to create the query field when creating
# the alarm
query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
- ("q.value", str(alarm_name))])
+ ("q.value", alarm_name)])
- result = self._aodh_common._perform_request(
+ result = self._common._perform_request(
url, auth_token, req_type="get", params=query)
try:
return alarm_id
except Exception:
log.debug("Alarm doesn't exist, needs to be created.")
- return None
+ return alarm_id
def configure_alarm(self, endpoint, auth_token, values):
- """Get a list of alarms that exist in Aodh."""
- alarm_id = None
-
- # TODO(mcgoughh): error check the values sent in the messag
- alarm_name = values['name']
+ """Create requested alarm in Aodh."""
+ url = "{}/v2/alarms/".format(endpoint)
- # Check that this alarm doesn't exist already
- alarm_id = self.alarm_check(endpoint, auth_token, alarm_name)
+ alarm_name = values['alarm_name']
+ # Confirm alarm doesn't exist
+ alarm_id = self.get_alarm_id(endpoint, auth_token, alarm_name)
if alarm_id is None:
- url = "{}/v2/alarms/".format(endpoint)
- severity = values['severity']
+ # Try to create the alarm
+ try:
+ metric_name = values['metric_name']
+ resource_id = values['resource_uuid']
+ payload = self.check_payload(values, metric_name, resource_id,
+ alarm_name)
+ new_alarm = self._common._perform_request(
+ url, auth_token, req_type="post", payload=payload)
- # Create a new threshold alarm with a resourceID
- # specified as a query
- rule = {'threshold': values['threshold'],
- 'comparison_operator': 'gt',
- 'metric': values['metric'],
- 'resource_id': values['resource_id'],
- 'resource_type': 'generic',
- 'aggregation_method': 'last', }
- payload = json.dumps({'state': 'alarm',
- 'name': alarm_name,
- 'severity': self.get_severity(severity),
- 'type': 'gnocchi_resources_threshold',
- 'gnocchi_resources_threshold_rule': rule, })
+ return json.loads(new_alarm.text)['alarm_id']
+ except Exception as exc:
+ log.warn("Alarm creation could not be performed: %s", exc)
+ return alarm_id
+ else:
+ log.warn("This alarm already exists. Try an update instead.")
+ return None
+
+ def delete_alarm(self, endpoint, auth_token, alarm_id):
+ """Delete alarm function."""
+ url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
+
+ result = False
+ try:
+ self._common._perform_request(url, auth_token, req_type="delete")
+ return True
+ except Exception as exc:
+ log.warn("Failed to delete alarm: %s because %s.", alarm_id, exc)
+ return result
- # Request performed to create alarm
- new_alarm = self._aodh_common._perform_request(
- url, auth_token, req_type="post", payload=payload)
+ def list_alarms(self, endpoint, auth_token,
+ alarm_name=None, resource_id=None, severity=None):
+ """Generate the requested list of alarms."""
+ result = None
+ if (alarm_name and resource_id and severity) is None:
+ # List all alarms
+ url = "{}/v2/alarms/".format(endpoint)
+
+ try:
+ result = self._common._perform_request(
+ url, auth_token, req_type="get")
+ return json.loads(result.text)
+ except Exception as exc:
+ log.warn("Unable to generate alarm list: %s", exc)
- return json.loads(new_alarm.text)['alarm_id']
+ return result
else:
- return alarm_id
+ # TODO(mcgoughh): support more specific lists
+ log.debug("Requested list is unavailable")
- def delete_alarm(self, endpoint, auth_token, alarmID):
- """Delete alarm function."""
- url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
+ return result
+
+ def update_alarm_state(self, endpoint, auth_token, alarm_id):
+ """Set the state of an alarm to ok when ack message is received."""
+ result = False
+
+ url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
+ payload = json.dumps("ok")
+
+ try:
+ result = self._common._perform_request(
+ url, auth_token, req_type="put", payload=payload)
+ return True
+ except Exception as exc:
+ log.warn("Unable to update alarm state: %s", exc)
+ return result
- self._aodh_common._perform_request(url, auth_token, req_type="delete")
+ def update_alarm(self, endpoint, auth_token, values):
+ """Get alarm name for an alarm configuration update."""
+ # Get already existing alarm details
+ url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
+
+ try:
+ result = self._common._perform_request(
+ url, auth_token, req_type="get")
+ alarm_name = json.loads(result.text)['name']
+ rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
+ alarm_state = json.loads(result.text)['state']
+ resource_id = rule['resource_id']
+ metric_name = rule['metric']
+ except Exception as exc:
+ log.warn("Failed to retreive existing alarm info: %s.\
+ Can only update OSM created alarms.", exc)
+ return None
+
+ # Genate and check payload configuration for alarm update
+ payload = self.check_payload(values, metric_name, resource_id,
+ alarm_name, alarm_state=alarm_state)
+
+ if payload is not None:
+ try:
+ update_alarm = self._common._perform_request(
+ url, auth_token, req_type="put", payload=payload)
+
+ return json.loads(update_alarm.text)['alarm_id']
+ except Exception as exc:
+ log.warn("Alarm update could not be performed: %s", exc)
+ return None
return None
- def get_severity(self, alarm_severity):
- """Get a normalized severity for Aodh."""
- # This logic can be changed, the other alternative was to have
- # MINOR and MAJOR = "moderate" instead.
- if alarm_severity == "WARNIING":
- aodh_severity = "low"
- elif alarm_severity == "MINOR":
- aodh_severity = "moderate"
- elif (alarm_severity == "MAJOR" or alarm_severity == "CRITICAL"):
- aodh_severity = "critical"
- else:
- aodh_severity = None
- log.warn("Invalid alarm severity configuration")
+ def check_payload(self, values, metric_name, resource_id,
+ alarm_name, alarm_state=None):
+ """Check that the payload is configuration for update/create alarm."""
+ try:
+ # Check state and severity
+ severity = values['severity']
+ if severity == "INDETERMINATE":
+ alarm_state = "insufficient data"
+
+ if alarm_state is None:
+ alarm_state = "ok"
- log.info("Severity has been normalized for Aodh to: %s", aodh_severity)
- return aodh_severity
+ # Try to configure the payload for the update/create request
+ rule = {'threshold': values['threshold_value'],
+ 'comparison_operator': values['operation'].lower(),
+ 'metric': metric_name,
+ 'resource_id': resource_id,
+ 'resource_type': 'generic',
+ 'aggregation_method': values['statistic'].lower()}
+ payload = json.dumps({'state': alarm_state,
+ 'name': alarm_name,
+ 'severity': SEVERITIES[severity],
+ 'type': 'gnocchi_resources_threshold',
+ 'gnocchi_resources_threshold_rule': rule, })
+ return payload
+ except KeyError as exc:
+ log.warn("Alarm is not configured correctly: %s", exc)
+ return None
+++ /dev/null
-"""Common methods for the Aodh Sender/Receiver."""
-
-import logging as log
-
-import requests
-
-from keystoneauth1.identity.v3 import AuthMethod
-
-from keystoneclient.service_catalog import ServiceCatalog
-from keystoneclient.v3 import client
-
-from plugins.OpenStack.settings import Config
-
-
-class Aodh_Common(object):
- """Common calls for Aodh Sender/Receiver."""
-
- def __init__(self):
- """Create the common instance."""
- self._auth_token = None
- self._endpoint = None
- self._ks = None
-
- def _authenticate(self):
- """Authenticate and/or renew the authentication token."""
- if self._auth_token is not None:
- return self._auth_token
-
- try:
- cfg = Config.instance()
- self._ks = client.Client(auth_url=cfg.OS_AUTH_URL,
- username=cfg.OS_USERNAME,
- password=cfg.OS_PASSWORD,
- tenant_name=cfg.OS_TENANT_NAME)
- self._auth_token = self._ks.auth_token
- except Exception as exc:
-
- log.warn("Authentication failed with the following exception: %s",
- exc)
- self._auth_token = None
-
- return self._auth_token
-
- def get_endpoint(self):
- """Get the endpoint for Aodh."""
- try:
- return self._ks.service_catalog.url_for(
- service_type='alarming',
- endpoint_type='internalURL',
- region_name='RegionOne')
- except Exception as exc:
- log.warning("Failed to retreive endpoint for Aodh due to: %s",
- exc)
- return None
-
- @classmethod
- def _perform_request(cls, url, auth_token,
- req_type="get", payload=None, params=None):
- """Perform the POST/PUT/GET/DELETE request."""
- # request headers
- headers = {'X-Auth-Token': auth_token,
- 'Content-type': 'application/json'}
- # perform request and return its result
- response = None
- try:
- if req_type == "put":
- response = requests.put(
- url, data=payload, headers=headers,
- timeout=1)
- elif req_type == "post":
- response = requests.post(
- url, data=payload, headers=headers,
- timeout=1)
- elif req_type == "get":
- response = requests.get(
- url, params=params, headers=headers, timeout=1)
- elif req_type == "delete":
- response = requests.delete(
- url, headers=headers, timeout=1)
- else:
- log.warn("Invalid request type")
-
- except Exception as e:
- log.warn("Exception thrown on request", e)
-
- return response