try:
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- group_id='osm_mon',
+ auto_offset_reset='earliest',
consumer_timeout_ms=2000)
self.req_consumer.subscribe(['metric_request'])
except KafkaError:
self.skipTest('Kafka server not present.')
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(metrics.Metrics, "configure_metric")
@mock.patch.object(prod, "create_metrics_resp")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Gnocchi create metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "1",
"correlation_id": 123,
"metric_create":
{"metric_name": "my_metric",
value=json.dumps(payload))
for message in self.req_consumer:
+ print(message)
# Check the vim desired by the message
vim_type = json.loads(message.value)["vim_type"].lower()
if vim_type == "openstack":
# A valid metric is created
config_metric.return_value = "metric_id", "resource_id", True
- self.metric_req.metric_calls(message, self.openstack_auth, None)
+ self.metric_req.metric_calls(message)
# A response message is generated and sent by MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(metrics.Metrics, "delete_metric")
@mock.patch.object(prod, "delete_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Gnocchi delete metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "1",
"correlation_id": 123,
"metric_uuid": "metric_id",
"metric_name": "metric_name",
for message in self.req_consumer:
# Check the vim desired by the message
vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "delete_metric_request":
# Metric has been deleted
del_metric.return_value = True
- self.metric_req.metric_calls(message, self.openstack_auth, None)
+ self.metric_req.metric_calls(message)
# A response message is generated and sent by MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(metrics.Metrics, "read_metric_data")
@mock.patch.object(prod, "read_metric_data_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Gnocchi read metric data request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"correlation_id": 123,
"metric_uuid": "metric_id",
"metric_name": "metric_name",
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "read_metric_data_request":
# Mock empty lists generated by the request message
read_data.return_value = [], []
- self.metric_req.metric_calls(message, self.openstack_auth, None)
+ self.metric_req.metric_calls(message)
# A response message is generated and sent by MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(metrics.Metrics, "list_metrics")
@mock.patch.object(prod, "list_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Gnocchi list metrics request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "1",
"metrics_list_request":
{"correlation_id": 123, }}
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "list_metric_request":
# Mock an empty list generated by the request
list_metrics.return_value = []
- self.metric_req.metric_calls(message, self.openstack_auth, None)
+ self.metric_req.metric_calls(message)
# A response message is generated and sent by MON's producer
resp.assert_called_with(
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(metrics.Metrics, "get_metric_id")
@mock.patch.object(prod, "update_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
"""Test Gnocchi update metric request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
+ "vim_uuid": "test_id",
"correlation_id": 123,
"metric_create":
{"metric_name": "my_metric",
for message in self.req_consumer:
# Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
- if vim_type == "openstack":
+ if message.key == "update_metric_request":
# Gnocchi doesn't support metric updates
get_id.return_value = "metric_id"
- self.metric_req.metric_calls(message, self.openstack_auth, None)
+ self.metric_req.metric_calls(message)
- # Reponse message is generated and sent via MON's producer
+ # Response message is generated and sent via MON's producer
# No metric update has taken place
resp.assert_called_with(
'update_metric_response', status=False, cor_id=123,