X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=10cd3d54ad36bfcf0c66bcd51099d217252b9c1a;hb=75512477988ae5e287433c6c859c61de1bc82318;hp=0ba003b8c967835c1bade1ab886a48b947bbe146;hpb=ad1d76c5afc49644f7d864eee85d8e4d09a184cc;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 0ba003b..10cd3d5 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -24,6 +24,10 @@ import logging import os import sys +import yaml + +from osm_mon.core.settings import Config + logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', @@ -47,14 +51,11 @@ from osm_mon.plugins.vRealiseOps import plugin_receiver from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager -# Initialize servers -if "BROKER_URI" in os.environ: - server = {'server': os.getenv("BROKER_URI")} -else: - server = {'server': 'localhost:9092'} +cfg = Config.instance() +cfg.read_environ() # Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(bootstrap_servers=server['server'], +common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, group_id="mon-consumer") @@ -77,14 +78,13 @@ aws_access_credentials = AccessCredentials() vrops_rcvr = plugin_receiver.PluginReceiver() -def get_vim_type(msg): +def get_vim_type(vim_uuid): """Get the vim type that is required by the message.""" try: - vim_uuid = json.loads(msg.value)["vim_uuid"].lower() credentials = database_manager.get_credentials(vim_uuid) return credentials.type - except Exception as exc: - log.warn("vim_type is not configured correctly; %s", exc) + except Exception: + log.exception("Error getting vim_type: ") return None @@ -97,74 +97,44 @@ log.info("Listening for alarm_request and metric_request messages") for message in common_consumer: log.info("Message arrived: %s", message) try: - values = json.loads(message.value) - # Check the message topic - if message.topic == "metric_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_metrics.metric_calls(message) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_metrics.metric_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "alarm_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_alarms.alarming(message) + try: + values = json.loads(message.value) + except ValueError: + values = yaml.safe_load(message.value) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_alarms.alarm_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This alarm_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "vim_account": + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": auth_manager.store_auth_credentials(values) if message.key == "delete": auth_manager.delete_auth_credentials(values) - # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly. - elif message.topic == "access_credentials": + else: # Check the vim desired by the message - vim_type = get_vim_type(message) + vim_type = get_vim_type(values['vim_uuid']) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + if message.topic == "metric_request": + openstack_metrics.metric_calls(message) + if message.topic == "alarm_request": + openstack_alarms.alarming(message) - if vim_type == "aws": + elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_access_credentials.access_credential_calls(message) + aws_conn = aws_connection.setEnvironment() + if message.topic == "metric_request": + cloudwatch_metrics.metric_calls(message, aws_conn) + if message.topic == "alarm_request": + cloudwatch_alarms.alarm_calls(message, aws_conn) + if message.topic == "access_credentials": + aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": - log.info("This access_credentials message is for the vROPs plugin.") + log.info("This metric_request message is for the vROPs plugin.") vrops_rcvr.consume(message) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) - else: - log.info("This topic is not relevant to any of the MON plugins.") - - - except Exception as exc: - log.exception("Exception: %s") + except Exception: + log.exception("Exception processing message: ")