From b170aec221ad296cc9de06a7b6069c878211cc65 Mon Sep 17 00:00:00 2001 From: palsus Date: Tue, 30 Mar 2021 07:15:02 +0000 Subject: [PATCH] Fix for bug 1433 additional checks for kafka readiness Change-Id: I5e4c712c35089c6b41c0eaf102e577af7ac5502b Signed-off-by: palsus --- osm_mon/cmd/mon_collector.py | 10 ++-- osm_mon/cmd/mon_dashboarder.py | 16 +++--- osm_mon/cmd/mon_evaluator.py | 10 ++-- osm_mon/cmd/mon_server.py | 10 ++-- .../cmd/{common_functions.py => mon_utils.py} | 54 +++++++++++++------ 5 files changed, 70 insertions(+), 30 deletions(-) rename osm_mon/cmd/{common_functions.py => mon_utils.py} (57%) diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index 902d2e7..d0330d4 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -27,7 +27,7 @@ import sys from osm_mon.collector.collector import Collector from osm_mon.core.config import Config -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -49,8 +49,12 @@ def main(): log.info("Starting MON Collector...") log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - collector = Collector(cfg) - collector.collect_forever() + try: + collector = Collector(cfg) + collector.collect_forever() + except Exception as e: + log.error("Failed to start MON Collector") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Collector") diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index 828c8b5..97d682e 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -27,7 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.dashboarder.dashboarder import Dashboarder -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready import threading @@ -49,11 +49,15 @@ def main(): if wait_till_core_services_are_ready(cfg, "osm-mon-dashboarder"): log.info("Starting MON Dashboarder...") log.debug("Config: %s", cfg.conf) - dashboarder = Dashboarder(cfg) - log.info("Starting MON kafka Consumer...") - threading.Thread(target=dashboarder.run, args=()).start() - log.info("Starting MON Dashboarder...") - dashboarder.dashboard_forever() + try: + dashboarder = Dashboarder(cfg) + log.info("Starting MON kafka Consumer...") + threading.Thread(target=dashboarder.run, args=()).start() + log.info("Starting MON Dashboarder...") + dashboarder.dashboard_forever() + except Exception as e: + log.error("Failed to start MON Dashboarder") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Dashboarder") diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index 72a1012..cc36d99 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -27,7 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.evaluator.evaluator import Evaluator -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -49,8 +49,12 @@ def main(): log.info("Starting MON Evaluator...") log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - evaluator = Evaluator(cfg) - evaluator.evaluate_forever() + try: + evaluator = Evaluator(cfg) + evaluator.evaluate_forever() + except Exception as e: + log.error("Failed to start MON Evaluator") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Evaluator") diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index c296c0d..23d7a7c 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -28,7 +28,7 @@ import sys from osm_mon.core.config import Config from osm_mon.server.server import Server -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -51,8 +51,12 @@ def main(): log.debug("Config: %s", cfg.conf) log.info("Initializing database...") loop = asyncio.get_event_loop() - server = Server(cfg, loop) - server.run() + try: + server = Server(cfg, loop) + server.run() + except Exception as e: + log.error("Failed to start MON Server") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Server") diff --git a/osm_mon/cmd/common_functions.py b/osm_mon/cmd/mon_utils.py similarity index 57% rename from osm_mon/cmd/common_functions.py rename to osm_mon/cmd/mon_utils.py index 78f9242..a5d3d07 100644 --- a/osm_mon/cmd/common_functions.py +++ b/osm_mon/cmd/mon_utils.py @@ -18,17 +18,13 @@ import pymongo import time import socket import logging +import kafka -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): - - logging.debug("wait_till_services_are_ready()") +def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): - if not config: - logging.info("Config information is not available") - return False + logging.debug("wait_till_commondb_is_ready") - # Check if common-db is ready while(True): commondb_url = config.conf["database"].get("uri") try: @@ -39,11 +35,16 @@ def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_w logging.info("{} process is waiting for commondb to come up...".format(process_name)) time.sleep(commondb_wait_time) - # Check if kafka is ready + +def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): + + logging.debug("wait_till_kafka_is_ready") + while(True): - port_in_use = False + kafka_ready = False try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Verify is kafka port is up if ( s.connect_ex( ( @@ -53,14 +54,37 @@ def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_w ) == 0 ): - port_in_use = True + # Get the list of topics. If kafka is not ready exception will be thrown. + consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[config.conf.get("message", {}).get("host", + "kafka") + ":" + config.conf["message"] + .get("port")]) + topics = consumer.topics() + logging.debug("Number of topics found: %s", len(topics)) + kafka_ready = True except Exception as e: logging.info("Error when trying to get kafka status.") logging.debug("Exception when trying to get kafka status: %s", str(e)) - if port_in_use: - break - else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) - time.sleep(kafka_wait_time) + finally: + if kafka_ready: + break + else: + logging.info("{} process is waiting for kafka to come up...".format(process_name)) + time.sleep(kafka_wait_time) + + +def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): + + logging.debug("wait_till_services_are_ready") + + if not config: + logging.info("Config information is not available") + return False + + # Check if common-db is ready + wait_till_kafka_is_ready(config, process_name, commondb_wait_time) + + # Check if kafka is ready + wait_till_kafka_is_ready(config, process_name, kafka_wait_time) return True -- 2.25.1