X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=bb8f0e8c37a3fa61406e5f950a39ae73b1df6494;hb=HEAD;hp=a1e848bcdf346376fa8f6cbd49ce2dc3a720044c;hpb=274a6e9fa4268db2a87525a27bc574c46eaa80dc;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index a1e848b..bb8f0e8 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -20,137 +20,117 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -"""A common KafkaConsumer for all MON plugins.""" +""" +MON component in charge of CRUD operations for vim_accounts and alarms. It uses the message bus to communicate. +""" import asyncio import json import logging -from json import JSONDecodeError +import time -import yaml -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer - -from osm_mon.core.auth import AuthManager -from osm_mon.core.common_db import CommonDbClient -from osm_mon.core.database import DatabaseManager +from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder -from osm_mon.core.settings import Config +from osm_mon.server.service import ServerService log = logging.getLogger(__name__) class Server: - - def __init__(self, loop=None): - cfg = Config.instance() - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop - self.auth_manager = AuthManager() - self.database_manager = DatabaseManager() - self.database_manager.create_tables() - self.common_db = CommonDbClient() - self.kafka_server = cfg.BROKER_URI + def __init__(self, config: Config): + self.conf = config + self.msg_bus = MessageBusClient(config) + self.service = ServerService(config) + self.service.populate_prometheus() def run(self): - self.loop.run_until_complete(self.start()) - - async def start(self): - consumer = AIOKafkaConsumer( - "vim_account", - "alarm_request", - loop=self.loop, - bootstrap_servers=self.kafka_server, - group_id="mon-server", - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - ) - await consumer.start() - try: - async for message in consumer: - log.info("Message arrived: %s", message) - await self.consume_message(message) - finally: - await consumer.stop() + asyncio.run(self.start()) - async def consume_message(self, message): - try: + async def start(self, wait_time=5): + topics = ["alarm_request"] + while True: try: - values = json.loads(message.value) - except JSONDecodeError: - values = yaml.safe_load(message.value) - - if message.topic == "vim_account": - if message.key == "create" or message.key == "edit": - values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], - values['schema_version'], - values['_id']) - self.auth_manager.store_auth_credentials(values) - if message.key == "delete": - self.auth_manager.delete_auth_credentials(values) - - elif message.topic == "alarm_request": - if message.key == "create_alarm_request": - alarm_details = values['alarm_create_request'] - cor_id = alarm_details['correlation_id'] + await self.msg_bus.aioread(topics, self._process_msg) + break + except Exception as e: + # Failed to subscribe to kafka topic + log.error("Error when subscribing to topic(s) %s", str(topics)) + log.exception("Exception %s", str(e)) + # Wait for some time for kaka to stabilize and then reattempt to subscribe again + time.sleep(wait_time) + log.info("Retrying to subscribe the kafka topic(s) %s", str(topics)) + + async def _process_msg(self, topic, key, values): + log.info("Message arrived: %s", values) + try: + if topic == "alarm_request": + if key == "create_alarm_request": + alarm_details = values["alarm_create_request"] + cor_id = alarm_details["correlation_id"] response_builder = ResponseBuilder() try: - alarm = self.database_manager.save_alarm( - alarm_details['alarm_name'], - alarm_details['threshold_value'], - alarm_details['operation'].lower(), - alarm_details['severity'].lower(), - alarm_details['statistic'].lower(), - alarm_details['metric_name'], - alarm_details['vdu_name'], - alarm_details['vnf_member_index'], - alarm_details['ns_id'] + alarm = self.service.create_alarm( + alarm_details["alarm_name"], + alarm_details["threshold_value"], + alarm_details["operation"].lower(), + alarm_details["severity"].lower(), + alarm_details["statistic"].lower(), + alarm_details["metric_name"], + alarm_details["action"], + alarm_details["tags"], + ) + response = response_builder.generate_response( + "create_alarm_response", + cor_id=cor_id, + status=True, + alarm_id=alarm.uuid, ) - response = response_builder.generate_response('create_alarm_response', - cor_id=cor_id, - status=True, - alarm_id=alarm.uuid) except Exception: log.exception("Error creating alarm: ") - response = response_builder.generate_response('create_alarm_response', - cor_id=cor_id, - status=False, - alarm_id=None) - await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) - - if message.key == "delete_alarm_request": - alarm_details = values['alarm_delete_request'] - alarm_uuid = alarm_details['alarm_uuid'] + response = response_builder.generate_response( + "create_alarm_response", + cor_id=cor_id, + status=False, + alarm_id=None, + ) + await self._publish_response( + "alarm_response_" + str(cor_id), + "create_alarm_response", + response, + ) + + if key == "delete_alarm_request": + alarm_details = values["alarm_delete_request"] + alarm_uuid = alarm_details["alarm_uuid"] response_builder = ResponseBuilder() - cor_id = alarm_details['correlation_id'] + cor_id = alarm_details["correlation_id"] try: - self.database_manager.delete_alarm(alarm_uuid) - response = response_builder.generate_response('delete_alarm_response', - cor_id=cor_id, - status=True, - alarm_id=alarm_uuid) + self.service.delete_alarm(alarm_uuid) + response = response_builder.generate_response( + "delete_alarm_response", + cor_id=cor_id, + status=True, + alarm_id=alarm_uuid, + ) except Exception: log.exception("Error deleting alarm: ") - response = response_builder.generate_response('delete_alarm_response', - cor_id=cor_id, - status=False, - alarm_id=alarm_uuid) - await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response) + response = response_builder.generate_response( + "delete_alarm_response", + cor_id=cor_id, + status=False, + alarm_id=alarm_uuid, + ) + await self._publish_response( + "alarm_response_" + str(cor_id), + "delete_alarm_response", + response, + ) except Exception: log.exception("Exception processing message: ") async def _publish_response(self, topic: str, key: str, msg: dict): - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - await producer.start() - log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) - try: - await producer.send_and_wait(topic, key=key, value=json.dumps(msg)) - finally: - await producer.stop() - - -if __name__ == '__main__': - Server().run() + log.info( + "Sending response %s to topic %s with key %s", json.dumps(msg), topic, key + ) + await self.msg_bus.aiowrite(topic, key, msg)