From c85d9848945f55dcda1ab660a116b45c5cd51152 Mon Sep 17 00:00:00 2001 From: Helena McGough Date: Tue, 29 Aug 2017 09:52:56 +0000 Subject: [PATCH] Updated the Openstack-Aodh-plugin - Can read in "alarms" messages via a consume - Checks for and creates new alarms Change-Id: I13f389677b8ab600d1e6974d0dc47a5cfe74acfc Signed-off-by: Helena McGough --- plugins/OpenStack/Aodh/alarming.py | 149 ++++++++++++++++------ plugins/OpenStack/Aodh/aodh_common.py | 44 ++++--- plugins/OpenStack/Aodh/plugin_instance.py | 14 +- plugins/OpenStack/settings.py | 25 ++-- plugins/OpenStack/singleton.py | 7 +- 5 files changed, 162 insertions(+), 77 deletions(-) diff --git a/plugins/OpenStack/Aodh/alarming.py b/plugins/OpenStack/Aodh/alarming.py index 3ae31ba..b44d624 100644 --- a/plugins/OpenStack/Aodh/alarming.py +++ b/plugins/OpenStack/Aodh/alarming.py @@ -1,7 +1,13 @@ -"""Send alarm info from Aodh to SO via MON""" +"""Send alarm info from Aodh to SO via MON.""" import json -from plugins.Openstack.Aodh.aodh_common import Aodh_Common +import logging as log + +from collections import OrderedDict + +from kafka import KafkaConsumer + +from plugins.OpenStack.Aodh.aodh_common import Aodh_Common class Alarming(object): @@ -11,52 +17,123 @@ class Alarming(object): """Create the aodh_receiver instance.""" self._aodh_common = Aodh_Common() + # Initialize a generic consumer object to consume message from the SO + server = {'server': 'localhost:9092', 'topic': 'alarms'} + self._consumer = KafkaConsumer(server['topic'], + group_id='my-group', + bootstrap_servers=server['server']) + + # TODO(mcgoughh): Initialize a producer to send messages bask to the SO + def alarming(self): - """Receive payload from Aodh.""" + """Consume info from the message bus to manage alarms.""" + # Generate authentication credentials to access keystone; + # auth_token, endpoint auth_token = self._aodh_common._authenticate() endpoint = self._aodh_common.get_endpoint() - alarm_list = self._get_alarm_list(endpoint, auth_token) - # Confirm communication with Aodh by listing alarms - print("Alarm List ", alarm_list.text) - - alarm_id = self._create_alarm(endpoint, auth_token) - print(alarm_id) + # Check the alarming functionlity that needs to be performed + for message in self._consumer: + if message.topic == "alarms": + log.info("Alarm action required: %s" % (message.topic)) + + if message.key == "configure_alarm": + # Configure/Update an alarm + alarm_details = json.loads(message.value) + alarm_id = self.configure_alarm(endpoint, + auth_token, alarm_details) + log.info("New alarm created with alarmID: %s", alarm_id) + + # TODO(mcgoughh): will send an acknowledge message back on + # the bus via the producer + + else: + # TODO(mcoughh): Key alternatives are "notify_alarm" and + # "acknowledge_alarm" will be accomodated later + log.debug("Unknown key, no action will be performed") + else: + log.info("Message topic not relevant to this plugin: %s", + message.topic) -# alarm_info = self._get_alarm_info(endpoint, -# auth_token, "372af0e2-5c36-4e4d-8ce9-ca92d97d07d0") -# print("Alarm info", alarm_info.text) return - def _get_alarm_list(self, endpoint, auth_token): + def alarm_check(self, endpoint, auth_token, alarm_name): """Get a list of alarms that exist in Aodh.""" url = "{}/v2/alarms/".format(endpoint) - alarm_list = self._aodh_common._perform_request(url, auth_token, - req_type="get") - return alarm_list + # TODO(mcgoughh): will query on resource_id once it has been + # implemented need to create the query field when creating + # the alarm + query = OrderedDict([("q.field", 'name'), ("q.op", "eq"), + ("q.value", str(alarm_name))]) - def _get_alarm_info(self, endpoint, auth_token, alarmID): - """Get information about a specific alarm from Aodh.""" - url = "{}/v2/alarms/%s".format(endpoint) % (alarmID) + result = self._aodh_common._perform_request( + url, auth_token, req_type="get", params=query) - alarm_details = self._aodh_common._perform_request(url, auth_token, - req_type="get") - return alarm_details + try: + alarm_id = json.loads(result.text)[0]['alarm_id'] + log.info("An existing alarm was found: %s", alarm_id) + return alarm_id + except Exception: + log.debug("Alarm doesn't exist, needs to be created.") + return None - def _create_alarm(self, endpoint, auth_token): + def configure_alarm(self, endpoint, auth_token, values): """Get a list of alarms that exist in Aodh.""" - url = "{}/v2/alarms/".format(endpoint) - - rule = {'event_type': "threshold",} - payload = json.dumps({'state': 'alarm', - 'name': 'my_alarm', - 'severity': 'moderate', - 'type': 'event', - 'event_rule': rule,}) - - new_alarm = self._aodh_common._perform_request(url, auth_token, - req_type="post", payload=payload) - alarm_id =json.loads(new_alarm.text)['alarm_id'] - return alarm_id + alarm_id = None + + # TODO(mcgoughh): error check the values sent in the messag + alarm_name = values['name'] + + # Check that this alarm doesn't exist already + alarm_id = self.alarm_check(endpoint, auth_token, alarm_name) + + if alarm_id is None: + url = "{}/v2/alarms/".format(endpoint) + severity = values['severity'] + + # Create a new threshold alarm with a resourceID + # specified as a query + rule = {'threshold': values['threshold'], + 'comparison_operator': 'gt', + 'metric': values['metric'], + 'resource_id': values['resource_id'], + 'resource_type': 'generic', + 'aggregation_method': 'last', } + payload = json.dumps({'state': 'alarm', + 'name': alarm_name, + 'severity': self.get_severity(severity), + 'type': 'gnocchi_resources_threshold', + 'gnocchi_resources_threshold_rule': rule, }) + + # Request performed to create alarm + new_alarm = self._aodh_common._perform_request( + url, auth_token, req_type="post", payload=payload) + + return json.loads(new_alarm.text)['alarm_id'] + else: + return alarm_id + + def delete_alarm(self, endpoint, auth_token, alarmID): + """Delete alarm function.""" + url = "{}/v2/alarms/%s".format(endpoint) % (alarmID) + self._aodh_common._perform_request(url, auth_token, req_type="delete") + return None + + def get_severity(self, alarm_severity): + """Get a normalized severity for Aodh.""" + # This logic can be changed, the other alternative was to have + # MINOR and MAJOR = "moderate" instead. + if alarm_severity == "WARNIING": + aodh_severity = "low" + elif alarm_severity == "MINOR": + aodh_severity = "moderate" + elif (alarm_severity == "MAJOR" or alarm_severity == "CRITICAL"): + aodh_severity = "critical" + else: + aodh_severity = None + log.warn("Invalid alarm severity configuration") + + log.info("Severity has been normalized for Aodh to: %s", aodh_severity) + return aodh_severity diff --git a/plugins/OpenStack/Aodh/aodh_common.py b/plugins/OpenStack/Aodh/aodh_common.py index 28e44fc..ac9dc26 100644 --- a/plugins/OpenStack/Aodh/aodh_common.py +++ b/plugins/OpenStack/Aodh/aodh_common.py @@ -1,16 +1,15 @@ """Common methods for the Aodh Sender/Receiver.""" -import threading -import os +import logging as log + import requests -from keystoneauth1.identity import v3 from keystoneauth1.identity.v3 import AuthMethod -from keystoneauth1 import session -from keystoneclient.v3 import client + from keystoneclient.service_catalog import ServiceCatalog +from keystoneclient.v3 import client -from plugins.Openstack.settings import Config +from plugins.OpenStack.settings import Config class Aodh_Common(object): @@ -23,8 +22,7 @@ class Aodh_Common(object): self._ks = None def _authenticate(self): - """Authenticate and/or renew the authentication token""" - + """Authenticate and/or renew the authentication token.""" if self._auth_token is not None: return self._auth_token @@ -37,26 +35,33 @@ class Aodh_Common(object): self._auth_token = self._ks.auth_token except Exception as exc: - # TODO: Log errors + log.warn("Authentication failed with the following exception: %s", + exc) self._auth_token = None return self._auth_token def get_endpoint(self): - endpoint = self._ks.service_catalog.url_for( - service_type='alarming', - endpoint_type='internalURL', - region_name='RegionOne') - return endpoint + """Get the endpoint for Aodh.""" + try: + return self._ks.service_catalog.url_for( + service_type='alarming', + endpoint_type='internalURL', + region_name='RegionOne') + except Exception as exc: + log.warning("Failed to retreive endpoint for Aodh due to: %s", + exc) + return None @classmethod - def _perform_request(cls, url, auth_token, req_type="get", payload=None): + def _perform_request(cls, url, auth_token, + req_type="get", payload=None, params=None): """Perform the POST/PUT/GET/DELETE request.""" - # request headers headers = {'X-Auth-Token': auth_token, 'Content-type': 'application/json'} # perform request and return its result + response = None try: if req_type == "put": response = requests.put( @@ -68,15 +73,14 @@ class Aodh_Common(object): timeout=1) elif req_type == "get": response = requests.get( - url, headers=headers, timeout=1) + url, params=params, headers=headers, timeout=1) elif req_type == "delete": response = requests.delete( url, headers=headers, timeout=1) else: - print("Invalid request type") + log.warn("Invalid request type") except Exception as e: - # Log info later - print("Exception thrown on request", e) + log.warn("Exception thrown on request", e) return response diff --git a/plugins/OpenStack/Aodh/plugin_instance.py b/plugins/OpenStack/Aodh/plugin_instance.py index 8096f3f..22db409 100644 --- a/plugins/OpenStack/Aodh/plugin_instance.py +++ b/plugins/OpenStack/Aodh/plugin_instance.py @@ -1,14 +1,9 @@ """Aodh plugin for the OSM monitoring module.""" -import sys -import logging +import logging as log -path = "/home/stack/MON" -if path not in sys.path: - sys.path.append(path) - -from plugins.Openstack.Aodh.alarming import Alarming -from plugins.Openstack.settings import Config +from plugins.OpenStack.Aodh.alarming import Alarming +from plugins.OpenStack.settings import Config def register_plugin(): @@ -24,15 +19,18 @@ class Plugin(object): def __init__(self, config): """Plugin instance.""" + log.info("Initialze the plugin instance.") self._config = config self._alarm = Alarming() def config(self): """Configure plugin.""" + log.info("Configure the plugin instance.") self._config.read_environ() def alarm(self): """Allow alarm info to be received from Aodh.""" + log.info("Begin alarm functionality.") self._alarm.alarming() register_plugin() diff --git a/plugins/OpenStack/settings.py b/plugins/OpenStack/settings.py index 4dacef9..fc54b07 100644 --- a/plugins/OpenStack/settings.py +++ b/plugins/OpenStack/settings.py @@ -2,27 +2,30 @@ from __future__ import unicode_literals -from plugins.Openstack.singleton import Singleton +import logging as log +import os from collections import namedtuple + +from plugins.Openstack.singleton import Singleton + import six -import os class BadConfigError(Exception): - """Configuration exception""" + """Configuration exception.""" + pass class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])): - """Configuration parameter definition""" + """Configuration parameter definition.""" def value(self, data): - """Convert a string to the parameter type""" - + """Convert a string to the parameter type.""" try: return self.data_type(data) - except (ValueError, TypeError) as exc: + except (ValueError, TypeError): raise BadConfigError( 'Invalid value "%s" for configuration parameter "%s"' % ( data, self.key)) @@ -30,7 +33,7 @@ class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])): @Singleton class Config(object): - """Plugin confguration""" + """Plugin confguration.""" _configuration = [ CfgParam('OS_AUTH_URL', None, six.text_type), @@ -44,13 +47,12 @@ class Config(object): _config_keys = _config_dict.keys() def __init__(self): - """Set the default values""" + """Set the default values.""" for cfg in self._configuration: setattr(self, cfg.key, cfg.default) def read_environ(self): """Check the appropriate environment variables and update defaults.""" - for key in self._config_keys: if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"): val = str(os.environ[key]) @@ -59,5 +61,6 @@ class Config(object): val = str(os.environ[key]) + "/v3" setattr(self, key, val) else: - # TODO: Log errors and no config updates required + # TODO(mcgoughh): Log errors and no config updates required + log.warn("Configuration doesn't require updating") return diff --git a/plugins/OpenStack/singleton.py b/plugins/OpenStack/singleton.py index 2edc20b..12cd5a9 100644 --- a/plugins/OpenStack/singleton.py +++ b/plugins/OpenStack/singleton.py @@ -1,14 +1,17 @@ +"""Simple singleton class.""" + from __future__ import unicode_literals class Singleton(object): - """Simple singleton class""" + """Simple singleton class.""" def __init__(self, decorated): + """Initialize singleton instance.""" self._decorated = decorated def instance(self): - """Return singleton instance""" + """Return singleton instance.""" try: return self._instance except AttributeError: -- 2.25.1