Updated the Openstack-Aodh-plugin
- Can read in "alarms" messages via a consume
- Checks for and creates new alarms
Change-Id: I13f389677b8ab600d1e6974d0dc47a5cfe74acfc
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
diff --git a/plugins/OpenStack/Aodh/alarming.py b/plugins/OpenStack/Aodh/alarming.py
index 3ae31ba..b44d624 100644
--- a/plugins/OpenStack/Aodh/alarming.py
+++ b/plugins/OpenStack/Aodh/alarming.py
@@ -1,7 +1,13 @@
-"""Send alarm info from Aodh to SO via MON"""
+"""Send alarm info from Aodh to SO via MON."""
import json
-from plugins.Openstack.Aodh.aodh_common import Aodh_Common
+import logging as log
+
+from collections import OrderedDict
+
+from kafka import KafkaConsumer
+
+from plugins.OpenStack.Aodh.aodh_common import Aodh_Common
class Alarming(object):
@@ -11,52 +17,123 @@
"""Create the aodh_receiver instance."""
self._aodh_common = Aodh_Common()
+ # Initialize a generic consumer object to consume message from the SO
+ server = {'server': 'localhost:9092', 'topic': 'alarms'}
+ self._consumer = KafkaConsumer(server['topic'],
+ group_id='my-group',
+ bootstrap_servers=server['server'])
+
+ # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
+
def alarming(self):
- """Receive payload from Aodh."""
+ """Consume info from the message bus to manage alarms."""
+ # Generate authentication credentials to access keystone;
+ # auth_token, endpoint
auth_token = self._aodh_common._authenticate()
endpoint = self._aodh_common.get_endpoint()
- alarm_list = self._get_alarm_list(endpoint, auth_token)
- # Confirm communication with Aodh by listing alarms
- print("Alarm List ", alarm_list.text)
+ # Check the alarming functionlity that needs to be performed
+ for message in self._consumer:
+ if message.topic == "alarms":
+ log.info("Alarm action required: %s" % (message.topic))
- alarm_id = self._create_alarm(endpoint, auth_token)
- print(alarm_id)
+ if message.key == "configure_alarm":
+ # Configure/Update an alarm
+ alarm_details = json.loads(message.value)
+ alarm_id = self.configure_alarm(endpoint,
+ auth_token, alarm_details)
+ log.info("New alarm created with alarmID: %s", alarm_id)
-# alarm_info = self._get_alarm_info(endpoint,
-# auth_token, "372af0e2-5c36-4e4d-8ce9-ca92d97d07d0")
-# print("Alarm info", alarm_info.text)
+ # TODO(mcgoughh): will send an acknowledge message back on
+ # the bus via the producer
+
+ else:
+ # TODO(mcoughh): Key alternatives are "notify_alarm" and
+ # "acknowledge_alarm" will be accomodated later
+ log.debug("Unknown key, no action will be performed")
+ else:
+ log.info("Message topic not relevant to this plugin: %s",
+ message.topic)
+
return
- def _get_alarm_list(self, endpoint, auth_token):
+ def alarm_check(self, endpoint, auth_token, alarm_name):
"""Get a list of alarms that exist in Aodh."""
url = "{}/v2/alarms/".format(endpoint)
- alarm_list = self._aodh_common._perform_request(url, auth_token,
- req_type="get")
- return alarm_list
+ # TODO(mcgoughh): will query on resource_id once it has been
+ # implemented need to create the query field when creating
+ # the alarm
+ query = OrderedDict([("q.field", 'name'), ("q.op", "eq"),
+ ("q.value", str(alarm_name))])
- def _get_alarm_info(self, endpoint, auth_token, alarmID):
- """Get information about a specific alarm from Aodh."""
+ result = self._aodh_common._perform_request(
+ url, auth_token, req_type="get", params=query)
+
+ try:
+ alarm_id = json.loads(result.text)[0]['alarm_id']
+ log.info("An existing alarm was found: %s", alarm_id)
+ return alarm_id
+ except Exception:
+ log.debug("Alarm doesn't exist, needs to be created.")
+ return None
+
+ def configure_alarm(self, endpoint, auth_token, values):
+ """Get a list of alarms that exist in Aodh."""
+ alarm_id = None
+
+ # TODO(mcgoughh): error check the values sent in the messag
+ alarm_name = values['name']
+
+ # Check that this alarm doesn't exist already
+ alarm_id = self.alarm_check(endpoint, auth_token, alarm_name)
+
+ if alarm_id is None:
+ url = "{}/v2/alarms/".format(endpoint)
+ severity = values['severity']
+
+ # Create a new threshold alarm with a resourceID
+ # specified as a query
+ rule = {'threshold': values['threshold'],
+ 'comparison_operator': 'gt',
+ 'metric': values['metric'],
+ 'resource_id': values['resource_id'],
+ 'resource_type': 'generic',
+ 'aggregation_method': 'last', }
+ payload = json.dumps({'state': 'alarm',
+ 'name': alarm_name,
+ 'severity': self.get_severity(severity),
+ 'type': 'gnocchi_resources_threshold',
+ 'gnocchi_resources_threshold_rule': rule, })
+
+ # Request performed to create alarm
+ new_alarm = self._aodh_common._perform_request(
+ url, auth_token, req_type="post", payload=payload)
+
+ return json.loads(new_alarm.text)['alarm_id']
+ else:
+ return alarm_id
+
+ def delete_alarm(self, endpoint, auth_token, alarmID):
+ """Delete alarm function."""
url = "{}/v2/alarms/%s".format(endpoint) % (alarmID)
- alarm_details = self._aodh_common._perform_request(url, auth_token,
- req_type="get")
- return alarm_details
+ self._aodh_common._perform_request(url, auth_token, req_type="delete")
+ return None
- def _create_alarm(self, endpoint, auth_token):
- """Get a list of alarms that exist in Aodh."""
- url = "{}/v2/alarms/".format(endpoint)
+ def get_severity(self, alarm_severity):
+ """Get a normalized severity for Aodh."""
+ # This logic can be changed, the other alternative was to have
+ # MINOR and MAJOR = "moderate" instead.
+ if alarm_severity == "WARNIING":
+ aodh_severity = "low"
+ elif alarm_severity == "MINOR":
+ aodh_severity = "moderate"
+ elif (alarm_severity == "MAJOR" or alarm_severity == "CRITICAL"):
+ aodh_severity = "critical"
+ else:
+ aodh_severity = None
+ log.warn("Invalid alarm severity configuration")
- rule = {'event_type': "threshold",}
- payload = json.dumps({'state': 'alarm',
- 'name': 'my_alarm',
- 'severity': 'moderate',
- 'type': 'event',
- 'event_rule': rule,})
-
- new_alarm = self._aodh_common._perform_request(url, auth_token,
- req_type="post", payload=payload)
- alarm_id =json.loads(new_alarm.text)['alarm_id']
- return alarm_id
-
+ log.info("Severity has been normalized for Aodh to: %s", aodh_severity)
+ return aodh_severity
diff --git a/plugins/OpenStack/Aodh/aodh_common.py b/plugins/OpenStack/Aodh/aodh_common.py
index 28e44fc..ac9dc26 100644
--- a/plugins/OpenStack/Aodh/aodh_common.py
+++ b/plugins/OpenStack/Aodh/aodh_common.py
@@ -1,16 +1,15 @@
"""Common methods for the Aodh Sender/Receiver."""
-import threading
-import os
+import logging as log
+
import requests
-from keystoneauth1.identity import v3
from keystoneauth1.identity.v3 import AuthMethod
-from keystoneauth1 import session
-from keystoneclient.v3 import client
-from keystoneclient.service_catalog import ServiceCatalog
-from plugins.Openstack.settings import Config
+from keystoneclient.service_catalog import ServiceCatalog
+from keystoneclient.v3 import client
+
+from plugins.OpenStack.settings import Config
class Aodh_Common(object):
@@ -23,8 +22,7 @@
self._ks = None
def _authenticate(self):
- """Authenticate and/or renew the authentication token"""
-
+ """Authenticate and/or renew the authentication token."""
if self._auth_token is not None:
return self._auth_token
@@ -37,26 +35,33 @@
self._auth_token = self._ks.auth_token
except Exception as exc:
- # TODO: Log errors
+ log.warn("Authentication failed with the following exception: %s",
+ exc)
self._auth_token = None
return self._auth_token
def get_endpoint(self):
- endpoint = self._ks.service_catalog.url_for(
- service_type='alarming',
- endpoint_type='internalURL',
- region_name='RegionOne')
- return endpoint
+ """Get the endpoint for Aodh."""
+ try:
+ return self._ks.service_catalog.url_for(
+ service_type='alarming',
+ endpoint_type='internalURL',
+ region_name='RegionOne')
+ except Exception as exc:
+ log.warning("Failed to retreive endpoint for Aodh due to: %s",
+ exc)
+ return None
@classmethod
- def _perform_request(cls, url, auth_token, req_type="get", payload=None):
+ def _perform_request(cls, url, auth_token,
+ req_type="get", payload=None, params=None):
"""Perform the POST/PUT/GET/DELETE request."""
-
# request headers
headers = {'X-Auth-Token': auth_token,
'Content-type': 'application/json'}
# perform request and return its result
+ response = None
try:
if req_type == "put":
response = requests.put(
@@ -68,15 +73,14 @@
timeout=1)
elif req_type == "get":
response = requests.get(
- url, headers=headers, timeout=1)
+ url, params=params, headers=headers, timeout=1)
elif req_type == "delete":
response = requests.delete(
url, headers=headers, timeout=1)
else:
- print("Invalid request type")
+ log.warn("Invalid request type")
except Exception as e:
- # Log info later
- print("Exception thrown on request", e)
+ log.warn("Exception thrown on request", e)
return response
diff --git a/plugins/OpenStack/Aodh/plugin_instance.py b/plugins/OpenStack/Aodh/plugin_instance.py
index 8096f3f..22db409 100644
--- a/plugins/OpenStack/Aodh/plugin_instance.py
+++ b/plugins/OpenStack/Aodh/plugin_instance.py
@@ -1,14 +1,9 @@
"""Aodh plugin for the OSM monitoring module."""
-import sys
-import logging
+import logging as log
-path = "/home/stack/MON"
-if path not in sys.path:
- sys.path.append(path)
-
-from plugins.Openstack.Aodh.alarming import Alarming
-from plugins.Openstack.settings import Config
+from plugins.OpenStack.Aodh.alarming import Alarming
+from plugins.OpenStack.settings import Config
def register_plugin():
@@ -24,15 +19,18 @@
def __init__(self, config):
"""Plugin instance."""
+ log.info("Initialze the plugin instance.")
self._config = config
self._alarm = Alarming()
def config(self):
"""Configure plugin."""
+ log.info("Configure the plugin instance.")
self._config.read_environ()
def alarm(self):
"""Allow alarm info to be received from Aodh."""
+ log.info("Begin alarm functionality.")
self._alarm.alarming()
register_plugin()
diff --git a/plugins/OpenStack/settings.py b/plugins/OpenStack/settings.py
index 4dacef9..fc54b07 100644
--- a/plugins/OpenStack/settings.py
+++ b/plugins/OpenStack/settings.py
@@ -2,27 +2,30 @@
from __future__ import unicode_literals
-from plugins.Openstack.singleton import Singleton
+import logging as log
+import os
from collections import namedtuple
+
+from plugins.Openstack.singleton import Singleton
+
import six
-import os
class BadConfigError(Exception):
- """Configuration exception"""
+ """Configuration exception."""
+
pass
class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
- """Configuration parameter definition"""
+ """Configuration parameter definition."""
def value(self, data):
- """Convert a string to the parameter type"""
-
+ """Convert a string to the parameter type."""
try:
return self.data_type(data)
- except (ValueError, TypeError) as exc:
+ except (ValueError, TypeError):
raise BadConfigError(
'Invalid value "%s" for configuration parameter "%s"' % (
data, self.key))
@@ -30,7 +33,7 @@
@Singleton
class Config(object):
- """Plugin confguration"""
+ """Plugin confguration."""
_configuration = [
CfgParam('OS_AUTH_URL', None, six.text_type),
@@ -44,13 +47,12 @@
_config_keys = _config_dict.keys()
def __init__(self):
- """Set the default values"""
+ """Set the default values."""
for cfg in self._configuration:
setattr(self, cfg.key, cfg.default)
def read_environ(self):
"""Check the appropriate environment variables and update defaults."""
-
for key in self._config_keys:
if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"):
val = str(os.environ[key])
@@ -59,5 +61,6 @@
val = str(os.environ[key]) + "/v3"
setattr(self, key, val)
else:
- # TODO: Log errors and no config updates required
+ # TODO(mcgoughh): Log errors and no config updates required
+ log.warn("Configuration doesn't require updating")
return
diff --git a/plugins/OpenStack/singleton.py b/plugins/OpenStack/singleton.py
index 2edc20b..12cd5a9 100644
--- a/plugins/OpenStack/singleton.py
+++ b/plugins/OpenStack/singleton.py
@@ -1,14 +1,17 @@
+"""Simple singleton class."""
+
from __future__ import unicode_literals
class Singleton(object):
- """Simple singleton class"""
+ """Simple singleton class."""
def __init__(self, decorated):
+ """Initialize singleton instance."""
self._decorated = decorated
def instance(self):
- """Return singleton instance"""
+ """Return singleton instance."""
try:
return self._instance
except AttributeError: