From 3c4858e0f3e3ef1bcc3ac51fd95fb6f9c0227389 Mon Sep 17 00:00:00 2001 From: Atul Agarwal Date: Tue, 6 Apr 2021 11:47:21 +0000 Subject: [PATCH] Fix for Bug 1433 Exception handling in aioread method Change-Id: I39e2c888798dd518a9ad71e3cbd41be9241dd579 Signed-off-by: palsus (cherry picked from commit a949ff9862c725133c5ea77ad21dd688077ee28c) Signed-off-by: Atul Agarwal --- osm_mon/cmd/mon_collector.py | 2 +- osm_mon/cmd/mon_dashboarder.py | 2 +- osm_mon/cmd/mon_evaluator.py | 2 +- osm_mon/cmd/mon_server.py | 2 +- osm_mon/cmd/mon_utils.py | 14 ++++++++++++-- osm_mon/dashboarder/dashboarder.py | 7 ++++++- osm_mon/server/server.py | 7 ++++++- requirements.txt | 4 ++++ 8 files changed, 32 insertions(+), 8 deletions(-) diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index d0330d4..8732269 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -54,7 +54,7 @@ def main(): collector.collect_forever() except Exception as e: log.error("Failed to start MON Collector") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Collector") diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index 97d682e..e7748cd 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -57,7 +57,7 @@ def main(): dashboarder.dashboard_forever() except Exception as e: log.error("Failed to start MON Dashboarder") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Dashboarder") diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index cc36d99..ca2df2e 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -54,7 +54,7 @@ def main(): evaluator.evaluate_forever() except Exception as e: log.error("Failed to start MON Evaluator") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Evaluator") diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index 23d7a7c..e5bca31 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -56,7 +56,7 @@ def main(): server.run() except Exception as e: log.error("Failed to start MON Server") - log.debug("Exception: %s", str(e)) + log.exception("Exception: %s", str(e)) else: log.error("Failed to start MON Server") diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py index 5b62d6c..9640543 100644 --- a/osm_mon/cmd/mon_utils.py +++ b/osm_mon/cmd/mon_utils.py @@ -59,8 +59,18 @@ def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): bootstrap_servers=[config.conf.get("message", {}).get("host", "kafka") + ":" + config.conf["message"] .get("port")]) - topics = consumer.topics() - logging.debug("Number of topics found: %s", len(topics)) + all_topics = consumer.topics() + logging.debug("Number of topics found: %s", len(all_topics)) + + # Send dummy message in kafka topics. If kafka is not ready exception will be thrown. + producer = kafka.KafkaProducer(bootstrap_servers=[config.conf.get("message", {}).get("host", + "kafka") + ":" + config.conf["message"] + .get("port")]) + mon_topics = ["alarm_request", "users", "project"] + for mon_topic in mon_topics: + producer.send(mon_topic, key=b"echo", value=b"dummy message") + + # Kafka is ready now kafka_ready = True except Exception as e: logging.info("Error when trying to get kafka status.") diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py index 35364d6..8b16988 100644 --- a/osm_mon/dashboarder/dashboarder.py +++ b/osm_mon/dashboarder/dashboarder.py @@ -48,7 +48,12 @@ class Dashboarder: async def start(self): topics = ["users", "project"] - await self.msg_bus.aioread(topics, self._user_msg) + try: + await self.msg_bus.aioread(topics, self._user_msg) + except Exception as e: + # Failed to subscribe to kafka topics + log.error("Error when subscribing to topics %s", str(topics)) + log.exception("Exception %s", str(e)) async def _user_msg(self, topic, key, values): log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key) diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 94c7479..117c054 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -52,7 +52,12 @@ class Server: topics = [ "alarm_request" ] - await self.msg_bus.aioread(topics, self._process_msg) + try: + await self.msg_bus.aioread(topics, self._process_msg) + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + raise e async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values) diff --git a/requirements.txt b/requirements.txt index b2e7aa5..c87a5ff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,5 +32,9 @@ pyvcloud==23.0.* python-ceilometerclient==2.9.* python-novaclient==12.0.* python-neutronclient==5.1.* +pymongo==3.11.* +pycrypto==2.6.* +juju==2.8.* +kubernetes==12.0.* git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc -- 2.25.1