from kafka.errors import KafkaError
-from osm_mon.core.message_bus.producer import KafkaProducer as prod
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import VimCredentials
+from osm_mon.core.message_bus.producer import KafkaProducer as Producer
from kafka import KafkaConsumer
from kafka import KafkaProducer
log = logging.getLogger(__name__)
+mock_creds = VimCredentials()
+mock_creds.config = '{}'
+
+@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
+@mock.patch.object(Common, "get_auth_token", mock.Mock())
+@mock.patch.object(Common, "get_endpoint", mock.Mock())
class MetricIntegrationTest(unittest.TestCase):
def setUp(self):
# Set up common and alarming class instances
except KafkaError:
self.skipTest('Kafka server not present.')
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "configure_metric")
- @mock.patch.object(prod, "create_metrics_resp")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_create_metric_req(self, resp, create_resp, config_metric):
+ def test_create_metric_req(self, resp, config_metric, get_creds, perf_req):
"""Test Gnocchi create metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"metric_create_request": {"correlation_id": 123,
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="create_metric_request",
value=json.dumps(payload))
resp.assert_called_with(
'create_metric_response', status=True, cor_id=123,
metric_id="metric_id", r_id="resource_id")
- create_resp.assert_called_with(
- 'create_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "delete_metric")
- @mock.patch.object(prod, "delete_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_delete_metric_req(self, resp, del_resp, del_metric):
+ def test_delete_metric_req(self, resp, del_metric, get_creds, perf_req):
"""Test Gnocchi delete metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="delete_metric_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
- 'delete_metric_response', m_id=None,
+ 'delete_metric_response', m_id='1',
m_name="cpu_utilization", status=True, r_id="resource_id",
cor_id=123)
- del_resp.assert_called_with(
- 'delete_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "read_metric_data")
- @mock.patch.object(prod, "read_metric_data_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_read_metric_data_req(self, resp, read_resp, read_data):
+ def test_read_metric_data_req(self, resp, read_data, get_creds, perf_req):
"""Test Gnocchi read metric data request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="read_metric_data_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
- 'read_metric_data_response', m_id=None,
+ 'read_metric_data_response', m_id='1',
m_name="cpu_utilization", r_id="resource_id", cor_id=123, times=[],
metrics=[])
- read_resp.assert_called_with(
- 'read_metric_data_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "list_metrics")
- @mock.patch.object(prod, "list_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_list_metrics_req(self, resp, list_resp, list_metrics):
+ def test_list_metrics_req(self, resp, list_metrics, get_creds, perf_req):
"""Test Gnocchi list metrics request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metrics_list_request":
{"correlation_id": 123, }}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="list_metric_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
'list_metric_response', m_list=[], cor_id=123)
- list_resp.assert_called_with(
- 'list_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "get_metric_id")
- @mock.patch.object(prod, "update_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_update_metrics_req(self, resp, update_resp, get_id):
+ def test_update_metrics_req(self, resp, get_id, get_creds, perf_req):
"""Test Gnocchi update metric request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"metric_create_request": {"metric_name": "my_metric",
"correlation_id": 123,
"resource_uuid": "resource_id", }}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="update_metric_request",
value=json.dumps(payload))
resp.assert_called_with(
'update_metric_response', status=False, cor_id=123,
r_id="resource_id", m_id="metric_id")
- update_resp.assert_called_with(
- 'update_metric_response', resp.return_value)
return
self.fail("No message received in consumer")