Adds granularity support in OpenStack vim config
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index 0ba003b..10cd3d5 100755 (executable)
@@ -24,6 +24,10 @@ import logging
 import os
 import sys
 
+import yaml
+
+from osm_mon.core.settings import Config
+
 logging.basicConfig(stream=sys.stdout,
                     format='%(asctime)s %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
@@ -47,14 +51,11 @@ from osm_mon.plugins.vRealiseOps import plugin_receiver
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.database import DatabaseManager
 
-# Initialize servers
-if "BROKER_URI" in os.environ:
-    server = {'server': os.getenv("BROKER_URI")}
-else:
-    server = {'server': 'localhost:9092'}
+cfg = Config.instance()
+cfg.read_environ()
 
 # Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
+common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
                                 key_deserializer=bytes.decode,
                                 value_deserializer=bytes.decode,
                                 group_id="mon-consumer")
@@ -77,14 +78,13 @@ aws_access_credentials = AccessCredentials()
 vrops_rcvr = plugin_receiver.PluginReceiver()
 
 
-def get_vim_type(msg):
+def get_vim_type(vim_uuid):
     """Get the vim type that is required by the message."""
     try:
-        vim_uuid = json.loads(msg.value)["vim_uuid"].lower()
         credentials = database_manager.get_credentials(vim_uuid)
         return credentials.type
-    except Exception as exc:
-        log.warn("vim_type is not configured correctly; %s", exc)
+    except Exception:
+        log.exception("Error getting vim_type: ")
     return None
 
 
@@ -97,74 +97,44 @@ log.info("Listening for alarm_request and metric_request messages")
 for message in common_consumer:
     log.info("Message arrived: %s", message)
     try:
-        values = json.loads(message.value)
-        # Check the message topic
-        if message.topic == "metric_request":
-            # Check the vim desired by the message
-            vim_type = get_vim_type(message)
-
-            if vim_type == "openstack":
-                log.info("This message is for the OpenStack plugin.")
-                openstack_metrics.metric_calls(message)
-            elif vim_type == "aws":
-                log.info("This message is for the CloudWatch plugin.")
-                aws_conn = aws_connection.setEnvironment()
-                cloudwatch_metrics.metric_calls(message, aws_conn)
-
-            elif vim_type == "vmware":
-                log.info("This metric_request message is for the vROPs plugin.")
-                vrops_rcvr.consume(message)
-
-            else:
-                log.debug("vim_type is misconfigured or unsupported; %s",
-                          vim_type)
-
-        elif message.topic == "alarm_request":
-            # Check the vim desired by the message
-            vim_type = get_vim_type(message)
-            if vim_type == "openstack":
-                log.info("This message is for the OpenStack plugin.")
-                openstack_alarms.alarming(message)
+        try:
+            values = json.loads(message.value)
+        except ValueError:
+            values = yaml.safe_load(message.value)
 
-            elif vim_type == "aws":
-                log.info("This message is for the CloudWatch plugin.")
-                aws_conn = aws_connection.setEnvironment()
-                cloudwatch_alarms.alarm_calls(message, aws_conn)
-
-            elif vim_type == "vmware":
-                log.info("This alarm_request message is for the vROPs plugin.")
-                vrops_rcvr.consume(message)
-
-            else:
-                log.debug("vim_type is misconfigured or unsupported; %s",
-                          vim_type)
-
-        elif message.topic == "vim_account":
+        if message.topic == "vim_account":
             if message.key == "create" or message.key == "edit":
                 auth_manager.store_auth_credentials(values)
             if message.key == "delete":
                 auth_manager.delete_auth_credentials(values)
 
-        # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
-        elif message.topic == "access_credentials":
+        else:
             # Check the vim desired by the message
-            vim_type = get_vim_type(message)
+            vim_type = get_vim_type(values['vim_uuid'])
+            if vim_type == "openstack":
+                log.info("This message is for the OpenStack plugin.")
+                if message.topic == "metric_request":
+                    openstack_metrics.metric_calls(message)
+                if message.topic == "alarm_request":
+                    openstack_alarms.alarming(message)
 
-            if vim_type == "aws":
+            elif vim_type == "aws":
                 log.info("This message is for the CloudWatch plugin.")
-                aws_access_credentials.access_credential_calls(message)
+                aws_conn = aws_connection.setEnvironment()
+                if message.topic == "metric_request":
+                    cloudwatch_metrics.metric_calls(message, aws_conn)
+                if message.topic == "alarm_request":
+                    cloudwatch_alarms.alarm_calls(message, aws_conn)
+                if message.topic == "access_credentials":
+                    aws_access_credentials.access_credential_calls(message)
 
             elif vim_type == "vmware":
-                log.info("This access_credentials message is for the vROPs plugin.")
+                log.info("This metric_request message is for the vROPs plugin.")
                 vrops_rcvr.consume(message)
 
             else:
                 log.debug("vim_type is misconfigured or unsupported; %s",
                           vim_type)
 
-        else:
-            log.info("This topic is not relevant to any of the MON plugins.")
-
-
-    except Exception as exc:
-        log.exception("Exception: %s")
+    except Exception:
+        log.exception("Exception processing message: ")