X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcmd%2Fmon_utils.py;h=83dd130f550bee8f2a7167a955203aa8c4b6b828;hb=881f2ce1a5876782cbd939764a5bffcad28908f8;hp=6d383d0bce9eab505b96b6d51c2d1d4ccbaaa0db;hpb=a949ff9862c725133c5ea77ad21dd688077ee28c;p=osm%2FMON.git diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index 6d383d0..83dd130 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -19,20 +19,22 @@ import time import socket import logging import kafka -from osm_mon.core.message_bus_client import MessageBusClient + def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): logging.debug("wait_till_commondb_is_ready") - while(True): + while True: commondb_url = config.conf["database"].get("uri") try: commondb = pymongo.MongoClient(commondb_url) commondb.server_info() break except Exception: - logging.info("{} process is waiting for commondb to come up...".format(process_name)) + logging.info( + "{} process is waiting for commondb to come up...".format(process_name) + ) time.sleep(commondb_wait_time) @@ -40,7 +42,7 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): logging.debug("wait_till_kafka_is_ready") - while(True): + while True: kafka_ready = False try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -55,18 +57,28 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): == 0 ): # Get the list of topics. If kafka is not ready exception will be thrown. - consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), - bootstrap_servers=[config.conf.get("message", {}).get("host", - "kafka") + ":" + config.conf["message"] - .get("port")]) - topics = consumer.topics() - logging.debug("Number of topics found: %s", len(topics)) + consumer = kafka.KafkaConsumer( + group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ], + ) + all_topics = consumer.topics() + logging.debug("Number of topics found: %s", len(all_topics)) # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. - msg_bus = MessageBusClient(config) - topics = ["alarm_request", "users", "project"] - for topic in topics: - msg_bus.aiowrite(topic, 'echo', 'dummy message') + producer = kafka.KafkaProducer( + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ] + ) + mon_topics = ["alarm_request", "users", "project"] + for mon_topic in mon_topics: + producer.send(mon_topic, key=b"echo", value=b"dummy message") # Kafka is ready now kafka_ready = True @@ -77,11 +89,15 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): if kafka_ready: break else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) + logging.info( + "{} process is waiting for kafka to come up...".format(process_name) + ) time.sleep(kafka_wait_time) -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): +def wait_till_core_services_are_ready( + config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5 +): logging.debug("wait_till_core_services_are_ready")