Updated the Openstack-Aodh-plugin 02/2102/1
authorHelena McGough <helena.mcgough@intel.com>
Tue, 29 Aug 2017 09:52:56 +0000 (09:52 +0000)
committerHelena McGough <helena.mcgough@intel.com>
Tue, 29 Aug 2017 09:52:56 +0000 (09:52 +0000)
 - Can read in "alarms" messages via a consume
 - Checks for and creates new alarms

Change-Id: I13f389677b8ab600d1e6974d0dc47a5cfe74acfc
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
plugins/OpenStack/Aodh/alarming.py
plugins/OpenStack/Aodh/aodh_common.py
plugins/OpenStack/Aodh/plugin_instance.py
plugins/OpenStack/settings.py
plugins/OpenStack/singleton.py

index 3ae31ba..b44d624 100644 (file)
@@ -1,7 +1,13 @@
-"""Send alarm info from Aodh to SO via MON"""
+"""Send alarm info from Aodh to SO via MON."""
 
 import json
-from plugins.Openstack.Aodh.aodh_common import Aodh_Common
+import logging as log
+
+from collections import OrderedDict
+
+from kafka import KafkaConsumer
+
+from plugins.OpenStack.Aodh.aodh_common import Aodh_Common
 
 
 class Alarming(object):
@@ -11,52 +17,123 @@ class Alarming(object):
         """Create the aodh_receiver instance."""
         self._aodh_common = Aodh_Common()
 
+        # Initialize a generic consumer object to consume message from the SO
+        server = {'server': 'localhost:9092', 'topic': 'alarms'}
+        self._consumer = KafkaConsumer(server['topic'],
+                                       group_id='my-group',
+                                       bootstrap_servers=server['server'])
+
+        # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
+
     def alarming(self):
-        """Receive payload from Aodh."""
+        """Consume info from the message bus to manage alarms."""
+        # Generate authentication credentials to access keystone;
+        # auth_token, endpoint
         auth_token = self._aodh_common._authenticate()
         endpoint = self._aodh_common.get_endpoint()
 
-        alarm_list = self._get_alarm_list(endpoint, auth_token)
-        # Confirm communication with Aodh by listing alarms
-        print("Alarm List ", alarm_list.text)
-
-        alarm_id = self._create_alarm(endpoint, auth_token)
-        print(alarm_id)
+        # Check the alarming functionlity that needs to be performed
+        for message in self._consumer:
+            if message.topic == "alarms":
+                log.info("Alarm action required: %s" % (message.topic))
+
+                if message.key == "configure_alarm":
+                    # Configure/Update an alarm
+                    alarm_details = json.loads(message.value)
+                    alarm_id = self.configure_alarm(endpoint,
+                                                    auth_token, alarm_details)
+                    log.info("New alarm created with alarmID: %s", alarm_id)
+
+                    # TODO(mcgoughh): will send an acknowledge message back on
+                    # the bus via the producer
+
+                else:
+                    # TODO(mcoughh): Key alternatives are "notify_alarm" and
+                    # "acknowledge_alarm" will be accomodated later
+                    log.debug("Unknown key, no action will be performed")
+            else:
+                log.info("Message topic not relevant to this plugin: %s",
+                         message.topic)
 
-#        alarm_info = self._get_alarm_info(endpoint,
-#            auth_token, "372af0e2-5c36-4e4d-8ce9-ca92d97d07d0")
-#        print("Alarm info", alarm_info.text)
         return
 
-    def _get_alarm_list(self, endpoint, auth_token):
+    def alarm_check(self, endpoint, auth_token, alarm_name):
         """Get a list of alarms that exist in Aodh."""
         url = "{}/v2/alarms/".format(endpoint)
 
-        alarm_list = self._aodh_common._perform_request(url, auth_token,
-                                                        req_type="get")
-        return alarm_list
+        # TODO(mcgoughh): will query on resource_id once it has been
+        # implemented need to create the query field when creating
+        # the alarm
+        query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
+                            ("q.value", str(alarm_name))])
 
-    def _get_alarm_info(self, endpoint, auth_token, alarmID):
-        """Get information about a specific alarm from Aodh."""
-        url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
+        result = self._aodh_common._perform_request(
+            url, auth_token, req_type="get", params=query)
 
-        alarm_details = self._aodh_common._perform_request(url, auth_token,
-                                                           req_type="get")
-        return alarm_details
+        try:
+            alarm_id = json.loads(result.text)[0]['alarm_id']
+            log.info("An existing alarm was found: %s", alarm_id)
+            return alarm_id
+        except Exception:
+            log.debug("Alarm doesn't exist, needs to be created.")
+            return None
 
-    def _create_alarm(self, endpoint, auth_token):
+    def configure_alarm(self, endpoint, auth_token, values):
         """Get a list of alarms that exist in Aodh."""
-        url = "{}/v2/alarms/".format(endpoint)
-
-        rule = {'event_type': "threshold",}
-        payload = json.dumps({'state': 'alarm',
-                              'name': 'my_alarm',
-                              'severity': 'moderate',
-                              'type': 'event',
-                              'event_rule': rule,})
-
-        new_alarm = self._aodh_common._perform_request(url, auth_token,
-                                                        req_type="post", payload=payload)
-        alarm_id =json.loads(new_alarm.text)['alarm_id']
-        return alarm_id
+        alarm_id = None
+
+        # TODO(mcgoughh): error check the values sent in the messag
+        alarm_name = values['name']
+
+        # Check that this alarm doesn't exist already
+        alarm_id = self.alarm_check(endpoint, auth_token, alarm_name)
+
+        if alarm_id is None:
+            url = "{}/v2/alarms/".format(endpoint)
+            severity = values['severity']
+
+            # Create a new threshold alarm with a resourceID
+            # specified as a query
+            rule = {'threshold': values['threshold'],
+                    'comparison_operator': 'gt',
+                    'metric': values['metric'],
+                    'resource_id': values['resource_id'],
+                    'resource_type': 'generic',
+                    'aggregation_method': 'last', }
+            payload = json.dumps({'state': 'alarm',
+                                  'name': alarm_name,
+                                  'severity': self.get_severity(severity),
+                                  'type': 'gnocchi_resources_threshold',
+                                  'gnocchi_resources_threshold_rule': rule, })
+
+            # Request performed to create alarm
+            new_alarm = self._aodh_common._perform_request(
+                url, auth_token, req_type="post", payload=payload)
+
+            return json.loads(new_alarm.text)['alarm_id']
+        else:
+            return alarm_id
+
+    def delete_alarm(self, endpoint, auth_token, alarmID):
+        """Delete alarm function."""
+        url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
 
+        self._aodh_common._perform_request(url, auth_token, req_type="delete")
+        return None
+
+    def get_severity(self, alarm_severity):
+        """Get a normalized severity for Aodh."""
+        # This logic can be changed, the other alternative was to have
+        # MINOR and MAJOR = "moderate" instead.
+        if alarm_severity == "WARNIING":
+            aodh_severity = "low"
+        elif alarm_severity == "MINOR":
+            aodh_severity = "moderate"
+        elif (alarm_severity == "MAJOR" or alarm_severity == "CRITICAL"):
+            aodh_severity = "critical"
+        else:
+            aodh_severity = None
+            log.warn("Invalid alarm severity configuration")
+
+        log.info("Severity has been normalized for Aodh to: %s", aodh_severity)
+        return aodh_severity
index 28e44fc..ac9dc26 100644 (file)
@@ -1,16 +1,15 @@
 """Common methods for the Aodh Sender/Receiver."""
 
-import threading
-import os
+import logging as log
+
 import requests
 
-from keystoneauth1.identity import v3
 from keystoneauth1.identity.v3 import AuthMethod
-from keystoneauth1 import session
-from keystoneclient.v3 import client
+
 from keystoneclient.service_catalog import ServiceCatalog
+from keystoneclient.v3 import client
 
-from plugins.Openstack.settings import Config
+from plugins.OpenStack.settings import Config
 
 
 class Aodh_Common(object):
@@ -23,8 +22,7 @@ class Aodh_Common(object):
         self._ks = None
 
     def _authenticate(self):
-        """Authenticate and/or renew the authentication token"""
-
+        """Authenticate and/or renew the authentication token."""
         if self._auth_token is not None:
             return self._auth_token
 
@@ -37,26 +35,33 @@ class Aodh_Common(object):
             self._auth_token = self._ks.auth_token
         except Exception as exc:
 
-            # TODO: Log errors
+            log.warn("Authentication failed with the following exception: %s",
+                     exc)
             self._auth_token = None
 
         return self._auth_token
 
     def get_endpoint(self):
-        endpoint = self._ks.service_catalog.url_for(
-            service_type='alarming',
-            endpoint_type='internalURL',
-            region_name='RegionOne')
-        return endpoint
+        """Get the endpoint for Aodh."""
+        try:
+            return self._ks.service_catalog.url_for(
+                service_type='alarming',
+                endpoint_type='internalURL',
+                region_name='RegionOne')
+        except Exception as exc:
+            log.warning("Failed to retreive endpoint for Aodh due to: %s",
+                        exc)
+        return None
 
     @classmethod
-    def _perform_request(cls, url, auth_token, req_type="get", payload=None):
+    def _perform_request(cls, url, auth_token,
+                         req_type="get", payload=None, params=None):
         """Perform the POST/PUT/GET/DELETE request."""
-
         # request headers
         headers = {'X-Auth-Token': auth_token,
                    'Content-type': 'application/json'}
         # perform request and return its result
+        response = None
         try:
             if req_type == "put":
                 response = requests.put(
@@ -68,15 +73,14 @@ class Aodh_Common(object):
                     timeout=1)
             elif req_type == "get":
                 response = requests.get(
-                    url, headers=headers, timeout=1)
+                    url, params=params, headers=headers, timeout=1)
             elif req_type == "delete":
                 response = requests.delete(
                     url, headers=headers, timeout=1)
             else:
-                print("Invalid request type")
+                log.warn("Invalid request type")
 
         except Exception as e:
-            # Log info later
-            print("Exception thrown on request", e)
+            log.warn("Exception thrown on request", e)
 
         return response
index 8096f3f..22db409 100644 (file)
@@ -1,14 +1,9 @@
 """Aodh plugin for the OSM monitoring module."""
 
-import sys
-import logging
+import logging as log
 
-path = "/home/stack/MON"
-if path not in sys.path:
-    sys.path.append(path)
-
-from plugins.Openstack.Aodh.alarming import Alarming
-from plugins.Openstack.settings import Config
+from plugins.OpenStack.Aodh.alarming import Alarming
+from plugins.OpenStack.settings import Config
 
 
 def register_plugin():
@@ -24,15 +19,18 @@ class Plugin(object):
 
     def __init__(self, config):
         """Plugin instance."""
+        log.info("Initialze the plugin instance.")
         self._config = config
         self._alarm = Alarming()
 
     def config(self):
         """Configure plugin."""
+        log.info("Configure the plugin instance.")
         self._config.read_environ()
 
     def alarm(self):
         """Allow alarm info to be received from Aodh."""
+        log.info("Begin alarm functionality.")
         self._alarm.alarming()
 
 register_plugin()
index 4dacef9..fc54b07 100644 (file)
@@ -2,27 +2,30 @@
 
 from __future__ import unicode_literals
 
-from plugins.Openstack.singleton import Singleton
+import logging as log
+import os
 
 from collections import namedtuple
+
+from plugins.Openstack.singleton import Singleton
+
 import six
-import os
 
 
 class BadConfigError(Exception):
-    """Configuration exception"""
+    """Configuration exception."""
+
     pass
 
 
 class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
-    """Configuration parameter definition"""
+    """Configuration parameter definition."""
 
     def value(self, data):
-        """Convert a string to the parameter type"""
-
+        """Convert a string to the parameter type."""
         try:
             return self.data_type(data)
-        except (ValueError, TypeError) as exc:
+        except (ValueError, TypeError):
             raise BadConfigError(
                 'Invalid value "%s" for configuration parameter "%s"' % (
                     data, self.key))
@@ -30,7 +33,7 @@ class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
 
 @Singleton
 class Config(object):
-    """Plugin confguration"""
+    """Plugin confguration."""
 
     _configuration = [
         CfgParam('OS_AUTH_URL', None, six.text_type),
@@ -44,13 +47,12 @@ class Config(object):
     _config_keys = _config_dict.keys()
 
     def __init__(self):
-        """Set the default values"""
+        """Set the default values."""
         for cfg in self._configuration:
             setattr(self, cfg.key, cfg.default)
 
     def read_environ(self):
         """Check the appropriate environment variables and update defaults."""
-
         for key in self._config_keys:
             if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"):
                 val = str(os.environ[key])
@@ -59,5 +61,6 @@ class Config(object):
                 val = str(os.environ[key]) + "/v3"
                 setattr(self, key, val)
             else:
-                # TODO: Log errors and no config updates required
+                # TODO(mcgoughh): Log errors and no config updates required
+                log.warn("Configuration doesn't require updating")
                 return
index 2edc20b..12cd5a9 100644 (file)
@@ -1,14 +1,17 @@
+"""Simple singleton class."""
+
 from __future__ import unicode_literals
 
 
 class Singleton(object):
-    """Simple singleton class"""
+    """Simple singleton class."""
 
     def __init__(self, decorated):
+        """Initialize singleton instance."""
         self._decorated = decorated
 
     def instance(self):
-        """Return singleton instance"""
+        """Return singleton instance."""
         try:
             return self._instance
         except AttributeError: