-def _is_kafka_ok():
- async def _test_kafka(loop):
- cfg = Config.instance()
- consumer = AIOKafkaConsumer(
- 'healthcheck',
- loop=loop, bootstrap_servers=cfg.BROKER_URI)
- await consumer.start()
- await consumer.stop()
-
- try:
- loop = asyncio.get_event_loop()
- loop.run_until_complete(_test_kafka(loop))
- return True
- except Exception:
- log.exception("MON can not connect to Kafka")
- return False
-
-
-if __name__ == '__main__':