X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=0011618265aaabe78cb747b28ea6998830ca339b;hb=refs%2Fchanges%2F22%2F7322%2F3;hp=854d9268e521bbdf2556a474a0e422421ceb95a4;hpb=de3d570360a0a31d3054a6ff72b09ae9f559b59c;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 854d926..0011618 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -21,18 +21,15 @@ # contact: bdiaz@whitestack.com or glavado@whitestack.com ## """A common KafkaConsumer for all MON plugins.""" - +import asyncio import json import logging -from json import JSONDecodeError - -import yaml from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.message_bus.consumer import Consumer -from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder log = logging.getLogger(__name__) @@ -40,44 +37,52 @@ log = logging.getLogger(__name__) class Server: - def __init__(self): - self.auth_manager = AuthManager() - self.database_manager = DatabaseManager() + def __init__(self, config: Config, loop=None): + self.conf = config + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + self.auth_manager = AuthManager(config) + self.database_manager = DatabaseManager(config) self.database_manager.create_tables() - self.common_db = CommonDbClient() + self.common_db = CommonDbClient(config) + self.msg_bus = MessageBusClient(config) def run(self): - common_consumer = Consumer("mon-server") + self.loop.run_until_complete(self.start()) - topics = ['alarm_request', 'vim_account'] - common_consumer.subscribe(topics) + async def start(self): + topics = [ + "vim_account", + "alarm_request" + ] + await self.msg_bus.aioread(topics, self._process_msg) - log.info("Listening for messages...") - for message in common_consumer: - self.consume_message(message) - - def consume_message(self, message): - log.info("Message arrived: %s", message) + async def _process_msg(self, topic, key, values): + log.info("Message arrived: %s", values) try: - try: - values = json.loads(message.value) - except JSONDecodeError: - values = yaml.safe_load(message.value) - - response = None - - if message.topic == "vim_account": - if message.key == "create" or message.key == "edit": + if topic == "vim_account": + if key == "create" or key == "edit": values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], values['schema_version'], values['_id']) + + vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password") + if 'config' in values: + for key in values['config']: + if key in vim_config_encrypted: + values['config'][key] = self.common_db.decrypt_vim_password(values['config'][key], + values['schema_version'], + values['_id']) self.auth_manager.store_auth_credentials(values) - if message.key == "delete": + + if key == "delete": self.auth_manager.delete_auth_credentials(values) - elif message.topic == "alarm_request": - if message.key == "create_alarm_request": + elif topic == "alarm_request": + if key == "create_alarm_request": alarm_details = values['alarm_create_request'] + cor_id = alarm_details['correlation_id'] response_builder = ResponseBuilder() try: alarm = self.database_manager.save_alarm( @@ -92,16 +97,18 @@ class Server: alarm_details['ns_id'] ) response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=True, alarm_id=alarm.uuid) except Exception: log.exception("Error creating alarm: ") response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=False, alarm_id=None) - if message.key == "delete_alarm_request": + await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) + + if key == "delete_alarm_request": alarm_details = values['alarm_delete_request'] alarm_uuid = alarm_details['alarm_uuid'] response_builder = ResponseBuilder() @@ -118,20 +125,11 @@ class Server: cor_id=cor_id, status=False, alarm_id=alarm_uuid) - if response: - self._publish_response(message.topic, message.key, response) + await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response) except Exception: log.exception("Exception processing message: ") - def _publish_response(self, topic: str, key: str, msg: dict): - topic = topic.replace('request', 'response') - key = key.replace('request', 'response') - producer = Producer() - producer.send(topic=topic, key=key, value=json.dumps(msg)) - producer.flush(timeout=5) - producer.close() - - -if __name__ == '__main__': - Server().run() + async def _publish_response(self, topic: str, key: str, msg: dict): + log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) + await self.msg_bus.aiowrite(topic, key, msg)