X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=117c05411c0248519307f3d0eb3714b7632ee061;hb=refs%2Ftags%2Fbranch-bug1511-start;hp=854d9268e521bbdf2556a474a0e422421ceb95a4;hpb=de3d570360a0a31d3054a6ff72b09ae9f559b59c;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 854d926..117c054 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -20,94 +20,83 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -"""A common KafkaConsumer for all MON plugins.""" - +""" +MON component in charge of CRUD operations for vim_accounts and alarms. It uses the message bus to communicate. +""" +import asyncio import json import logging -from json import JSONDecodeError - -import yaml -from osm_mon.core.auth import AuthManager -from osm_mon.core.common_db import CommonDbClient -from osm_mon.core.database import DatabaseManager -from osm_mon.core.message_bus.consumer import Consumer -from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder +from osm_mon.server.service import ServerService log = logging.getLogger(__name__) class Server: - def __init__(self): - self.auth_manager = AuthManager() - self.database_manager = DatabaseManager() - self.database_manager.create_tables() - self.common_db = CommonDbClient() + def __init__(self, config: Config, loop=None): + self.conf = config + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + self.msg_bus = MessageBusClient(config) + self.service = ServerService(config) def run(self): - common_consumer = Consumer("mon-server") - - topics = ['alarm_request', 'vim_account'] - common_consumer.subscribe(topics) + self.loop.run_until_complete(self.start()) - log.info("Listening for messages...") - for message in common_consumer: - self.consume_message(message) - - def consume_message(self, message): - log.info("Message arrived: %s", message) + async def start(self): + topics = [ + "alarm_request" + ] + try: + await self.msg_bus.aioread(topics, self._process_msg) + except Exception as e: + # Failed to subscribe to kafka topic + log.exception("Error when subscribing to topics %s", str(topics)) + raise e + + async def _process_msg(self, topic, key, values): + log.info("Message arrived: %s", values) try: - try: - values = json.loads(message.value) - except JSONDecodeError: - values = yaml.safe_load(message.value) - - response = None - - if message.topic == "vim_account": - if message.key == "create" or message.key == "edit": - values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], - values['schema_version'], - values['_id']) - self.auth_manager.store_auth_credentials(values) - if message.key == "delete": - self.auth_manager.delete_auth_credentials(values) - - elif message.topic == "alarm_request": - if message.key == "create_alarm_request": + + if topic == "alarm_request": + if key == "create_alarm_request": alarm_details = values['alarm_create_request'] + cor_id = alarm_details['correlation_id'] response_builder = ResponseBuilder() try: - alarm = self.database_manager.save_alarm( + alarm = self.service.create_alarm( alarm_details['alarm_name'], alarm_details['threshold_value'], alarm_details['operation'].lower(), alarm_details['severity'].lower(), alarm_details['statistic'].lower(), alarm_details['metric_name'], - alarm_details['vdu_name'], - alarm_details['vnf_member_index'], - alarm_details['ns_id'] + alarm_details['tags'] ) response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=True, alarm_id=alarm.uuid) except Exception: log.exception("Error creating alarm: ") response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=False, alarm_id=None) - if message.key == "delete_alarm_request": + await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) + + if key == "delete_alarm_request": alarm_details = values['alarm_delete_request'] alarm_uuid = alarm_details['alarm_uuid'] response_builder = ResponseBuilder() cor_id = alarm_details['correlation_id'] try: - self.database_manager.delete_alarm(alarm_uuid) + self.service.delete_alarm(alarm_uuid) response = response_builder.generate_response('delete_alarm_response', cor_id=cor_id, status=True, @@ -118,20 +107,11 @@ class Server: cor_id=cor_id, status=False, alarm_id=alarm_uuid) - if response: - self._publish_response(message.topic, message.key, response) + await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response) except Exception: log.exception("Exception processing message: ") - def _publish_response(self, topic: str, key: str, msg: dict): - topic = topic.replace('request', 'response') - key = key.replace('request', 'response') - producer = Producer() - producer.send(topic=topic, key=key, value=json.dumps(msg)) - producer.flush(timeout=5) - producer.close() - - -if __name__ == '__main__': - Server().run() + async def _publish_response(self, topic: str, key: str, msg: dict): + log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) + await self.msg_bus.aiowrite(topic, key, msg)