X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=10cd3d54ad36bfcf0c66bcd51099d217252b9c1a;hb=refs%2Fchanges%2F50%2F6050%2F3;hp=8f595b02a00f615f255a64e597957dbb2ec91444;hpb=e69fe6fd4fca96b7e3f771cfbe7664bc0e8d727d;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 8f595b0..10cd3d5 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -26,13 +26,14 @@ import sys import yaml +from osm_mon.core.settings import Config + logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO) log = logging.getLogger(__name__) - sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..'))) from kafka import KafkaConsumer @@ -50,14 +51,11 @@ from osm_mon.plugins.vRealiseOps import plugin_receiver from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager -# Initialize servers -if "BROKER_URI" in os.environ: - server = {'server': os.getenv("BROKER_URI")} -else: - server = {'server': 'localhost:9092'} +cfg = Config.instance() +cfg.read_environ() # Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(bootstrap_servers=server['server'], +common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, group_id="mon-consumer") @@ -85,7 +83,7 @@ def get_vim_type(vim_uuid): try: credentials = database_manager.get_credentials(vim_uuid) return credentials.type - except Exception as exc: + except Exception: log.exception("Error getting vim_type: ") return None @@ -103,73 +101,40 @@ for message in common_consumer: values = json.loads(message.value) except ValueError: values = yaml.safe_load(message.value) - # Check the message topic - if message.topic == "metric_request": - # Check the vim desired by the message - vim_type = get_vim_type(values['vim_uuid']) - - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_metrics.metric_calls(message) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_metrics.metric_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "alarm_request": - # Check the vim desired by the message - vim_type = get_vim_type(values['vim_uuid']) - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_alarms.alarming(message) - - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_alarms.alarm_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This alarm_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "vim_account": + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": auth_manager.store_auth_credentials(values) if message.key == "delete": auth_manager.delete_auth_credentials(values) - # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly. - elif message.topic == "access_credentials": + else: # Check the vim desired by the message vim_type = get_vim_type(values['vim_uuid']) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + if message.topic == "metric_request": + openstack_metrics.metric_calls(message) + if message.topic == "alarm_request": + openstack_alarms.alarming(message) - if vim_type == "aws": + elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_access_credentials.access_credential_calls(message) + aws_conn = aws_connection.setEnvironment() + if message.topic == "metric_request": + cloudwatch_metrics.metric_calls(message, aws_conn) + if message.topic == "alarm_request": + cloudwatch_alarms.alarm_calls(message, aws_conn) + if message.topic == "access_credentials": + aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": - log.info("This access_credentials message is for the vROPs plugin.") + log.info("This metric_request message is for the vROPs plugin.") vrops_rcvr.consume(message) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) - else: - log.info("This topic is not relevant to any of the MON plugins.") - - - except Exception as exc: - log.exception("Exception: %s") + except Exception: + log.exception("Exception processing message: ")