from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager, VimCredentials
-from osm_mon.core.message_bus.producer import KafkaProducer as Producer
from osm_mon.plugins.OpenStack import response
-from osm_mon.plugins.OpenStack.Aodh import alarming
+from osm_mon.plugins.OpenStack.Aodh import alarm_handler
from osm_mon.plugins.OpenStack.common import Common
log = logging.getLogger(__name__)
mock_creds.config = '{}'
-@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
@mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
@mock.patch.object(Common, "get_auth_token", mock.Mock())
@mock.patch.object(Common, "get_endpoint", mock.Mock())
except KafkaError:
self.skipTest('Kafka server not present.')
# Set up common and alarming class instances
- self.alarms = alarming.Alarming()
+ self.alarms = alarm_handler.OpenstackAlarmHandler()
self.openstack_auth = Common()
def tearDown(self):
@mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(alarming.Alarming, "update_alarm")
- @mock.patch.object(response.OpenStack_Response, "generate_response")
+ @mock.patch.object(alarm_handler.OpenstackAlarmHandler, "update_alarm")
+ @mock.patch.object(response.OpenStackResponseBuilder, "generate_response")
def test_update_alarm_req(self, resp, update_alarm, get_creds, perf_req):
"""Test Aodh update alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
if message.key == "update_alarm_request":
# Mock a valid alarm update
update_alarm.return_value = "alarm_id"
- self.alarms.alarming(message, 'test_id')
+ self.alarms.handle_message(message, 'test_id')
# A response message is generated and sent via MON's producer
resp.assert_called_with(
@mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(alarming.Alarming, "configure_alarm")
- @mock.patch.object(response.OpenStack_Response, "generate_response")
+ @mock.patch.object(alarm_handler.OpenstackAlarmHandler, "configure_alarm")
+ @mock.patch.object(response.OpenStackResponseBuilder, "generate_response")
def test_create_alarm_req(self, resp, config_alarm, get_creds, perf_req):
"""Test Aodh create alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
if message.key == "create_alarm_request":
# Mock a valid alarm creation
config_alarm.return_value = "alarm_id"
- self.alarms.alarming(message, 'test_id')
+ self.alarms.handle_message(message, 'test_id')
# A response message is generated and sent via MON's produce
resp.assert_called_with(
@mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(alarming.Alarming, "list_alarms")
- @mock.patch.object(response.OpenStack_Response, "generate_response")
+ @mock.patch.object(alarm_handler.OpenstackAlarmHandler, "list_alarms")
+ @mock.patch.object(response.OpenStackResponseBuilder, "generate_response")
def test_list_alarm_req(self, resp, list_alarm, get_creds, perf_req):
"""Test Aodh list alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
if message.key == "list_alarm_request":
# Mock an empty list generated by the request
list_alarm.return_value = []
- self.alarms.alarming(message, 'test_id')
+ self.alarms.handle_message(message, 'test_id')
# Response message is generated
resp.assert_called_with(
@mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(alarming.Alarming, "delete_alarm")
- @mock.patch.object(response.OpenStack_Response, "generate_response")
+ @mock.patch.object(alarm_handler.OpenstackAlarmHandler, "delete_alarm")
+ @mock.patch.object(response.OpenStackResponseBuilder, "generate_response")
def test_delete_alarm_req(self, resp, del_alarm, get_creds, perf_req):
"""Test Aodh delete alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
for message in self.req_consumer:
if message.key == "delete_alarm_request":
- self.alarms.alarming(message, 'test_id')
+ self.alarms.handle_message(message, 'test_id')
# Response message is generated and sent by MON's producer
resp.assert_called_with(
self.fail("No message received in consumer")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(alarming.Alarming, "update_alarm_state")
+ @mock.patch.object(alarm_handler.OpenstackAlarmHandler, "update_alarm_state")
def test_ack_alarm_req(self, ack_alarm, get_creds):
"""Test Aodh acknowledge alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
for message in self.req_consumer:
if message.key == "acknowledge_alarm":
- self.alarms.alarming(message, 'test_id')
+ self.alarms.handle_message(message, 'test_id')
ack_alarm.assert_called_with(mock.ANY, mock.ANY, 'alarm_id', True)
return