X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=c5f5cc2a24bc8dbdcbed2e0377cde81256dd432e;hb=0e7722a03b70c3909faf4c154e11a700dfcfee13;hp=1e3e7f835c77ed23b8572e0ab04a09bf3fce0eb2;hpb=a97bdb3eafa4f3d07d61d32635f7f36f5cc36c58;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 1e3e7f8..c5f5cc2 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -26,6 +26,7 @@ MON component in charge of CRUD operations for vim_accounts and alarms. It uses import asyncio import json import logging +import time from osm_mon.core.config import Config from osm_mon.core.message_bus_client import MessageBusClient @@ -48,34 +49,27 @@ class Server: def run(self): self.loop.run_until_complete(self.start()) - async def start(self): + async def start(self, wait_time=5): topics = [ - "vim_account", "alarm_request" ] - await self.msg_bus.aioread(topics, self._process_msg) + while True: + try: + await self.msg_bus.aioread(topics, self._process_msg) + break + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + log.exception("Exception %s", str(e)) + # Wait for some time for kaka to stabilize and then reattempt to subscribe again + time.sleep(wait_time) + log.info("Retrying to subscribe the kafka topic(s) %s", str(topics)) async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values) try: - if topic == "vim_account": - if key == "create" or key == "edit": - if 'config' not in values: - values['config'] = {} - self.service.upsert_vim_account(values['_id'], - values['name'], - values['vim_type'], - values['vim_url'], - values['vim_user'], - values['vim_password'], - values['vim_tenant_name'], - values['schema_version'], - values['config']) - - if key == "delete": - self.service.delete_vim_account(values['_id']) - - elif topic == "alarm_request": + + if topic == "alarm_request": if key == "create_alarm_request": alarm_details = values['alarm_create_request'] cor_id = alarm_details['correlation_id'] @@ -88,9 +82,7 @@ class Server: alarm_details['severity'].lower(), alarm_details['statistic'].lower(), alarm_details['metric_name'], - alarm_details['vdu_name'], - alarm_details['vnf_member_index'], - alarm_details['ns_id'] + alarm_details['tags'] ) response = response_builder.generate_response('create_alarm_response', cor_id=cor_id,