From: Benjamin Diaz Date: Mon, 18 Jun 2018 17:21:46 +0000 (-0300) Subject: [MON] Implements multithreading for message consumption X-Git-Tag: BUILD_v4.0.1_1~1^2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=326907a0151bed5641a9e9e241bc3b05bf0b71b9;p=osm%2FMON.git [MON] Implements multithreading for message consumption [MON] Fixes errors in OpenStack plugin integration tests Signed-off-by: Benjamin Diaz Change-Id: Ifbf85d5217244aa22655addd1a64c276746cda77 --- diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index dc5816e..60ad313 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,28 +22,24 @@ import json import logging import sys +import threading import six import yaml - from kafka import KafkaConsumer +from osm_common import dbmongo +from osm_mon.core.auth import AuthManager +from osm_mon.core.database import DatabaseManager from osm_mon.core.settings import Config -from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.Gnocchi import metrics - +from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials +from osm_mon.plugins.CloudWatch.connection import Connection from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics -from osm_mon.plugins.CloudWatch.connection import Connection -from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials - +from osm_mon.plugins.OpenStack.Aodh import alarming +from osm_mon.plugins.OpenStack.Gnocchi import metrics from osm_mon.plugins.vRealiseOps import plugin_receiver -from osm_mon.core.auth import AuthManager -from osm_mon.core.database import DatabaseManager - -from osm_common import dbmongo - logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', @@ -51,63 +47,70 @@ logging.basicConfig(stream=sys.stdout, log = logging.getLogger(__name__) -def get_vim_type(db_manager, vim_uuid): - """Get the vim type that is required by the message.""" - credentials = db_manager.get_credentials(vim_uuid) - return credentials.type - - -def get_vdur(common_db, nsr_id, member_index, vdu_name): - vnfr = get_vnfr(common_db, nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['vdu-id-ref'] == vdu_name: - return vdur - raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name) - - -def get_vnfr(common_db, nsr_id, member_index): - vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr - - -def main(): - cfg = Config.instance() - cfg.read_environ() - - auth_manager = AuthManager() - database_manager = DatabaseManager() - database_manager.create_tables() - - # Create OpenStack alarming and metric instances - openstack_metrics = metrics.Metrics() - openstack_alarms = alarming.Alarming() - - # Create CloudWatch alarm and metric instances - cloudwatch_alarms = plugin_alarms() - cloudwatch_metrics = plugin_metrics() - aws_connection = Connection() - aws_access_credentials = AccessCredentials() - - # Create vROps plugin_receiver class instance - vrops_rcvr = plugin_receiver.PluginReceiver() - - common_db = dbmongo.DbMongo() - common_db_uri = cfg.MONGO_URI.split(':') - common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) - - # Initialize consumers for alarms and metrics - common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - - # Define subscribe the consumer for the plugins - topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] - # TODO: Remove access_credentials - common_consumer.subscribe(topics) - - log.info("Listening for alarm_request and metric_request messages") - for message in common_consumer: +class CommonConsumer: + + def __init__(self): + cfg = Config.instance() + + self.auth_manager = AuthManager() + self.database_manager = DatabaseManager() + self.database_manager.create_tables() + + # Create OpenStack alarming and metric instances + self.openstack_metrics = metrics.Metrics() + self.openstack_alarms = alarming.Alarming() + + # Create CloudWatch alarm and metric instances + self.cloudwatch_alarms = plugin_alarms() + self.cloudwatch_metrics = plugin_metrics() + self.aws_connection = Connection() + self.aws_access_credentials = AccessCredentials() + + # Create vROps plugin_receiver class instance + self.vrops_rcvr = plugin_receiver.PluginReceiver() + + log.info("Connecting to MongoDB...") + self.common_db = dbmongo.DbMongo() + common_db_uri = cfg.MONGO_URI.split(':') + self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) + log.info("Connection successful.") + + # Initialize consumers for alarms and metrics + self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer") + + # Define subscribe the consumer for the plugins + topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] + # TODO: Remove access_credentials + self.common_consumer.subscribe(topics) + + def get_vim_type(self, vim_uuid): + """Get the vim type that is required by the message.""" + credentials = self.database_manager.get_credentials(vim_uuid) + return credentials.type + + def get_vdur(self, nsr_id, member_index, vdu_name): + vnfr = self.get_vnfr(nsr_id, member_index) + for vdur in vnfr['vdur']: + if vdur['vdu-id-ref'] == vdu_name: + return vdur + raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, + vdu_name) + + def get_vnfr(self, nsr_id, member_index): + vnfr = self.common_db.get_one(table="vnfrs", + filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + return vnfr + + def run(self): + log.info("Listening for messages...") + for message in self.common_consumer: + t = threading.Thread(target=self.consume_message, args=(message,)) + t.start() + + def consume_message(self, message): log.info("Message arrived: %s", message) try: try: @@ -117,9 +120,9 @@ def main(): if message.topic == "vim_account": if message.key == "create" or message.key == "edit": - auth_manager.store_auth_credentials(values) + self.auth_manager.store_auth_credentials(values) if message.key == "delete": - auth_manager.delete_auth_credentials(values) + self.auth_manager.delete_auth_credentials(values) else: # Get ns_id from message @@ -140,39 +143,40 @@ def main(): vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index'] # Check the vim desired by the message - vnfr = get_vnfr(common_db, ns_id, vnf_index) + vnfr = self.get_vnfr(ns_id, vnf_index) vim_uuid = vnfr['vim-account-id'] - vim_type = get_vim_type(database_manager, vim_uuid) if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values: vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name'] - vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name) + vdur = self.get_vdur(ns_id, vnf_index, vdu_name) if contains_list: values[list_index]['resource_uuid'] = vdur['vim-id'] else: values['resource_uuid'] = vdur['vim-id'] message = message._replace(value=json.dumps(values)) + vim_type = self.get_vim_type(vim_uuid) + if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") if message.topic == "metric_request": - openstack_metrics.metric_calls(message, vim_uuid) + self.openstack_metrics.metric_calls(message, vim_uuid) if message.topic == "alarm_request": - openstack_alarms.alarming(message, vim_uuid) + self.openstack_alarms.alarming(message, vim_uuid) elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() + aws_conn = self.aws_connection.setEnvironment() if message.topic == "metric_request": - cloudwatch_metrics.metric_calls(message, aws_conn) + self.cloudwatch_metrics.metric_calls(message, aws_conn) if message.topic == "alarm_request": - cloudwatch_alarms.alarm_calls(message, aws_conn) + self.cloudwatch_alarms.alarm_calls(message, aws_conn) if message.topic == "access_credentials": - aws_access_credentials.access_credential_calls(message) + self.aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) + self.vrops_rcvr.consume(message) else: log.debug("vim_type is misconfigured or unsupported; %s", @@ -183,4 +187,4 @@ def main(): if __name__ == '__main__': - main() + CommonConsumer().run() diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py index bf0839c..f04ecf8 100755 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -67,97 +67,34 @@ class KafkaProducer(object): """Send the required message on the Kafka message bus.""" try: future = self.producer.send(topic=topic, key=key, value=value) - self.producer.flush() + record_metadata = future.get(timeout=10) except Exception: logging.exception("Error publishing to {} topic." .format(topic)) raise try: - record_metadata = future.get(timeout=10) logging.debug("TOPIC:", record_metadata.topic) logging.debug("PARTITION:", record_metadata.partition) logging.debug("OFFSET:", record_metadata.offset) except KafkaError: pass - def create_alarm_request(self, key, message): - """Create alarm request from SO to MON.""" + def publish_alarm_request(self, key, message): + """Publish an alarm request.""" # External to MON self.publish(key, value=message, topic='alarm_request') - def create_alarm_response(self, key, message): - """Response to a create alarm request from MON to SO.""" - # Internal to MON - - self.publish(key, - value=message, - topic='alarm_response') - - def acknowledge_alarm(self, key, message): - """Alarm acknowledgement request from SO to MON.""" + def publish_alarm_response(self, key, message): + """Publish an alarm response.""" # Internal to MON - self.publish(key, - value=message, - topic='alarm_request') - - def list_alarm_request(self, key, message): - """List alarms request from SO to MON.""" - # External to MON - - self.publish(key, - value=message, - topic='alarm_request') - - def notify_alarm(self, key, message): - """Notify of triggered alarm from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def list_alarm_response(self, key, message): - """Response for list alarms request from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def update_alarm_request(self, key, message): - """Update alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def update_alarm_response(self, key, message): - """Response from update alarm request from MON to SO.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='alarm_response') - - def delete_alarm_request(self, key, message): - """Delete alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def delete_alarm_response(self, key, message): - """Response for a delete alarm request from MON to SO.""" - # Internal to Mon - self.publish(key, value=message, topic='alarm_response') - def create_metrics_request(self, key, message): + def publish_metrics_request(self, key, message): """Create metrics request from SO to MON.""" # External to Mon @@ -165,7 +102,7 @@ class KafkaProducer(object): value=message, topic='metric_request') - def create_metrics_resp(self, key, message): + def publish_metrics_response(self, key, message): """Response for a create metric request from MON to SO.""" # Internal to Mon diff --git a/osm_mon/plugins/CloudWatch/plugin_alarm.py b/osm_mon/plugins/CloudWatch/plugin_alarm.py index dea2b06..40e7fe5 100644 --- a/osm_mon/plugins/CloudWatch/plugin_alarm.py +++ b/osm_mon/plugins/CloudWatch/plugin_alarm.py @@ -87,12 +87,12 @@ class plugin_alarms(): log.debug("Alarm Already exists") payload = json.dumps(config_resp) file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload) + self.producer.publish_alarm_response(key='create_alarm_response',message=payload) else: payload = json.dumps(config_resp) file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload) + self.producer.publish_alarm_response(key='create_alarm_response',message=payload) log.info("New alarm created with alarm info: %s", config_resp) else: diff --git a/osm_mon/plugins/CloudWatch/plugin_metric.py b/osm_mon/plugins/CloudWatch/plugin_metric.py index dc687db..36b89e3 100644 --- a/osm_mon/plugins/CloudWatch/plugin_metric.py +++ b/osm_mon/plugins/CloudWatch/plugin_metric.py @@ -91,7 +91,7 @@ class plugin_metrics(): metric_response['metric_create_response'] = metric_resp payload = json.dumps(metric_response) file = open('../../core/models/create_metric_resp.json','wb').write((payload)) - self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response') + self.producer.publish_metrics_response(key='create_metric_response', message=payload, topic ='metric_response') log.info("Metric configured: %s", metric_resp) return metric_response diff --git a/osm_mon/plugins/OpenStack/Aodh/alarming.py b/osm_mon/plugins/OpenStack/Aodh/alarming.py index 44b8fdd..f380b4e 100644 --- a/osm_mon/plugins/OpenStack/Aodh/alarming.py +++ b/osm_mon/plugins/OpenStack/Aodh/alarming.py @@ -177,7 +177,7 @@ class Alarming(object): log.exception("Error updating alarm") raise e finally: - self._generate_and_send_response('create_alarm_response', + self._generate_and_send_response('update_alarm_response', alarm_details['correlation_id'], status=status, alarm_id=alarm_id) @@ -387,13 +387,13 @@ class Alarming(object): log.exception("Desired Gnocchi metric not found:", e) raise e - def _generate_and_send_response(self, topic, correlation_id, **kwargs): + def _generate_and_send_response(self, key, correlation_id, **kwargs): try: resp_message = self._response.generate_response( - topic, cor_id=correlation_id, **kwargs) + key, cor_id=correlation_id, **kwargs) log.info("Response Message: %s", resp_message) - self._producer.create_alarm_response( - topic, resp_message) + self._producer.publish_alarm_response( + key, resp_message) except Exception as e: log.exception("Response creation failed:") raise e diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py index 377404a..92674cb 100644 --- a/osm_mon/plugins/OpenStack/Aodh/notifier.py +++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py @@ -107,7 +107,7 @@ class NotifierHandler(BaseHTTPRequestHandler): sev=values['severity'], date=a_date, state=values['current']) - producer.notify_alarm( + producer.publish_alarm_response( 'notify_alarm', resp_message) log.info("Sent alarm notification: %s", resp_message) diff --git a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py index 7af9427..825671e 100644 --- a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py +++ b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py @@ -103,7 +103,7 @@ class Metrics(object): metric_id=metric_id, r_id=resource_id) log.info("Response messages: %s", resp_message) - self._producer.create_metrics_resp( + self._producer.publish_metrics_response( 'create_metric_response', resp_message) except Exception as exc: log.warning("Failed to create response: %s", exc) diff --git a/osm_mon/test/OpenStack/integration/test_alarm_integration.py b/osm_mon/test/OpenStack/integration/test_alarm_integration.py index 32532e1..bdd2033 100644 --- a/osm_mon/test/OpenStack/integration/test_alarm_integration.py +++ b/osm_mon/test/OpenStack/integration/test_alarm_integration.py @@ -33,7 +33,7 @@ from kafka.errors import KafkaError from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager, VimCredentials -from osm_mon.core.message_bus.producer import KafkaProducer as prod +from osm_mon.core.message_bus.producer import KafkaProducer as Producer from osm_mon.plugins.OpenStack import response from osm_mon.plugins.OpenStack.Aodh import alarming from osm_mon.plugins.OpenStack.common import Common @@ -44,6 +44,10 @@ mock_creds = VimCredentials() mock_creds.config = '{}' +@mock.patch.object(Producer, "publish_alarm_request", mock.Mock()) +@mock.patch.object(DatabaseManager, "save_alarm", mock.Mock()) +@mock.patch.object(Common, "get_auth_token", mock.Mock()) +@mock.patch.object(Common, "get_endpoint", mock.Mock()) class AlarmIntegrationTest(unittest.TestCase): def setUp(self): try: @@ -67,21 +71,20 @@ class AlarmIntegrationTest(unittest.TestCase): self.producer.close() self.req_consumer.close() - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") @mock.patch.object(AuthManager, 'get_credentials') - @mock.patch.object(prod, "update_alarm_response") @mock.patch.object(alarming.Alarming, "update_alarm") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_update_alarm_req(self, resp, update_alarm, update_resp, get_creds): + def test_update_alarm_req(self, resp, update_alarm, get_creds, perf_req): """Test Aodh update alarm request message from KafkaProducer.""" # Set-up message, producer and consumer for tests - payload = {"alarm_update_request": - {"correlation_id": 123, - "alarm_uuid": "alarm_id", - "metric_uuid": "metric_id"}} + payload = {"alarm_update_request": {"correlation_id": 123, + "alarm_uuid": "alarm_id", + "metric_uuid": "metric_id"}} get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' self.producer.send('alarm_request', key="update_alarm_request", value=json.dumps(payload)) @@ -89,79 +92,71 @@ class AlarmIntegrationTest(unittest.TestCase): for message in self.req_consumer: if message.key == "update_alarm_request": # Mock a valid alarm update - update_alarm.return_value = "alarm_id", True + update_alarm.return_value = "alarm_id" self.alarms.alarming(message, 'test_id') # A response message is generated and sent via MON's producer resp.assert_called_with( 'update_alarm_response', alarm_id="alarm_id", cor_id=123, status=True) - update_resp.assert_called_with( - 'update_alarm_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(DatabaseManager, "save_alarm", mock.Mock()) - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") @mock.patch.object(AuthManager, 'get_credentials') - @mock.patch.object(prod, "create_alarm_response") @mock.patch.object(alarming.Alarming, "configure_alarm") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_create_alarm_req(self, resp, config_alarm, create_resp, get_creds): + def test_create_alarm_req(self, resp, config_alarm, get_creds, perf_req): """Test Aodh create alarm request message from KafkaProducer.""" # Set-up message, producer and consumer for tests - payload = {"alarm_create_request": - {"correlation_id": 123, - "alarm_name": "my_alarm", - "metric_name": "my_metric", - "resource_uuid": "my_resource", - "severity": "WARNING", - "threshold_value": 60, - "operation": "GT", - "vdu_name": "vdu", - "vnf_member_index": "1", - "ns_id": "1"}} + payload = {"alarm_create_request": {"correlation_id": 123, + "alarm_name": "my_alarm", + "metric_name": "cpu_utilization", + "resource_uuid": "my_resource", + "severity": "WARNING", + "threshold_value": 60, + "operation": "GT", + "vdu_name": "vdu", + "vnf_member_index": "1", + "ns_id": "1"}} get_creds.return_value = mock_creds - + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' self.producer.send('alarm_request', key="create_alarm_request", value=json.dumps(payload)) for message in self.req_consumer: if message.key == "create_alarm_request": # Mock a valid alarm creation - config_alarm.return_value = "alarm_id", True + config_alarm.return_value = "alarm_id" self.alarms.alarming(message, 'test_id') # A response message is generated and sent via MON's produce resp.assert_called_with( 'create_alarm_response', status=True, alarm_id="alarm_id", cor_id=123) - create_resp.assert_called_with( - 'create_alarm_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") @mock.patch.object(AuthManager, 'get_credentials') - @mock.patch.object(prod, "list_alarm_response") @mock.patch.object(alarming.Alarming, "list_alarms") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_list_alarm_req(self, resp, list_alarm, list_resp, get_creds): + def test_list_alarm_req(self, resp, list_alarm, get_creds, perf_req): """Test Aodh list alarm request message from KafkaProducer.""" # Set-up message, producer and consumer for tests - payload = {"alarm_list_request": - {"correlation_id": 123, - "resource_uuid": "resource_id", }} + payload = {"alarm_list_request": {"correlation_id": 123, + "resource_uuid": "resource_id", }} self.producer.send('alarm_request', key="list_alarm_request", value=json.dumps(payload)) get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps([])}) + resp.return_value = '' for message in self.req_consumer: if message.key == "list_alarm_request": @@ -173,30 +168,26 @@ class AlarmIntegrationTest(unittest.TestCase): resp.assert_called_with( 'list_alarm_response', alarm_list=[], cor_id=123) - # Producer attempts to send the response message back to the SO - list_resp.assert_called_with( - 'list_alarm_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(alarming.Alarming, "delete_alarm") - @mock.patch.object(prod, "delete_alarm_response") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_delete_alarm_req(self, resp, del_resp, del_alarm, get_creds): + def test_delete_alarm_req(self, resp, del_alarm, get_creds, perf_req): """Test Aodh delete alarm request message from KafkaProducer.""" # Set-up message, producer and consumer for tests - payload = {"alarm_delete_request": - {"correlation_id": 123, - "alarm_uuid": "alarm_id", }} + payload = {"alarm_delete_request": {"correlation_id": 123, + "alarm_uuid": "alarm_id", }} self.producer.send('alarm_request', key="delete_alarm_request", value=json.dumps(payload)) get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps([])}) + resp.return_value = '' for message in self.req_consumer: if message.key == "delete_alarm_request": @@ -205,22 +196,17 @@ class AlarmIntegrationTest(unittest.TestCase): # Response message is generated and sent by MON's producer resp.assert_called_with( 'delete_alarm_response', alarm_id="alarm_id", - status=del_alarm.return_value, cor_id=123) - del_resp.assert_called_with( - 'delete_alarm_response', resp.return_value) + status=True, cor_id=123) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(alarming.Alarming, "update_alarm_state") def test_ack_alarm_req(self, ack_alarm, get_creds): """Test Aodh acknowledge alarm request message from KafkaProducer.""" # Set-up message, producer and consumer for tests - payload = {"ack_details": - {"alarm_uuid": "alarm_id", }} + payload = {"ack_details": {"alarm_uuid": "alarm_id", }} self.producer.send('alarm_request', key="acknowledge_alarm", value=json.dumps(payload)) diff --git a/osm_mon/test/OpenStack/integration/test_metric_integration.py b/osm_mon/test/OpenStack/integration/test_metric_integration.py index 45a34d3..eb672da 100644 --- a/osm_mon/test/OpenStack/integration/test_metric_integration.py +++ b/osm_mon/test/OpenStack/integration/test_metric_integration.py @@ -29,7 +29,9 @@ import unittest from kafka.errors import KafkaError -from osm_mon.core.message_bus.producer import KafkaProducer as prod +from osm_mon.core.auth import AuthManager +from osm_mon.core.database import VimCredentials +from osm_mon.core.message_bus.producer import KafkaProducer as Producer from kafka import KafkaConsumer from kafka import KafkaProducer @@ -44,7 +46,13 @@ from osm_mon.plugins.OpenStack.common import Common log = logging.getLogger(__name__) +mock_creds = VimCredentials() +mock_creds.config = '{}' + +@mock.patch.object(Producer, "publish_alarm_request", mock.Mock()) +@mock.patch.object(Common, "get_auth_token", mock.Mock()) +@mock.patch.object(Common, "get_endpoint", mock.Mock()) class MetricIntegrationTest(unittest.TestCase): def setUp(self): # Set up common and alarming class instances @@ -65,18 +73,21 @@ class MetricIntegrationTest(unittest.TestCase): except KafkaError: self.skipTest('Kafka server not present.') - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") + @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(metrics.Metrics, "configure_metric") - @mock.patch.object(prod, "create_metrics_resp") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_create_metric_req(self, resp, create_resp, config_metric): + def test_create_metric_req(self, resp, config_metric, get_creds, perf_req): """Test Gnocchi create metric request message from producer.""" # Set-up message, producer and consumer for tests payload = {"metric_create_request": {"correlation_id": 123, "metric_name": "cpu_utilization", "resource_uuid": "resource_id"}} + get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' + self.producer.send('metric_request', key="create_metric_request", value=json.dumps(payload)) @@ -90,18 +101,15 @@ class MetricIntegrationTest(unittest.TestCase): resp.assert_called_with( 'create_metric_response', status=True, cor_id=123, metric_id="metric_id", r_id="resource_id") - create_resp.assert_called_with( - 'create_metric_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") + @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(metrics.Metrics, "delete_metric") - @mock.patch.object(prod, "delete_metric_response") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_delete_metric_req(self, resp, del_resp, del_metric): + def test_delete_metric_req(self, resp, del_metric, get_creds, perf_req): """Test Gnocchi delete metric request message from producer.""" # Set-up message, producer and consumer for tests payload = {"vim_type": "OpenSTACK", @@ -110,6 +118,10 @@ class MetricIntegrationTest(unittest.TestCase): "metric_name": "cpu_utilization", "resource_uuid": "resource_id"} + get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' + self.producer.send('metric_request', key="delete_metric_request", value=json.dumps(payload)) @@ -121,21 +133,18 @@ class MetricIntegrationTest(unittest.TestCase): # A response message is generated and sent by MON's producer resp.assert_called_with( - 'delete_metric_response', m_id=None, + 'delete_metric_response', m_id='1', m_name="cpu_utilization", status=True, r_id="resource_id", cor_id=123) - del_resp.assert_called_with( - 'delete_metric_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") + @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(metrics.Metrics, "read_metric_data") - @mock.patch.object(prod, "read_metric_data_response") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_read_metric_data_req(self, resp, read_resp, read_data): + def test_read_metric_data_req(self, resp, read_data, get_creds, perf_req): """Test Gnocchi read metric data request message from producer.""" # Set-up message, producer and consumer for tests payload = {"vim_type": "OpenSTACK", @@ -144,6 +153,10 @@ class MetricIntegrationTest(unittest.TestCase): "metric_name": "cpu_utilization", "resource_uuid": "resource_id"} + get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' + self.producer.send('metric_request', key="read_metric_data_request", value=json.dumps(payload)) @@ -156,21 +169,18 @@ class MetricIntegrationTest(unittest.TestCase): # A response message is generated and sent by MON's producer resp.assert_called_with( - 'read_metric_data_response', m_id=None, + 'read_metric_data_response', m_id='1', m_name="cpu_utilization", r_id="resource_id", cor_id=123, times=[], metrics=[]) - read_resp.assert_called_with( - 'read_metric_data_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") + @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(metrics.Metrics, "list_metrics") - @mock.patch.object(prod, "list_metric_response") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_list_metrics_req(self, resp, list_resp, list_metrics): + def test_list_metrics_req(self, resp, list_metrics, get_creds, perf_req): """Test Gnocchi list metrics request message from producer.""" # Set-up message, producer and consumer for tests payload = {"vim_type": "OpenSTACK", @@ -178,6 +188,10 @@ class MetricIntegrationTest(unittest.TestCase): "metrics_list_request": {"correlation_id": 123, }} + get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' + self.producer.send('metric_request', key="list_metric_request", value=json.dumps(payload)) @@ -191,24 +205,25 @@ class MetricIntegrationTest(unittest.TestCase): # A response message is generated and sent by MON's producer resp.assert_called_with( 'list_metric_response', m_list=[], cor_id=123) - list_resp.assert_called_with( - 'list_metric_response', resp.return_value) return self.fail("No message received in consumer") - @mock.patch.object(Common, "get_auth_token", mock.Mock()) - @mock.patch.object(Common, "get_endpoint", mock.Mock()) + @mock.patch.object(Common, "perform_request") + @mock.patch.object(AuthManager, 'get_credentials') @mock.patch.object(metrics.Metrics, "get_metric_id") - @mock.patch.object(prod, "update_metric_response") @mock.patch.object(response.OpenStack_Response, "generate_response") - def test_update_metrics_req(self, resp, update_resp, get_id): + def test_update_metrics_req(self, resp, get_id, get_creds, perf_req): """Test Gnocchi update metric request message from KafkaProducer.""" # Set-up message, producer and consumer for tests payload = {"metric_create_request": {"metric_name": "my_metric", "correlation_id": 123, "resource_uuid": "resource_id", }} + get_creds.return_value = mock_creds + perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})}) + resp.return_value = '' + self.producer.send('metric_request', key="update_metric_request", value=json.dumps(payload)) @@ -224,8 +239,6 @@ class MetricIntegrationTest(unittest.TestCase): resp.assert_called_with( 'update_metric_response', status=False, cor_id=123, r_id="resource_id", m_id="metric_id") - update_resp.assert_called_with( - 'update_metric_response', resp.return_value) return self.fail("No message received in consumer") diff --git a/osm_mon/test/OpenStack/integration/test_notify_alarm.py b/osm_mon/test/OpenStack/integration/test_notify_alarm.py index 1b2c64c..0841446 100644 --- a/osm_mon/test/OpenStack/integration/test_notify_alarm.py +++ b/osm_mon/test/OpenStack/integration/test_notify_alarm.py @@ -119,8 +119,8 @@ class MockNotifierHandler(BaseHTTPRequestHandler): r_id=resource_id, sev=values['severity'], date=a_date, state=values['current'], vim_type="OpenStack") - self._producer.notify_alarm( - 'notify_alarm', resp_message, 'alarm_response') + self._producer.publish_alarm_response( + 'notify_alarm', resp_message) except Exception: pass @@ -154,7 +154,7 @@ def test_do_get(): class AlarmNotificationTest(unittest.TestCase): - @mock.patch.object(KafkaProducer, "notify_alarm") + @mock.patch.object(KafkaProducer, "publish_alarm_response") @mock.patch.object(OpenStack_Response, "generate_response") @mock.patch.object(Common, "perform_request") @mock.patch.object(Common, "get_endpoint") @@ -188,4 +188,4 @@ class AlarmNotificationTest(unittest.TestCase): vim_type="OpenStack") # Response message is sent back to the SO via MON's producer - notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response") + notify.assert_called_with("notify_alarm", mock.ANY) diff --git a/osm_mon/test/OpenStack/unit/test_notifier.py b/osm_mon/test/OpenStack/unit/test_notifier.py index 951bf10..f0fbf55 100644 --- a/osm_mon/test/OpenStack/unit/test_notifier.py +++ b/osm_mon/test/OpenStack/unit/test_notifier.py @@ -100,7 +100,7 @@ class TestNotifier(unittest.TestCase): set_head.assert_called_once() notify.assert_called_with(json.dumps(post_data)) - @mock.patch.object(KafkaProducer, "notify_alarm") + @mock.patch.object(KafkaProducer, "publish_alarm_response") @mock.patch.object(DatabaseManager, "get_alarm") def test_notify_alarm_valid_alarm( self, get_alarm, notify): @@ -116,7 +116,7 @@ class TestNotifier(unittest.TestCase): notify.assert_called_with("notify_alarm", mock.ANY) - @mock.patch.object(KafkaProducer, "notify_alarm") + @mock.patch.object(KafkaProducer, "publish_alarm_response") @mock.patch.object(DatabaseManager, "get_alarm") def test_notify_alarm_invalid_alarm( self, get_alarm, notify): diff --git a/osm_mon/test/core/test_common_consumer.py b/osm_mon/test/core/test_common_consumer.py index 56ac492..ddbdf8b 100644 --- a/osm_mon/test/core/test_common_consumer.py +++ b/osm_mon/test/core/test_common_consumer.py @@ -1,12 +1,25 @@ import unittest import mock +from kafka import KafkaProducer +from kafka.errors import KafkaError from osm_mon.core.database import VimCredentials from osm_mon.core.message_bus.common_consumer import * +@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock()) class CommonConsumerTest(unittest.TestCase): + + def setUp(self): + try: + KafkaProducer(bootstrap_servers='localhost:9092', + key_serializer=str.encode, + value_serializer=str.encode + ) + except KafkaError: + self.skipTest('Kafka server not present.') + @mock.patch.object(DatabaseManager, "get_credentials") def test_get_vim_type(self, get_creds): mock_creds = VimCredentials() @@ -19,8 +32,8 @@ class CommonConsumerTest(unittest.TestCase): get_creds.return_value = mock_creds - db_manager = DatabaseManager() - vim_type = get_vim_type(db_manager, 'test_id') + common_consumer = CommonConsumer() + vim_type = common_consumer.get_vim_type('test_id') self.assertEqual(vim_type, 'openstack') @@ -46,9 +59,8 @@ class CommonConsumerTest(unittest.TestCase): 'created-time': 1526044312.0999322, 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01', 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'} - - common_db = dbmongo.DbMongo() - vdur = get_vdur(common_db, '5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM') + common_consumer = CommonConsumer() + vdur = common_consumer.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM') expected_vdur = { 'internal-connection-point': [], 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',