- consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"),
- bootstrap_servers=[config.conf.get("message", {}).get("host",
- "kafka") + ":" + config.conf["message"]
- .get("port")])
- topics = consumer.topics()
- logging.debug("Number of topics found: %s", len(topics))
+ consumer = kafka.KafkaConsumer(
+ group_id=config.conf["message"].get("group_id"),
+ bootstrap_servers=[
+ config.conf.get("message", {}).get("host", "kafka")
+ + ":"
+ + config.conf["message"].get("port")
+ ],
+ )
+ all_topics = consumer.topics()
+ logging.debug("Number of topics found: %s", len(all_topics))
+
+ # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
+ producer = kafka.KafkaProducer(
+ bootstrap_servers=[
+ config.conf.get("message", {}).get("host", "kafka")
+ + ":"
+ + config.conf["message"].get("port")
+ ]
+ )
+ mon_topics = ["alarm_request", "users", "project"]
+ for mon_topic in mon_topics:
+ producer.send(mon_topic, key=b"echo", value=b"dummy message")
+
+ # Kafka is ready now