import json
import logging
import sys
+import threading
import six
import yaml
-
from kafka import KafkaConsumer
+from osm_common import dbmongo
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
-
+from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
+from osm_mon.plugins.CloudWatch.connection import Connection
from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-
+from osm_mon.plugins.OpenStack.Aodh import alarming
+from osm_mon.plugins.OpenStack.Gnocchi import metrics
from osm_mon.plugins.vRealiseOps import plugin_receiver
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
-
-from osm_common import dbmongo
-
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
log = logging.getLogger(__name__)
-def get_vim_type(db_manager, vim_uuid):
- """Get the vim type that is required by the message."""
- credentials = db_manager.get_credentials(vim_uuid)
- return credentials.type
-
-
-def get_vdur(common_db, nsr_id, member_index, vdu_name):
- vnfr = get_vnfr(common_db, nsr_id, member_index)
- for vdur in vnfr['vdur']:
- if vdur['vdu-id-ref'] == vdu_name:
- return vdur
- raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
-
-
-def get_vnfr(common_db, nsr_id, member_index):
- vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
-
-
-def main():
- cfg = Config.instance()
- cfg.read_environ()
-
- auth_manager = AuthManager()
- database_manager = DatabaseManager()
- database_manager.create_tables()
-
- # Create OpenStack alarming and metric instances
- openstack_metrics = metrics.Metrics()
- openstack_alarms = alarming.Alarming()
-
- # Create CloudWatch alarm and metric instances
- cloudwatch_alarms = plugin_alarms()
- cloudwatch_metrics = plugin_metrics()
- aws_connection = Connection()
- aws_access_credentials = AccessCredentials()
-
- # Create vROps plugin_receiver class instance
- vrops_rcvr = plugin_receiver.PluginReceiver()
-
- common_db = dbmongo.DbMongo()
- common_db_uri = cfg.MONGO_URI.split(':')
- common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
-
- # Initialize consumers for alarms and metrics
- common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- common_consumer.subscribe(topics)
-
- log.info("Listening for alarm_request and metric_request messages")
- for message in common_consumer:
+class CommonConsumer:
+
+ def __init__(self):
+ cfg = Config.instance()
+
+ self.auth_manager = AuthManager()
+ self.database_manager = DatabaseManager()
+ self.database_manager.create_tables()
+
+ # Create OpenStack alarming and metric instances
+ self.openstack_metrics = metrics.Metrics()
+ self.openstack_alarms = alarming.Alarming()
+
+ # Create CloudWatch alarm and metric instances
+ self.cloudwatch_alarms = plugin_alarms()
+ self.cloudwatch_metrics = plugin_metrics()
+ self.aws_connection = Connection()
+ self.aws_access_credentials = AccessCredentials()
+
+ # Create vROps plugin_receiver class instance
+ self.vrops_rcvr = plugin_receiver.PluginReceiver()
+
+ log.info("Connecting to MongoDB...")
+ self.common_db = dbmongo.DbMongo()
+ common_db_uri = cfg.MONGO_URI.split(':')
+ self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
+ log.info("Connection successful.")
+
+ # Initialize consumers for alarms and metrics
+ self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer")
+
+ # Define subscribe the consumer for the plugins
+ topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
+ # TODO: Remove access_credentials
+ self.common_consumer.subscribe(topics)
+
+ def get_vim_type(self, vim_uuid):
+ """Get the vim type that is required by the message."""
+ credentials = self.database_manager.get_credentials(vim_uuid)
+ return credentials.type
+
+ def get_vdur(self, nsr_id, member_index, vdu_name):
+ vnfr = self.get_vnfr(nsr_id, member_index)
+ for vdur in vnfr['vdur']:
+ if vdur['vdu-id-ref'] == vdu_name:
+ return vdur
+ raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
+ vdu_name)
+
+ def get_vnfr(self, nsr_id, member_index):
+ vnfr = self.common_db.get_one(table="vnfrs",
+ filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+ return vnfr
+
+ def run(self):
+ log.info("Listening for messages...")
+ for message in self.common_consumer:
+ t = threading.Thread(target=self.consume_message, args=(message,))
+ t.start()
+
+ def consume_message(self, message):
log.info("Message arrived: %s", message)
try:
try:
if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
- auth_manager.store_auth_credentials(values)
+ self.auth_manager.store_auth_credentials(values)
if message.key == "delete":
- auth_manager.delete_auth_credentials(values)
+ self.auth_manager.delete_auth_credentials(values)
else:
# Get ns_id from message
vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
# Check the vim desired by the message
- vnfr = get_vnfr(common_db, ns_id, vnf_index)
+ vnfr = self.get_vnfr(ns_id, vnf_index)
vim_uuid = vnfr['vim-account-id']
- vim_type = get_vim_type(database_manager, vim_uuid)
if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
- vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
+ vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
if contains_list:
values[list_index]['resource_uuid'] = vdur['vim-id']
else:
values['resource_uuid'] = vdur['vim-id']
message = message._replace(value=json.dumps(values))
+ vim_type = self.get_vim_type(vim_uuid)
+
if vim_type == "openstack":
log.info("This message is for the OpenStack plugin.")
if message.topic == "metric_request":
- openstack_metrics.metric_calls(message, vim_uuid)
+ self.openstack_metrics.metric_calls(message, vim_uuid)
if message.topic == "alarm_request":
- openstack_alarms.alarming(message, vim_uuid)
+ self.openstack_alarms.alarming(message, vim_uuid)
elif vim_type == "aws":
log.info("This message is for the CloudWatch plugin.")
- aws_conn = aws_connection.setEnvironment()
+ aws_conn = self.aws_connection.setEnvironment()
if message.topic == "metric_request":
- cloudwatch_metrics.metric_calls(message, aws_conn)
+ self.cloudwatch_metrics.metric_calls(message, aws_conn)
if message.topic == "alarm_request":
- cloudwatch_alarms.alarm_calls(message, aws_conn)
+ self.cloudwatch_alarms.alarm_calls(message, aws_conn)
if message.topic == "access_credentials":
- aws_access_credentials.access_credential_calls(message)
+ self.aws_access_credentials.access_credential_calls(message)
elif vim_type == "vmware":
log.info("This metric_request message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
+ self.vrops_rcvr.consume(message)
else:
log.debug("vim_type is misconfigured or unsupported; %s",
if __name__ == '__main__':
- main()
+ CommonConsumer().run()
"""Send the required message on the Kafka message bus."""
try:
future = self.producer.send(topic=topic, key=key, value=value)
- self.producer.flush()
+ record_metadata = future.get(timeout=10)
except Exception:
logging.exception("Error publishing to {} topic." .format(topic))
raise
try:
- record_metadata = future.get(timeout=10)
logging.debug("TOPIC:", record_metadata.topic)
logging.debug("PARTITION:", record_metadata.partition)
logging.debug("OFFSET:", record_metadata.offset)
except KafkaError:
pass
- def create_alarm_request(self, key, message):
- """Create alarm request from SO to MON."""
+ def publish_alarm_request(self, key, message):
+ """Publish an alarm request."""
# External to MON
self.publish(key,
value=message,
topic='alarm_request')
- def create_alarm_response(self, key, message):
- """Response to a create alarm request from MON to SO."""
- # Internal to MON
-
- self.publish(key,
- value=message,
- topic='alarm_response')
-
- def acknowledge_alarm(self, key, message):
- """Alarm acknowledgement request from SO to MON."""
+ def publish_alarm_response(self, key, message):
+ """Publish an alarm response."""
# Internal to MON
- self.publish(key,
- value=message,
- topic='alarm_request')
-
- def list_alarm_request(self, key, message):
- """List alarms request from SO to MON."""
- # External to MON
-
- self.publish(key,
- value=message,
- topic='alarm_request')
-
- def notify_alarm(self, key, message):
- """Notify of triggered alarm from MON to SO."""
-
- self.publish(key,
- value=message,
- topic='alarm_response')
-
- def list_alarm_response(self, key, message):
- """Response for list alarms request from MON to SO."""
-
- self.publish(key,
- value=message,
- topic='alarm_response')
-
- def update_alarm_request(self, key, message):
- """Update alarm request from SO to MON."""
- # External to Mon
-
- self.publish(key,
- value=message,
- topic='alarm_request')
-
- def update_alarm_response(self, key, message):
- """Response from update alarm request from MON to SO."""
- # Internal to Mon
-
- self.publish(key,
- value=message,
- topic='alarm_response')
-
- def delete_alarm_request(self, key, message):
- """Delete alarm request from SO to MON."""
- # External to Mon
-
- self.publish(key,
- value=message,
- topic='alarm_request')
-
- def delete_alarm_response(self, key, message):
- """Response for a delete alarm request from MON to SO."""
- # Internal to Mon
-
self.publish(key,
value=message,
topic='alarm_response')
- def create_metrics_request(self, key, message):
+ def publish_metrics_request(self, key, message):
"""Create metrics request from SO to MON."""
# External to Mon
value=message,
topic='metric_request')
- def create_metrics_resp(self, key, message):
+ def publish_metrics_response(self, key, message):
"""Response for a create metric request from MON to SO."""
# Internal to Mon
log.debug("Alarm Already exists")
payload = json.dumps(config_resp)
file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
- self.producer.create_alarm_response(key='create_alarm_response',message=payload)
+ self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
else:
payload = json.dumps(config_resp)
file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
- self.producer.create_alarm_response(key='create_alarm_response',message=payload)
+ self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
log.info("New alarm created with alarm info: %s", config_resp)
else:
metric_response['metric_create_response'] = metric_resp
payload = json.dumps(metric_response)
file = open('../../core/models/create_metric_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
+ self.producer.publish_metrics_response(key='create_metric_response', message=payload, topic ='metric_response')
log.info("Metric configured: %s", metric_resp)
return metric_response
log.exception("Error updating alarm")
raise e
finally:
- self._generate_and_send_response('create_alarm_response',
+ self._generate_and_send_response('update_alarm_response',
alarm_details['correlation_id'],
status=status,
alarm_id=alarm_id)
log.exception("Desired Gnocchi metric not found:", e)
raise e
- def _generate_and_send_response(self, topic, correlation_id, **kwargs):
+ def _generate_and_send_response(self, key, correlation_id, **kwargs):
try:
resp_message = self._response.generate_response(
- topic, cor_id=correlation_id, **kwargs)
+ key, cor_id=correlation_id, **kwargs)
log.info("Response Message: %s", resp_message)
- self._producer.create_alarm_response(
- topic, resp_message)
+ self._producer.publish_alarm_response(
+ key, resp_message)
except Exception as e:
log.exception("Response creation failed:")
raise e
sev=values['severity'],
date=a_date,
state=values['current'])
- producer.notify_alarm(
+ producer.publish_alarm_response(
'notify_alarm', resp_message)
log.info("Sent alarm notification: %s", resp_message)
metric_id=metric_id,
r_id=resource_id)
log.info("Response messages: %s", resp_message)
- self._producer.create_metrics_resp(
+ self._producer.publish_metrics_response(
'create_metric_response', resp_message)
except Exception as exc:
log.warning("Failed to create response: %s", exc)
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager, VimCredentials
-from osm_mon.core.message_bus.producer import KafkaProducer as prod
+from osm_mon.core.message_bus.producer import KafkaProducer as Producer
from osm_mon.plugins.OpenStack import response
from osm_mon.plugins.OpenStack.Aodh import alarming
from osm_mon.plugins.OpenStack.common import Common
mock_creds.config = '{}'
+@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
+@mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
+@mock.patch.object(Common, "get_auth_token", mock.Mock())
+@mock.patch.object(Common, "get_endpoint", mock.Mock())
class AlarmIntegrationTest(unittest.TestCase):
def setUp(self):
try:
self.producer.close()
self.req_consumer.close()
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(prod, "update_alarm_response")
@mock.patch.object(alarming.Alarming, "update_alarm")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_update_alarm_req(self, resp, update_alarm, update_resp, get_creds):
+ def test_update_alarm_req(self, resp, update_alarm, get_creds, perf_req):
"""Test Aodh update alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"alarm_update_request":
- {"correlation_id": 123,
- "alarm_uuid": "alarm_id",
- "metric_uuid": "metric_id"}}
+ payload = {"alarm_update_request": {"correlation_id": 123,
+ "alarm_uuid": "alarm_id",
+ "metric_uuid": "metric_id"}}
get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
self.producer.send('alarm_request', key="update_alarm_request",
value=json.dumps(payload))
for message in self.req_consumer:
if message.key == "update_alarm_request":
# Mock a valid alarm update
- update_alarm.return_value = "alarm_id", True
+ update_alarm.return_value = "alarm_id"
self.alarms.alarming(message, 'test_id')
# A response message is generated and sent via MON's producer
resp.assert_called_with(
'update_alarm_response', alarm_id="alarm_id", cor_id=123,
status=True)
- update_resp.assert_called_with(
- 'update_alarm_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(prod, "create_alarm_response")
@mock.patch.object(alarming.Alarming, "configure_alarm")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_create_alarm_req(self, resp, config_alarm, create_resp, get_creds):
+ def test_create_alarm_req(self, resp, config_alarm, get_creds, perf_req):
"""Test Aodh create alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"alarm_create_request":
- {"correlation_id": 123,
- "alarm_name": "my_alarm",
- "metric_name": "my_metric",
- "resource_uuid": "my_resource",
- "severity": "WARNING",
- "threshold_value": 60,
- "operation": "GT",
- "vdu_name": "vdu",
- "vnf_member_index": "1",
- "ns_id": "1"}}
+ payload = {"alarm_create_request": {"correlation_id": 123,
+ "alarm_name": "my_alarm",
+ "metric_name": "cpu_utilization",
+ "resource_uuid": "my_resource",
+ "severity": "WARNING",
+ "threshold_value": 60,
+ "operation": "GT",
+ "vdu_name": "vdu",
+ "vnf_member_index": "1",
+ "ns_id": "1"}}
get_creds.return_value = mock_creds
-
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
self.producer.send('alarm_request', key="create_alarm_request",
value=json.dumps(payload))
for message in self.req_consumer:
if message.key == "create_alarm_request":
# Mock a valid alarm creation
- config_alarm.return_value = "alarm_id", True
+ config_alarm.return_value = "alarm_id"
self.alarms.alarming(message, 'test_id')
# A response message is generated and sent via MON's produce
resp.assert_called_with(
'create_alarm_response', status=True, alarm_id="alarm_id",
cor_id=123)
- create_resp.assert_called_with(
- 'create_alarm_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
- @mock.patch.object(prod, "list_alarm_response")
@mock.patch.object(alarming.Alarming, "list_alarms")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_list_alarm_req(self, resp, list_alarm, list_resp, get_creds):
+ def test_list_alarm_req(self, resp, list_alarm, get_creds, perf_req):
"""Test Aodh list alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"alarm_list_request":
- {"correlation_id": 123,
- "resource_uuid": "resource_id", }}
+ payload = {"alarm_list_request": {"correlation_id": 123,
+ "resource_uuid": "resource_id", }}
self.producer.send('alarm_request', key="list_alarm_request",
value=json.dumps(payload))
get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps([])})
+ resp.return_value = ''
for message in self.req_consumer:
if message.key == "list_alarm_request":
resp.assert_called_with(
'list_alarm_response', alarm_list=[],
cor_id=123)
- # Producer attempts to send the response message back to the SO
- list_resp.assert_called_with(
- 'list_alarm_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
@mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(alarming.Alarming, "delete_alarm")
- @mock.patch.object(prod, "delete_alarm_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_delete_alarm_req(self, resp, del_resp, del_alarm, get_creds):
+ def test_delete_alarm_req(self, resp, del_alarm, get_creds, perf_req):
"""Test Aodh delete alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"alarm_delete_request":
- {"correlation_id": 123,
- "alarm_uuid": "alarm_id", }}
+ payload = {"alarm_delete_request": {"correlation_id": 123,
+ "alarm_uuid": "alarm_id", }}
self.producer.send('alarm_request', key="delete_alarm_request",
value=json.dumps(payload))
get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps([])})
+ resp.return_value = ''
for message in self.req_consumer:
if message.key == "delete_alarm_request":
# Response message is generated and sent by MON's producer
resp.assert_called_with(
'delete_alarm_response', alarm_id="alarm_id",
- status=del_alarm.return_value, cor_id=123)
- del_resp.assert_called_with(
- 'delete_alarm_response', resp.return_value)
+ status=True, cor_id=123)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(alarming.Alarming, "update_alarm_state")
def test_ack_alarm_req(self, ack_alarm, get_creds):
"""Test Aodh acknowledge alarm request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
- payload = {"ack_details":
- {"alarm_uuid": "alarm_id", }}
+ payload = {"ack_details": {"alarm_uuid": "alarm_id", }}
self.producer.send('alarm_request', key="acknowledge_alarm",
value=json.dumps(payload))
from kafka.errors import KafkaError
-from osm_mon.core.message_bus.producer import KafkaProducer as prod
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import VimCredentials
+from osm_mon.core.message_bus.producer import KafkaProducer as Producer
from kafka import KafkaConsumer
from kafka import KafkaProducer
log = logging.getLogger(__name__)
+mock_creds = VimCredentials()
+mock_creds.config = '{}'
+
+@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
+@mock.patch.object(Common, "get_auth_token", mock.Mock())
+@mock.patch.object(Common, "get_endpoint", mock.Mock())
class MetricIntegrationTest(unittest.TestCase):
def setUp(self):
# Set up common and alarming class instances
except KafkaError:
self.skipTest('Kafka server not present.')
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "configure_metric")
- @mock.patch.object(prod, "create_metrics_resp")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_create_metric_req(self, resp, create_resp, config_metric):
+ def test_create_metric_req(self, resp, config_metric, get_creds, perf_req):
"""Test Gnocchi create metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"metric_create_request": {"correlation_id": 123,
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="create_metric_request",
value=json.dumps(payload))
resp.assert_called_with(
'create_metric_response', status=True, cor_id=123,
metric_id="metric_id", r_id="resource_id")
- create_resp.assert_called_with(
- 'create_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "delete_metric")
- @mock.patch.object(prod, "delete_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_delete_metric_req(self, resp, del_resp, del_metric):
+ def test_delete_metric_req(self, resp, del_metric, get_creds, perf_req):
"""Test Gnocchi delete metric request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="delete_metric_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
- 'delete_metric_response', m_id=None,
+ 'delete_metric_response', m_id='1',
m_name="cpu_utilization", status=True, r_id="resource_id",
cor_id=123)
- del_resp.assert_called_with(
- 'delete_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "read_metric_data")
- @mock.patch.object(prod, "read_metric_data_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_read_metric_data_req(self, resp, read_resp, read_data):
+ def test_read_metric_data_req(self, resp, read_data, get_creds, perf_req):
"""Test Gnocchi read metric data request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metric_name": "cpu_utilization",
"resource_uuid": "resource_id"}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="read_metric_data_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
- 'read_metric_data_response', m_id=None,
+ 'read_metric_data_response', m_id='1',
m_name="cpu_utilization", r_id="resource_id", cor_id=123, times=[],
metrics=[])
- read_resp.assert_called_with(
- 'read_metric_data_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "list_metrics")
- @mock.patch.object(prod, "list_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_list_metrics_req(self, resp, list_resp, list_metrics):
+ def test_list_metrics_req(self, resp, list_metrics, get_creds, perf_req):
"""Test Gnocchi list metrics request message from producer."""
# Set-up message, producer and consumer for tests
payload = {"vim_type": "OpenSTACK",
"metrics_list_request":
{"correlation_id": 123, }}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="list_metric_request",
value=json.dumps(payload))
# A response message is generated and sent by MON's producer
resp.assert_called_with(
'list_metric_response', m_list=[], cor_id=123)
- list_resp.assert_called_with(
- 'list_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
- @mock.patch.object(Common, "get_auth_token", mock.Mock())
- @mock.patch.object(Common, "get_endpoint", mock.Mock())
+ @mock.patch.object(Common, "perform_request")
+ @mock.patch.object(AuthManager, 'get_credentials')
@mock.patch.object(metrics.Metrics, "get_metric_id")
- @mock.patch.object(prod, "update_metric_response")
@mock.patch.object(response.OpenStack_Response, "generate_response")
- def test_update_metrics_req(self, resp, update_resp, get_id):
+ def test_update_metrics_req(self, resp, get_id, get_creds, perf_req):
"""Test Gnocchi update metric request message from KafkaProducer."""
# Set-up message, producer and consumer for tests
payload = {"metric_create_request": {"metric_name": "my_metric",
"correlation_id": 123,
"resource_uuid": "resource_id", }}
+ get_creds.return_value = mock_creds
+ perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+ resp.return_value = ''
+
self.producer.send('metric_request', key="update_metric_request",
value=json.dumps(payload))
resp.assert_called_with(
'update_metric_response', status=False, cor_id=123,
r_id="resource_id", m_id="metric_id")
- update_resp.assert_called_with(
- 'update_metric_response', resp.return_value)
return
self.fail("No message received in consumer")
r_id=resource_id,
sev=values['severity'], date=a_date,
state=values['current'], vim_type="OpenStack")
- self._producer.notify_alarm(
- 'notify_alarm', resp_message, 'alarm_response')
+ self._producer.publish_alarm_response(
+ 'notify_alarm', resp_message)
except Exception:
pass
class AlarmNotificationTest(unittest.TestCase):
- @mock.patch.object(KafkaProducer, "notify_alarm")
+ @mock.patch.object(KafkaProducer, "publish_alarm_response")
@mock.patch.object(OpenStack_Response, "generate_response")
@mock.patch.object(Common, "perform_request")
@mock.patch.object(Common, "get_endpoint")
vim_type="OpenStack")
# Response message is sent back to the SO via MON's producer
- notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response")
+ notify.assert_called_with("notify_alarm", mock.ANY)
set_head.assert_called_once()
notify.assert_called_with(json.dumps(post_data))
- @mock.patch.object(KafkaProducer, "notify_alarm")
+ @mock.patch.object(KafkaProducer, "publish_alarm_response")
@mock.patch.object(DatabaseManager, "get_alarm")
def test_notify_alarm_valid_alarm(
self, get_alarm, notify):
notify.assert_called_with("notify_alarm", mock.ANY)
- @mock.patch.object(KafkaProducer, "notify_alarm")
+ @mock.patch.object(KafkaProducer, "publish_alarm_response")
@mock.patch.object(DatabaseManager, "get_alarm")
def test_notify_alarm_invalid_alarm(
self, get_alarm, notify):
import unittest
import mock
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
from osm_mon.core.database import VimCredentials
from osm_mon.core.message_bus.common_consumer import *
+@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
class CommonConsumerTest(unittest.TestCase):
+
+ def setUp(self):
+ try:
+ KafkaProducer(bootstrap_servers='localhost:9092',
+ key_serializer=str.encode,
+ value_serializer=str.encode
+ )
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
+
@mock.patch.object(DatabaseManager, "get_credentials")
def test_get_vim_type(self, get_creds):
mock_creds = VimCredentials()
get_creds.return_value = mock_creds
- db_manager = DatabaseManager()
- vim_type = get_vim_type(db_manager, 'test_id')
+ common_consumer = CommonConsumer()
+ vim_type = common_consumer.get_vim_type('test_id')
self.assertEqual(vim_type, 'openstack')
'created-time': 1526044312.0999322,
'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
-
- common_db = dbmongo.DbMongo()
- vdur = get_vdur(common_db, '5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
+ common_consumer = CommonConsumer()
+ vdur = common_consumer.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
expected_vdur = {
'internal-connection-point': [],
'vdu-id-ref': 'ubuntuvnf_vnfd-VM',