X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_mon%2Fcmd%2Fmon_utils.py;h=88e7500dcdd755454ef4d53b86f71a1bdb78c282;hb=32f92ce2d3df9e66686cebe499a51c0964c23acb;hp=9640543cf85cd658e65e5ef95a0c1ed3f6c8021e;hpb=865c35c7cdc508749bf51ebd2594996d14afd8c1;p=osm%2FMON.git diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index 9640543..88e7500 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -22,25 +22,25 @@ import kafka def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): - logging.debug("wait_till_commondb_is_ready") - while(True): + while True: commondb_url = config.conf["database"].get("uri") try: commondb = pymongo.MongoClient(commondb_url) commondb.server_info() break except Exception: - logging.info("{} process is waiting for commondb to come up...".format(process_name)) + logging.info( + "{} process is waiting for commondb to come up...".format(process_name) + ) time.sleep(commondb_wait_time) def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): - logging.debug("wait_till_kafka_is_ready") - while(True): + while True: kafka_ready = False try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -55,17 +55,25 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): == 0 ): # Get the list of topics. If kafka is not ready exception will be thrown. - consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), - bootstrap_servers=[config.conf.get("message", {}).get("host", - "kafka") + ":" + config.conf["message"] - .get("port")]) + consumer = kafka.KafkaConsumer( + group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ], + ) all_topics = consumer.topics() logging.debug("Number of topics found: %s", len(all_topics)) # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. - producer = kafka.KafkaProducer(bootstrap_servers=[config.conf.get("message", {}).get("host", - "kafka") + ":" + config.conf["message"] - .get("port")]) + producer = kafka.KafkaProducer( + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ] + ) mon_topics = ["alarm_request", "users", "project"] for mon_topic in mon_topics: producer.send(mon_topic, key=b"echo", value=b"dummy message") @@ -79,12 +87,15 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): if kafka_ready: break else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) + logging.info( + "{} process is waiting for kafka to come up...".format(process_name) + ) time.sleep(kafka_wait_time) -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): - +def wait_till_core_services_are_ready( + config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5 +): logging.debug("wait_till_core_services_are_ready") if not config: