from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
from six.moves.BaseHTTPServer import HTTPServer
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
from six.moves.BaseHTTPServer import HTTPServer
def notify_alarm(self, values):
"""Mock the notify_alarm functionality to generate a valid response."""
def notify_alarm(self, values):
"""Mock the notify_alarm functionality to generate a valid response."""
sev=values['severity'], date=a_date,
state=values['current'], vim_type="OpenStack")
sev=values['severity'], date=a_date,
state=values['current'], vim_type="OpenStack")
- @mock.patch.object(KafkaProducer, "publish_alarm_response")
- @mock.patch.object(OpenStack_Response, "generate_response")
+ @mock.patch.object(OpenStackResponseBuilder, "generate_response")
@mock.patch.object(Common, "perform_request")
@mock.patch.object(Common, "get_endpoint")
@mock.patch.object(Common, "get_auth_token")
@mock.patch.object(Common, "perform_request")
@mock.patch.object(Common, "get_endpoint")
@mock.patch.object(Common, "get_auth_token")
- def test_post_notify_alarm(self, auth, endpoint, perf_req, resp, notify):
+ def test_post_notify_alarm(self, auth, endpoint, perf_req, resp):
"""Integration test for notify_alarm."""
url = 'http://localhost:{port}/users'.format(port=mock_server_port)
payload = {"severity": "critical",
"""Integration test for notify_alarm."""
url = 'http://localhost:{port}/users'.format(port=mock_server_port)
payload = {"severity": "critical",
self.assertEqual(response.status_code, 200)
# A response message is generated with the following details
resp.assert_called_with(
self.assertEqual(response.status_code, 200)
# A response message is generated with the following details
resp.assert_called_with(
- "notify_alarm", a_id="my_alarm_id", r_id="my_resource_id",
+ "notify_alarm", alarm_id="my_alarm_id", resource_id="my_resource_id",
sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
vim_type="OpenStack")
sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
vim_type="OpenStack")