Removes threading from common_consumer
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index 25aee43..0b61e1e 100755 (executable)
 
 import json
 import logging
-import os
 import sys
-import yaml
-
-logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s %(message)s',
-                    datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.INFO)
-log = logging.getLogger(__name__)
-
-sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..')))
-
-from kafka import KafkaConsumer
+from json import JSONDecodeError
 
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
+import six
+import yaml
 
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus.consumer import Consumer
+from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.settings import Config
+from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
+from osm_mon.plugins.CloudWatch.connection import Connection
 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-
+from osm_mon.plugins.OpenStack.Aodh import alarm_handler
+from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
 from osm_mon.plugins.vRealiseOps import plugin_receiver
 
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
+cfg = Config.instance()
 
-# Initialize servers
-if "BROKER_URI" in os.environ:
-    server = {'server': os.getenv("BROKER_URI")}
-else:
-    server = {'server': 'localhost:9092'}
-
-# Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(bootstrap_servers=server['server'],
-                                key_deserializer=bytes.decode,
-                                value_deserializer=bytes.decode,
-                                group_id="mon-consumer")
-
-auth_manager = AuthManager()
-database_manager = DatabaseManager()
-database_manager.create_tables()
-
-# Create OpenStack alarming and metric instances
-openstack_metrics = metrics.Metrics()
-openstack_alarms = alarming.Alarming()
-
-# Create CloudWatch alarm and metric instances
-cloudwatch_alarms = plugin_alarms()
-cloudwatch_metrics = plugin_metrics()
-aws_connection = Connection()
-aws_access_credentials = AccessCredentials()
-
-# Create vROps plugin_receiver class instance
-vrops_rcvr = plugin_receiver.PluginReceiver()
-
-
-def get_vim_type(msg):
-    """Get the vim type that is required by the message."""
-    try:
-        vim_uuid = json.loads(msg.value)["vim_uuid"].lower()
-        credentials = database_manager.get_credentials(vim_uuid)
-        return credentials.type
-    except Exception as exc:
-        log.warn("vim_type is not configured correctly; %s", exc)
-    return None
+logging.basicConfig(stream=sys.stdout,
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+                    datefmt='%m/%d/%Y %I:%M:%S %p',
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
 
-# Define subscribe the consumer for the plugins
-topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-# TODO: Remove access_credentials
-common_consumer.subscribe(topics)
 
-log.info("Listening for alarm_request and metric_request messages")
-for message in common_consumer:
-    log.info("Message arrived: %s", message)
-    try:
-        try:
-            values = json.loads(message.value)
-        except:
-            values = yaml.safe_load(message.value)
-        # Check the message topic
-        if message.topic == "metric_request":
-            # Check the vim desired by the message
-            vim_type = get_vim_type(message)
-
-            if vim_type == "openstack":
-                log.info("This message is for the OpenStack plugin.")
-                openstack_metrics.metric_calls(message)
-            elif vim_type == "aws":
-                log.info("This message is for the CloudWatch plugin.")
-                aws_conn = aws_connection.setEnvironment()
-                cloudwatch_metrics.metric_calls(message, aws_conn)
-
-            elif vim_type == "vmware":
-                log.info("This metric_request message is for the vROPs plugin.")
-                vrops_rcvr.consume(message)
+class CommonConsumer:
 
-            else:
-                log.debug("vim_type is misconfigured or unsupported; %s",
-                          vim_type)
+    def __init__(self):
+        self.auth_manager = AuthManager()
+        self.database_manager = DatabaseManager()
+        self.database_manager.create_tables()
 
-        elif message.topic == "alarm_request":
-            # Check the vim desired by the message
-            vim_type = get_vim_type(message)
-            if vim_type == "openstack":
-                log.info("This message is for the OpenStack plugin.")
-                openstack_alarms.alarming(message)
+        # Create OpenStack alarming and metric instances
+        self.openstack_metrics = metric_handler.OpenstackMetricHandler()
+        self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
 
-            elif vim_type == "aws":
-                log.info("This message is for the CloudWatch plugin.")
-                aws_conn = aws_connection.setEnvironment()
-                cloudwatch_alarms.alarm_calls(message, aws_conn)
+        # Create CloudWatch alarm and metric instances
+        self.cloudwatch_alarms = plugin_alarms()
+        self.cloudwatch_metrics = plugin_metrics()
+        self.aws_connection = Connection()
+        self.aws_access_credentials = AccessCredentials()
 
-            elif vim_type == "vmware":
-                log.info("This alarm_request message is for the vROPs plugin.")
-                vrops_rcvr.consume(message)
+        # Create vROps plugin_receiver class instance
+        self.vrops_rcvr = plugin_receiver.PluginReceiver()
 
-            else:
-                log.debug("vim_type is misconfigured or unsupported; %s",
-                          vim_type)
+        log.info("Connecting to MongoDB...")
+        self.common_db = CommonDbClient()
+        log.info("Connection successful.")
 
-        elif message.topic == "vim_account":
-            if message.key == "create" or message.key == "edit":
-                auth_manager.store_auth_credentials(values)
-            if message.key == "delete":
-                auth_manager.delete_auth_credentials(values)
+    def get_vim_type(self, vim_uuid):
+        """Get the vim type that is required by the message."""
+        credentials = self.database_manager.get_credentials(vim_uuid)
+        return credentials.type
 
-        # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
-        elif message.topic == "access_credentials":
-            # Check the vim desired by the message
-            vim_type = get_vim_type(message)
+    def run(self):
+        common_consumer = Consumer("mon-consumer")
 
-            if vim_type == "aws":
-                log.info("This message is for the CloudWatch plugin.")
-                aws_access_credentials.access_credential_calls(message)
+        topics = ['metric_request', 'alarm_request', 'vim_account']
+        common_consumer.subscribe(topics)
 
-            elif vim_type == "vmware":
-                log.info("This access_credentials message is for the vROPs plugin.")
-                vrops_rcvr.consume(message)
+        log.info("Listening for messages...")
+        for message in common_consumer:
+            self.consume_message(message)
 
-            else:
-                log.debug("vim_type is misconfigured or unsupported; %s",
-                          vim_type)
+    def consume_message(self, message):
+        log.info("Message arrived: %s", message)
+        try:
+            try:
+                values = json.loads(message.value)
+            except JSONDecodeError:
+                values = yaml.safe_load(message.value)
 
-        else:
-            log.info("This topic is not relevant to any of the MON plugins.")
+            response = None
 
+            if message.topic == "vim_account":
+                if message.key == "create" or message.key == "edit":
+                    self.auth_manager.store_auth_credentials(values)
+                if message.key == "delete":
+                    self.auth_manager.delete_auth_credentials(values)
 
-    except Exception as exc:
-        log.exception("Exception: %s")
+            else:
+                # Get ns_id from message
+                # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
+                contains_list = False
+                list_index = None
+                for k, v in six.iteritems(values):
+                    if isinstance(v, dict):
+                        if 'ns_id' in v:
+                            contains_list = True
+                            list_index = k
+                            break
+                if not contains_list and 'ns_id' in values:
+                    ns_id = values['ns_id']
+                else:
+                    ns_id = values[list_index]['ns_id']
+
+                vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
+
+                # Check the vim desired by the message
+                vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
+                vim_uuid = vnfr['vim-account-id']
+
+                if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
+                    vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
+                    vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
+                    if contains_list:
+                        values[list_index]['resource_uuid'] = vdur['vim-id']
+                    else:
+                        values['resource_uuid'] = vdur['vim-id']
+                    message = message._replace(value=json.dumps(values))
+
+                vim_type = self.get_vim_type(vim_uuid)
+
+                if vim_type == "openstack":
+                    log.info("This message is for the OpenStack plugin.")
+                    if message.topic == "metric_request":
+                        response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
+                    if message.topic == "alarm_request":
+                        response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
+
+                elif vim_type == "aws":
+                    log.info("This message is for the CloudWatch plugin.")
+                    aws_conn = self.aws_connection.setEnvironment()
+                    if message.topic == "metric_request":
+                        response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
+                    if message.topic == "alarm_request":
+                        response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
+
+                elif vim_type == "vmware":
+                    log.info("This metric_request message is for the vROPs plugin.")
+                    if message.topic == "metric_request":
+                        response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
+                    if message.topic == "alarm_request":
+                        response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
+
+                else:
+                    log.debug("vim_type is misconfigured or unsupported; %s",
+                              vim_type)
+            if response:
+                self._publish_response(message.topic, message.key, response)
+
+        except Exception:
+            log.exception("Exception processing message: ")
+
+    def _publish_response(self, topic: str, key: str, msg: dict):
+        topic = topic.replace('request', 'response')
+        key = key.replace('request', 'response')
+        producer = Producer()
+        producer.send(topic=topic, key=key, value=json.dumps(msg))
+        producer.flush()
+        producer.close()
+
+
+if __name__ == '__main__':
+    CommonConsumer().run()