Merge "Updated Kafka producer. This producer has the following changes:"
[osm/MON.git] / plugins / OpenStack / Aodh / alarming.py
index 3ae31ba..0f4a2da 100644 (file)
@@ -1,7 +1,21 @@
-"""Send alarm info from Aodh to SO via MON"""
+"""Send alarm info from Aodh to SO via MON."""
 
 import json
-from plugins.Openstack.Aodh.aodh_common import Aodh_Common
+import logging as log
+
+from collections import OrderedDict
+
+from kafka import KafkaConsumer
+
+from plugins.OpenStack.common import Common
+
+
+SEVERITIES = {
+    "WARNING": "low",
+    "MINOR": "low",
+    "MAJOR": "moderate",
+    "CRITICAL": "critical",
+    "INDETERMINATE": "critical"}
 
 
 class Alarming(object):
@@ -9,54 +23,283 @@ class Alarming(object):
 
     def __init__(self):
         """Create the aodh_receiver instance."""
-        self._aodh_common = Aodh_Common()
+        self._common = Common()
+        self.auth_token = None
+        self.endpoint = None
+        self.resp_status = None
+
+        # TODO(mcgoughh): Remove hardcoded kafkaconsumer
+        # Initialize a generic consumer object to consume message from the SO
+        server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+        self._consumer = KafkaConsumer(server['topic'],
+                                       group_id='osm_mon',
+                                       bootstrap_servers=server['server'])
+
+        # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
 
     def alarming(self):
-        """Receive payload from Aodh."""
-        auth_token = self._aodh_common._authenticate()
-        endpoint = self._aodh_common.get_endpoint()
+        """Consume info from the message bus to manage alarms."""
+        # Check the alarming functionlity that needs to be performed
+        for message in self._consumer:
+
+            values = json.loads(message.value)
+            vim_type = values['vim_type'].lower()
+
+            if vim_type == "openstack":
+                log.info("Alarm action required: %s" % (message.topic))
+
+                if message.key == "create_alarm_request":
+                    # Configure/Update an alarm
+                    alarm_details = values['alarm_create_request']
+
+                    # Generate an auth_token and endpoint
+                    auth_token = self._common._authenticate(
+                        tenant_id=alarm_details['tenant_uuid'])
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = self.configure_alarm(
+                        endpoint, auth_token, alarm_details)
+
+                    # TODO(mcgoughh): will send an acknowledge message back on
+                    # the bus via the producer
+                    if alarm_id is not None:
+                        self.resp_status = True
+                        log.debug("A valid alarm was found/created: %s",
+                                  self.resp_status)
+                    else:
+                        self.resp_status = False
+                        log.debug("Failed to create desired alarm: %s",
+                                  self.resp_status)
+
+                elif message.key == "list_alarm_request":
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    # List all of the alarms
+                    alarm_list = self.list_alarms(endpoint, auth_token)
+
+                    # TODO(mcgoughh): send a repsonse back to SO
+                    if alarm_list is not None:
+                        self.resp_status = True
+                        log.info("A list of alarms was generated: %s",
+                                 alarm_list)
+                    else:
+                        self.resp_status = False
+                        log.warn("Failed to generae an alarm list")
+
+                elif message.key == "delete_alarm_request":
+                    # Delete the specified alarm
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = values['alarm_delete_request']['alarm_uuid']
+
+                    response = self.delete_alarm(
+                        endpoint, auth_token, alarm_id)
+
+                    # TODO(mcgoughh): send a response back on the bus
+                    if response is True:
+                        log.info("Requested alarm has been deleted: %s",
+                                 alarm_id)
+                    else:
+                        log.warn("Failed to delete requested alarm.")
+
+                elif message.key == "acknowledge_alarm":
+                    # Acknowledge that an alarm has been dealt with by the SO
+                    # Set its state to ok
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = values['ack_details']['alarm_uuid']
+
+                    response = self.update_alarm_state(
+                        endpoint, auth_token, alarm_id)
+
+                    if response is True:
+                        log.info("Status has been updated for alarm, %s.",
+                                 alarm_id)
+                    else:
+                        log.warn("Failed update the state of requested alarm.")
 
-        alarm_list = self._get_alarm_list(endpoint, auth_token)
-        # Confirm communication with Aodh by listing alarms
-        print("Alarm List ", alarm_list.text)
+                elif message.key == "update_alarm_request":
+                    # Update alarm configurations
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
 
-        alarm_id = self._create_alarm(endpoint, auth_token)
-        print(alarm_id)
+                    alarm_details = values['alarm_update_request']
+
+                    alarm_id = self.update_alarm(
+                        endpoint, auth_token, alarm_details)
+
+                    # TODO(mcgoughh): send a response message to the SO
+                    if alarm_id is not None:
+                        log.info("Alarm configuration was update correctly.")
+                    else:
+                        log.warn("Unable to update the specified alarm")
+
+                else:
+                    log.debug("Unknown key, no action will be performed")
+            else:
+                log.info("Message topic not relevant to this plugin: %s",
+                         message.topic)
 
-#        alarm_info = self._get_alarm_info(endpoint,
-#            auth_token, "372af0e2-5c36-4e4d-8ce9-ca92d97d07d0")
-#        print("Alarm info", alarm_info.text)
         return
 
-    def _get_alarm_list(self, endpoint, auth_token):
+    def get_alarm_id(self, endpoint, auth_token, alarm_name):
         """Get a list of alarms that exist in Aodh."""
+        alarm_id = None
         url = "{}/v2/alarms/".format(endpoint)
 
-        alarm_list = self._aodh_common._perform_request(url, auth_token,
-                                                        req_type="get")
-        return alarm_list
+        # TODO(mcgoughh): will query on resource_id once it has been
+        # implemented need to create the query field when creating
+        # the alarm
+        query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
+                            ("q.value", alarm_name)])
 
-    def _get_alarm_info(self, endpoint, auth_token, alarmID):
-        """Get information about a specific alarm from Aodh."""
-        url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
+        result = self._common._perform_request(
+            url, auth_token, req_type="get", params=query)
 
-        alarm_details = self._aodh_common._perform_request(url, auth_token,
-                                                           req_type="get")
-        return alarm_details
+        try:
+            alarm_id = json.loads(result.text)[0]['alarm_id']
+            log.info("An existing alarm was found: %s", alarm_id)
+            return alarm_id
+        except Exception:
+            log.debug("Alarm doesn't exist, needs to be created.")
+        return alarm_id
 
-    def _create_alarm(self, endpoint, auth_token):
-        """Get a list of alarms that exist in Aodh."""
+    def configure_alarm(self, endpoint, auth_token, values):
+        """Create requested alarm in Aodh."""
         url = "{}/v2/alarms/".format(endpoint)
 
-        rule = {'event_type': "threshold",}
-        payload = json.dumps({'state': 'alarm',
-                              'name': 'my_alarm',
-                              'severity': 'moderate',
-                              'type': 'event',
-                              'event_rule': rule,})
+        alarm_name = values['alarm_name']
 
-        new_alarm = self._aodh_common._perform_request(url, auth_token,
-                                                        req_type="post", payload=payload)
-        alarm_id =json.loads(new_alarm.text)['alarm_id']
-        return alarm_id
+        # Confirm alarm doesn't exist
+        alarm_id = self.get_alarm_id(endpoint, auth_token, alarm_name)
+        if alarm_id is None:
+            # Try to create the alarm
+            try:
+                metric_name = values['metric_name']
+                resource_id = values['resource_uuid']
+                payload = self.check_payload(values, metric_name, resource_id,
+                                             alarm_name)
+                new_alarm = self._common._perform_request(
+                    url, auth_token, req_type="post", payload=payload)
+
+                return json.loads(new_alarm.text)['alarm_id']
+            except Exception as exc:
+                log.warn("Alarm creation could not be performed: %s", exc)
+                return alarm_id
+        else:
+            log.warn("This alarm already exists. Try an update instead.")
+        return None
+
+    def delete_alarm(self, endpoint, auth_token, alarm_id):
+        """Delete alarm function."""
+        url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
+
+        result = False
+        try:
+            self._common._perform_request(url, auth_token, req_type="delete")
+            return True
+        except Exception as exc:
+            log.warn("Failed to delete alarm: %s because %s.", alarm_id, exc)
+        return result
+
+    def list_alarms(self, endpoint, auth_token,
+                    alarm_name=None, resource_id=None, severity=None):
+        """Generate the requested list of alarms."""
+        result = None
+        if (alarm_name and resource_id and severity) is None:
+            # List all alarms
+            url = "{}/v2/alarms/".format(endpoint)
+
+            try:
+                result = self._common._perform_request(
+                    url, auth_token, req_type="get")
+                return json.loads(result.text)
+            except Exception as exc:
+                log.warn("Unable to generate alarm list: %s", exc)
+
+            return result
+        else:
+            # TODO(mcgoughh): support more specific lists
+            log.debug("Requested list is unavailable")
+
+        return result
+
+    def update_alarm_state(self, endpoint, auth_token, alarm_id):
+        """Set the state of an alarm to ok when ack message is received."""
+        result = False
+
+        url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
+        payload = json.dumps("ok")
+
+        try:
+            result = self._common._perform_request(
+                url, auth_token, req_type="put", payload=payload)
+            return True
+        except Exception as exc:
+            log.warn("Unable to update alarm state: %s", exc)
+        return result
+
+    def update_alarm(self, endpoint, auth_token, values):
+        """Get alarm name for an alarm configuration update."""
+        # Get already existing alarm details
+        url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
+
+        try:
+            result = self._common._perform_request(
+                url, auth_token, req_type="get")
+            alarm_name = json.loads(result.text)['name']
+            rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
+            alarm_state = json.loads(result.text)['state']
+            resource_id = rule['resource_id']
+            metric_name = rule['metric']
+        except Exception as exc:
+            log.warn("Failed to retreive existing alarm info: %s.\
+                     Can only update OSM created alarms.", exc)
+            return None
+
+        # Genate and check payload configuration for alarm update
+        payload = self.check_payload(values, metric_name, resource_id,
+                                     alarm_name, alarm_state=alarm_state)
+
+        if payload is not None:
+            try:
+                update_alarm = self._common._perform_request(
+                    url, auth_token, req_type="put", payload=payload)
+
+                return json.loads(update_alarm.text)['alarm_id']
+            except Exception as exc:
+                log.warn("Alarm update could not be performed: %s", exc)
+                return None
+        return None
+
+    def check_payload(self, values, metric_name, resource_id,
+                      alarm_name, alarm_state=None):
+        """Check that the payload is configuration for update/create alarm."""
+        try:
+            # Check state and severity
+            severity = values['severity']
+            if severity == "INDETERMINATE":
+                alarm_state = "insufficient data"
+
+            if alarm_state is None:
+                alarm_state = "ok"
 
+            # Try to configure the payload for the update/create request
+            rule = {'threshold': values['threshold_value'],
+                    'comparison_operator': values['operation'].lower(),
+                    'metric': metric_name,
+                    'resource_id': resource_id,
+                    'resource_type': 'generic',
+                    'aggregation_method': values['statistic'].lower()}
+            payload = json.dumps({'state': alarm_state,
+                                  'name': alarm_name,
+                                  'severity': SEVERITIES[severity],
+                                  'type': 'gnocchi_resources_threshold',
+                                  'gnocchi_resources_threshold_rule': rule, })
+            return payload
+        except KeyError as exc:
+            log.warn("Alarm is not configured correctly: %s", exc)
+        return None