X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=ce6255c890e0125407e9470c29c03aae4e1671fa;hb=refs%2Fchanges%2F03%2F10803%2F1;hp=1e3e7f835c77ed23b8572e0ab04a09bf3fce0eb2;hpb=a97bdb3eafa4f3d07d61d32635f7f36f5cc36c58;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 1e3e7f8..ce6255c 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -36,7 +36,6 @@ log = logging.getLogger(__name__) class Server: - def __init__(self, config: Config, loop=None): self.conf = config if not loop: @@ -49,83 +48,85 @@ class Server: self.loop.run_until_complete(self.start()) async def start(self): - topics = [ - "vim_account", - "alarm_request" - ] - await self.msg_bus.aioread(topics, self._process_msg) + topics = ["alarm_request"] + try: + await self.msg_bus.aioread(topics, self._process_msg) + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + raise e async def _process_msg(self, topic, key, values): log.info("Message arrived: %s", values) try: - if topic == "vim_account": - if key == "create" or key == "edit": - if 'config' not in values: - values['config'] = {} - self.service.upsert_vim_account(values['_id'], - values['name'], - values['vim_type'], - values['vim_url'], - values['vim_user'], - values['vim_password'], - values['vim_tenant_name'], - values['schema_version'], - values['config']) - - if key == "delete": - self.service.delete_vim_account(values['_id']) - - elif topic == "alarm_request": + + if topic == "alarm_request": if key == "create_alarm_request": - alarm_details = values['alarm_create_request'] - cor_id = alarm_details['correlation_id'] + alarm_details = values["alarm_create_request"] + cor_id = alarm_details["correlation_id"] response_builder = ResponseBuilder() try: alarm = self.service.create_alarm( - alarm_details['alarm_name'], - alarm_details['threshold_value'], - alarm_details['operation'].lower(), - alarm_details['severity'].lower(), - alarm_details['statistic'].lower(), - alarm_details['metric_name'], - alarm_details['vdu_name'], - alarm_details['vnf_member_index'], - alarm_details['ns_id'] + alarm_details["alarm_name"], + alarm_details["threshold_value"], + alarm_details["operation"].lower(), + alarm_details["severity"].lower(), + alarm_details["statistic"].lower(), + alarm_details["metric_name"], + alarm_details["tags"], + ) + response = response_builder.generate_response( + "create_alarm_response", + cor_id=cor_id, + status=True, + alarm_id=alarm.uuid, ) - response = response_builder.generate_response('create_alarm_response', - cor_id=cor_id, - status=True, - alarm_id=alarm.uuid) except Exception: log.exception("Error creating alarm: ") - response = response_builder.generate_response('create_alarm_response', - cor_id=cor_id, - status=False, - alarm_id=None) - await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) + response = response_builder.generate_response( + "create_alarm_response", + cor_id=cor_id, + status=False, + alarm_id=None, + ) + await self._publish_response( + "alarm_response_" + str(cor_id), + "create_alarm_response", + response, + ) if key == "delete_alarm_request": - alarm_details = values['alarm_delete_request'] - alarm_uuid = alarm_details['alarm_uuid'] + alarm_details = values["alarm_delete_request"] + alarm_uuid = alarm_details["alarm_uuid"] response_builder = ResponseBuilder() - cor_id = alarm_details['correlation_id'] + cor_id = alarm_details["correlation_id"] try: self.service.delete_alarm(alarm_uuid) - response = response_builder.generate_response('delete_alarm_response', - cor_id=cor_id, - status=True, - alarm_id=alarm_uuid) + response = response_builder.generate_response( + "delete_alarm_response", + cor_id=cor_id, + status=True, + alarm_id=alarm_uuid, + ) except Exception: log.exception("Error deleting alarm: ") - response = response_builder.generate_response('delete_alarm_response', - cor_id=cor_id, - status=False, - alarm_id=alarm_uuid) - await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response) + response = response_builder.generate_response( + "delete_alarm_response", + cor_id=cor_id, + status=False, + alarm_id=alarm_uuid, + ) + await self._publish_response( + "alarm_response_" + str(cor_id), + "delete_alarm_response", + response, + ) except Exception: log.exception("Exception processing message: ") async def _publish_response(self, topic: str, key: str, msg: dict): - log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) + log.info( + "Sending response %s to topic %s with key %s", json.dumps(msg), topic, key + ) await self.msg_bus.aiowrite(topic, key, msg)