X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fserver%2Fserver.py;h=0011618265aaabe78cb747b28ea6998830ca339b;hb=5ac7c081ca13495185ecf6bdf302c16c25a4b759;hp=0f17d99ccfe43a48dc8a849eb89f559984819213;hpb=b525e6c8619d494d4e254def394cf5b62de4df4a;p=osm%2FMON.git diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 0f17d99..0011618 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -24,63 +24,45 @@ import asyncio import json import logging -from json import JSONDecodeError - -import yaml -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder -from osm_mon.core.settings import Config log = logging.getLogger(__name__) class Server: - def __init__(self, loop=None): - cfg = Config.instance() + def __init__(self, config: Config, loop=None): + self.conf = config if not loop: loop = asyncio.get_event_loop() self.loop = loop - self.auth_manager = AuthManager() - self.database_manager = DatabaseManager() + self.auth_manager = AuthManager(config) + self.database_manager = DatabaseManager(config) self.database_manager.create_tables() - self.common_db = CommonDbClient() - self.kafka_server = cfg.BROKER_URI + self.common_db = CommonDbClient(config) + self.msg_bus = MessageBusClient(config) def run(self): self.loop.run_until_complete(self.start()) async def start(self): - consumer = AIOKafkaConsumer( + topics = [ "vim_account", - "alarm_request", - loop=self.loop, - bootstrap_servers=self.kafka_server, - group_id="mon-server", - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - ) - await consumer.start() - try: - async for message in consumer: - log.info("Message arrived: %s", message) - await self.consume_message(message) - finally: - await consumer.stop() + "alarm_request" + ] + await self.msg_bus.aioread(topics, self._process_msg) - async def consume_message(self, message): + async def _process_msg(self, topic, key, values): + log.info("Message arrived: %s", values) try: - try: - values = json.loads(message.value) - except JSONDecodeError: - values = yaml.safe_load(message.value) - - if message.topic == "vim_account": - if message.key == "create" or message.key == "edit": + if topic == "vim_account": + if key == "create" or key == "edit": values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], values['schema_version'], values['_id']) @@ -94,11 +76,11 @@ class Server: values['_id']) self.auth_manager.store_auth_credentials(values) - if message.key == "delete": + if key == "delete": self.auth_manager.delete_auth_credentials(values) - elif message.topic == "alarm_request": - if message.key == "create_alarm_request": + elif topic == "alarm_request": + if key == "create_alarm_request": alarm_details = values['alarm_create_request'] cor_id = alarm_details['correlation_id'] response_builder = ResponseBuilder() @@ -126,7 +108,7 @@ class Server: alarm_id=None) await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) - if message.key == "delete_alarm_request": + if key == "delete_alarm_request": alarm_details = values['alarm_delete_request'] alarm_uuid = alarm_details['alarm_uuid'] response_builder = ResponseBuilder() @@ -149,17 +131,5 @@ class Server: log.exception("Exception processing message: ") async def _publish_response(self, topic: str, key: str, msg: dict): - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - await producer.start() log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) - try: - await producer.send_and_wait(topic, key=key, value=json.dumps(msg)) - finally: - await producer.stop() - - -if __name__ == '__main__': - Server().run() + await self.msg_bus.aiowrite(topic, key, msg)