Refactor common_db client code
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index 69abee5..fd97f83 100755 (executable)
 import json
 import logging
 import sys
+import threading
 
 import six
 import yaml
-
 from kafka import KafkaConsumer
 
+from osm_mon.common.common_db_client import CommonDbClient
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
 from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
-
+from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
+from osm_mon.plugins.CloudWatch.connection import Connection
 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-
+from osm_mon.plugins.OpenStack.Aodh import alarming
+from osm_mon.plugins.OpenStack.Gnocchi import metrics
 from osm_mon.plugins.vRealiseOps import plugin_receiver
 
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
-
-from osm_common import dbmongo
+cfg = Config.instance()
 
 logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s %(message)s',
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.INFO)
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
 
-def get_vim_type(db_manager, vim_uuid):
-    """Get the vim type that is required by the message."""
-    credentials = db_manager.get_credentials(vim_uuid)
-    return credentials.type
-
-
-def get_vdur(common_db, nsr_id, member_index, vdu_name):
-    vnfr = get_vnfr(common_db, nsr_id, member_index)
-    for vdur in vnfr['vdur']:
-        if vdur['vdu-id-ref'] == vdu_name:
-            return vdur
-    raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
 
+class CommonConsumer:
 
-def get_vnfr(common_db, nsr_id, member_index):
-    vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
-    return vnfr
+    def __init__(self):
+        cfg = Config.instance()
 
+        self.auth_manager = AuthManager()
+        self.database_manager = DatabaseManager()
+        self.database_manager.create_tables()
 
-def main():
-    cfg = Config.instance()
-    cfg.read_environ()
+        # Create OpenStack alarming and metric instances
+        self.openstack_metrics = metrics.Metrics()
+        self.openstack_alarms = alarming.Alarming()
 
-    auth_manager = AuthManager()
-    database_manager = DatabaseManager()
-    database_manager.create_tables()
+        # Create CloudWatch alarm and metric instances
+        self.cloudwatch_alarms = plugin_alarms()
+        self.cloudwatch_metrics = plugin_metrics()
+        self.aws_connection = Connection()
+        self.aws_access_credentials = AccessCredentials()
 
-    # Create OpenStack alarming and metric instances
-    openstack_metrics = metrics.Metrics()
-    openstack_alarms = alarming.Alarming()
+        # Create vROps plugin_receiver class instance
+        self.vrops_rcvr = plugin_receiver.PluginReceiver()
 
-    # Create CloudWatch alarm and metric instances
-    cloudwatch_alarms = plugin_alarms()
-    cloudwatch_metrics = plugin_metrics()
-    aws_connection = Connection()
-    aws_access_credentials = AccessCredentials()
+        log.info("Connecting to MongoDB...")
+        self.common_db = CommonDbClient()
+        log.info("Connection successful.")
 
-    # Create vROps plugin_receiver class instance
-    vrops_rcvr = plugin_receiver.PluginReceiver()
+    def get_vim_type(self, vim_uuid):
+        """Get the vim type that is required by the message."""
+        credentials = self.database_manager.get_credentials(vim_uuid)
+        return credentials.type
 
-    common_db = dbmongo.DbMongo()
-    common_db_uri = cfg.MONGO_URI.split(':')
-    common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
+    def run(self):
+        common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+                                        key_deserializer=bytes.decode,
+                                        value_deserializer=bytes.decode,
+                                        group_id="mon-consumer",
+                                        session_timeout_ms=60000,
+                                        heartbeat_interval_ms=20000)
 
-    # Initialize consumers for alarms and metrics
-    common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
-                                    key_deserializer=bytes.decode,
-                                    value_deserializer=bytes.decode,
-                                    group_id="mon-consumer")
+        topics = ['metric_request', 'alarm_request', 'vim_account']
+        common_consumer.subscribe(topics)
 
-    # Define subscribe the consumer for the plugins
-    topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-    # TODO: Remove access_credentials
-    common_consumer.subscribe(topics)
+        log.info("Listening for messages...")
+        for message in common_consumer:
+            t = threading.Thread(target=self.consume_message, args=(message,))
+            t.start()
 
-    log.info("Listening for alarm_request and metric_request messages")
-    for message in common_consumer:
+    def consume_message(self, message):
         log.info("Message arrived: %s", message)
         try:
             try:
@@ -117,61 +113,63 @@ def main():
 
             if message.topic == "vim_account":
                 if message.key == "create" or message.key == "edit":
-                    auth_manager.store_auth_credentials(values)
+                    self.auth_manager.store_auth_credentials(values)
                 if message.key == "delete":
-                    auth_manager.delete_auth_credentials(values)
+                    self.auth_manager.delete_auth_credentials(values)
 
             else:
                 # Get ns_id from message
                 # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
                 contains_list = False
                 list_index = None
-                ns_id = None
                 for k, v in six.iteritems(values):
                     if isinstance(v, dict):
                         if 'ns_id' in v:
-                            ns_id = v['ns_id']
                             contains_list = True
                             list_index = k
+                            break
                 if not contains_list and 'ns_id' in values:
                     ns_id = values['ns_id']
+                else:
+                    ns_id = values[list_index]['ns_id']
 
                 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
 
                 # Check the vim desired by the message
-                vnfr = get_vnfr(common_db, ns_id, vnf_index)
+                vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
                 vim_uuid = vnfr['vim-account-id']
-                vim_type = get_vim_type(database_manager, vim_uuid)
 
                 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
                     vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
-                    vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
+                    vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
                     if contains_list:
                         values[list_index]['resource_uuid'] = vdur['vim-id']
                     else:
                         values['resource_uuid'] = vdur['vim-id']
                     message = message._replace(value=json.dumps(values))
 
+                vim_type = self.get_vim_type(vim_uuid)
+
                 if vim_type == "openstack":
                     log.info("This message is for the OpenStack plugin.")
                     if message.topic == "metric_request":
-                        openstack_metrics.metric_calls(message, vim_uuid)
+                        self.openstack_metrics.metric_calls(message, vim_uuid)
                     if message.topic == "alarm_request":
-                        openstack_alarms.alarming(message, vim_uuid)
+                        self.openstack_alarms.alarming(message, vim_uuid)
 
                 elif vim_type == "aws":
                     log.info("This message is for the CloudWatch plugin.")
-                    aws_conn = aws_connection.setEnvironment()
+                    aws_conn = self.aws_connection.setEnvironment()
                     if message.topic == "metric_request":
-                        cloudwatch_metrics.metric_calls(message, aws_conn)
+                        self.cloudwatch_metrics.metric_calls(message, aws_conn)
                     if message.topic == "alarm_request":
-                        cloudwatch_alarms.alarm_calls(message, aws_conn)
+                        self.cloudwatch_alarms.alarm_calls(message, aws_conn)
                     if message.topic == "access_credentials":
-                        aws_access_credentials.access_credential_calls(message)
+                        self.aws_access_credentials.access_credential_calls(message)
 
                 elif vim_type == "vmware":
                     log.info("This metric_request message is for the vROPs plugin.")
-                    vrops_rcvr.consume(message)
+                    self.vrops_rcvr.consume(message, vim_uuid)
 
                 else:
                     log.debug("vim_type is misconfigured or unsupported; %s",
@@ -182,4 +180,4 @@ def main():
 
 
 if __name__ == '__main__':
-    main()
+    CommonConsumer().run()