X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcmd%2Fmon_utils.py;h=83dd130f550bee8f2a7167a955203aa8c4b6b828;hb=refs%2Fchanges%2F03%2F10803%2F1;hp=a5d3d07bc6d70527ca378dc84665ebf367433e19;hpb=b170aec221ad296cc9de06a7b6069c878211cc65;p=osm%2FMON.git diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index a5d3d07..83dd130 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -25,14 +25,16 @@ def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_ti logging.debug("wait_till_commondb_is_ready") - while(True): + while True: commondb_url = config.conf["database"].get("uri") try: commondb = pymongo.MongoClient(commondb_url) commondb.server_info() break except Exception: - logging.info("{} process is waiting for commondb to come up...".format(process_name)) + logging.info( + "{} process is waiting for commondb to come up...".format(process_name) + ) time.sleep(commondb_wait_time) @@ -40,7 +42,7 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): logging.debug("wait_till_kafka_is_ready") - while(True): + while True: kafka_ready = False try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -55,12 +57,30 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): == 0 ): # Get the list of topics. If kafka is not ready exception will be thrown. - consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), - bootstrap_servers=[config.conf.get("message", {}).get("host", - "kafka") + ":" + config.conf["message"] - .get("port")]) - topics = consumer.topics() - logging.debug("Number of topics found: %s", len(topics)) + consumer = kafka.KafkaConsumer( + group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ], + ) + all_topics = consumer.topics() + logging.debug("Number of topics found: %s", len(all_topics)) + + # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. + producer = kafka.KafkaProducer( + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ] + ) + mon_topics = ["alarm_request", "users", "project"] + for mon_topic in mon_topics: + producer.send(mon_topic, key=b"echo", value=b"dummy message") + + # Kafka is ready now kafka_ready = True except Exception as e: logging.info("Error when trying to get kafka status.") @@ -69,20 +89,24 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): if kafka_ready: break else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) + logging.info( + "{} process is waiting for kafka to come up...".format(process_name) + ) time.sleep(kafka_wait_time) -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): +def wait_till_core_services_are_ready( + config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5 +): - logging.debug("wait_till_services_are_ready") + logging.debug("wait_till_core_services_are_ready") if not config: logging.info("Config information is not available") return False # Check if common-db is ready - wait_till_kafka_is_ready(config, process_name, commondb_wait_time) + wait_till_commondb_is_ready(config, process_name, commondb_wait_time) # Check if kafka is ready wait_till_kafka_is_ready(config, process_name, kafka_wait_time)