Bug 2215 fixed
[osm/MON.git] / osm_mon / server / server.py
index 07fdd22..bb8f0e8 100755 (executable)
@@ -26,6 +26,7 @@ MON component in charge of CRUD operations for vim_accounts and alarms. It uses
 import asyncio
 import json
 import logging
+import time
 
 from osm_mon.core.config import Config
 from osm_mon.core.message_bus_client import MessageBusClient
@@ -36,31 +37,32 @@ log = logging.getLogger(__name__)
 
 
 class Server:
-    def __init__(self, config: Config, loop=None):
+    def __init__(self, config: Config):
         self.conf = config
-        if not loop:
-            loop = asyncio.get_event_loop()
-        self.loop = loop
         self.msg_bus = MessageBusClient(config)
         self.service = ServerService(config)
         self.service.populate_prometheus()
 
     def run(self):
-        self.loop.run_until_complete(self.start())
+        asyncio.run(self.start())
 
-    async def start(self):
+    async def start(self, wait_time=5):
         topics = ["alarm_request"]
-        try:
-            await self.msg_bus.aioread(topics, self._process_msg)
-        except Exception as e:
-            # Failed to subscribe to kafka topic
-            log.exception("Error when subscribing to topics %s", str(topics))
-            raise e
+        while True:
+            try:
+                await self.msg_bus.aioread(topics, self._process_msg)
+                break
+            except Exception as e:
+                # Failed to subscribe to kafka topic
+                log.error("Error when subscribing to topic(s) %s", str(topics))
+                log.exception("Exception %s", str(e))
+                # Wait for some time for kaka to stabilize and then reattempt to subscribe again
+                time.sleep(wait_time)
+                log.info("Retrying to subscribe the kafka topic(s) %s", str(topics))
 
     async def _process_msg(self, topic, key, values):
         log.info("Message arrived: %s", values)
         try:
-
             if topic == "alarm_request":
                 if key == "create_alarm_request":
                     alarm_details = values["alarm_create_request"]
@@ -74,6 +76,7 @@ class Server:
                             alarm_details["severity"].lower(),
                             alarm_details["statistic"].lower(),
                             alarm_details["metric_name"],
+                            alarm_details["action"],
                             alarm_details["tags"],
                         )
                         response = response_builder.generate_response(