import json
import logging
import sys
+import time
from json import JSONDecodeError
import six
kafka_logger = logging.getLogger('kafka')
kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-kafka_handler = logging.StreamHandler(sys.stdout)
-kafka_handler.setFormatter(kafka_formatter)
-kafka_logger.addHandler(kafka_handler)
class CommonConsumer:
log.exception("Exception: ")
return
retries = retries + 1
+ time.sleep(2)
log.info("Listening for messages...")
for message in common_consumer:
if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
+ values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
+ values['schema_version'],
+ values['_id'])
self.auth_manager.store_auth_credentials(values)
if message.key == "delete":
self.auth_manager.delete_auth_credentials(values)
key = key.replace('request', 'response')
producer = Producer()
producer.send(topic=topic, key=key, value=json.dumps(msg))
- producer.flush()
+ producer.flush(timeout=5)
producer.close()