Fix for Bug 1433 Exception handling in aioread method 81/10581/23
authorpalsus <subhankar.pal@aricent.com>
Tue, 6 Apr 2021 11:47:21 +0000 (11:47 +0000)
committerpalsus <subhankar.pal@aricent.com>
Thu, 15 Apr 2021 17:04:37 +0000 (17:04 +0000)
Change-Id: I39e2c888798dd518a9ad71e3cbd41be9241dd579
Signed-off-by: palsus <subhankar.pal@aricent.com>
osm_mon/cmd/mon_collector.py
osm_mon/cmd/mon_dashboarder.py
osm_mon/cmd/mon_evaluator.py
osm_mon/cmd/mon_server.py
osm_mon/cmd/mon_utils.py
osm_mon/dashboarder/dashboarder.py
osm_mon/server/server.py

index d0330d4..8732269 100644 (file)
@@ -54,7 +54,7 @@ def main():
             collector.collect_forever()
         except Exception as e:
             log.error("Failed to start MON Collector")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Collector")
 
index 97d682e..e7748cd 100644 (file)
@@ -57,7 +57,7 @@ def main():
             dashboarder.dashboard_forever()
         except Exception as e:
             log.error("Failed to start MON Dashboarder")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Dashboarder")
 
index cc36d99..ca2df2e 100644 (file)
@@ -54,7 +54,7 @@ def main():
             evaluator.evaluate_forever()
         except Exception as e:
             log.error("Failed to start MON Evaluator")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Evaluator")
 
index 23d7a7c..e5bca31 100644 (file)
@@ -56,7 +56,7 @@ def main():
             server.run()
         except Exception as e:
             log.error("Failed to start MON Server")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Server")
 
index 5b62d6c..6d383d0 100644 (file)
@@ -19,7 +19,7 @@ import time
 import socket
 import logging
 import kafka
-
+from osm_mon.core.message_bus_client import MessageBusClient
 
 def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5):
 
@@ -61,6 +61,14 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5):
                                                                       .get("port")])
                     topics = consumer.topics()
                     logging.debug("Number of topics found: %s", len(topics))
+
+                    # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
+                    msg_bus = MessageBusClient(config)
+                    topics = ["alarm_request", "users", "project"]
+                    for topic in topics:
+                        msg_bus.aiowrite(topic, 'echo', 'dummy message')
+
+                    # Kafka is ready now
                     kafka_ready = True
         except Exception as e:
             logging.info("Error when trying to get kafka status.")
index 35364d6..8b16988 100644 (file)
@@ -48,7 +48,12 @@ class Dashboarder:
 
     async def start(self):
         topics = ["users", "project"]
-        await self.msg_bus.aioread(topics, self._user_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._user_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topics
+            log.error("Error when subscribing to topics %s", str(topics))
+            log.exception("Exception %s", str(e))
 
     async def _user_msg(self, topic, key, values):
         log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key)
index 94c7479..117c054 100755 (executable)
@@ -52,7 +52,12 @@ class Server:
         topics = [
             "alarm_request"
         ]
-        await self.msg_bus.aioread(topics, self._process_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._process_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topic
+            log.exception("Error when subscribing to topics %s", str(topics))
+            raise e
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)