import logging
import socket
+import unittest
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
def do_GET(self):
"""Mock functionality for GET request."""
-# self.send_response(requests.codes.ok)
+ # self.send_response(requests.codes.ok)
self._set_headers()
pass
assert response.ok
-@mock.patch.object(KafkaProducer, "notify_alarm")
-@mock.patch.object(OpenStack_Response, "generate_response")
-@mock.patch.object(Common, "_perform_request")
-@mock.patch.object(Common, "get_endpoint")
-@mock.patch.object(Common, "_authenticate")
-def test_post_notify_alarm(auth, endpoint, perf_req, resp, notify):
- """Integration test for notify_alarm."""
- url = 'http://localhost:{port}/users'.format(port=mock_server_port)
- payload = {"severity": "critical",
- "alarm_name": "my_alarm",
- "current": "current_state",
- "alarm_id": "my_alarm_id",
- "reason": "Threshold has been broken",
- "reason_data": {"count": 1,
- "most_recent": "null",
- "type": "threshold",
- "disposition": "unknown"},
- "previous": "previous_state"}
-
- # Mock authenticate and request response for testing
- auth.return_value = "my_auth_token"
- endpoint.return_value = "my_endpoint"
- perf_req.return_value = MockResponse(valid_get_resp)
-
- # Generate a post reqest for testing
- requests.post(url, json.dumps(payload))
-
- # A response message is generated with the following details
- resp.assert_called_with(
- "notify_alarm", a_id="my_alarm_id", r_id="my_resource_id",
- sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
- vim_type="OpenStack")
-
- # Reponse message is sent back to the SO via MON's producer
- notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response")
+class AlarmNotificationTest(unittest.TestCase):
+ @mock.patch.object(KafkaProducer, "notify_alarm")
+ @mock.patch.object(OpenStack_Response, "generate_response")
+ @mock.patch.object(Common, "_perform_request")
+ @mock.patch.object(Common, "get_endpoint")
+ @mock.patch.object(Common, "_authenticate")
+ def test_post_notify_alarm(self, auth, endpoint, perf_req, resp, notify):
+ """Integration test for notify_alarm."""
+ url = 'http://localhost:{port}/users'.format(port=mock_server_port)
+ payload = {"severity": "critical",
+ "alarm_name": "my_alarm",
+ "current": "current_state",
+ "alarm_id": "my_alarm_id",
+ "reason": "Threshold has been broken",
+ "reason_data": {"count": 1,
+ "most_recent": "null",
+ "type": "threshold",
+ "disposition": "unknown"},
+ "previous": "previous_state"}
+
+ # Mock authenticate and request response for testing
+ auth.return_value = "my_auth_token"
+ endpoint.return_value = "my_endpoint"
+ perf_req.return_value = MockResponse(valid_get_resp)
+
+ # Generate a post reqest for testing
+ requests.post(url, json.dumps(payload))
+
+ # A response message is generated with the following details
+ resp.assert_called_with(
+ "notify_alarm", a_id="my_alarm_id", r_id="my_resource_id",
+ sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
+ vim_type="OpenStack")
+
+ # Reponse message is sent back to the SO via MON's producer
+ notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response")