##
"""Tests for all common OpenStack methods."""
-
from __future__ import unicode_literals
import json
import logging
import mock
import requests
+from kafka import KafkaProducer
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
from six.moves.BaseHTTPServer import HTTPServer
-from osm_mon.core.message_bus.producer import KafkaProducer
from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh.alarming import Alarming
+from osm_mon.plugins.OpenStack.Aodh.alarm_handler import OpenstackAlarmHandler
from osm_mon.plugins.OpenStack.common import Common
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
+from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder
log = logging.getLogger(__name__)
def notify_alarm(self, values):
"""Mock the notify_alarm functionality to generate a valid response."""
- config = Config.instance()
- config.read_environ()
- self._alarming = Alarming()
+ cfg = Config.instance()
+ self._alarming = OpenstackAlarmHandler()
self._common = Common()
- self._response = OpenStack_Response()
- self._producer = KafkaProducer('alarm_response')
+ self._response = OpenStackResponseBuilder()
alarm_id = values['alarm_id']
auth_token = Common.get_auth_token('test_id')
resource_id=resource_id,
sev=values['severity'], date=a_date,
state=values['current'], vim_type="OpenStack")
- self._producer.publish_alarm_response(
- 'notify_alarm', resp_message)
except Exception:
- pass
+ log.exception("Error generating response")
def get_free_port():
class AlarmNotificationTest(unittest.TestCase):
- @mock.patch.object(KafkaProducer, "publish_alarm_response")
- @mock.patch.object(OpenStack_Response, "generate_response")
+ @mock.patch.object(OpenStackResponseBuilder, "generate_response")
@mock.patch.object(Common, "perform_request")
@mock.patch.object(Common, "get_endpoint")
@mock.patch.object(Common, "get_auth_token")
- def test_post_notify_alarm(self, auth, endpoint, perf_req, resp, notify):
+ def test_post_notify_alarm(self, auth, endpoint, perf_req, resp):
"""Integration test for notify_alarm."""
url = 'http://localhost:{port}/users'.format(port=mock_server_port)
payload = {"severity": "critical",
"notify_alarm", alarm_id="my_alarm_id", resource_id="my_resource_id",
sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
vim_type="OpenStack")
-
- # Response message is sent back to the SO via MON's producer
- notify.assert_called_with("notify_alarm", mock.ANY)