Updated the OSM Aodh plugin to align with the design doc 33/2133/1
authorHelena McGough <helena.mcgough@intel.com>
Thu, 7 Sep 2017 13:14:30 +0000 (13:14 +0000)
committerHelena McGough <helena.mcgough@intel.com>
Thu, 7 Sep 2017 16:44:09 +0000 (16:44 +0000)
 - Included update/delete/acknowledge/create alarm functionality
 - Included basic listing alarms functioanlity

Change-Id: I8e11eea52a1bcb839d36e20a8aecdab4aa833728
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
plugins/OpenStack/Aodh/alarming.py
plugins/OpenStack/Aodh/aodh_common.py [deleted file]
plugins/OpenStack/Aodh/plugin_instance.py
plugins/OpenStack/common.py
plugins/OpenStack/settings.py

index b44d624..0f4a2da 100644 (file)
@@ -7,7 +7,15 @@ from collections import OrderedDict
 
 from kafka import KafkaConsumer
 
-from plugins.OpenStack.Aodh.aodh_common import Aodh_Common
+from plugins.OpenStack.common import Common
+
+
+SEVERITIES = {
+    "WARNING": "low",
+    "MINOR": "low",
+    "MAJOR": "moderate",
+    "CRITICAL": "critical",
+    "INDETERMINATE": "critical"}
 
 
 class Alarming(object):
@@ -15,41 +23,121 @@ class Alarming(object):
 
     def __init__(self):
         """Create the aodh_receiver instance."""
-        self._aodh_common = Aodh_Common()
+        self._common = Common()
+        self.auth_token = None
+        self.endpoint = None
+        self.resp_status = None
 
+        # TODO(mcgoughh): Remove hardcoded kafkaconsumer
         # Initialize a generic consumer object to consume message from the SO
-        server = {'server': 'localhost:9092', 'topic': 'alarms'}
+        server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
         self._consumer = KafkaConsumer(server['topic'],
-                                       group_id='my-group',
+                                       group_id='osm_mon',
                                        bootstrap_servers=server['server'])
 
         # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
 
     def alarming(self):
         """Consume info from the message bus to manage alarms."""
-        # Generate authentication credentials to access keystone;
-        # auth_token, endpoint
-        auth_token = self._aodh_common._authenticate()
-        endpoint = self._aodh_common.get_endpoint()
-
         # Check the alarming functionlity that needs to be performed
         for message in self._consumer:
-            if message.topic == "alarms":
+
+            values = json.loads(message.value)
+            vim_type = values['vim_type'].lower()
+
+            if vim_type == "openstack":
                 log.info("Alarm action required: %s" % (message.topic))
 
-                if message.key == "configure_alarm":
+                if message.key == "create_alarm_request":
                     # Configure/Update an alarm
-                    alarm_details = json.loads(message.value)
-                    alarm_id = self.configure_alarm(endpoint,
-                                                    auth_token, alarm_details)
-                    log.info("New alarm created with alarmID: %s", alarm_id)
+                    alarm_details = values['alarm_create_request']
+
+                    # Generate an auth_token and endpoint
+                    auth_token = self._common._authenticate(
+                        tenant_id=alarm_details['tenant_uuid'])
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = self.configure_alarm(
+                        endpoint, auth_token, alarm_details)
 
                     # TODO(mcgoughh): will send an acknowledge message back on
                     # the bus via the producer
+                    if alarm_id is not None:
+                        self.resp_status = True
+                        log.debug("A valid alarm was found/created: %s",
+                                  self.resp_status)
+                    else:
+                        self.resp_status = False
+                        log.debug("Failed to create desired alarm: %s",
+                                  self.resp_status)
+
+                elif message.key == "list_alarm_request":
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    # List all of the alarms
+                    alarm_list = self.list_alarms(endpoint, auth_token)
+
+                    # TODO(mcgoughh): send a repsonse back to SO
+                    if alarm_list is not None:
+                        self.resp_status = True
+                        log.info("A list of alarms was generated: %s",
+                                 alarm_list)
+                    else:
+                        self.resp_status = False
+                        log.warn("Failed to generae an alarm list")
+
+                elif message.key == "delete_alarm_request":
+                    # Delete the specified alarm
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = values['alarm_delete_request']['alarm_uuid']
+
+                    response = self.delete_alarm(
+                        endpoint, auth_token, alarm_id)
+
+                    # TODO(mcgoughh): send a response back on the bus
+                    if response is True:
+                        log.info("Requested alarm has been deleted: %s",
+                                 alarm_id)
+                    else:
+                        log.warn("Failed to delete requested alarm.")
+
+                elif message.key == "acknowledge_alarm":
+                    # Acknowledge that an alarm has been dealt with by the SO
+                    # Set its state to ok
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_id = values['ack_details']['alarm_uuid']
+
+                    response = self.update_alarm_state(
+                        endpoint, auth_token, alarm_id)
+
+                    if response is True:
+                        log.info("Status has been updated for alarm, %s.",
+                                 alarm_id)
+                    else:
+                        log.warn("Failed update the state of requested alarm.")
+
+                elif message.key == "update_alarm_request":
+                    # Update alarm configurations
+                    auth_token = self._common._authenticate()
+                    endpoint = self._common.get_endpoint("alarming")
+
+                    alarm_details = values['alarm_update_request']
+
+                    alarm_id = self.update_alarm(
+                        endpoint, auth_token, alarm_details)
+
+                    # TODO(mcgoughh): send a response message to the SO
+                    if alarm_id is not None:
+                        log.info("Alarm configuration was update correctly.")
+                    else:
+                        log.warn("Unable to update the specified alarm")
 
                 else:
-                    # TODO(mcoughh): Key alternatives are "notify_alarm" and
-                    # "acknowledge_alarm" will be accomodated later
                     log.debug("Unknown key, no action will be performed")
             else:
                 log.info("Message topic not relevant to this plugin: %s",
@@ -57,17 +145,18 @@ class Alarming(object):
 
         return
 
-    def alarm_check(self, endpoint, auth_token, alarm_name):
+    def get_alarm_id(self, endpoint, auth_token, alarm_name):
         """Get a list of alarms that exist in Aodh."""
+        alarm_id = None
         url = "{}/v2/alarms/".format(endpoint)
 
         # TODO(mcgoughh): will query on resource_id once it has been
         # implemented need to create the query field when creating
         # the alarm
         query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
-                            ("q.value", str(alarm_name))])
+                            ("q.value", alarm_name)])
 
-        result = self._aodh_common._perform_request(
+        result = self._common._perform_request(
             url, auth_token, req_type="get", params=query)
 
         try:
@@ -76,64 +165,141 @@ class Alarming(object):
             return alarm_id
         except Exception:
             log.debug("Alarm doesn't exist, needs to be created.")
-            return None
+        return alarm_id
 
     def configure_alarm(self, endpoint, auth_token, values):
-        """Get a list of alarms that exist in Aodh."""
-        alarm_id = None
-
-        # TODO(mcgoughh): error check the values sent in the messag
-        alarm_name = values['name']
+        """Create requested alarm in Aodh."""
+        url = "{}/v2/alarms/".format(endpoint)
 
-        # Check that this alarm doesn't exist already
-        alarm_id = self.alarm_check(endpoint, auth_token, alarm_name)
+        alarm_name = values['alarm_name']
 
+        # Confirm alarm doesn't exist
+        alarm_id = self.get_alarm_id(endpoint, auth_token, alarm_name)
         if alarm_id is None:
-            url = "{}/v2/alarms/".format(endpoint)
-            severity = values['severity']
+            # Try to create the alarm
+            try:
+                metric_name = values['metric_name']
+                resource_id = values['resource_uuid']
+                payload = self.check_payload(values, metric_name, resource_id,
+                                             alarm_name)
+                new_alarm = self._common._perform_request(
+                    url, auth_token, req_type="post", payload=payload)
 
-            # Create a new threshold alarm with a resourceID
-            # specified as a query
-            rule = {'threshold': values['threshold'],
-                    'comparison_operator': 'gt',
-                    'metric': values['metric'],
-                    'resource_id': values['resource_id'],
-                    'resource_type': 'generic',
-                    'aggregation_method': 'last', }
-            payload = json.dumps({'state': 'alarm',
-                                  'name': alarm_name,
-                                  'severity': self.get_severity(severity),
-                                  'type': 'gnocchi_resources_threshold',
-                                  'gnocchi_resources_threshold_rule': rule, })
+                return json.loads(new_alarm.text)['alarm_id']
+            except Exception as exc:
+                log.warn("Alarm creation could not be performed: %s", exc)
+                return alarm_id
+        else:
+            log.warn("This alarm already exists. Try an update instead.")
+        return None
+
+    def delete_alarm(self, endpoint, auth_token, alarm_id):
+        """Delete alarm function."""
+        url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
+
+        result = False
+        try:
+            self._common._perform_request(url, auth_token, req_type="delete")
+            return True
+        except Exception as exc:
+            log.warn("Failed to delete alarm: %s because %s.", alarm_id, exc)
+        return result
 
-            # Request performed to create alarm
-            new_alarm = self._aodh_common._perform_request(
-                url, auth_token, req_type="post", payload=payload)
+    def list_alarms(self, endpoint, auth_token,
+                    alarm_name=None, resource_id=None, severity=None):
+        """Generate the requested list of alarms."""
+        result = None
+        if (alarm_name and resource_id and severity) is None:
+            # List all alarms
+            url = "{}/v2/alarms/".format(endpoint)
+
+            try:
+                result = self._common._perform_request(
+                    url, auth_token, req_type="get")
+                return json.loads(result.text)
+            except Exception as exc:
+                log.warn("Unable to generate alarm list: %s", exc)
 
-            return json.loads(new_alarm.text)['alarm_id']
+            return result
         else:
-            return alarm_id
+            # TODO(mcgoughh): support more specific lists
+            log.debug("Requested list is unavailable")
 
-    def delete_alarm(self, endpoint, auth_token, alarmID):
-        """Delete alarm function."""
-        url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
+        return result
+
+    def update_alarm_state(self, endpoint, auth_token, alarm_id):
+        """Set the state of an alarm to ok when ack message is received."""
+        result = False
+
+        url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
+        payload = json.dumps("ok")
+
+        try:
+            result = self._common._perform_request(
+                url, auth_token, req_type="put", payload=payload)
+            return True
+        except Exception as exc:
+            log.warn("Unable to update alarm state: %s", exc)
+        return result
 
-        self._aodh_common._perform_request(url, auth_token, req_type="delete")
+    def update_alarm(self, endpoint, auth_token, values):
+        """Get alarm name for an alarm configuration update."""
+        # Get already existing alarm details
+        url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid']
+
+        try:
+            result = self._common._perform_request(
+                url, auth_token, req_type="get")
+            alarm_name = json.loads(result.text)['name']
+            rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
+            alarm_state = json.loads(result.text)['state']
+            resource_id = rule['resource_id']
+            metric_name = rule['metric']
+        except Exception as exc:
+            log.warn("Failed to retreive existing alarm info: %s.\
+                     Can only update OSM created alarms.", exc)
+            return None
+
+        # Genate and check payload configuration for alarm update
+        payload = self.check_payload(values, metric_name, resource_id,
+                                     alarm_name, alarm_state=alarm_state)
+
+        if payload is not None:
+            try:
+                update_alarm = self._common._perform_request(
+                    url, auth_token, req_type="put", payload=payload)
+
+                return json.loads(update_alarm.text)['alarm_id']
+            except Exception as exc:
+                log.warn("Alarm update could not be performed: %s", exc)
+                return None
         return None
 
-    def get_severity(self, alarm_severity):
-        """Get a normalized severity for Aodh."""
-        # This logic can be changed, the other alternative was to have
-        # MINOR and MAJOR = "moderate" instead.
-        if alarm_severity == "WARNIING":
-            aodh_severity = "low"
-        elif alarm_severity == "MINOR":
-            aodh_severity = "moderate"
-        elif (alarm_severity == "MAJOR" or alarm_severity == "CRITICAL"):
-            aodh_severity = "critical"
-        else:
-            aodh_severity = None
-            log.warn("Invalid alarm severity configuration")
+    def check_payload(self, values, metric_name, resource_id,
+                      alarm_name, alarm_state=None):
+        """Check that the payload is configuration for update/create alarm."""
+        try:
+            # Check state and severity
+            severity = values['severity']
+            if severity == "INDETERMINATE":
+                alarm_state = "insufficient data"
+
+            if alarm_state is None:
+                alarm_state = "ok"
 
-        log.info("Severity has been normalized for Aodh to: %s", aodh_severity)
-        return aodh_severity
+            # Try to configure the payload for the update/create request
+            rule = {'threshold': values['threshold_value'],
+                    'comparison_operator': values['operation'].lower(),
+                    'metric': metric_name,
+                    'resource_id': resource_id,
+                    'resource_type': 'generic',
+                    'aggregation_method': values['statistic'].lower()}
+            payload = json.dumps({'state': alarm_state,
+                                  'name': alarm_name,
+                                  'severity': SEVERITIES[severity],
+                                  'type': 'gnocchi_resources_threshold',
+                                  'gnocchi_resources_threshold_rule': rule, })
+            return payload
+        except KeyError as exc:
+            log.warn("Alarm is not configured correctly: %s", exc)
+        return None
diff --git a/plugins/OpenStack/Aodh/aodh_common.py b/plugins/OpenStack/Aodh/aodh_common.py
deleted file mode 100644 (file)
index ac9dc26..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-"""Common methods for the Aodh Sender/Receiver."""
-
-import logging as log
-
-import requests
-
-from keystoneauth1.identity.v3 import AuthMethod
-
-from keystoneclient.service_catalog import ServiceCatalog
-from keystoneclient.v3 import client
-
-from plugins.OpenStack.settings import Config
-
-
-class Aodh_Common(object):
-    """Common calls for Aodh Sender/Receiver."""
-
-    def __init__(self):
-        """Create the common instance."""
-        self._auth_token = None
-        self._endpoint = None
-        self._ks = None
-
-    def _authenticate(self):
-        """Authenticate and/or renew the authentication token."""
-        if self._auth_token is not None:
-            return self._auth_token
-
-        try:
-            cfg = Config.instance()
-            self._ks = client.Client(auth_url=cfg.OS_AUTH_URL,
-                                     username=cfg.OS_USERNAME,
-                                     password=cfg.OS_PASSWORD,
-                                     tenant_name=cfg.OS_TENANT_NAME)
-            self._auth_token = self._ks.auth_token
-        except Exception as exc:
-
-            log.warn("Authentication failed with the following exception: %s",
-                     exc)
-            self._auth_token = None
-
-        return self._auth_token
-
-    def get_endpoint(self):
-        """Get the endpoint for Aodh."""
-        try:
-            return self._ks.service_catalog.url_for(
-                service_type='alarming',
-                endpoint_type='internalURL',
-                region_name='RegionOne')
-        except Exception as exc:
-            log.warning("Failed to retreive endpoint for Aodh due to: %s",
-                        exc)
-        return None
-
-    @classmethod
-    def _perform_request(cls, url, auth_token,
-                         req_type="get", payload=None, params=None):
-        """Perform the POST/PUT/GET/DELETE request."""
-        # request headers
-        headers = {'X-Auth-Token': auth_token,
-                   'Content-type': 'application/json'}
-        # perform request and return its result
-        response = None
-        try:
-            if req_type == "put":
-                response = requests.put(
-                    url, data=payload, headers=headers,
-                    timeout=1)
-            elif req_type == "post":
-                response = requests.post(
-                    url, data=payload, headers=headers,
-                    timeout=1)
-            elif req_type == "get":
-                response = requests.get(
-                    url, params=params, headers=headers, timeout=1)
-            elif req_type == "delete":
-                response = requests.delete(
-                    url, headers=headers, timeout=1)
-            else:
-                log.warn("Invalid request type")
-
-        except Exception as e:
-            log.warn("Exception thrown on request", e)
-
-        return response
index 22db409..364a12e 100644 (file)
@@ -1,6 +1,11 @@
 """Aodh plugin for the OSM monitoring module."""
 
 import logging as log
+#import sys
+
+#path = "/home/stack/MON"
+#if path not in sys.path:
+#    sys.path.append(path)
 
 from plugins.OpenStack.Aodh.alarming import Alarming
 from plugins.OpenStack.settings import Config
@@ -26,7 +31,7 @@ class Plugin(object):
     def config(self):
         """Configure plugin."""
         log.info("Configure the plugin instance.")
-        self._config.read_environ()
+        self._config.read_environ("aodh")
 
     def alarm(self):
         """Allow alarm info to be received from Aodh."""
index d706456..68ce4e6 100644 (file)
@@ -21,7 +21,7 @@ class Common(object):
         self._endpoint = None
         self._ks = None
 
-    def _authenticate(self, tenant_id):
+    def _authenticate(self, tenant_id=None):
         """Authenticate and/or renew the authentication token."""
         if self._auth_token is not None:
             return self._auth_token
index fc54b07..45620d9 100644 (file)
@@ -7,7 +7,7 @@ import os
 
 from collections import namedtuple
 
-from plugins.Openstack.singleton import Singleton
+from plugins.OpenStack.singleton import Singleton
 
 import six
 
@@ -38,7 +38,7 @@ class Config(object):
     _configuration = [
         CfgParam('OS_AUTH_URL', None, six.text_type),
         CfgParam('OS_IDENTITY_API_VERSION', "3", six.text_type),
-        CfgParam('OS_USERNAME', "aodh", six.text_type),
+        CfgParam('OS_USERNAME', None, six.text_type),
         CfgParam('OS_PASSWORD', "password", six.text_type),
         CfgParam('OS_TENANT_NAME', "service", six.text_type),
     ]
@@ -51,9 +51,11 @@ class Config(object):
         for cfg in self._configuration:
             setattr(self, cfg.key, cfg.default)
 
-    def read_environ(self):
+    def read_environ(self, service):
         """Check the appropriate environment variables and update defaults."""
         for key in self._config_keys:
+            # Default username for a service is it's name
+            setattr(self, 'OS_USERNAME', service)
             if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"):
                 val = str(os.environ[key])
                 setattr(self, key, val)