bootstrap_servers=[config.conf.get("message", {}).get("host",
"kafka") + ":" + config.conf["message"]
.get("port")])
- topics = consumer.topics()
- logging.debug("Number of topics found: %s", len(topics))
+ all_topics = consumer.topics()
+ logging.debug("Number of topics found: %s", len(all_topics))
+
+ # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
+ producer = kafka.KafkaProducer(bootstrap_servers=[config.conf.get("message", {}).get("host",
+ "kafka") + ":" + config.conf["message"]
+ .get("port")])
+ mon_topics = ["alarm_request", "users", "project"]
+ for mon_topic in mon_topics:
+ producer.send(mon_topic, key=b"echo", value=b"dummy message")
+
+ # Kafka is ready now
kafka_ready = True
except Exception as e:
logging.info("Error when trying to get kafka status.")
def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5):
- logging.debug("wait_till_services_are_ready")
+ logging.debug("wait_till_core_services_are_ready")
if not config:
logging.info("Config information is not available")
return False
# Check if common-db is ready
- wait_till_kafka_is_ready(config, process_name, commondb_wait_time)
+ wait_till_commondb_is_ready(config, process_name, commondb_wait_time)
# Check if kafka is ready
wait_till_kafka_is_ready(config, process_name, kafka_wait_time)