import pytest
from kafka import KafkaConsumer, KafkaProducer
-def test_end_to_end(kafka_broker):
- connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
- producer = KafkaProducer(bootstrap_servers=connect_str,
+def test_end_to_end():
+ producer = KafkaProducer(bootstrap_servers='localhost:9092',
retries=5,
max_block_ms=10000,
value_serializer=str.encode)
- consumer = KafkaConsumer(bootstrap_servers=connect_str,
+ consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
group_id=None,
consumer_timeout_ms=10000,
auto_offset_reset='earliest',
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+# __author__ = "Helena McGough"
+"""Test an end to end Openstack access_credentials requests."""
+
+import json
+
+import logging
+
+from kafka import KafkaConsumer
+from kafka import KafkaProducer
+
+from keystoneclient.v3 import client
+
+import mock
+
+from osm_mon.plugins.OpenStack.common import Common
+
+log = logging.getLogger(__name__)
+
+# Create an instance of the common openstack class, producer and consumer
+openstack_auth = Common()
+
+producer = KafkaProducer(bootstrap_servers='localhost:9092')
+req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon')
+req_consumer.subscribe("access_credentials")
+
+
+@mock.patch.object(client, "Client")
+def test_access_cred_req(keyclient):
+ """Test access credentials request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenStack",
+ "access_config":
+ {"openstack_site": "my_site",
+ "user": "my_user",
+ "password": "my_password",
+ "vim_tenant_name": "my_tenant"}}
+
+ producer.send('access_credentials', value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ openstack_auth._authenticate(message=message)
+
+ # A keystone client is created with the valid access_credentials
+ keyclient.assert_called_with(
+ auth_url="my_site", username="my_user", password="my_password",
+ tenant_name="my_tenant")
+
+ return
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+# __author__ = "Helena McGough"
+"""Test an end to end Openstack alarm requests."""
+
+import json
+
+import logging
+
+from osm_mon.core.message_bus.producer import KafkaProducer as prod
+
+from kafka import KafkaConsumer
+from kafka import KafkaProducer
+
+import mock
+
+from osm_mon.plugins.OpenStack import response
+
+from osm_mon.plugins.OpenStack.Aodh import alarming
+from osm_mon.plugins.OpenStack.common import Common
+
+log = logging.getLogger(__name__)
+
+# Set up common and alarming class instances
+alarms = alarming.Alarming()
+openstack_auth = Common()
+
+# Initialise the alarm request consumer and a producer for testing
+producer = KafkaProducer(bootstrap_servers='localhost:9092')
+req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon')
+req_consumer.subscribe("alarm_request")
+
+
+@mock.patch.object(prod, "update_alarm_response")
+@mock.patch.object(alarming.Alarming, "update_alarm")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_update_alarm_req(resp, update_alarm, update_resp):
+ """Test Aodh update alarm request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "alarm_update_request":
+ {"correlation_id": 123,
+ "alarm_uuid": "alarm_id",
+ "metric_uuid": "metric_id"}}
+
+ producer.send('alarm_request', key="update_alarm_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Mock a valid alarm update
+ update_alarm.return_value = "alarm_id", True
+ alarms.alarming(message, openstack_auth, None)
+
+ # A response message is generated and sent via MON's producer
+ resp.assert_called_with(
+ 'update_alarm_response', alarm_id="alarm_id", cor_id=123,
+ status=True)
+ update_resp.assert_called_with(
+ 'update_alarm_response', resp.return_value, 'alarm_response')
+
+ return
+
+
+@mock.patch.object(prod, "create_alarm_response")
+@mock.patch.object(alarming.Alarming, "configure_alarm")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_create_alarm_req(resp, config_alarm, create_resp):
+ """Test Aodh create alarm request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "alarm_create_request":
+ {"correlation_id": 123,
+ "alarm_name": "my_alarm",
+ "metric_name": "my_metric",
+ "resource_uuid": "my_resource",
+ "severity": "WARNING"}}
+
+ producer.send('alarm_request', key="create_alarm_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Mock a valid alarm creation
+ config_alarm.return_value = "alarm_id", True
+ alarms.alarming(message, openstack_auth, None)
+
+ # A response message is generated and sent via MON's produce
+ resp.assert_called_with(
+ 'create_alarm_response', status=True, alarm_id="alarm_id",
+ cor_id=123)
+ create_resp.assert_called_with(
+ 'create_alarm_response', resp.return_value, 'alarm_response')
+
+ return
+
+
+@mock.patch.object(prod, "list_alarm_response")
+@mock.patch.object(alarming.Alarming, "list_alarms")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_list_alarm_req(resp, list_alarm, list_resp):
+ """Test Aodh list alarm request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "alarm_list_request":
+ {"correlation_id": 123,
+ "resource_uuid": "resource_id", }}
+
+ producer.send('alarm_request', key="list_alarm_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Mock an empty list generated by the request
+ list_alarm.return_value = []
+ alarms.alarming(message, openstack_auth, None)
+
+ # Resoonse message is generated
+ resp.assert_called_with(
+ 'list_alarm_response', alarm_list=[],
+ cor_id=123)
+ # Producer attempts to send the response message back to the SO
+ list_resp.assert_called_with(
+ 'list_alarm_response', resp.return_value, 'alarm_response')
+
+ return
+
+
+@mock.patch.object(alarming.Alarming, "delete_alarm")
+@mock.patch.object(prod, "delete_alarm_response")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_delete_alarm_req(resp, del_resp, del_alarm):
+ """Test Aodh delete alarm request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "alarm_delete_request":
+ {"correlation_id": 123,
+ "alarm_uuid": "alarm_id", }}
+
+ producer.send('alarm_request', key="delete_alarm_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ alarms.alarming(message, openstack_auth, None)
+
+ # Response message is generated and sent by MON's producer
+ resp.assert_called_with(
+ 'delete_alarm_response', alarm_id="alarm_id",
+ status=del_alarm.return_value, cor_id=123)
+ del_resp.assert_called_with(
+ 'delete_alarm_response', resp.return_value, 'alarm_response')
+
+ return
+
+
+@mock.patch.object(alarming.Alarming, "update_alarm_state")
+def test_ack_alarm_req(ack_alarm):
+ """Test Aodh acknowledge alarm request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "ack_details":
+ {"alarm_uuid": "alarm_id", }}
+
+ producer.send('alarm_request', key="acknowledge_alarm",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ alarms.alarming(message, openstack_auth, None)
+ # No response message is sent for and ack request
+ # Alarm state is updated from alarm -> ok
+ ack_alarm.assert_called_with(None, None, "alarm_id")
+ return
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+# __author__ = "Helena McGough"
+"""Test an end to end Openstack metric requests."""
+
+import json
+
+import logging
+
+from osm_mon.core.message_bus.producer import KafkaProducer as prod
+
+from kafka import KafkaConsumer
+from kafka import KafkaProducer
+
+import mock
+
+from osm_mon.plugins.OpenStack import response
+
+from osm_mon.plugins.OpenStack.Gnocchi import metrics
+
+from osm_mon.plugins.OpenStack.common import Common
+
+log = logging.getLogger(__name__)
+
+# Instances for the openstack common and metric classes
+metric_req = metrics.Metrics()
+openstack_auth = Common()
+
+# A metric_request consumer and a producer for testing purposes
+producer = KafkaProducer(bootstrap_servers='localhost:9092')
+req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon')
+req_consumer.subscribe("metric_request")
+
+
+@mock.patch.object(metrics.Metrics, "configure_metric")
+@mock.patch.object(prod, "create_metrics_resp")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_create_metric_req(resp, create_resp, config_metric):
+ """Test Gnocchi create metric request message from producer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "correlation_id": 123,
+ "metric_create":
+ {"metric_name": "my_metric",
+ "resource_uuid": "resource_id"}}
+
+ producer.send('metric_request', key="create_metric_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # A valid metric is created
+ config_metric.return_value = "metric_id", "resource_id", True
+ metric_req.metric_calls(message, openstack_auth, None)
+
+ # A response message is generated and sent by MON's producer
+ resp.assert_called_with(
+ 'create_metric_response', status=True, cor_id=123,
+ metric_id="metric_id", r_id="resource_id")
+ create_resp.assert_called_with(
+ 'create_metric_response', resp.return_value, 'metric_response')
+
+ return
+
+
+@mock.patch.object(metrics.Metrics, "delete_metric")
+@mock.patch.object(prod, "delete_metric_response")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_delete_metric_req(resp, del_resp, del_metric):
+ """Test Gnocchi delete metric request message from producer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "correlation_id": 123,
+ "metric_uuid": "metric_id",
+ "metric_name": "metric_name",
+ "resource_uuid": "resource_id"}
+
+ producer.send('metric_request', key="delete_metric_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Metric has been deleted
+ del_metric.return_value = True
+ metric_req.metric_calls(message, openstack_auth, None)
+
+ # A response message is generated and sent by MON's producer
+ resp.assert_called_with(
+ 'delete_metric_response', m_id="metric_id",
+ m_name="metric_name", status=True, r_id="resource_id",
+ cor_id=123)
+ del_resp.assert_called_with(
+ 'delete_metric_response', resp.return_value, 'metric_response')
+
+ return
+
+
+@mock.patch.object(metrics.Metrics, "read_metric_data")
+@mock.patch.object(prod, "read_metric_data_response")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_read_metric_data_req(resp, read_resp, read_data):
+ """Test Gnocchi read metric data request message from producer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "correlation_id": 123,
+ "metric_uuid": "metric_id",
+ "metric_name": "metric_name",
+ "resource_uuid": "resource_id"}
+
+ producer.send('metric_request', key="read_metric_data_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Mock empty lists generated by the request message
+ read_data.return_value = [], []
+ metric_req.metric_calls(message, openstack_auth, None)
+
+ # A response message is generated and sent by MON's producer
+ resp.assert_called_with(
+ 'read_metric_data_response', m_id="metric_id",
+ m_name="metric_name", r_id="resource_id", cor_id=123, times=[],
+ metrics=[])
+ read_resp.assert_called_with(
+ 'read_metric_data_response', resp.return_value,
+ 'metric_response')
+
+ return
+
+
+@mock.patch.object(metrics.Metrics, "list_metrics")
+@mock.patch.object(prod, "list_metric_response")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_list_metrics_req(resp, list_resp, list_metrics):
+ """Test Gnocchi list metrics request message from producer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "metrics_list_request":
+ {"correlation_id": 123, }}
+
+ producer.send('metric_request', key="list_metric_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Mock an empty list generated by the request
+ list_metrics.return_value = []
+ metric_req.metric_calls(message, openstack_auth, None)
+
+ # A response message is generated and sent by MON's producer
+ resp.assert_called_with(
+ 'list_metric_response', m_list=[], cor_id=123)
+ list_resp.assert_called_with(
+ 'list_metric_response', resp.return_value, 'metric_response')
+
+ return
+
+
+@mock.patch.object(metrics.Metrics, "get_metric_id")
+@mock.patch.object(prod, "update_metric_response")
+@mock.patch.object(response.OpenStack_Response, "generate_response")
+def test_update_metrics_req(resp, update_resp, get_id):
+ """Test Gnocchi update metric request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenSTACK",
+ "correlation_id": 123,
+ "metric_create":
+ {"metric_name": "my_metric",
+ "resource_uuid": "resource_id", }}
+
+ producer.send('metric_request', key="update_metric_request",
+ value=json.dumps(payload))
+
+ for message in req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ # Gnocchi doesn't support metric updates
+ get_id.return_value = "metric_id"
+ metric_req.metric_calls(message, openstack_auth, None)
+
+ # Reponse message is generated and sent via MON's producer
+ # No metric update has taken place
+ resp.assert_called_with(
+ 'update_metric_response', status=False, cor_id=123,
+ r_id="resource_id", m_id="metric_id")
+ update_resp.assert_called_with(
+ 'update_metric_response', resp.return_value, 'metric_response')
+
+ return
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all common OpenStack methods."""
+
+import json
+
+import logging
+
+import socket
+
+from BaseHTTPServer import BaseHTTPRequestHandler
+from BaseHTTPServer import HTTPServer
+
+from threading import Thread
+
+from osm_mon.core.message_bus.producer import KafkaProducer
+
+import mock
+
+from osm_mon.plugins.OpenStack.Aodh.alarming import Alarming
+from osm_mon.plugins.OpenStack.common import Common
+from osm_mon.plugins.OpenStack.response import OpenStack_Response
+from osm_mon.plugins.OpenStack.settings import Config
+
+import requests
+
+log = logging.getLogger(__name__)
+
+# Create an instance of the common openstack class, producer and consumer
+openstack_auth = Common()
+
+# Mock a valid get_response for alarm details
+valid_get_resp = '{"gnocchi_resources_threshold_rule":\
+ {"resource_id": "my_resource_id"}}'
+
+
+class MockResponse(object):
+ """Mock a response class for generating responses."""
+
+ def __init__(self, text):
+ """Initialise a mock response with a text attribute."""
+ self.text = text
+
+
+class MockNotifierHandler(BaseHTTPRequestHandler):
+ """Mock the NotifierHandler class for testing purposes."""
+
+ def _set_headers(self):
+ """Set the headers for a request."""
+ self.send_response(200)
+ self.send_header('Content-type', 'text/html')
+ self.end_headers()
+
+ def do_GET(self):
+ """Mock functionality for GET request."""
+# self.send_response(requests.codes.ok)
+ self._set_headers()
+ pass
+
+ def do_POST(self):
+ """Mock functionality for a POST request."""
+ self._set_headers()
+ content_length = int(self.headers['Content-Length'])
+ post_data = self.rfile.read(content_length)
+ self.notify_alarm(json.loads(post_data))
+
+ def notify_alarm(self, values):
+ """Mock the notify_alarm functionality to generate a valid response."""
+ config = Config.instance()
+ config.read_environ("aodh")
+ self._alarming = Alarming()
+ self._common = Common()
+ self._response = OpenStack_Response()
+ self._producer = KafkaProducer('alarm_response')
+ alarm_id = values['alarm_id']
+
+ auth_token = self._common._authenticate()
+ endpoint = self._common.get_endpoint("alarming")
+
+ # If authenticated generate and send response message
+ if (auth_token is not None and endpoint is not None):
+ url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
+
+ # Get the resource_id of the triggered alarm and the date
+ result = self._common._perform_request(
+ url, auth_token, req_type="get")
+ alarm_details = json.loads(result.text)
+ gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule']
+ resource_id = gnocchi_rule['resource_id']
+ # Mock a date for testing purposes
+ a_date = "dd-mm-yyyy 00:00"
+
+ # Process an alarm notification if resource_id is valid
+ if resource_id is not None:
+ # Try generate and send response
+ try:
+ resp_message = self._response.generate_response(
+ 'notify_alarm', a_id=alarm_id,
+ r_id=resource_id,
+ sev=values['severity'], date=a_date,
+ state=values['current'], vim_type="OpenStack")
+ self._producer.notify_alarm(
+ 'notify_alarm', resp_message, 'alarm_response')
+ except Exception:
+ pass
+
+
+def get_free_port():
+ """Function to get a free port to run the test webserver on."""
+ s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM)
+ s.bind(('localhost', 0))
+ address, port = s.getsockname()
+ s.close()
+ return port
+
+
+# Create the webserver, port and run it on its own thread
+mock_server_port = get_free_port()
+mock_server = HTTPServer(('localhost', mock_server_port), MockNotifierHandler)
+mock_server_thread = Thread(target=mock_server.serve_forever)
+mock_server_thread.setDaemon(True)
+mock_server_thread.start()
+
+
+def test_do_get():
+ """Integration test for get request on notifier webserver."""
+ url = 'http://localhost:{port}/users'.format(port=mock_server_port)
+
+ # Send a request to the mock API server and store the response.
+ response = requests.get(url)
+
+ # Confirm that the request-response cycle completed successfully.
+ assert response.ok
+
+
+@mock.patch.object(KafkaProducer, "notify_alarm")
+@mock.patch.object(OpenStack_Response, "generate_response")
+@mock.patch.object(Common, "_perform_request")
+@mock.patch.object(Common, "get_endpoint")
+@mock.patch.object(Common, "_authenticate")
+def test_post_notify_alarm(auth, endpoint, perf_req, resp, notify):
+ """Integration test for notify_alarm."""
+ url = 'http://localhost:{port}/users'.format(port=mock_server_port)
+ payload = {"severity": "critical",
+ "alarm_name": "my_alarm",
+ "current": "current_state",
+ "alarm_id": "my_alarm_id",
+ "reason": "Threshold has been broken",
+ "reason_data": {"count": 1,
+ "most_recent": "null",
+ "type": "threshold",
+ "disposition": "unknown"},
+ "previous": "previous_state"}
+
+ # Mock authenticate and request response for testing
+ auth.return_value = "my_auth_token"
+ endpoint.return_value = "my_endpoint"
+ perf_req.return_value = MockResponse(valid_get_resp)
+
+ # Generate a post reqest for testing
+ requests.post(url, json.dumps(payload))
+
+ # A response message is generated with the following details
+ resp.assert_called_with(
+ "notify_alarm", a_id="my_alarm_id", r_id="my_resource_id",
+ sev="critical", date='dd-mm-yyyy 00:00', state="current_state",
+ vim_type="OpenStack")
+
+ # Reponse message is sent back to the SO via MON's producer
+ notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response")
show-source = True
ignore = E123,E125,E241
builtins = _
-exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,core/*,devops_stages/**.rst
+exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst