Adds support for OSMMON_DATABASE_COMMONKEY to decrypt vim passwords
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index 85e679f..3a95c76 100755 (executable)
@@ -22,6 +22,7 @@
 import json
 import logging
 import sys
+import time
 from json import JSONDecodeError
 
 import six
@@ -51,10 +52,6 @@ log = logging.getLogger(__name__)
 
 kafka_logger = logging.getLogger('kafka')
 kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-kafka_handler = logging.StreamHandler(sys.stdout)
-kafka_handler.setFormatter(kafka_formatter)
-kafka_logger.addHandler(kafka_handler)
 
 
 class CommonConsumer:
@@ -106,6 +103,7 @@ class CommonConsumer:
                     log.exception("Exception: ")
                     return
                 retries = retries + 1
+                time.sleep(2)
 
         log.info("Listening for messages...")
         for message in common_consumer:
@@ -123,6 +121,9 @@ class CommonConsumer:
 
             if message.topic == "vim_account":
                 if message.key == "create" or message.key == "edit":
+                    values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
+                                                                                 values['schema_version'],
+                                                                                 values['_id'])
                     self.auth_manager.store_auth_credentials(values)
                 if message.key == "delete":
                     self.auth_manager.delete_auth_credentials(values)
@@ -196,7 +197,7 @@ class CommonConsumer:
         key = key.replace('request', 'response')
         producer = Producer()
         producer.send(topic=topic, key=key, value=json.dumps(msg))
-        producer.flush()
+        producer.flush(timeout=5)
         producer.close()