X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=10cd3d54ad36bfcf0c66bcd51099d217252b9c1a;hb=b85fc8cdf840080b10d01c33b4a57a2a39bcc0f1;hp=27a4188205e372233cd9fdf449ab00e0e95488e2;hpb=b3f86c9f43a0eb4d56487118a403c39f979ec042;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 27a4188..10cd3d5 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -24,6 +24,10 @@ import logging import os import sys +import yaml + +from osm_mon.core.settings import Config + logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', @@ -35,7 +39,6 @@ sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', ' from kafka import KafkaConsumer from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.common import Common from osm_mon.plugins.OpenStack.Gnocchi import metrics from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms @@ -48,14 +51,11 @@ from osm_mon.plugins.vRealiseOps import plugin_receiver from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager -# Initialize servers -if "BROKER_URI" in os.environ: - server = {'server': os.getenv("BROKER_URI")} -else: - server = {'server': 'localhost:9092'} +cfg = Config.instance() +cfg.read_environ() # Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(bootstrap_servers=server['server'], +common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, group_id="mon-consumer") @@ -65,8 +65,6 @@ database_manager = DatabaseManager() database_manager.create_tables() # Create OpenStack alarming and metric instances -auth_token = None -openstack_auth = Common() openstack_metrics = metrics.Metrics() openstack_alarms = alarming.Alarming() @@ -80,12 +78,13 @@ aws_access_credentials = AccessCredentials() vrops_rcvr = plugin_receiver.PluginReceiver() -def get_vim_type(message): +def get_vim_type(vim_uuid): """Get the vim type that is required by the message.""" try: - return json.loads(message.value)["vim_type"].lower() - except Exception as exc: - log.warn("vim_type is not configured correctly; %s", exc) + credentials = database_manager.get_credentials(vim_uuid) + return credentials.type + except Exception: + log.exception("Error getting vim_type: ") return None @@ -98,78 +97,44 @@ log.info("Listening for alarm_request and metric_request messages") for message in common_consumer: log.info("Message arrived: %s", message) try: - # Check the message topic - if message.topic == "metric_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_metrics.metric_calls( - message, openstack_auth, auth_token) - - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_metrics.metric_calls(message, aws_conn) + try: + values = json.loads(message.value) + except ValueError: + values = yaml.safe_load(message.value) - elif vim_type == "vmware": - log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "alarm_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_alarms.alarming(message, openstack_auth, auth_token) - - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_alarms.alarm_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This alarm_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "vim_account": + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": - auth_manager.store_auth_credentials(message) + auth_manager.store_auth_credentials(values) if message.key == "delete": - auth_manager.delete_auth_credentials(message) + auth_manager.delete_auth_credentials(values) - # TODO: Remove in the near future. Modify tests accordingly. - elif message.topic == "access_credentials": + else: # Check the vim desired by the message - vim_type = get_vim_type(message) + vim_type = get_vim_type(values['vim_uuid']) if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") - auth_token = openstack_auth._authenticate(message=message) + if message.topic == "metric_request": + openstack_metrics.metric_calls(message) + if message.topic == "alarm_request": + openstack_alarms.alarming(message) elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_access_credentials.access_credential_calls(message) + aws_conn = aws_connection.setEnvironment() + if message.topic == "metric_request": + cloudwatch_metrics.metric_calls(message, aws_conn) + if message.topic == "alarm_request": + cloudwatch_alarms.alarm_calls(message, aws_conn) + if message.topic == "access_credentials": + aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": - log.info("This access_credentials message is for the vROPs plugin.") + log.info("This metric_request message is for the vROPs plugin.") vrops_rcvr.consume(message) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) - else: - log.info("This topic is not relevant to any of the MON plugins.") - - - except Exception as exc: - log.exception("Exception: %s") + except Exception: + log.exception("Exception processing message: ")