import json
import logging
import sys
+import time
from json import JSONDecodeError
import six
kafka_logger = logging.getLogger('kafka')
kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-kafka_handler = logging.StreamHandler(sys.stdout)
-kafka_handler.setFormatter(kafka_formatter)
-kafka_logger.addHandler(kafka_handler)
class CommonConsumer:
topics = ['metric_request', 'alarm_request', 'vim_account']
common_consumer.subscribe(topics)
+ retries = 1
+ max_retries = 5
+ while True:
+ try:
+ common_consumer.poll()
+ common_consumer.seek_to_end()
+ break
+ except Exception:
+ log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
+ log.error("Retry number %d of %d", retries, max_retries)
+ if retries >= max_retries:
+ log.error("Achieved max number of retries. Logging exception and exiting...")
+ log.exception("Exception: ")
+ return
+ retries = retries + 1
+ time.sleep(2)
log.info("Listening for messages...")
for message in common_consumer:
key = key.replace('request', 'response')
producer = Producer()
producer.send(topic=topic, key=key, value=json.dumps(msg))
- producer.flush()
+ producer.flush(timeout=5)
producer.close()