From 5aa710b2f20055daed07dcf474ce64062ec55dfa Mon Sep 17 00:00:00 2001 From: Atul Agarwal Date: Fri, 26 Feb 2021 13:42:09 +0000 Subject: [PATCH] Fix for bug 1433 - MON will not start till MongoDB service is up Change-Id: I46361822dfaca22cdba6dbb05835ca24081ae787 Signed-off-by: Atul Agarwal --- osm_mon/cmd/common_functions.py | 55 +++++++++++++++++++++++++++++++++ osm_mon/cmd/mon_collector.py | 14 ++++++--- osm_mon/cmd/mon_dashboarder.py | 18 ++++++----- osm_mon/cmd/mon_evaluator.py | 14 ++++++--- osm_mon/cmd/mon_server.py | 16 ++++++---- 5 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 osm_mon/cmd/common_functions.py diff --git a/osm_mon/cmd/common_functions.py b/osm_mon/cmd/common_functions.py new file mode 100644 index 0000000..5e4bff8 --- /dev/null +++ b/osm_mon/cmd/common_functions.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +# This file is part of OSM Monitoring module + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pymongo +import time +import socket +import logging + + +def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): + + logging.debug("wait_till_services_are_ready()") + + if not config: + logging.info("Config information is not available") + return False + + # Check if common-db is ready + while(True): + commondb_url = config.conf["database"].get("uri") + try: + commondb = pymongo.MongoClient(commondb_url) + commondb.server_info() + break + except Exception: + logging.info("{} process is waiting for commondb to come up...".format(process_name)) + time.sleep(commondb_wait_time) + + # Check if kafka is ready + while(True): + port_in_use = False + # Logic to check kafka is ready + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("kafka", int(config.conf["message"].get("port")))) == 0: + port_in_use = True + if port_in_use: + break + else: + logging.info("{} process is waiting for kafka to come up...".format(process_name)) + time.sleep(kafka_wait_time) + + return True diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index 94c3883..902d2e7 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -27,6 +27,7 @@ import sys from osm_mon.collector.collector import Collector from osm_mon.core.config import Config +from osm_mon.cmd.common_functions import wait_till_core_services_are_ready def main(): @@ -44,11 +45,14 @@ def main(): root.addHandler(ch) log = logging.getLogger(__name__) - log.info("Starting MON Collector...") - log.debug("Config: %s", cfg.conf) - log.info("Initializing database...") - collector = Collector(cfg) - collector.collect_forever() + if wait_till_core_services_are_ready(cfg, "osm-mon-collector"): + log.info("Starting MON Collector...") + log.debug("Config: %s", cfg.conf) + log.info("Initializing database...") + collector = Collector(cfg) + collector.collect_forever() + else: + log.error("Failed to start MON Collector") if __name__ == '__main__': diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index b49b570..828c8b5 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -27,6 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.dashboarder.dashboarder import Dashboarder +from osm_mon.cmd.common_functions import wait_till_core_services_are_ready import threading @@ -45,13 +46,16 @@ def main(): root.addHandler(ch) log = logging.getLogger(__name__) - log.info("Starting MON Dashboarder...") - log.debug("Config: %s", cfg.conf) - dashboarder = Dashboarder(cfg) - log.info("Starting MON kafka Consumer...") - threading.Thread(target=dashboarder.run, args=()).start() - log.info("Starting MON Dashboarder...") - dashboarder.dashboard_forever() + if wait_till_core_services_are_ready(cfg, "osm-mon-dashboarder"): + log.info("Starting MON Dashboarder...") + log.debug("Config: %s", cfg.conf) + dashboarder = Dashboarder(cfg) + log.info("Starting MON kafka Consumer...") + threading.Thread(target=dashboarder.run, args=()).start() + log.info("Starting MON Dashboarder...") + dashboarder.dashboard_forever() + else: + log.error("Failed to start MON Dashboarder") if __name__ == '__main__': diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index ba9a420..72a1012 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -27,6 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.evaluator.evaluator import Evaluator +from osm_mon.cmd.common_functions import wait_till_core_services_are_ready def main(): @@ -44,11 +45,14 @@ def main(): root.addHandler(ch) log = logging.getLogger(__name__) - log.info("Starting MON Evaluator...") - log.debug("Config: %s", cfg.conf) - log.info("Initializing database...") - evaluator = Evaluator(cfg) - evaluator.evaluate_forever() + if wait_till_core_services_are_ready(cfg, "osm-mon-evaluator"): + log.info("Starting MON Evaluator...") + log.debug("Config: %s", cfg.conf) + log.info("Initializing database...") + evaluator = Evaluator(cfg) + evaluator.evaluate_forever() + else: + log.error("Failed to start MON Evaluator") if __name__ == '__main__': diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index e05f4b8..c296c0d 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -28,6 +28,7 @@ import sys from osm_mon.core.config import Config from osm_mon.server.server import Server +from osm_mon.cmd.common_functions import wait_till_core_services_are_ready def main(): @@ -45,12 +46,15 @@ def main(): root.addHandler(ch) log = logging.getLogger(__name__) - log.info("Starting MON Server...") - log.debug("Config: %s", cfg.conf) - log.info("Initializing database...") - loop = asyncio.get_event_loop() - server = Server(cfg, loop) - server.run() + if wait_till_core_services_are_ready(cfg, "osm-mon-server"): + log.info("Starting MON Server...") + log.debug("Config: %s", cfg.conf) + log.info("Initializing database...") + loop = asyncio.get_event_loop() + server = Server(cfg, loop) + server.run() + else: + log.error("Failed to start MON Server") if __name__ == '__main__': -- 2.17.1