kafka_logger = logging.getLogger('kafka')
kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-kafka_handler = logging.StreamHandler(sys.stdout)
-kafka_handler.setFormatter(kafka_formatter)
-kafka_logger.addHandler(kafka_handler)
class CommonConsumer:
key = key.replace('request', 'response')
producer = Producer()
producer.send(topic=topic, key=key, value=json.dumps(msg))
- producer.flush()
+ producer.flush(timeout=5)
producer.close()