Refactored the Aodh notifier class 06/5706/8
authorHelena McGough <helena.mcgough@intel.com>
Thu, 23 Nov 2017 17:29:54 +0000 (17:29 +0000)
committerh.mcgough <helena.mcgough@intel.com>
Mon, 27 Nov 2017 14:15:30 +0000 (16:15 +0200)
 - Included alarm actions to send notification to SO
 - Utilizes Aodh's webhook option for alarm_actions
 - Included additional tests for this change
 - Updated documentation

Change-Id: Ia3f2e2fa2ffee0cc66a80d5098d52b7da2be2dbe
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
doc/MON_install_guide.rst
doc/MON_usage_guide.rst
doc/OpenStack/openstack_plugins.rst
osm_mon/plugins/OpenStack/Aodh/alarming.py
osm_mon/plugins/OpenStack/Aodh/notifier.py
osm_mon/plugins/OpenStack/common.py
osm_mon/test/OpenStack/test_alarming.py
osm_mon/test/OpenStack/test_notifier.py [new file with mode: 0644]

index 57b79d3..22d4764 100644 (file)
@@ -89,11 +89,17 @@ the plugins support.
 
       ::
 
-          lxc exec MON - python /root/MON/core/message_bus/common_consumer.py
+          lxc exec MON - python /root/MON/osm_mon/core/message_bus/common_consumer.py
+
+* To enable Aodh alarm notifications to be sent to SO:
+
+      ::
+
+          lxc exec MON - python /root/MON/osm_mon/plugins/OpenStack/Aodh/notifier.py
 
 CloudWatch
 ~~~~~~~~~~
-The MON container supports a CloudWatch plugin as well.
+The MON container supports a CloudWatch plugin on installation.
 
 
 Verification
index 0a28318..89f75fa 100644 (file)
@@ -32,12 +32,6 @@ The topics that the plugins will consume messages based on are:
 * alarm_request
 * metric_request
 
-In return the plugins will send messages back to the SO with the following
-topics:
-
-* alarm_response
-* metric_response
-
 Each type of request has it's own unique key:
 * create_alarm_request
 * create_metric_request
@@ -50,11 +44,35 @@ Each type of request has it's own unique key:
 * acknowledge_alarm_request
 * read_metric_data_request
 
-Sending Messages
-----------------
+In return the plugins will send messages back to the SO with the following
+topics:
+
+* alarm_response
+* metric_response
+
+Each request has a corresponding response key:
+* create_alarm_reponse
+* create_metric_response
+* list_alarm_response
+* list_metric_response
+* delete_alarm_response
+* delete_metric_response
+* update_alarm_response
+* update_metric_response
+* acknowledge_alarm_response
+* read_metric_data_response
+
+  .. note::
+
+      There is an additional response key to send notifications to the SO
+      when an alarm has been triggered:
+      * notify_alarm
+
+Sending Request Messages
+------------------------
 For each of the request message that can be sent there is a json schema defined
 in the models directory of the MON repo:
-: `</MON/core/models/>`
+: `</MON/osm_mon/core/models/>`
 
 To send a valid message to the MON module for use by one of the plugins, your
 message must match the json schema for that request type.
index f67fd17..0e7b07c 100644 (file)
@@ -38,7 +38,7 @@ For more information on Gnocchi please refer to the source code/documentation:
 
 For plugin specific instructions and configuration options please refer to the
 following guide:
-: `<doc/plugins/OpenStack/gnocchi_plugin_guide.rst>`
+: `<doc/OpenStack/gnocchi_plugin_guide.rst>`
 
 Aodh
 ----
@@ -54,4 +54,4 @@ code/documentation:
 
 For plugin specific instructions and configuration options please refer to the
 following guide:
-: `<doc/plugins/OpenStack/aodh_plugin_guide.rst>`
+: `<doc/OpenStack/aodh_plugin_guide.rst>`
index 2343372..5093281 100644 (file)
@@ -120,7 +120,7 @@ class Alarming(object):
                     cor_id=alarm_details['correlation_id'])
                 log.info("Response Message: %s", resp_message)
                 self._producer.create_alarm_response(
-                    'create_alarm_resonse', resp_message,
+                    'create_alarm_response', resp_message,
                     'alarm_response')
             except Exception as exc:
                 log.warn("Response creation failed: %s", exc)
@@ -405,12 +405,13 @@ class Alarming(object):
                     'metric': metric_name,
                     'resource_id': resource_id,
                     'resource_type': 'generic',
-                    'aggregation_method': STATISTICS[statistic]}
+                    'aggregation_method': STATISTICS[statistic]}
             payload = json.dumps({'state': alarm_state,
                                   'name': alarm_name,
                                   'severity': SEVERITIES[severity],
                                   'type': 'gnocchi_resources_threshold',
-                                  'gnocchi_resources_threshold_rule': rule, })
+                                  'gnocchi_resources_threshold_rule': rule,
+                                  'alarm_actions': ['http://localhost:8662'], })
             return payload
         except KeyError as exc:
             log.warn("Alarm is not configured correctly: %s", exc)
index 3487daa..5330197 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
 ##
-"""Notifier class for alarm notification response."""
-
+# __author__ = Helena McGough
+#
+"""A Webserver to send alarm notifications from Aodh to the SO."""
 import json
-import logging as log
 
-try:
-    import aodhclient
-except ImportError:
-    log.warn("Failed to import the aodhclient")
+import logging
 
+import sys
 
-from core.message_bus.producer import KafkaProducer
+import time
 
-from plugins.OpenStack.Aodh.alarming import Alarming
-from plugins.OpenStack.response import OpenStack_Response
-from plugins.OpenStack.settings import Config
-
-__author__ = "Helena McGough"
+from BaseHTTPServer import BaseHTTPRequestHandler
+from BaseHTTPServer import HTTPServer
 
-ALARM_NAMES = [
-    "average_memory_usage_above_threshold",
-    "disk_read_ops",
-    "disk_write_ops",
-    "disk_read_bytes",
-    "disk_write_bytes",
-    "net_packets_dropped",
-    "packets_in_above_threshold",
-    "packets_out_above_threshold",
-    "cpu_utilization_above_threshold"]
+# Initialise a logger for alarm notifier
+logging.basicConfig(filename='aodh_notify.log',
+                    format='%(asctime)s %(message)s',
+                    datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+                    level=logging.INFO)
+log = logging.getLogger(__name__)
 
+sys.path.append("/root/MON")
 
-def register_notifier():
-    """Run the notifier instance."""
-    config = Config.instance()
-    instance = Notifier(config=config)
-    instance.config()
-    instance.notify()
+from core.message_bus.producer import KafkaProducer
 
+from plugins.OpenStack.Aodh.alarming import Alarming
+from plugins.OpenStack.common import Common
+from plugins.OpenStack.response import OpenStack_Response
+from plugins.OpenStack.settings import Config
 
-class Notifier(object):
-    """Alarm Notification class."""
 
-    def __init__(self, config):
-        """Initialize alarm notifier."""
-        log.info("Initialize the notifier for the SO.")
-        self._config = config
-        self._response = OpenStack_Response()
-        self._producer = KafkaProducer("alarm_response")
+class NotifierHandler(BaseHTTPRequestHandler):
+    """Handler class for alarm_actions triggered by OSM alarms."""
+
+    def _set_headers(self):
+        """Set the headers for a request."""
+        self.send_response(200)
+        self.send_header('Content-type', 'text/html')
+        self.end_headers()
+
+    def do_GET(self):
+        """Get request functionality."""
+        self._set_headers()
+        self.wfile.write("<html><body><h1>hi!</h1></body></html>")
+
+    def do_POST(self):
+        """POST request function."""
+        # Gets header and data from the post request and records info
+        self._set_headers()
+        # Gets the size of data
+        content_length = int(self.headers['Content-Length'])
+        post_data = self.rfile.read(content_length)
+        self.wfile.write("<html><body><h1>POST!</h1></body></tml>")
+        log.info("This alarm was triggered: %s", json.loads(post_data))
+
+        # Generate a notify_alarm response for the SO
+        self.notify_alarm(json.loads(post_data))
+
+    def notify_alarm(self, values):
+        """Send a notifcation repsonse message to the SO."""
+        # Initialiase configuration and authentication for response message
+        config = Config.instance()
+        config.read_environ("aodh")
         self._alarming = Alarming()
-
-    def config(self):
-        """Configure the alarm notifier."""
-        log.info("Configure the notifier instance.")
-        self._config.read_environ("aodh")
-
-    def notify(self):
-        """Send alarm notifications responses to the SO."""
-        log.info("Checking for alarm notifications")
-        auth_token, endpoint = self._alarming.authenticate()
-
-        while(1):
-            alarm_list = self._alarming.list_alarms(endpoint, auth_token)
-            for alarm in json.loads(alarm_list):
-                alarm_id = alarm['alarm_id']
-                alarm_name = alarm['name']
-                # Send a notification response to the SO on alarm trigger
-                if alarm_name in ALARM_NAMES:
-                    alarm_state = self._alarming.get_alarm_state(
-                        endpoint, auth_token, alarm_id)
-                    if alarm_state == "alarm":
-                        # Generate and send an alarm notification response
-                        try:
-                            a_date = alarm['state_timestamp'].replace("T", " ")
-                            rule = alarm['gnocchi_resources_threshold_rule']
-                            resp_message = self._response.generate_response(
-                                'notify_alarm', a_id=alarm_id,
-                                r_id=rule['resource_id'],
-                                sev=alarm['severity'], date=a_date,
-                                state=alarm_state, vim_type="OpenStack")
-                            self._producer.notify_alarm(
-                                'notify_alarm', resp_message, 'alarm_response')
-                        except Exception as exc:
-                            log.warn("Failed to send notify response:%s", exc)
-
-if aodhclient:
-    register_notifier()
+        self._common = Common()
+        self._response = OpenStack_Response()
+        self._producer = KafkaProducer('alarm_response')
+
+        alarm_id = values['alarm_id']
+        auth_token = self._common._authenticate()
+        endpoint = self._common.get_endpoint("alarming")
+
+        # If authenticated generate and send response message
+        if (auth_token is not None and endpoint is not None):
+            url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
+
+            # Get the resource_id of the triggered alarm
+            result = self._common._perform_request(
+                url, auth_token, req_type="get")
+            alarm_details = json.loads(result.text)
+            gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule']
+            resource_id = gnocchi_rule['resource_id']
+
+            # Process an alarm notification if resource_id is valid
+            if resource_id is not None:
+                # Get date and time for response message
+                a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
+                # Try generate and send response
+                try:
+                    resp_message = self._response.generate_response(
+                        'notify_alarm', a_id=alarm_id,
+                        r_id=resource_id,
+                        sev=values['severity'], date=a_date,
+                        state=values['current'], vim_type="OpenStack")
+                    self._producer.notify_alarm(
+                        'notify_alarm', resp_message, 'alarm_response')
+                    log.info("Sent an alarm response to SO: %s", resp_message)
+                except Exception as exc:
+                    log.warn("Couldn't notify SO of the alarm: %s", exc)
+            else:
+                log.warn("No resource_id for alarm; no SO response sent.")
+        else:
+            log.warn("Authentication failure; SO notification not sent.")
+
+
+def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
+    """Run the webserver application to retreive alarm notifications."""
+    try:
+        server_address = ('', port)
+        httpd = server_class(server_address, handler_class)
+        print('Starting alarm notifier...')
+        log.info("Starting alarm notifier server on port: %s", port)
+        httpd.serve_forever()
+    except Exception as exc:
+        log.warn("Failed to start webserver, %s", exc)
+
+if __name__ == "__main__":
+    from sys import argv
+
+    # Runs the webserver
+    if len(argv) == 2:
+        run(port=int(argv[1]))
+    else:
+        run()
index 8769312..70154f2 100644 (file)
@@ -134,6 +134,6 @@ class Common(object):
         except Exception:
             # Log out the result of the request for debugging purpose
             log.debug(
-                'Result: %s, %d',
+                'Result: %s, %s',
                 response.status_code, response.text)
         return response
index 557a93d..6570570 100644 (file)
@@ -52,6 +52,8 @@ class Response(object):
 class TestAlarming(unittest.TestCase):
     """Tests for alarming class functions."""
 
+    maxDiff = None
+
     def setUp(self):
         """Setup for tests."""
         super(TestAlarming, self).setUp()
@@ -219,7 +221,8 @@ class TestAlarming(unittest.TestCase):
                                    "resource_type": "generic"},
                                   "severity": "low",
                                   "state": "ok",
-                                  "type": "gnocchi_resources_threshold"})
+                                  "type": "gnocchi_resources_threshold",
+                                  "alarm_actions": ["http://localhost:8662"]})
 
     def test_check_valid_state_payload(self):
         """Test the check payload function for a valid payload with state."""
@@ -241,7 +244,8 @@ class TestAlarming(unittest.TestCase):
                                    "resource_type": "generic"},
                                   "severity": "low",
                                   "state": "alarm",
-                                  "type": "gnocchi_resources_threshold"})
+                                  "type": "gnocchi_resources_threshold",
+                                  "alarm_actions": ["http://localhost:8662"]})
 
     def test_check_invalid_payload(self):
         """Test the check payload function for an invalid payload."""
diff --git a/osm_mon/test/OpenStack/test_notifier.py b/osm_mon/test/OpenStack/test_notifier.py
new file mode 100644 (file)
index 0000000..ccebb69
--- /dev/null
@@ -0,0 +1,281 @@
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all common OpenStack methods."""
+
+import json
+
+import unittest
+
+from BaseHTTPServer import BaseHTTPRequestHandler
+
+from core.message_bus.producer import KafkaProducer
+
+import mock
+
+from plugins.OpenStack.Aodh.alarming import Alarming
+from plugins.OpenStack.common import Common
+from plugins.OpenStack.response import OpenStack_Response
+from plugins.OpenStack.settings import Config
+
+__author__ = "Helena McGough"
+
+# Mock data from post request
+post_data = json.dumps({"severity": "critical",
+                        "alarm_name": "my_alarm",
+                        "current": "current_state",
+                        "alarm_id": "my_alarm_id",
+                        "reason": "Threshold has been broken",
+                        "reason_data": {"count": 1,
+                                        "most_recent": "null",
+                                        "type": "threshold",
+                                        "disposition": "unknown"},
+                        "previous": "previous_state"})
+
+valid_get_resp = '{"gnocchi_resources_threshold_rule":\
+                   {"resource_id": "my_resource_id"}}'
+
+invalid_get_resp = '{"gnocchi_resources_threshold_rule":\
+                     {"resource_id": "None"}}'
+
+valid_notify_resp = '{"notify_details": {"status": "current_state",\
+                                         "severity": "critical",\
+                                         "resource_uuid": "my_resource_id",\
+                                         "alarm_uuid": "my_alarm_id",\
+                                         "vim_type": "OpenStack",\
+                                         "start_date": "dd-mm-yyyy 00:00"},\
+                      "schema_version": "1.0",\
+                      "schema_type": "notify_alarm"}'
+
+invalid_notify_resp = '{"notify_details": {"invalid":"mock_details"}'
+
+
+class Response(object):
+    """Mock a response class for generating responses."""
+
+    def __init__(self, text):
+        """Initialise a mock response with a text attribute."""
+        self.text = text
+
+
+class NotifierHandler(BaseHTTPRequestHandler):
+    """Mock the NotifierHandler class for testing purposes."""
+
+    def __init__(self, request, client_address, server):
+        """Initilase mock NotifierHandler."""
+        self.request = request
+        self.client_address = client_address
+        self.server = server
+        self.setup()
+        try:
+            self.handle()
+        finally:
+            self.finish()
+
+    def setup(self):
+        """Mock setup function."""
+        pass
+
+    def handle(self):
+        """Mock handle function."""
+        pass
+
+    def finish(self):
+        """Mock finish function."""
+        pass
+
+    def _set_headers(self):
+        """Mock getting the request headers."""
+        pass
+
+    def do_GET(self):
+        """Mock functionality for GET request."""
+        self._set_headers()
+        pass
+
+    def do_POST(self):
+        """Mock functionality for a POST request."""
+        self._set_headers()
+        self.notify_alarm(json.loads(post_data))
+
+    def notify_alarm(self, values):
+        """Mock the notify_alarm functionality to generate a valid response."""
+        config = Config.instance()
+        config.read_environ("aodh")
+        self._alarming = Alarming()
+        self._common = Common()
+        self._response = OpenStack_Response()
+        self._producer = KafkaProducer('alarm_response')
+        alarm_id = values['alarm_id']
+
+        auth_token = self._common._authenticate()
+        endpoint = self._common.get_endpoint("alarming")
+
+        # If authenticated generate and send response message
+        if (auth_token is not None and endpoint is not None):
+            url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
+
+            # Get the resource_id of the triggered alarm and the date
+            result = self._common._perform_request(
+                url, auth_token, req_type="get")
+            alarm_details = json.loads(result.text)
+            gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule']
+            resource_id = gnocchi_rule['resource_id']
+            a_date = "dd-mm-yyyy 00:00"
+
+            # Process an alarm notification if resource_id is valid
+            if resource_id is not None:
+                # Try generate and send response
+                try:
+                    resp_message = self._response.generate_response(
+                        'notify_alarm', a_id=alarm_id,
+                        r_id=resource_id,
+                        sev=values['severity'], date=a_date,
+                        state=values['current'], vim_type="OpenStack")
+                    self._producer.notify_alarm(
+                        'notify_alarm', resp_message, 'alarm_response')
+                except Exception:
+                    pass
+
+
+class TestNotifier(unittest.TestCase):
+    """Test the NotifierHandler class for requests from aodh."""
+
+    def setUp(self):
+        """Setup tests."""
+        super(TestNotifier, self).setUp()
+        self.handler = NotifierHandler(
+            "mock_request", "mock_address", "mock_server")
+
+    @mock.patch.object(NotifierHandler, "_set_headers")
+    def test_do_GET(self, set_head):
+        """Test do_GET, generates headers for get request."""
+        self.handler.do_GET()
+
+        set_head.assert_called_once
+
+    @mock.patch.object(NotifierHandler, "notify_alarm")
+    @mock.patch.object(NotifierHandler, "_set_headers")
+    def test_do_POST(self, set_head, notify):
+        """Test do_POST functionality for a POST request."""
+        self.handler.do_POST()
+
+        set_head.assert_called_once
+        notify.assert_called_with(json.loads(post_data))
+
+    @mock.patch.object(Common, "get_endpoint")
+    @mock.patch.object(Common, "_authenticate")
+    @mock.patch.object(Common, "_perform_request")
+    def test_notify_alarm_unauth(self, perf_req, auth, endpoint):
+        """Test notify alarm when not authenticated with keystone."""
+        # Response request will not be performed unless there is a valid
+        # auth_token and endpoint
+        # Invalid auth_token and endpoint
+        auth.return_value = None
+        endpoint.return_value = None
+        self.handler.notify_alarm(json.loads(post_data))
+
+        perf_req.assert_not_called
+
+        # Valid endpoint
+        auth.return_value = None
+        endpoint.return_value = "my_endpoint"
+        self.handler.notify_alarm(json.loads(post_data))
+
+        perf_req.assert_not_called
+
+        # Valid auth_token
+        auth.return_value = "my_auth_token"
+        endpoint.return_value = None
+        self.handler.notify_alarm(json.loads(post_data))
+
+        perf_req.assert_not_called
+
+    @mock.patch.object(Common, "get_endpoint")
+    @mock.patch.object(OpenStack_Response, "generate_response")
+    @mock.patch.object(Common, "_authenticate")
+    @mock.patch.object(Common, "_perform_request")
+    def test_notify_alarm_invalid_alarm(self, perf_req, auth, resp, endpoint):
+        """Test valid authentication, invalid alarm details."""
+        # Mock valid auth_token and endpoint
+        auth.return_value = "my_auth_token"
+        endpoint.return_value = "my_endpoint"
+        perf_req.return_value = Response(invalid_get_resp)
+
+        self.handler.notify_alarm(json.loads(post_data))
+
+        # Response is not generated
+        resp.assert_not_called
+
+    @mock.patch.object(Common, "get_endpoint")
+    @mock.patch.object(OpenStack_Response, "generate_response")
+    @mock.patch.object(Common, "_authenticate")
+    @mock.patch.object(Common, "_perform_request")
+    def test_notify_alarm_resp_call(self, perf_req, auth, response, endpoint):
+        """Test notify_alarm tries to generate a response for SO."""
+        # Mock valid auth token and endpoint, valid response from aodh
+        auth.return_value = "my_auth_token"
+        endpoint.returm_value = "my_endpoint"
+        perf_req.return_value = Response(valid_get_resp)
+        self.handler.notify_alarm(json.loads(post_data))
+
+        response.assert_called_with('notify_alarm', a_id="my_alarm_id",
+                                    r_id="my_resource_id", sev="critical",
+                                    date="dd-mm-yyyy 00:00",
+                                    state="current_state",
+                                    vim_type="OpenStack")
+
+    @mock.patch.object(Common, "get_endpoint")
+    @mock.patch.object(KafkaProducer, "notify_alarm")
+    @mock.patch.object(OpenStack_Response, "generate_response")
+    @mock.patch.object(Common, "_authenticate")
+    @mock.patch.object(Common, "_perform_request")
+    def test_notify_alarm_invalid_resp(
+            self, perf_req, auth, response, notify, endpoint):
+        """Test the notify_alarm function, sends response to the producer."""
+        # Generate return values for valid notify_alarm operation
+        auth.return_value = "my_auth_token"
+        endpoint.return_value = "my_endpoint"
+        perf_req.return_value = Response(valid_get_resp)
+        response.return_value = invalid_notify_resp
+
+        self.handler.notify_alarm(json.loads(post_data))
+
+        notify.assert_not_called
+
+    @mock.patch.object(Common, "get_endpoint")
+    @mock.patch.object(KafkaProducer, "notify_alarm")
+    @mock.patch.object(OpenStack_Response, "generate_response")
+    @mock.patch.object(Common, "_authenticate")
+    @mock.patch.object(Common, "_perform_request")
+    def test_notify_alarm_valid_resp(
+            self, perf_req, auth, response, notify, endpoint):
+        """Test the notify_alarm function, sends response to the producer."""
+        # Generate return values for valid notify_alarm operation
+        auth.return_value = "my_auth_token"
+        endpoint.return_value = "my_endpoint"
+        perf_req.return_value = Response(valid_get_resp)
+        response.return_value = valid_notify_resp
+
+        self.handler.notify_alarm(json.loads(post_data))
+
+        notify.assert_called_with(
+            "notify_alarm", valid_notify_resp, "alarm_response")