From: Atul Agarwal Date: Tue, 15 Jun 2021 05:54:05 +0000 (+0000) Subject: Resolved Bug 1569 - Unable to subscribe to Kafka topics X-Git-Tag: v9.1.2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=91eee624eb1ea8ad649f056678d8dac7fcb544c6;p=osm%2FMON.git Resolved Bug 1569 - Unable to subscribe to Kafka topics Change-Id: I9b40c450ee38a5f961ed3d0f5e80f92f2064ff4d Signed-off-by: Atul Agarwal --- diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py index 3b454b8..5e23af4 100644 --- a/osm_mon/dashboarder/dashboarder.py +++ b/osm_mon/dashboarder/dashboarder.py @@ -47,14 +47,19 @@ class Dashboarder: def run(self): self.loop.run_until_complete(self.start()) - async def start(self): + async def start(self, wait_time=5): topics = ["users", "project"] - try: - await self.msg_bus.aioread(topics, self._user_msg) - except Exception as e: - # Failed to subscribe to kafka topics - log.error("Error when subscribing to topics %s", str(topics)) - log.exception("Exception %s", str(e)) + while True: + try: + await self.msg_bus.aioread(topics, self._user_msg) + break + except Exception as e: + # Failed to subscribe to kafka topics + log.error("Error when subscribing to topics %s", str(topics)) + log.exception("Exception %s", str(e)) + # Wait for some time for kaka to stabilize and then reattempt to subscribe again + time.sleep(wait_time) + log.info("Retrying to subscribe the kafka topic(s) %s", str(topics)) async def _user_msg(self, topic, key, values): log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key) diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 117c054..c5f5cc2 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -26,6 +26,7 @@ MON component in charge of CRUD operations for vim_accounts and alarms. It uses import asyncio import json import logging +import time from osm_mon.core.config import Config from osm_mon.core.message_bus_client import MessageBusClient @@ -48,16 +49,21 @@ class Server: def run(self): self.loop.run_until_complete(self.start()) - async def start(self): + async def start(self, wait_time=5): topics = [ "alarm_request" ] - try: - await self.msg_bus.aioread(topics, self._process_msg) - except Exception as e: - # Failed to subscribe to kafka topic - log.exception("Error when subscribing to topics %s", str(topics)) - raise e + while True: + try: + await self.msg_bus.aioread(topics, self._process_msg) + break + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + log.exception("Exception %s", str(e)) + # Wait for some time for kaka to stabilize and then reattempt to subscribe again + time.sleep(wait_time) + log.info("Retrying to subscribe the kafka topic(s) %s", str(topics)) async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values)