def test_update_alarm_req(self, resp, update_alarm, update_resp, get_creds):
"""Test Aodh update alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"vim_type": "OpenSTACK",
- "vim_uuid": "test_id",
- "alarm_update_request":
+ payload = {"alarm_update_request":
{"correlation_id": 123,
"alarm_uuid": "alarm_id",
"metric_uuid": "metric_id"}}
value=json.dumps(payload))
for message in self.req_consumer:
- # Check the vim desired by the message
if message.key == "update_alarm_request":
# Mock a valid alarm update
update_alarm.return_value = "alarm_id", True
- self.alarms.alarming(message)
+ self.alarms.alarming(message, 'test_id')
# A response message is generated and sent via MON's producer
resp.assert_called_with(
def test_create_alarm_req(self, resp, config_alarm, create_resp, get_creds):
"""Test Aodh create alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"vim_type": "OpenSTACK",
- "vim_uuid": "test_id",
- "alarm_create_request":
+ payload = {"alarm_create_request":
{"correlation_id": 123,
"alarm_name": "my_alarm",
"metric_name": "my_metric",
value=json.dumps(payload))
for message in self.req_consumer:
- # Check the vim desired by the message
if message.key == "create_alarm_request":
# Mock a valid alarm creation
config_alarm.return_value = "alarm_id", True
- self.alarms.alarming(message)
+ self.alarms.alarming(message, 'test_id')
# A response message is generated and sent via MON's produce
resp.assert_called_with(
def test_list_alarm_req(self, resp, list_alarm, list_resp, get_creds):
"""Test Aodh list alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"vim_type": "OpenSTACK",
- "vim_uuid": "test_id",
- "alarm_list_request":
+ payload = {"alarm_list_request":
{"correlation_id": 123,
"resource_uuid": "resource_id", }}
get_creds.return_value = mock_creds
for message in self.req_consumer:
- # Check the vim desired by the message
if message.key == "list_alarm_request":
# Mock an empty list generated by the request
list_alarm.return_value = []
- self.alarms.alarming(message)
+ self.alarms.alarming(message, 'test_id')
# Response message is generated
resp.assert_called_with(
def test_delete_alarm_req(self, resp, del_resp, del_alarm, get_creds):
"""Test Aodh delete alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"vim_type": "OpenSTACK",
- "vim_uuid": "test_id",
- "alarm_delete_request":
+ payload = {"alarm_delete_request":
{"correlation_id": 123,
"alarm_uuid": "alarm_id", }}
get_creds.return_value = mock_creds
for message in self.req_consumer:
- # Check the vim desired by the message
if message.key == "delete_alarm_request":
- self.alarms.alarming(message)
+ self.alarms.alarming(message, 'test_id')
# Response message is generated and sent by MON's producer
resp.assert_called_with(
def test_ack_alarm_req(self, ack_alarm, get_creds):
"""Test Aodh acknowledge alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"vim_type": "OpenSTACK",
- "vim_uuid": "test_id",
- "ack_details":
+ payload = {"ack_details":
{"alarm_uuid": "alarm_id", }}
self.producer.send('alarm_request', key="acknowledge_alarm",
get_creds.return_value = mock_creds
for message in self.req_consumer:
- # Check the vim desired by the message
if message.key == "acknowledge_alarm":
- self.alarms.alarming(message)
+ self.alarms.alarming(message, 'test_id')
return
self.fail("No message received in consumer")