From: palsus Date: Tue, 6 Apr 2021 11:47:21 +0000 (+0000) Subject: Fix for Bug 1433 Exception handling in aioread method X-Git-Tag: branch-sol006v331-start~1 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FMON.git;a=commitdiff_plain;h=a949ff9862c725133c5ea77ad21dd688077ee28c Fix for Bug 1433 Exception handling in aioread method Change-Id: I39e2c888798dd518a9ad71e3cbd41be9241dd579 Signed-off-by: palsus --- diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index d0330d4..8732269 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -54,7 +54,7 @@ def main(): collector.collect_forever() except Exception as e: log.error("Failed to start MON Collector") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Collector") diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index 97d682e..e7748cd 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -57,7 +57,7 @@ def main(): dashboarder.dashboard_forever() except Exception as e: log.error("Failed to start MON Dashboarder") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Dashboarder") diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index cc36d99..ca2df2e 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -54,7 +54,7 @@ def main(): evaluator.evaluate_forever() except Exception as e: log.error("Failed to start MON Evaluator") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Evaluator") diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index 23d7a7c..e5bca31 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -56,7 +56,7 @@ def main(): server.run() except Exception as e: log.error("Failed to start MON Server") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Server") diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index 5b62d6c..6d383d0 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -19,7 +19,7 @@ import time import socket import logging import kafka - +from osm_mon.core.message_bus_client import MessageBusClient def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): @@ -61,6 +61,14 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): .get("port")]) topics = consumer.topics() logging.debug("Number of topics found: %s", len(topics)) + + # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. + msg_bus = MessageBusClient(config) + topics = ["alarm_request", "users", "project"] + for topic in topics: + msg_bus.aiowrite(topic, 'echo', 'dummy message') + + # Kafka is ready now kafka_ready = True except Exception as e: logging.info("Error when trying to get kafka status.") diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py index 35364d6..8b16988 100644 --- a/osm_mon/dashboarder/dashboarder.py +++ b/osm_mon/dashboarder/dashboarder.py @@ -48,7 +48,12 @@ class Dashboarder: async def start(self): topics = ["users", "project"] - await self.msg_bus.aioread(topics, self._user_msg) + try: + await self.msg_bus.aioread(topics, self._user_msg) + except Exception as e: + # Failed to subscribe to kafka topics + log.error("Error when subscribing to topics %s", str(topics)) + log.exception("Exception %s", str(e)) async def _user_msg(self, topic, key, values): log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key) diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 94c7479..117c054 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -52,7 +52,12 @@ class Server: topics = [ "alarm_request" ] - await self.msg_bus.aioread(topics, self._process_msg) + try: + await self.msg_bus.aioread(topics, self._process_msg) + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + raise e async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values)