X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcmd%2Fmon_utils.py;h=88e7500dcdd755454ef4d53b86f71a1bdb78c282;hb=32f92ce2d3df9e66686cebe499a51c0964c23acb;hp=6d383d0bce9eab505b96b6d51c2d1d4ccbaaa0db;hpb=a949ff9862c725133c5ea77ad21dd688077ee28c;p=osm%2FMON.git diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index 6d383d0..88e7500 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -19,28 +19,28 @@ import time import socket import logging import kafka -from osm_mon.core.message_bus_client import MessageBusClient -def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): +def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): logging.debug("wait_till_commondb_is_ready") - while(True): + while True: commondb_url = config.conf["database"].get("uri") try: commondb = pymongo.MongoClient(commondb_url) commondb.server_info() break except Exception: - logging.info("{} process is waiting for commondb to come up...".format(process_name)) + logging.info( + "{} process is waiting for commondb to come up...".format(process_name) + ) time.sleep(commondb_wait_time) def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): - logging.debug("wait_till_kafka_is_ready") - while(True): + while True: kafka_ready = False try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -55,18 +55,28 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): == 0 ): # Get the list of topics. If kafka is not ready exception will be thrown. - consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), - bootstrap_servers=[config.conf.get("message", {}).get("host", - "kafka") + ":" + config.conf["message"] - .get("port")]) - topics = consumer.topics() - logging.debug("Number of topics found: %s", len(topics)) + consumer = kafka.KafkaConsumer( + group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ], + ) + all_topics = consumer.topics() + logging.debug("Number of topics found: %s", len(all_topics)) # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. - msg_bus = MessageBusClient(config) - topics = ["alarm_request", "users", "project"] - for topic in topics: - msg_bus.aiowrite(topic, 'echo', 'dummy message') + producer = kafka.KafkaProducer( + bootstrap_servers=[ + config.conf.get("message", {}).get("host", "kafka") + + ":" + + config.conf["message"].get("port") + ] + ) + mon_topics = ["alarm_request", "users", "project"] + for mon_topic in mon_topics: + producer.send(mon_topic, key=b"echo", value=b"dummy message") # Kafka is ready now kafka_ready = True @@ -77,12 +87,15 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): if kafka_ready: break else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) + logging.info( + "{} process is waiting for kafka to come up...".format(process_name) + ) time.sleep(kafka_wait_time) -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): - +def wait_till_core_services_are_ready( + config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5 +): logging.debug("wait_till_core_services_are_ready") if not config: