From: Helena McGough Date: Thu, 23 Nov 2017 17:29:54 +0000 (+0000) Subject: Refactored the Aodh notifier class X-Git-Tag: v4.0.0~53 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=94f93f717ead448f13dc272ed765e008a0e64c81;p=osm%2FMON.git Refactored the Aodh notifier class - Included alarm actions to send notification to SO - Utilizes Aodh's webhook option for alarm_actions - Included additional tests for this change - Updated documentation Change-Id: Ia3f2e2fa2ffee0cc66a80d5098d52b7da2be2dbe Signed-off-by: Helena McGough --- diff --git a/doc/MON_install_guide.rst b/doc/MON_install_guide.rst index 57b79d3..22d4764 100644 --- a/doc/MON_install_guide.rst +++ b/doc/MON_install_guide.rst @@ -89,11 +89,17 @@ the plugins support. :: - lxc exec MON - python /root/MON/core/message_bus/common_consumer.py + lxc exec MON - python /root/MON/osm_mon/core/message_bus/common_consumer.py + +* To enable Aodh alarm notifications to be sent to SO: + + :: + + lxc exec MON - python /root/MON/osm_mon/plugins/OpenStack/Aodh/notifier.py CloudWatch ~~~~~~~~~~ -The MON container supports a CloudWatch plugin as well. +The MON container supports a CloudWatch plugin on installation. Verification diff --git a/doc/MON_usage_guide.rst b/doc/MON_usage_guide.rst index 0a28318..89f75fa 100644 --- a/doc/MON_usage_guide.rst +++ b/doc/MON_usage_guide.rst @@ -32,12 +32,6 @@ The topics that the plugins will consume messages based on are: * alarm_request * metric_request -In return the plugins will send messages back to the SO with the following -topics: - -* alarm_response -* metric_response - Each type of request has it's own unique key: * create_alarm_request * create_metric_request @@ -50,11 +44,35 @@ Each type of request has it's own unique key: * acknowledge_alarm_request * read_metric_data_request -Sending Messages ----------------- +In return the plugins will send messages back to the SO with the following +topics: + +* alarm_response +* metric_response + +Each request has a corresponding response key: +* create_alarm_reponse +* create_metric_response +* list_alarm_response +* list_metric_response +* delete_alarm_response +* delete_metric_response +* update_alarm_response +* update_metric_response +* acknowledge_alarm_response +* read_metric_data_response + + .. note:: + + There is an additional response key to send notifications to the SO + when an alarm has been triggered: + * notify_alarm + +Sending Request Messages +------------------------ For each of the request message that can be sent there is a json schema defined in the models directory of the MON repo: -: `` +: `` To send a valid message to the MON module for use by one of the plugins, your message must match the json schema for that request type. diff --git a/doc/OpenStack/openstack_plugins.rst b/doc/OpenStack/openstack_plugins.rst index f67fd17..0e7b07c 100644 --- a/doc/OpenStack/openstack_plugins.rst +++ b/doc/OpenStack/openstack_plugins.rst @@ -38,7 +38,7 @@ For more information on Gnocchi please refer to the source code/documentation: For plugin specific instructions and configuration options please refer to the following guide: -: `` +: `` Aodh ---- @@ -54,4 +54,4 @@ code/documentation: For plugin specific instructions and configuration options please refer to the following guide: -: `` +: `` diff --git a/osm_mon/plugins/OpenStack/Aodh/alarming.py b/osm_mon/plugins/OpenStack/Aodh/alarming.py index 2343372..5093281 100644 --- a/osm_mon/plugins/OpenStack/Aodh/alarming.py +++ b/osm_mon/plugins/OpenStack/Aodh/alarming.py @@ -120,7 +120,7 @@ class Alarming(object): cor_id=alarm_details['correlation_id']) log.info("Response Message: %s", resp_message) self._producer.create_alarm_response( - 'create_alarm_resonse', resp_message, + 'create_alarm_response', resp_message, 'alarm_response') except Exception as exc: log.warn("Response creation failed: %s", exc) @@ -405,12 +405,13 @@ class Alarming(object): 'metric': metric_name, 'resource_id': resource_id, 'resource_type': 'generic', - 'aggregation_method': STATISTICS[statistic]} + 'aggregation_method': STATISTICS[statistic], } payload = json.dumps({'state': alarm_state, 'name': alarm_name, 'severity': SEVERITIES[severity], 'type': 'gnocchi_resources_threshold', - 'gnocchi_resources_threshold_rule': rule, }) + 'gnocchi_resources_threshold_rule': rule, + 'alarm_actions': ['http://localhost:8662'], }) return payload except KeyError as exc: log.warn("Alarm is not configured correctly: %s", exc) diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py index 3487daa..5330197 100644 --- a/osm_mon/plugins/OpenStack/Aodh/notifier.py +++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py @@ -19,89 +19,127 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## -"""Notifier class for alarm notification response.""" - +# __author__ = Helena McGough +# +"""A Webserver to send alarm notifications from Aodh to the SO.""" import json -import logging as log -try: - import aodhclient -except ImportError: - log.warn("Failed to import the aodhclient") +import logging +import sys -from core.message_bus.producer import KafkaProducer +import time -from plugins.OpenStack.Aodh.alarming import Alarming -from plugins.OpenStack.response import OpenStack_Response -from plugins.OpenStack.settings import Config - -__author__ = "Helena McGough" +from BaseHTTPServer import BaseHTTPRequestHandler +from BaseHTTPServer import HTTPServer -ALARM_NAMES = [ - "average_memory_usage_above_threshold", - "disk_read_ops", - "disk_write_ops", - "disk_read_bytes", - "disk_write_bytes", - "net_packets_dropped", - "packets_in_above_threshold", - "packets_out_above_threshold", - "cpu_utilization_above_threshold"] +# Initialise a logger for alarm notifier +logging.basicConfig(filename='aodh_notify.log', + format='%(asctime)s %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', + level=logging.INFO) +log = logging.getLogger(__name__) +sys.path.append("/root/MON") -def register_notifier(): - """Run the notifier instance.""" - config = Config.instance() - instance = Notifier(config=config) - instance.config() - instance.notify() +from core.message_bus.producer import KafkaProducer +from plugins.OpenStack.Aodh.alarming import Alarming +from plugins.OpenStack.common import Common +from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config -class Notifier(object): - """Alarm Notification class.""" - def __init__(self, config): - """Initialize alarm notifier.""" - log.info("Initialize the notifier for the SO.") - self._config = config - self._response = OpenStack_Response() - self._producer = KafkaProducer("alarm_response") +class NotifierHandler(BaseHTTPRequestHandler): + """Handler class for alarm_actions triggered by OSM alarms.""" + + def _set_headers(self): + """Set the headers for a request.""" + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + + def do_GET(self): + """Get request functionality.""" + self._set_headers() + self.wfile.write("

hi!

") + + def do_POST(self): + """POST request function.""" + # Gets header and data from the post request and records info + self._set_headers() + # Gets the size of data + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + self.wfile.write("

POST!

") + log.info("This alarm was triggered: %s", json.loads(post_data)) + + # Generate a notify_alarm response for the SO + self.notify_alarm(json.loads(post_data)) + + def notify_alarm(self, values): + """Send a notifcation repsonse message to the SO.""" + # Initialiase configuration and authentication for response message + config = Config.instance() + config.read_environ("aodh") self._alarming = Alarming() - - def config(self): - """Configure the alarm notifier.""" - log.info("Configure the notifier instance.") - self._config.read_environ("aodh") - - def notify(self): - """Send alarm notifications responses to the SO.""" - log.info("Checking for alarm notifications") - auth_token, endpoint = self._alarming.authenticate() - - while(1): - alarm_list = self._alarming.list_alarms(endpoint, auth_token) - for alarm in json.loads(alarm_list): - alarm_id = alarm['alarm_id'] - alarm_name = alarm['name'] - # Send a notification response to the SO on alarm trigger - if alarm_name in ALARM_NAMES: - alarm_state = self._alarming.get_alarm_state( - endpoint, auth_token, alarm_id) - if alarm_state == "alarm": - # Generate and send an alarm notification response - try: - a_date = alarm['state_timestamp'].replace("T", " ") - rule = alarm['gnocchi_resources_threshold_rule'] - resp_message = self._response.generate_response( - 'notify_alarm', a_id=alarm_id, - r_id=rule['resource_id'], - sev=alarm['severity'], date=a_date, - state=alarm_state, vim_type="OpenStack") - self._producer.notify_alarm( - 'notify_alarm', resp_message, 'alarm_response') - except Exception as exc: - log.warn("Failed to send notify response:%s", exc) - -if aodhclient: - register_notifier() + self._common = Common() + self._response = OpenStack_Response() + self._producer = KafkaProducer('alarm_response') + + alarm_id = values['alarm_id'] + auth_token = self._common._authenticate() + endpoint = self._common.get_endpoint("alarming") + + # If authenticated generate and send response message + if (auth_token is not None and endpoint is not None): + url = "{}/v2/alarms/%s".format(endpoint) % alarm_id + + # Get the resource_id of the triggered alarm + result = self._common._perform_request( + url, auth_token, req_type="get") + alarm_details = json.loads(result.text) + gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule'] + resource_id = gnocchi_rule['resource_id'] + + # Process an alarm notification if resource_id is valid + if resource_id is not None: + # Get date and time for response message + a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X") + # Try generate and send response + try: + resp_message = self._response.generate_response( + 'notify_alarm', a_id=alarm_id, + r_id=resource_id, + sev=values['severity'], date=a_date, + state=values['current'], vim_type="OpenStack") + self._producer.notify_alarm( + 'notify_alarm', resp_message, 'alarm_response') + log.info("Sent an alarm response to SO: %s", resp_message) + except Exception as exc: + log.warn("Couldn't notify SO of the alarm: %s", exc) + else: + log.warn("No resource_id for alarm; no SO response sent.") + else: + log.warn("Authentication failure; SO notification not sent.") + + +def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662): + """Run the webserver application to retreive alarm notifications.""" + try: + server_address = ('', port) + httpd = server_class(server_address, handler_class) + print('Starting alarm notifier...') + log.info("Starting alarm notifier server on port: %s", port) + httpd.serve_forever() + except Exception as exc: + log.warn("Failed to start webserver, %s", exc) + +if __name__ == "__main__": + from sys import argv + + # Runs the webserver + if len(argv) == 2: + run(port=int(argv[1])) + else: + run() diff --git a/osm_mon/plugins/OpenStack/common.py b/osm_mon/plugins/OpenStack/common.py index 8769312..70154f2 100644 --- a/osm_mon/plugins/OpenStack/common.py +++ b/osm_mon/plugins/OpenStack/common.py @@ -134,6 +134,6 @@ class Common(object): except Exception: # Log out the result of the request for debugging purpose log.debug( - 'Result: %s, %d', + 'Result: %s, %s', response.status_code, response.text) return response diff --git a/osm_mon/test/OpenStack/test_alarming.py b/osm_mon/test/OpenStack/test_alarming.py index 557a93d..6570570 100644 --- a/osm_mon/test/OpenStack/test_alarming.py +++ b/osm_mon/test/OpenStack/test_alarming.py @@ -52,6 +52,8 @@ class Response(object): class TestAlarming(unittest.TestCase): """Tests for alarming class functions.""" + maxDiff = None + def setUp(self): """Setup for tests.""" super(TestAlarming, self).setUp() @@ -219,7 +221,8 @@ class TestAlarming(unittest.TestCase): "resource_type": "generic"}, "severity": "low", "state": "ok", - "type": "gnocchi_resources_threshold"}) + "type": "gnocchi_resources_threshold", + "alarm_actions": ["http://localhost:8662"]}) def test_check_valid_state_payload(self): """Test the check payload function for a valid payload with state.""" @@ -241,7 +244,8 @@ class TestAlarming(unittest.TestCase): "resource_type": "generic"}, "severity": "low", "state": "alarm", - "type": "gnocchi_resources_threshold"}) + "type": "gnocchi_resources_threshold", + "alarm_actions": ["http://localhost:8662"]}) def test_check_invalid_payload(self): """Test the check payload function for an invalid payload.""" diff --git a/osm_mon/test/OpenStack/test_notifier.py b/osm_mon/test/OpenStack/test_notifier.py new file mode 100644 index 0000000..ccebb69 --- /dev/null +++ b/osm_mon/test/OpenStack/test_notifier.py @@ -0,0 +1,281 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for all common OpenStack methods.""" + +import json + +import unittest + +from BaseHTTPServer import BaseHTTPRequestHandler + +from core.message_bus.producer import KafkaProducer + +import mock + +from plugins.OpenStack.Aodh.alarming import Alarming +from plugins.OpenStack.common import Common +from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config + +__author__ = "Helena McGough" + +# Mock data from post request +post_data = json.dumps({"severity": "critical", + "alarm_name": "my_alarm", + "current": "current_state", + "alarm_id": "my_alarm_id", + "reason": "Threshold has been broken", + "reason_data": {"count": 1, + "most_recent": "null", + "type": "threshold", + "disposition": "unknown"}, + "previous": "previous_state"}) + +valid_get_resp = '{"gnocchi_resources_threshold_rule":\ + {"resource_id": "my_resource_id"}}' + +invalid_get_resp = '{"gnocchi_resources_threshold_rule":\ + {"resource_id": "None"}}' + +valid_notify_resp = '{"notify_details": {"status": "current_state",\ + "severity": "critical",\ + "resource_uuid": "my_resource_id",\ + "alarm_uuid": "my_alarm_id",\ + "vim_type": "OpenStack",\ + "start_date": "dd-mm-yyyy 00:00"},\ + "schema_version": "1.0",\ + "schema_type": "notify_alarm"}' + +invalid_notify_resp = '{"notify_details": {"invalid":"mock_details"}' + + +class Response(object): + """Mock a response class for generating responses.""" + + def __init__(self, text): + """Initialise a mock response with a text attribute.""" + self.text = text + + +class NotifierHandler(BaseHTTPRequestHandler): + """Mock the NotifierHandler class for testing purposes.""" + + def __init__(self, request, client_address, server): + """Initilase mock NotifierHandler.""" + self.request = request + self.client_address = client_address + self.server = server + self.setup() + try: + self.handle() + finally: + self.finish() + + def setup(self): + """Mock setup function.""" + pass + + def handle(self): + """Mock handle function.""" + pass + + def finish(self): + """Mock finish function.""" + pass + + def _set_headers(self): + """Mock getting the request headers.""" + pass + + def do_GET(self): + """Mock functionality for GET request.""" + self._set_headers() + pass + + def do_POST(self): + """Mock functionality for a POST request.""" + self._set_headers() + self.notify_alarm(json.loads(post_data)) + + def notify_alarm(self, values): + """Mock the notify_alarm functionality to generate a valid response.""" + config = Config.instance() + config.read_environ("aodh") + self._alarming = Alarming() + self._common = Common() + self._response = OpenStack_Response() + self._producer = KafkaProducer('alarm_response') + alarm_id = values['alarm_id'] + + auth_token = self._common._authenticate() + endpoint = self._common.get_endpoint("alarming") + + # If authenticated generate and send response message + if (auth_token is not None and endpoint is not None): + url = "{}/v2/alarms/%s".format(endpoint) % alarm_id + + # Get the resource_id of the triggered alarm and the date + result = self._common._perform_request( + url, auth_token, req_type="get") + alarm_details = json.loads(result.text) + gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule'] + resource_id = gnocchi_rule['resource_id'] + a_date = "dd-mm-yyyy 00:00" + + # Process an alarm notification if resource_id is valid + if resource_id is not None: + # Try generate and send response + try: + resp_message = self._response.generate_response( + 'notify_alarm', a_id=alarm_id, + r_id=resource_id, + sev=values['severity'], date=a_date, + state=values['current'], vim_type="OpenStack") + self._producer.notify_alarm( + 'notify_alarm', resp_message, 'alarm_response') + except Exception: + pass + + +class TestNotifier(unittest.TestCase): + """Test the NotifierHandler class for requests from aodh.""" + + def setUp(self): + """Setup tests.""" + super(TestNotifier, self).setUp() + self.handler = NotifierHandler( + "mock_request", "mock_address", "mock_server") + + @mock.patch.object(NotifierHandler, "_set_headers") + def test_do_GET(self, set_head): + """Test do_GET, generates headers for get request.""" + self.handler.do_GET() + + set_head.assert_called_once + + @mock.patch.object(NotifierHandler, "notify_alarm") + @mock.patch.object(NotifierHandler, "_set_headers") + def test_do_POST(self, set_head, notify): + """Test do_POST functionality for a POST request.""" + self.handler.do_POST() + + set_head.assert_called_once + notify.assert_called_with(json.loads(post_data)) + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(Common, "_authenticate") + @mock.patch.object(Common, "_perform_request") + def test_notify_alarm_unauth(self, perf_req, auth, endpoint): + """Test notify alarm when not authenticated with keystone.""" + # Response request will not be performed unless there is a valid + # auth_token and endpoint + # Invalid auth_token and endpoint + auth.return_value = None + endpoint.return_value = None + self.handler.notify_alarm(json.loads(post_data)) + + perf_req.assert_not_called + + # Valid endpoint + auth.return_value = None + endpoint.return_value = "my_endpoint" + self.handler.notify_alarm(json.loads(post_data)) + + perf_req.assert_not_called + + # Valid auth_token + auth.return_value = "my_auth_token" + endpoint.return_value = None + self.handler.notify_alarm(json.loads(post_data)) + + perf_req.assert_not_called + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(OpenStack_Response, "generate_response") + @mock.patch.object(Common, "_authenticate") + @mock.patch.object(Common, "_perform_request") + def test_notify_alarm_invalid_alarm(self, perf_req, auth, resp, endpoint): + """Test valid authentication, invalid alarm details.""" + # Mock valid auth_token and endpoint + auth.return_value = "my_auth_token" + endpoint.return_value = "my_endpoint" + perf_req.return_value = Response(invalid_get_resp) + + self.handler.notify_alarm(json.loads(post_data)) + + # Response is not generated + resp.assert_not_called + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(OpenStack_Response, "generate_response") + @mock.patch.object(Common, "_authenticate") + @mock.patch.object(Common, "_perform_request") + def test_notify_alarm_resp_call(self, perf_req, auth, response, endpoint): + """Test notify_alarm tries to generate a response for SO.""" + # Mock valid auth token and endpoint, valid response from aodh + auth.return_value = "my_auth_token" + endpoint.returm_value = "my_endpoint" + perf_req.return_value = Response(valid_get_resp) + self.handler.notify_alarm(json.loads(post_data)) + + response.assert_called_with('notify_alarm', a_id="my_alarm_id", + r_id="my_resource_id", sev="critical", + date="dd-mm-yyyy 00:00", + state="current_state", + vim_type="OpenStack") + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(KafkaProducer, "notify_alarm") + @mock.patch.object(OpenStack_Response, "generate_response") + @mock.patch.object(Common, "_authenticate") + @mock.patch.object(Common, "_perform_request") + def test_notify_alarm_invalid_resp( + self, perf_req, auth, response, notify, endpoint): + """Test the notify_alarm function, sends response to the producer.""" + # Generate return values for valid notify_alarm operation + auth.return_value = "my_auth_token" + endpoint.return_value = "my_endpoint" + perf_req.return_value = Response(valid_get_resp) + response.return_value = invalid_notify_resp + + self.handler.notify_alarm(json.loads(post_data)) + + notify.assert_not_called + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(KafkaProducer, "notify_alarm") + @mock.patch.object(OpenStack_Response, "generate_response") + @mock.patch.object(Common, "_authenticate") + @mock.patch.object(Common, "_perform_request") + def test_notify_alarm_valid_resp( + self, perf_req, auth, response, notify, endpoint): + """Test the notify_alarm function, sends response to the producer.""" + # Generate return values for valid notify_alarm operation + auth.return_value = "my_auth_token" + endpoint.return_value = "my_endpoint" + perf_req.return_value = Response(valid_get_resp) + response.return_value = valid_notify_resp + + self.handler.notify_alarm(json.loads(post_data)) + + notify.assert_called_with( + "notify_alarm", valid_notify_resp, "alarm_response")