import unittest
import mock
-from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
+from osm_mon.core.database import DatabaseManager, Alarm
from osm_mon.core.message_bus.producer import KafkaProducer
-from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh.alarming import Alarming
-from osm_mon.plugins.OpenStack.common import Common
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
-
-# Mock data from post request
-post_data = json.dumps({"severity": "critical",
- "alarm_name": "my_alarm",
- "current": "current_state",
- "alarm_id": "my_alarm_id",
- "reason": "Threshold has been broken",
- "reason_data": {"count": 1,
- "most_recent": "null",
- "type": "threshold",
- "disposition": "unknown"},
- "previous": "previous_state"})
-
-valid_get_resp = '{"gnocchi_resources_threshold_rule":\
- {"resource_id": "my_resource_id"}}'
-
-invalid_get_resp = '{"gnocchi_resources_threshold_rule":\
- {"resource_id": null}}'
-
-valid_notify_resp = '{"notify_details": {"status": "current_state",\
- "severity": "critical",\
- "resource_uuid": "my_resource_id",\
- "alarm_uuid": "my_alarm_id",\
- "vim_type": "OpenStack",\
- "start_date": "dd-mm-yyyy 00:00"},\
- "schema_version": "1.0",\
- "schema_type": "notify_alarm"}'
-
-invalid_notify_resp = '{"notify_details": {"invalid":"mock_details"}'
+from osm_mon.plugins.OpenStack.Aodh.notifier import NotifierHandler
+
+post_data = {"severity": "critical",
+ "alarm_name": "my_alarm",
+ "current": "current_state",
+ "alarm_id": "my_alarm_id",
+ "reason": "Threshold has been broken",
+ "reason_data": {"count": 1,
+ "most_recent": "null",
+ "type": "threshold",
+ "disposition": "unknown"},
+ "previous": "previous_state"}
class Response(object):
self.text = text
-class NotifierHandler(BaseHTTPRequestHandler):
+class RFile():
+ def read(self, content_length):
+ return post_data
+
+
+class MockNotifierHandler(NotifierHandler):
"""Mock the NotifierHandler class for testing purposes."""
- def __init__(self, request, client_address, server):
- """Initilase mock NotifierHandler."""
- self.request = request
- self.client_address = client_address
- self.server = server
- self.setup()
- try:
- self.handle()
- finally:
- self.finish()
+ def __init__(self):
+ """Initialise mock NotifierHandler."""
+ self.headers = {'Content-Length': '20'}
+ self.rfile = RFile()
def setup(self):
"""Mock setup function."""
"""Mock finish function."""
pass
- def _set_headers(self):
- """Mock getting the request headers."""
- pass
-
- def do_GET(self):
- """Mock functionality for GET request."""
- self._set_headers()
- pass
-
- def do_POST(self):
- """Mock functionality for a POST request."""
- self._set_headers()
- self.notify_alarm(json.loads(post_data))
-
- def notify_alarm(self, values):
- """Mock the notify_alarm functionality to generate a valid response."""
- config = Config.instance()
- config.read_environ()
- self._alarming = Alarming()
- self._common = Common()
- self._response = OpenStack_Response()
- self._producer = KafkaProducer('alarm_response')
- alarm_id = values['alarm_id']
-
- vim_uuid = 'test_id'
-
- auth_token = Common.get_auth_token(vim_uuid)
- endpoint = Common.get_endpoint("alarming", vim_uuid)
-
- # If authenticated generate and send response message
- if auth_token is not None and endpoint is not None:
- url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
-
- # Get the resource_id of the triggered alarm and the date
- result = Common.perform_request(
- url, auth_token, req_type="get")
- alarm_details = json.loads(result.text)
- gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule']
- resource_id = gnocchi_rule['resource_id']
- a_date = "dd-mm-yyyy 00:00"
-
- # Process an alarm notification if resource_id is valid
- if resource_id is not None:
- # Try generate and send response
- try:
- resp_message = self._response.generate_response(
- 'notify_alarm', a_id=alarm_id,
- r_id=resource_id,
- sev=values['severity'], date=a_date,
- state=values['current'], vim_type="OpenStack")
- self._producer.notify_alarm(
- 'notify_alarm', resp_message)
- except Exception:
- pass
-
class TestNotifier(unittest.TestCase):
"""Test the NotifierHandler class for requests from aodh."""
def setUp(self):
"""Setup tests."""
super(TestNotifier, self).setUp()
- self.handler = NotifierHandler(
- "mock_request", "mock_address", "mock_server")
+ self.handler = MockNotifierHandler()
@mock.patch.object(NotifierHandler, "_set_headers")
def test_do_GET(self, set_head):
- """Test do_GET, generates headers for get request."""
+ """Tests do_GET. Validates _set_headers has been called."""
self.handler.do_GET()
set_head.assert_called_once()
@mock.patch.object(NotifierHandler, "notify_alarm")
@mock.patch.object(NotifierHandler, "_set_headers")
def test_do_POST(self, set_head, notify):
- """Test do_POST functionality for a POST request."""
+ """Tests do_POST. Validates notify_alarm has been called."""
self.handler.do_POST()
set_head.assert_called_once()
- notify.assert_called_with(json.loads(post_data))
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_unauth(self, perf_req, auth, endpoint):
- """Test notify alarm when not authenticated with keystone."""
- # Response request will not be performed unless there is a valid
- # auth_token and endpoint
- # Invalid auth_token and endpoint
- auth.return_value = None
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid endpoint
- auth.return_value = None
- endpoint.return_value = "my_endpoint"
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid auth_token
- auth.return_value = "my_auth_token"
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_invalid_alarm(self, perf_req, auth, resp, endpoint):
- """Test valid authentication, invalid alarm details."""
- # Mock valid auth_token and endpoint
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(invalid_get_resp)
-
- self.handler.notify_alarm(json.loads(post_data))
-
- # Response is not generated
- resp.assert_not_called()
+ notify.assert_called_with(json.dumps(post_data))
@mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_resp_call(self, perf_req, auth, response, endpoint, notify):
- """Test notify_alarm tries to generate a response for SO."""
- # Mock valid auth token and endpoint, valid response from aodh
- auth.return_value = "my_auth_token"
- endpoint.returm_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- self.handler.notify_alarm(json.loads(post_data))
-
- notify.assert_called()
- response.assert_called_with('notify_alarm', a_id="my_alarm_id",
- r_id="my_resource_id", sev="critical",
- date="dd-mm-yyyy 00:00",
- state="current_state",
- vim_type="OpenStack")
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- @unittest.skip("Schema validation not implemented yet.")
- def test_notify_alarm_invalid_resp(
- self, perf_req, auth, response, notify, endpoint):
- """Test the notify_alarm function, sends response to the producer."""
+ @mock.patch.object(DatabaseManager, "get_alarm")
+ def test_notify_alarm_valid_alarm(
+ self, get_alarm, notify):
+ """
+ Tests notify_alarm when request from OpenStack references an existing alarm in the DB.
+ Validates KafkaProducer.notify_alarm has been called.
+ """
# Generate return values for valid notify_alarm operation
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- response.return_value = invalid_notify_resp
+ mock_alarm = Alarm()
+ get_alarm.return_value = mock_alarm
- self.handler.notify_alarm(json.loads(post_data))
+ self.handler.notify_alarm(post_data)
- notify.assert_not_called()
+ notify.assert_called_with("notify_alarm", mock.ANY)
- @mock.patch.object(Common, "get_endpoint")
@mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_valid_resp(
- self, perf_req, auth, response, notify, endpoint):
- """Test the notify_alarm function, sends response to the producer."""
+ @mock.patch.object(DatabaseManager, "get_alarm")
+ def test_notify_alarm_invalid_alarm(
+ self, get_alarm, notify):
+ """
+ Tests notify_alarm when request from OpenStack references a non existing alarm in the DB.
+ Validates Exception is thrown and KafkaProducer.notify_alarm has not been called.
+ """
# Generate return values for valid notify_alarm operation
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- response.return_value = valid_notify_resp
-
- self.handler.notify_alarm(json.loads(post_data))
+ get_alarm.return_value = None
- notify.assert_called_with(
- "notify_alarm", valid_notify_resp)
+ with self.assertRaises(Exception):
+ self.handler.notify_alarm(post_data)
+ notify.assert_not_called()