Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / test / OpenStack / test_alarm_req.py
1 # Copyright 2017 iIntel Research and Development Ireland Limited
2 # **************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Tests for all alarm request message keys."""
23
24 import json
25
26 import logging
27
28 import unittest
29
30 import mock
31
32 from plugins.OpenStack.Aodh import alarming as alarm_req
33 from plugins.OpenStack.common import Common
34
35 __author__ = "Helena McGough"
36
37 log = logging.getLogger(__name__)
38
39
40 class Message(object):
41 """A class to mock a message object value for alarm requests."""
42
43 def __init__(self):
44 """Initialize a mocked message instance."""
45 self.topic = "alarm_request"
46 self.key = None
47 self.value = json.dumps({"mock_value": "mock_details"})
48
49
50 class TestAlarmKeys(unittest.TestCase):
51 """Integration test for alarm request keys."""
52
53 def setUp(self):
54 """Setup the tests for alarm request keys."""
55 super(TestAlarmKeys, self).setUp()
56 self.alarming = alarm_req.Alarming()
57 self.alarming.common = Common()
58
59 @mock.patch.object(Common, "_authenticate")
60 def test_alarming_env_authentication(self, auth):
61 """Test getting an auth_token and endpoint for alarm requests."""
62 # if auth_token is None environment variables are used to authenticare
63 message = Message()
64
65 self.alarming.alarming(message, self.alarming.common, None)
66
67 auth.assert_called_with()
68
69 @mock.patch.object(Common, "_authenticate")
70 def test_acccess_cred_auth(self, auth):
71 """Test receiving auth_token from access creds."""
72 message = Message()
73
74 self.alarming.alarming(message, self.alarming.common, "my_auth_token")
75
76 auth.assert_not_called
77 self.assertEqual(self.alarming.auth_token, "my_auth_token")
78
79 @mock.patch.object(alarm_req.Alarming, "delete_alarm")
80 def test_delete_alarm_key(self, del_alarm):
81 """Test the functionality for a create alarm request."""
82 # Mock a message value and key
83 message = Message()
84 message.key = "delete_alarm_request"
85 message.value = json.dumps({"alarm_delete_request":
86 {"alarm_uuid": "my_alarm_id"}})
87
88 # Call the alarming functionality and check delete request
89 self.alarming.alarming(message, self.alarming.common, "my_auth_token")
90
91 del_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id")
92
93 @mock.patch.object(alarm_req.Alarming, "list_alarms")
94 def test_list_alarm_key(self, list_alarm):
95 """Test the functionality for a list alarm request."""
96 # Mock a message with list alarm key and value
97 message = Message()
98 message.key = "list_alarm_request"
99 message.value = json.dumps({"alarm_list_request": "my_alarm_details"})
100
101 # Call the alarming functionality and check list functionality
102 self.alarming.alarming(message, self.alarming.common, "my_auth_token")
103 list_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_details")
104
105 @mock.patch.object(alarm_req.Alarming, "update_alarm_state")
106 def test_ack_alarm_key(self, ack_alarm):
107 """Test the functionality for an acknowledge alarm request."""
108 # Mock a message with acknowledge alarm key and value
109 message = Message()
110 message.key = "acknowledge_alarm"
111 message.value = json.dumps({"ack_details":
112 {"alarm_uuid": "my_alarm_id"}})
113
114 # Call alarming functionality and check acknowledge functionality
115 self.alarming.alarming(message, self.alarming.common, "my_auth_token")
116 ack_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id")
117
118 @mock.patch.object(alarm_req.Alarming, "configure_alarm")
119 def test_config_alarm_key(self, config_alarm):
120 """Test the functionality for a create alarm request."""
121 # Mock a message with config alarm key and value
122 message = Message()
123 message.key = "create_alarm_request"
124 message.value = json.dumps({"alarm_create_request": "alarm_details"})
125
126 # Call alarming functionality and check config alarm call
127 config_alarm.return_value = "my_alarm_id", True
128 self.alarming.alarming(message, self.alarming.common, "my_auth_token")
129 config_alarm.assert_called_with(mock.ANY, mock.ANY, "alarm_details")