Resolved Bug 1569 - Unable to subscribe to Kafka topics 85/10985/3
authorAtul Agarwal <atul.agarwal@altran.com>
Mon, 14 Jun 2021 10:46:09 +0000 (10:46 +0000)
committerAtul Agarwal <atul.agarwal@altran.com>
Mon, 14 Jun 2021 14:52:20 +0000 (14:52 +0000)
Change-Id: I6386869b855d559273b9e82bafa1c69aca43cbe9
Signed-off-by: Atul Agarwal <atul.agarwal@altran.com>
osm_mon/dashboarder/dashboarder.py
osm_mon/server/server.py

index bed157a..b25a8d9 100644 (file)
@@ -47,14 +47,20 @@ class Dashboarder:
     def run(self):
         self.loop.run_until_complete(self.start())
 
     def run(self):
         self.loop.run_until_complete(self.start())
 
-    async def start(self):
+    async def start(self, wait_time=5):
         topics = ["users", "project"]
         topics = ["users", "project"]
-        try:
-            await self.msg_bus.aioread(topics, self._user_msg)
-        except Exception as e:
-            # Failed to subscribe to kafka topics
-            log.error("Error when subscribing to topics %s", str(topics))
-            log.exception("Exception %s", str(e))
+        while True:
+            try:
+                await self.msg_bus.aioread(topics, self._user_msg)
+                log.info("Sucessfully subscribed to kafka topic(s) %s", str(topics))
+                break
+            except Exception as e:
+                # Failed to subscribe to kafka topics
+                log.error("Error when subscribing to topic(s) %s", str(topics))
+                log.exception("Exception %s", str(e))
+                # Wait for some time for kaka to stabilize and then reattempt to subscribe again
+                time.sleep(wait_time)
+                log.info("Retrying to subscribe the kafka topic(s) %s", str(topics))
 
     async def _user_msg(self, topic, key, values):
         log.debug(
 
     async def _user_msg(self, topic, key, values):
         log.debug(
index 62721ff..962a6f9 100755 (executable)
@@ -26,6 +26,7 @@ MON component in charge of CRUD operations for vim_accounts and alarms. It uses
 import asyncio
 import json
 import logging
 import asyncio
 import json
 import logging
+import time
 
 from osm_mon.core.config import Config
 from osm_mon.core.message_bus_client import MessageBusClient
 
 from osm_mon.core.config import Config
 from osm_mon.core.message_bus_client import MessageBusClient
@@ -48,14 +49,20 @@ class Server:
     def run(self):
         self.loop.run_until_complete(self.start())
 
     def run(self):
         self.loop.run_until_complete(self.start())
 
-    async def start(self):
+    async def start(self, wait_time=5):
         topics = ["alarm_request"]
         topics = ["alarm_request"]
-        try:
-            await self.msg_bus.aioread(topics, self._process_msg)
-        except Exception as e:
-            # Failed to subscribe to kafka topic
-            log.exception("Error when subscribing to topics %s", str(topics))
-            raise e
+        while True:
+            try:
+                await self.msg_bus.aioread(topics, self._process_msg)
+                log.info("Sucessfully subscribed to kafka topic(s) %s", str(topics))
+                break
+            except Exception as e:
+                # Failed to subscribe to kafka topic
+                log.error("Error when subscribing to topic(s) %s", str(topics))
+                log.exception("Exception %s", str(e))
+                # Wait for some time for kaka to stabilize and then reattempt to subscribe again
+                time.sleep(wait_time)
+                log.info("Retrying to subscribe the kafka topic(s) %s", str(topics))
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)