X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=bb8f0e8c37a3fa61406e5f950a39ae73b1df6494;hb=HEAD;hp=ce6255c890e0125407e9470c29c03aae4e1671fa;hpb=8e4179facf22c8096992f0a83caeec9f2f4996c7;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index ce6255c..bb8f0e8 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -26,6 +26,7 @@ MON component in charge of CRUD operations for vim_accounts and alarms. It uses import asyncio import json import logging +import time from osm_mon.core.config import Config from osm_mon.core.message_bus_client import MessageBusClient @@ -36,30 +37,32 @@ log = logging.getLogger(__name__) class Server: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.conf = config - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop self.msg_bus = MessageBusClient(config) self.service = ServerService(config) + self.service.populate_prometheus() def run(self): - self.loop.run_until_complete(self.start()) + asyncio.run(self.start()) - async def start(self): + async def start(self, wait_time=5): topics = ["alarm_request"] - try: - await self.msg_bus.aioread(topics, self._process_msg) - except Exception as e: - # Failed to subscribe to kafka topic - log.exception("Error when subscribing to topics %s", str(topics)) - raise e + while True: + try: + await self.msg_bus.aioread(topics, self._process_msg) + break + except Exception as e: + # Failed to subscribe to kafka topic + log.error("Error when subscribing to topic(s) %s", str(topics)) + log.exception("Exception %s", str(e)) + # Wait for some time for kaka to stabilize and then reattempt to subscribe again + time.sleep(wait_time) + log.info("Retrying to subscribe the kafka topic(s) %s", str(topics)) async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values) try: - if topic == "alarm_request": if key == "create_alarm_request": alarm_details = values["alarm_create_request"] @@ -73,6 +76,7 @@ class Server: alarm_details["severity"].lower(), alarm_details["statistic"].lower(), alarm_details["metric_name"], + alarm_details["action"], alarm_details["tags"], ) response = response_builder.generate_response(