Fix for Bug 1433 Exception handling in aioread method 47/10647/7 branch-bug1511-start v9.1.1 v9.1.1rc1
authorAtul Agarwal <atul.agarwal@altran.com>
Tue, 6 Apr 2021 11:47:21 +0000 (11:47 +0000)
committerAtul Agarwal <Atul.Agarwal@altran.com>
Mon, 19 Apr 2021 06:49:58 +0000 (06:49 +0000)
Change-Id: I39e2c888798dd518a9ad71e3cbd41be9241dd579
Signed-off-by: palsus <subhankar.pal@aricent.com>
(cherry picked from commit a949ff9862c725133c5ea77ad21dd688077ee28c)
Signed-off-by: Atul Agarwal <Atul.Agarwal@altran.com>
osm_mon/cmd/mon_collector.py
osm_mon/cmd/mon_dashboarder.py
osm_mon/cmd/mon_evaluator.py
osm_mon/cmd/mon_server.py
osm_mon/cmd/mon_utils.py
osm_mon/dashboarder/dashboarder.py
osm_mon/server/server.py
requirements.txt

index d0330d4..8732269 100644 (file)
@@ -54,7 +54,7 @@ def main():
             collector.collect_forever()
         except Exception as e:
             log.error("Failed to start MON Collector")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Collector")
 
index 97d682e..e7748cd 100644 (file)
@@ -57,7 +57,7 @@ def main():
             dashboarder.dashboard_forever()
         except Exception as e:
             log.error("Failed to start MON Dashboarder")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Dashboarder")
 
index cc36d99..ca2df2e 100644 (file)
@@ -54,7 +54,7 @@ def main():
             evaluator.evaluate_forever()
         except Exception as e:
             log.error("Failed to start MON Evaluator")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Evaluator")
 
index 23d7a7c..e5bca31 100644 (file)
@@ -56,7 +56,7 @@ def main():
             server.run()
         except Exception as e:
             log.error("Failed to start MON Server")
-            log.debug("Exception: %s", str(e))
+            log.exception("Exception: %s", str(e))
     else:
         log.error("Failed to start MON Server")
 
index 5b62d6c..9640543 100644 (file)
@@ -59,8 +59,18 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5):
                                                    bootstrap_servers=[config.conf.get("message", {}).get("host",
                                                                       "kafka") + ":" + config.conf["message"]
                                                                       .get("port")])
-                    topics = consumer.topics()
-                    logging.debug("Number of topics found: %s", len(topics))
+                    all_topics = consumer.topics()
+                    logging.debug("Number of topics found: %s", len(all_topics))
+
+                    # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
+                    producer = kafka.KafkaProducer(bootstrap_servers=[config.conf.get("message", {}).get("host",
+                                                                      "kafka") + ":" + config.conf["message"]
+                                                                      .get("port")])
+                    mon_topics = ["alarm_request", "users", "project"]
+                    for mon_topic in mon_topics:
+                        producer.send(mon_topic, key=b"echo", value=b"dummy message")
+
+                    # Kafka is ready now
                     kafka_ready = True
         except Exception as e:
             logging.info("Error when trying to get kafka status.")
index 35364d6..8b16988 100644 (file)
@@ -48,7 +48,12 @@ class Dashboarder:
 
     async def start(self):
         topics = ["users", "project"]
-        await self.msg_bus.aioread(topics, self._user_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._user_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topics
+            log.error("Error when subscribing to topics %s", str(topics))
+            log.exception("Exception %s", str(e))
 
     async def _user_msg(self, topic, key, values):
         log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key)
index 94c7479..117c054 100755 (executable)
@@ -52,7 +52,12 @@ class Server:
         topics = [
             "alarm_request"
         ]
-        await self.msg_bus.aioread(topics, self._process_msg)
+        try:
+            await self.msg_bus.aioread(topics, self._process_msg)
+        except Exception as e:
+            # Failed to subscribe to kafka topic
+            log.exception("Error when subscribing to topics %s", str(topics))
+            raise e
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)
index b2e7aa5..c87a5ff 100644 (file)
@@ -32,5 +32,9 @@ pyvcloud==23.0.*
 python-ceilometerclient==2.9.*
 python-novaclient==12.0.*
 python-neutronclient==5.1.*
+pymongo==3.11.*
+pycrypto==2.6.*
+juju==2.8.*
+kubernetes==12.0.*
 git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
 git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc