1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 """Tests for all common OpenStack methods."""
29 from osm_mon
.core
.database
import DatabaseManager
, Alarm
30 from osm_mon
.core
.message_bus
.producer
import KafkaProducer
31 from osm_mon
.plugins
.OpenStack
.Aodh
.notifier
import NotifierHandler
33 post_data
= {"severity": "critical",
34 "alarm_name": "my_alarm",
35 "current": "current_state",
36 "alarm_id": "my_alarm_id",
37 "reason": "Threshold has been broken",
38 "reason_data": {"count": 1,
39 "most_recent": "null",
41 "disposition": "unknown"},
42 "previous": "previous_state"}
45 class Response(object):
46 """Mock a response class for generating responses."""
48 def __init__(self
, text
):
49 """Initialise a mock response with a text attribute."""
54 def read(self
, content_length
):
55 return json
.dumps(post_data
, sort_keys
=True)
58 class MockNotifierHandler(NotifierHandler
):
59 """Mock the NotifierHandler class for testing purposes."""
62 """Initialise mock NotifierHandler."""
63 self
.headers
= {'Content-Length': '20'}
67 """Mock setup function."""
71 """Mock handle function."""
75 """Mock finish function."""
79 class TestNotifier(unittest
.TestCase
):
80 """Test the NotifierHandler class for requests from aodh."""
84 super(TestNotifier
, self
).setUp()
85 self
.handler
= MockNotifierHandler()
87 @mock.patch
.object(NotifierHandler
, "_set_headers")
88 def test_do_GET(self
, set_head
):
89 """Tests do_GET. Validates _set_headers has been called."""
92 set_head
.assert_called_once()
94 @mock.patch
.object(NotifierHandler
, "notify_alarm")
95 @mock.patch
.object(NotifierHandler
, "_set_headers")
96 def test_do_POST(self
, set_head
, notify
):
97 """Tests do_POST. Validates notify_alarm has been called."""
98 self
.handler
.do_POST()
100 set_head
.assert_called_once()
101 notify
.assert_called_with(post_data
)
103 @mock.patch
.object(KafkaProducer
, "publish_alarm_response")
104 @mock.patch
.object(DatabaseManager
, "get_alarm")
105 def test_notify_alarm_valid_alarm(
106 self
, get_alarm
, notify
):
108 Tests notify_alarm when request from OpenStack references an existing alarm in the DB.
109 Validates KafkaProducer.notify_alarm has been called.
111 # Generate return values for valid notify_alarm operation
113 get_alarm
.return_value
= mock_alarm
115 self
.handler
.notify_alarm(post_data
)
117 notify
.assert_called_with("notify_alarm", mock
.ANY
)
119 @mock.patch
.object(KafkaProducer
, "publish_alarm_response")
120 @mock.patch
.object(DatabaseManager
, "get_alarm")
121 def test_notify_alarm_invalid_alarm(
122 self
, get_alarm
, notify
):
124 Tests notify_alarm when request from OpenStack references a non existing alarm in the DB.
125 Validates Exception is thrown and KafkaProducer.notify_alarm has not been called.
127 # Generate return values for valid notify_alarm operation
128 get_alarm
.return_value
= None
130 with self
.assertRaises(Exception):
131 self
.handler
.notify_alarm(post_data
)
132 notify
.assert_not_called()