Increases Kafka session_timeout_ms and heartbeat_interval_ms
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index 60ad313..3af2d4a 100755 (executable)
@@ -40,12 +40,21 @@ from osm_mon.plugins.OpenStack.Aodh import alarming
 from osm_mon.plugins.OpenStack.Gnocchi import metrics
 from osm_mon.plugins.vRealiseOps import plugin_receiver
 
+cfg = Config.instance()
+
 logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s %(message)s',
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.INFO)
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 
 class CommonConsumer:
 
@@ -75,17 +84,6 @@ class CommonConsumer:
         self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
         log.info("Connection successful.")
 
-        # Initialize consumers for alarms and metrics
-        self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
-                                             key_deserializer=bytes.decode,
-                                             value_deserializer=bytes.decode,
-                                             group_id="mon-consumer")
-
-        # Define subscribe the consumer for the plugins
-        topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-        # TODO: Remove access_credentials
-        self.common_consumer.subscribe(topics)
-
     def get_vim_type(self, vim_uuid):
         """Get the vim type that is required by the message."""
         credentials = self.database_manager.get_credentials(vim_uuid)
@@ -94,19 +92,29 @@ class CommonConsumer:
     def get_vdur(self, nsr_id, member_index, vdu_name):
         vnfr = self.get_vnfr(nsr_id, member_index)
         for vdur in vnfr['vdur']:
-            if vdur['vdu-id-ref'] == vdu_name:
+            if vdur['name'] == vdu_name:
                 return vdur
         raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
                          vdu_name)
 
     def get_vnfr(self, nsr_id, member_index):
-        vnfr = self.common_db.get_one(table="vnfrs",
-                                      filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+        vnfr = self.common_db.get_one("vnfrs",
+                                      {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
         return vnfr
 
     def run(self):
+        common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+                                        key_deserializer=bytes.decode,
+                                        value_deserializer=bytes.decode,
+                                        group_id="mon-consumer",
+                                        session_timeout_ms=60000,
+                                        heartbeat_interval_ms=20000)
+
+        topics = ['metric_request', 'alarm_request', 'vim_account']
+        common_consumer.subscribe(topics)
+
         log.info("Listening for messages...")
-        for message in self.common_consumer:
+        for message in common_consumer:
             t = threading.Thread(target=self.consume_message, args=(message,))
             t.start()
 
@@ -176,7 +184,7 @@ class CommonConsumer:
 
                 elif vim_type == "vmware":
                     log.info("This metric_request message is for the vROPs plugin.")
-                    self.vrops_rcvr.consume(message)
+                    self.vrops_rcvr.consume(message,vim_uuid)
 
                 else:
                     log.debug("vim_type is misconfigured or unsupported; %s",