- notify.assert_called_with(json.loads(post_data))
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_unauth(self, perf_req, auth, endpoint):
- """Test notify alarm when not authenticated with keystone."""
- # Response request will not be performed unless there is a valid
- # auth_token and endpoint
- # Invalid auth_token and endpoint
- auth.return_value = None
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid endpoint
- auth.return_value = None
- endpoint.return_value = "my_endpoint"
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- # Valid auth_token
- auth.return_value = "my_auth_token"
- endpoint.return_value = None
- self.handler.notify_alarm(json.loads(post_data))
-
- perf_req.assert_not_called()
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_invalid_alarm(self, perf_req, auth, resp, endpoint):
- """Test valid authentication, invalid alarm details."""
- # Mock valid auth_token and endpoint
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = Response(invalid_get_resp)
-
- self.handler.notify_alarm(json.loads(post_data))
-
- # Response is not generated
- resp.assert_not_called()
-
- @mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- def test_notify_alarm_resp_call(self, perf_req, auth, response, endpoint, notify):
- """Test notify_alarm tries to generate a response for SO."""
- # Mock valid auth token and endpoint, valid response from aodh
- auth.return_value = "my_auth_token"
- endpoint.returm_value = "my_endpoint"
- perf_req.return_value = Response(valid_get_resp)
- self.handler.notify_alarm(json.loads(post_data))
-
- notify.assert_called()
- response.assert_called_with('notify_alarm', a_id="my_alarm_id",
- r_id="my_resource_id", sev="critical",
- date="dd-mm-yyyy 00:00",
- state="current_state",
- vim_type="OpenStack")
-
- @mock.patch.object(Common, "get_endpoint")
- @mock.patch.object(KafkaProducer, "notify_alarm")
- @mock.patch.object(OpenStack_Response, "generate_response")
- @mock.patch.object(Common, "get_auth_token")
- @mock.patch.object(Common, "perform_request")
- @unittest.skip("Schema validation not implemented yet.")
- def test_notify_alarm_invalid_resp(
- self, perf_req, auth, response, notify, endpoint):
- """Test the notify_alarm function, sends response to the producer."""