Fix for bug 1433 additional checks for kafka readiness 78/10578/1
authorpalsus <subhankar.pal@aricent.com>
Tue, 30 Mar 2021 07:15:02 +0000 (07:15 +0000)
committerpalsus <subhankar.pal@aricent.com>
Thu, 1 Apr 2021 05:47:46 +0000 (06:47 +0100)
Change-Id: I5e4c712c35089c6b41c0eaf102e577af7ac5502b
Signed-off-by: palsus <subhankar.pal@aricent.com>
(cherry picked from commit b170aec221ad296cc9de06a7b6069c878211cc65)

osm_mon/cmd/common_functions.py [deleted file]
osm_mon/cmd/mon_collector.py
osm_mon/cmd/mon_dashboarder.py
osm_mon/cmd/mon_evaluator.py
osm_mon/cmd/mon_server.py
osm_mon/cmd/mon_utils.py [new file with mode: 0644]

diff --git a/osm_mon/cmd/common_functions.py b/osm_mon/cmd/common_functions.py
deleted file mode 100644 (file)
index 78f9242..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# This file is part of OSM Monitoring module
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import pymongo
-import time
-import socket
-import logging
-
-
-def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5):
-
-    logging.debug("wait_till_services_are_ready()")
-
-    if not config:
-        logging.info("Config information is not available")
-        return False
-
-    # Check if common-db is ready
-    while(True):
-        commondb_url = config.conf["database"].get("uri")
-        try:
-            commondb = pymongo.MongoClient(commondb_url)
-            commondb.server_info()
-            break
-        except Exception:
-            logging.info("{} process is waiting for commondb to come up...".format(process_name))
-            time.sleep(commondb_wait_time)
-
-    # Check if kafka is ready
-    while(True):
-        port_in_use = False
-        try:
-            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
-                if (
-                    s.connect_ex(
-                        (
-                            config.conf.get("message", {}).get("host", "kafka"),
-                            int(config.conf["message"].get("port")),
-                        )
-                    )
-                    == 0
-                ):
-                    port_in_use = True
-        except Exception as e:
-            logging.info("Error when trying to get kafka status.")
-            logging.debug("Exception when trying to get kafka status: %s", str(e))
-        if port_in_use:
-            break
-        else:
-            logging.info("{} process is waiting for kafka to come up...".format(process_name))
-            time.sleep(kafka_wait_time)
-
-    return True
index 902d2e7..d0330d4 100644 (file)
@@ -27,7 +27,7 @@ import sys
 
 from osm_mon.collector.collector import Collector
 from osm_mon.core.config import Config
-from osm_mon.cmd.common_functions import wait_till_core_services_are_ready
+from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready
 
 
 def main():
@@ -49,8 +49,12 @@ def main():
         log.info("Starting MON Collector...")
         log.debug("Config: %s", cfg.conf)
         log.info("Initializing database...")
-        collector = Collector(cfg)
-        collector.collect_forever()
+        try:
+            collector = Collector(cfg)
+            collector.collect_forever()
+        except Exception as e:
+            log.error("Failed to start MON Collector")
+            log.debug("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Collector")
 
index 828c8b5..97d682e 100644 (file)
@@ -27,7 +27,7 @@ import sys
 
 from osm_mon.core.config import Config
 from osm_mon.dashboarder.dashboarder import Dashboarder
-from osm_mon.cmd.common_functions import wait_till_core_services_are_ready
+from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready
 import threading
 
 
@@ -49,11 +49,15 @@ def main():
     if wait_till_core_services_are_ready(cfg, "osm-mon-dashboarder"):
         log.info("Starting MON Dashboarder...")
         log.debug("Config: %s", cfg.conf)
-        dashboarder = Dashboarder(cfg)
-        log.info("Starting MON kafka Consumer...")
-        threading.Thread(target=dashboarder.run, args=()).start()
-        log.info("Starting MON Dashboarder...")
-        dashboarder.dashboard_forever()
+        try:
+            dashboarder = Dashboarder(cfg)
+            log.info("Starting MON kafka Consumer...")
+            threading.Thread(target=dashboarder.run, args=()).start()
+            log.info("Starting MON Dashboarder...")
+            dashboarder.dashboard_forever()
+        except Exception as e:
+            log.error("Failed to start MON Dashboarder")
+            log.debug("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Dashboarder")
 
index 72a1012..cc36d99 100644 (file)
@@ -27,7 +27,7 @@ import sys
 
 from osm_mon.core.config import Config
 from osm_mon.evaluator.evaluator import Evaluator
-from osm_mon.cmd.common_functions import wait_till_core_services_are_ready
+from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready
 
 
 def main():
@@ -49,8 +49,12 @@ def main():
         log.info("Starting MON Evaluator...")
         log.debug("Config: %s", cfg.conf)
         log.info("Initializing database...")
-        evaluator = Evaluator(cfg)
-        evaluator.evaluate_forever()
+        try:
+            evaluator = Evaluator(cfg)
+            evaluator.evaluate_forever()
+        except Exception as e:
+            log.error("Failed to start MON Evaluator")
+            log.debug("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Evaluator")
 
index c296c0d..23d7a7c 100644 (file)
@@ -28,7 +28,7 @@ import sys
 
 from osm_mon.core.config import Config
 from osm_mon.server.server import Server
-from osm_mon.cmd.common_functions import wait_till_core_services_are_ready
+from osm_mon.cmd.mon_utils import wait_till_core_services_are_ready
 
 
 def main():
@@ -51,8 +51,12 @@ def main():
         log.debug("Config: %s", cfg.conf)
         log.info("Initializing database...")
         loop = asyncio.get_event_loop()
-        server = Server(cfg, loop)
-        server.run()
+        try:
+            server = Server(cfg, loop)
+            server.run()
+        except Exception as e:
+            log.error("Failed to start MON Server")
+            log.debug("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Server")
 
diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py
new file mode 100644 (file)
index 0000000..a5d3d07
--- /dev/null
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+
+# This file is part of OSM Monitoring module
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import pymongo
+import time
+import socket
+import logging
+import kafka
+
+
+def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5):
+
+    logging.debug("wait_till_commondb_is_ready")
+
+    while(True):
+        commondb_url = config.conf["database"].get("uri")
+        try:
+            commondb = pymongo.MongoClient(commondb_url)
+            commondb.server_info()
+            break
+        except Exception:
+            logging.info("{} process is waiting for commondb to come up...".format(process_name))
+            time.sleep(commondb_wait_time)
+
+
+def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5):
+
+    logging.debug("wait_till_kafka_is_ready")
+
+    while(True):
+        kafka_ready = False
+        try:
+            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+                # Verify is kafka port is up
+                if (
+                    s.connect_ex(
+                        (
+                            config.conf.get("message", {}).get("host", "kafka"),
+                            int(config.conf["message"].get("port")),
+                        )
+                    )
+                    == 0
+                ):
+                    # Get the list of topics. If kafka is not ready exception will be thrown.
+                    consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"),
+                                                   bootstrap_servers=[config.conf.get("message", {}).get("host",
+                                                                      "kafka") + ":" + config.conf["message"]
+                                                                      .get("port")])
+                    topics = consumer.topics()
+                    logging.debug("Number of topics found: %s", len(topics))
+                    kafka_ready = True
+        except Exception as e:
+            logging.info("Error when trying to get kafka status.")
+            logging.debug("Exception when trying to get kafka status: %s", str(e))
+        finally:
+            if kafka_ready:
+                break
+            else:
+                logging.info("{} process is waiting for kafka to come up...".format(process_name))
+                time.sleep(kafka_wait_time)
+
+
+def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5):
+
+    logging.debug("wait_till_services_are_ready")
+
+    if not config:
+        logging.info("Config information is not available")
+        return False
+
+    # Check if common-db is ready
+    wait_till_kafka_is_ready(config, process_name, commondb_wait_time)
+
+    # Check if kafka is ready
+    wait_till_kafka_is_ready(config, process_name, kafka_wait_time)
+
+    return True