From: palsus Date: Tue, 30 Mar 2021 07:15:02 +0000 (+0000) Subject: Fix for bug 1433 additional checks for kafka readiness X-Git-Tag: branch-sol006v331-start~4 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F66%2F10566%2F8;p=osm%2FMON.git Fix for bug 1433 additional checks for kafka readiness Change-Id: I5e4c712c35089c6b41c0eaf102e577af7ac5502b Signed-off-by: palsus --- diff --git a/osm_mon/cmd/common_functions.py b/osm_mon/cmd/common_functions.py deleted file mode 100644 index 78f9242..0000000 --- a/osm_mon/cmd/common_functions.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: utf-8 -*- - -# This file is part of OSM Monitoring module - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import pymongo -import time -import socket -import logging - - -def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): - - logging.debug("wait_till_services_are_ready()") - - if not config: - logging.info("Config information is not available") - return False - - # Check if common-db is ready - while(True): - commondb_url = config.conf["database"].get("uri") - try: - commondb = pymongo.MongoClient(commondb_url) - commondb.server_info() - break - except Exception: - logging.info("{} process is waiting for commondb to come up...".format(process_name)) - time.sleep(commondb_wait_time) - - # Check if kafka is ready - while(True): - port_in_use = False - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if ( - s.connect_ex( - ( - config.conf.get("message", {}).get("host", "kafka"), - int(config.conf["message"].get("port")), - ) - ) - == 0 - ): - port_in_use = True - except Exception as e: - logging.info("Error when trying to get kafka status.") - logging.debug("Exception when trying to get kafka status: %s", str(e)) - if port_in_use: - break - else: - logging.info("{} process is waiting for kafka to come up...".format(process_name)) - time.sleep(kafka_wait_time) - - return True diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index 902d2e7..d0330d4 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -27,7 +27,7 @@ import sys from osm_mon.collector.collector import Collector from osm_mon.core.config import Config -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -49,8 +49,12 @@ def main(): log.info("Starting MON Collector...") log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - collector = Collector(cfg) - collector.collect_forever() + try: + collector = Collector(cfg) + collector.collect_forever() + except Exception as e: + log.error("Failed to start MON Collector") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Collector") diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index 828c8b5..97d682e 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -27,7 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.dashboarder.dashboarder import Dashboarder -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready import threading @@ -49,11 +49,15 @@ def main(): if wait_till_core_services_are_ready(cfg, "osm-mon-dashboarder"): log.info("Starting MON Dashboarder...") log.debug("Config: %s", cfg.conf) - dashboarder = Dashboarder(cfg) - log.info("Starting MON kafka Consumer...") - threading.Thread(target=dashboarder.run, args=()).start() - log.info("Starting MON Dashboarder...") - dashboarder.dashboard_forever() + try: + dashboarder = Dashboarder(cfg) + log.info("Starting MON kafka Consumer...") + threading.Thread(target=dashboarder.run, args=()).start() + log.info("Starting MON Dashboarder...") + dashboarder.dashboard_forever() + except Exception as e: + log.error("Failed to start MON Dashboarder") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Dashboarder") diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index 72a1012..cc36d99 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -27,7 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.evaluator.evaluator import Evaluator -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -49,8 +49,12 @@ def main(): log.info("Starting MON Evaluator...") log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - evaluator = Evaluator(cfg) - evaluator.evaluate_forever() + try: + evaluator = Evaluator(cfg) + evaluator.evaluate_forever() + except Exception as e: + log.error("Failed to start MON Evaluator") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Evaluator") diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index c296c0d..23d7a7c 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -28,7 +28,7 @@ import sys from osm_mon.core.config import Config from osm_mon.server.server import Server -from osm_mon.cmd.common_functions import wait_till_core_services_are_ready +from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready def main(): @@ -51,8 +51,12 @@ def main(): log.debug("Config: %s", cfg.conf) log.info("Initializing database...") loop = asyncio.get_event_loop() - server = Server(cfg, loop) - server.run() + try: + server = Server(cfg, loop) + server.run() + except Exception as e: + log.error("Failed to start MON Server") + log.debug("Exception: %s", str(e)) else: log.error("Failed to start MON Server") diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py new file mode 100644 index 0000000..a5d3d07 --- /dev/null +++ b/osm_mon/cmd/mon_utils.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +# This file is part of OSM Monitoring module + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pymongo +import time +import socket +import logging +import kafka + + +def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): + + logging.debug("wait_till_commondb_is_ready") + + while(True): + commondb_url = config.conf["database"].get("uri") + try: + commondb = pymongo.MongoClient(commondb_url) + commondb.server_info() + break + except Exception: + logging.info("{} process is waiting for commondb to come up...".format(process_name)) + time.sleep(commondb_wait_time) + + +def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): + + logging.debug("wait_till_kafka_is_ready") + + while(True): + kafka_ready = False + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Verify is kafka port is up + if ( + s.connect_ex( + ( + config.conf.get("message", {}).get("host", "kafka"), + int(config.conf["message"].get("port")), + ) + ) + == 0 + ): + # Get the list of topics. If kafka is not ready exception will be thrown. + consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[config.conf.get("message", {}).get("host", + "kafka") + ":" + config.conf["message"] + .get("port")]) + topics = consumer.topics() + logging.debug("Number of topics found: %s", len(topics)) + kafka_ready = True + except Exception as e: + logging.info("Error when trying to get kafka status.") + logging.debug("Exception when trying to get kafka status: %s", str(e)) + finally: + if kafka_ready: + break + else: + logging.info("{} process is waiting for kafka to come up...".format(process_name)) + time.sleep(kafka_wait_time) + + +def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): + + logging.debug("wait_till_services_are_ready") + + if not config: + logging.info("Config information is not available") + return False + + # Check if common-db is ready + wait_till_kafka_is_ready(config, process_name, commondb_wait_time) + + # Check if kafka is ready + wait_till_kafka_is_ready(config, process_name, kafka_wait_time) + + return True