Fix for Bug 1433 Exception handling in aioread method

Change-Id: I39e2c888798dd518a9ad71e3cbd41be9241dd579
Signed-off-by: palsus <subhankar.pal@aricent.com>
(cherry picked from commit a949ff9862c725133c5ea77ad21dd688077ee28c)
Signed-off-by: Atul Agarwal <Atul.Agarwal@altran.com>
diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py
index d0330d4..8732269 100644
--- a/osm_mon/cmd/mon_collector.py
+++ b/osm_mon/cmd/mon_collector.py
@@ -54,7 +54,7 @@
             collector.collect_forever()
         except Exception as e:
             log.error("Failed to start MON Collector")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Collector")
 
diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py
index 97d682e..e7748cd 100644
--- a/osm_mon/cmd/mon_dashboarder.py
+++ b/osm_mon/cmd/mon_dashboarder.py
@@ -57,7 +57,7 @@
             dashboarder.dashboard_forever()
         except Exception as e:
             log.error("Failed to start MON Dashboarder")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Dashboarder")
 
diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py
index cc36d99..ca2df2e 100644
--- a/osm_mon/cmd/mon_evaluator.py
+++ b/osm_mon/cmd/mon_evaluator.py
@@ -54,7 +54,7 @@
             evaluator.evaluate_forever()
         except Exception as e:
             log.error("Failed to start MON Evaluator")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Evaluator")
 
diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py
index 23d7a7c..e5bca31 100644
--- a/osm_mon/cmd/mon_server.py
+++ b/osm_mon/cmd/mon_server.py
@@ -56,7 +56,7 @@
             server.run()
         except Exception as e:
             log.error("Failed to start MON Server")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Server")
 
diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py
index 5b62d6c..9640543 100644
--- a/osm_mon/cmd/mon_utils.py
+++ b/osm_mon/cmd/mon_utils.py
@@ -59,8 +59,18 @@
                                                    bootstrap_servers=[config.conf.get("message", {}).get("host",
                                                                       "kafka") + ":" + config.conf["message"]
                                                                       .get("port")])
-                    topics = consumer.topics()
-                    logging.debug("Number of topics found: %s", len(topics))
+                    all_topics = consumer.topics()
+                    logging.debug("Number of topics found: %s", len(all_topics))
+
+                    # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
+                    producer = kafka.KafkaProducer(bootstrap_servers=[config.conf.get("message", {}).get("host",
+                                                                      "kafka") + ":" + config.conf["message"]
+                                                                      .get("port")])
+                    mon_topics = ["alarm_request", "users", "project"]
+                    for mon_topic in mon_topics:
+                        producer.send(mon_topic, key=b"echo", value=b"dummy message")
+
+                    # Kafka is ready now
                     kafka_ready = True
         except Exception as e:
             logging.info("Error when trying to get kafka status.")
diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py
index 35364d6..8b16988 100644
--- a/osm_mon/dashboarder/dashboarder.py
+++ b/osm_mon/dashboarder/dashboarder.py
@@ -48,7 +48,12 @@
 
     async def start(self):
         topics = ["users", "project"]
-        await self.msg_bus.aioread(topics, self._user_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._user_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topics
+            log.error("Error when subscribing to topics %s", str(topics))
+            log.exception("Exception %s", str(e))
 
     async def _user_msg(self, topic, key, values):
         log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key)
diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py
index 94c7479..117c054 100755
--- a/osm_mon/server/server.py
+++ b/osm_mon/server/server.py
@@ -52,7 +52,12 @@
         topics = [
             "alarm_request"
         ]
-        await self.msg_bus.aioread(topics, self._process_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._process_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topic
+            log.exception("Error when subscribing to topics %s", str(topics))
+            raise e
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)