# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
from typing import List, Callable
from osm_common import msgkafka, msglocal
class MessageBusClient:
- def __init__(self, config: Config, loop=None):
- if config.get('message', 'driver') == "local":
+ def __init__(self, config: Config):
+ if config.get("message", "driver") == "local":
self.msg_bus = msglocal.MsgLocal()
- elif config.get('message', 'driver') == "kafka":
+ elif config.get("message", "driver") == "kafka":
self.msg_bus = msgkafka.MsgKafka()
else:
- raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
- self.msg_bus.connect(config.get('message'))
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
+ raise Exception(
+ "Unknown message bug driver {}".format(config.get("section", "driver"))
+ )
+ self.msg_bus.connect(config.get("message"))
async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
"""
:param kwargs: Keyword arguments to be passed to callback function.
:return: None
"""
- await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+ await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs)
async def aiowrite(self, topic: str, key: str, msg: dict):
"""
:param msg: Dictionary containing message to be written.
:return: None
"""
- await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+ await self.msg_bus.aiowrite(topic, key, msg)
async def aioread_once(self, topic: str):
"""
:param topic: topic to retrieve message from.
:return: tuple(topic, key, message)
"""
- result = await self.msg_bus.aioread(topic, self.loop)
+ result = await self.msg_bus.aioread(topic)
return result