from kafka import KafkaProducer
from kafka.errors import KafkaError
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
from osm_mon.core.message_bus.producer import KafkaProducer as prod
from osm_mon.plugins.OpenStack import response
from osm_mon.plugins.OpenStack.Aodh import alarming
from osm_mon.plugins.OpenStack.common import Common
+from keystoneclient.v3 import client
log = logging.getLogger(__name__)
class AlarmIntegrationTest(unittest.TestCase):
def setUp(self):
- # Set up common and alarming class instances
- self.alarms = alarming.Alarming()
- self.openstack_auth = Common()
-
try:
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- group_id='osm_mon',
- consumer_timeout_ms=2000)
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=60000)
self.req_consumer.subscribe(['alarm_request'])
except KafkaError:
self.skipTest('Kafka server not present.')
+ # Set up common and alarming class instances
+ self.alarms = alarming.Alarming()
+ self.openstack_auth = Common()
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(prod, "update_alarm_response")
@mock.patch.object(alarming.Alarming, "update_alarm")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Aodh update alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"alarm_update_request":
{"correlation_id": 123,
"alarm_uuid": "alarm_id",
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "update_alarm_request":
# Mock a valid alarm update
update_alarm.return_value = "alarm_id", True
- self.alarms.alarming(message, self.openstack_auth, None)
+ self.alarms.alarming(message)
# A response message is generated and sent via MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(prod, "create_alarm_response")
@mock.patch.object(alarming.Alarming, "configure_alarm")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Aodh create alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"alarm_create_request":
{"correlation_id": 123,
"alarm_name": "my_alarm",
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "create_alarm_request":
# Mock a valid alarm creation
config_alarm.return_value = "alarm_id", True
- self.alarms.alarming(message, self.openstack_auth, None)
+ self.alarms.alarming(message)
# A response message is generated and sent via MON's produce
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(prod, "list_alarm_response")
@mock.patch.object(alarming.Alarming, "list_alarms")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Aodh list alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"alarm_list_request":
{"correlation_id": 123,
"resource_uuid": "resource_id", }}
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "list_alarm_request":
# Mock an empty list generated by the request
list_alarm.return_value = []
- self.alarms.alarming(message, self.openstack_auth, None)
+ self.alarms.alarming(message)
- # Resoonse message is generated
+ # Response message is generated
resp.assert_called_with(
'list_alarm_response', alarm_list=[],
cor_id=123)
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(alarming.Alarming, "delete_alarm")
@mock.patch.object(prod, "delete_alarm_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Aodh delete alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"alarm_delete_request":
{"correlation_id": 123,
"alarm_uuid": "alarm_id", }}
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
- self.alarms.alarming(message, self.openstack_auth, None)
+ if message.key == "delete_alarm_request":
+ self.alarms.alarming(message)
# Response message is generated and sent by MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(alarming.Alarming, "update_alarm_state")
def test_ack_alarm_req(self, ack_alarm):
"""Test Aodh acknowledge alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"ack_details":
{"alarm_uuid": "alarm_id", }}
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
- self.alarms.alarming(message, self.openstack_auth, None)
- # No response message is sent for and ack request
- # Alarm state is updated from alarm -> ok
- ack_alarm.assert_called_with(None, None, "alarm_id")
+ if message.key == "acknowledge_alarm":
+ self.alarms.alarming(message)
return
- self.fail("No message received in consumer")
+
+ self.fail("No message received in consumer")
\ No newline at end of file