X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=10cd3d54ad36bfcf0c66bcd51099d217252b9c1a;hb=b85fc8cdf840080b10d01c33b4a57a2a39bcc0f1;hp=25aee43a48ddf9295f7ee9b769be79f9cdb93da7;hpb=e4be37f562f1d0c394d1ccdd0238202ec8f6f949;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 25aee43..10cd3d5 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -23,8 +23,11 @@ import json import logging import os import sys + import yaml +from osm_mon.core.settings import Config + logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', @@ -48,14 +51,11 @@ from osm_mon.plugins.vRealiseOps import plugin_receiver from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager -# Initialize servers -if "BROKER_URI" in os.environ: - server = {'server': os.getenv("BROKER_URI")} -else: - server = {'server': 'localhost:9092'} +cfg = Config.instance() +cfg.read_environ() # Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(bootstrap_servers=server['server'], +common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, group_id="mon-consumer") @@ -78,14 +78,13 @@ aws_access_credentials = AccessCredentials() vrops_rcvr = plugin_receiver.PluginReceiver() -def get_vim_type(msg): +def get_vim_type(vim_uuid): """Get the vim type that is required by the message.""" try: - vim_uuid = json.loads(msg.value)["vim_uuid"].lower() credentials = database_manager.get_credentials(vim_uuid) return credentials.type - except Exception as exc: - log.warn("vim_type is not configured correctly; %s", exc) + except Exception: + log.exception("Error getting vim_type: ") return None @@ -100,75 +99,42 @@ for message in common_consumer: try: try: values = json.loads(message.value) - except: + except ValueError: values = yaml.safe_load(message.value) - # Check the message topic - if message.topic == "metric_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_metrics.metric_calls(message) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_metrics.metric_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "alarm_request": - # Check the vim desired by the message - vim_type = get_vim_type(message) - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_alarms.alarming(message) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_alarms.alarm_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This alarm_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "vim_account": + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": auth_manager.store_auth_credentials(values) if message.key == "delete": auth_manager.delete_auth_credentials(values) - # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly. - elif message.topic == "access_credentials": + else: # Check the vim desired by the message - vim_type = get_vim_type(message) + vim_type = get_vim_type(values['vim_uuid']) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + if message.topic == "metric_request": + openstack_metrics.metric_calls(message) + if message.topic == "alarm_request": + openstack_alarms.alarming(message) - if vim_type == "aws": + elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_access_credentials.access_credential_calls(message) + aws_conn = aws_connection.setEnvironment() + if message.topic == "metric_request": + cloudwatch_metrics.metric_calls(message, aws_conn) + if message.topic == "alarm_request": + cloudwatch_alarms.alarm_calls(message, aws_conn) + if message.topic == "access_credentials": + aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": - log.info("This access_credentials message is for the vROPs plugin.") + log.info("This metric_request message is for the vROPs plugin.") vrops_rcvr.consume(message) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) - else: - log.info("This topic is not relevant to any of the MON plugins.") - - - except Exception as exc: - log.exception("Exception: %s") + except Exception: + log.exception("Exception processing message: ")