X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus_client.py;h=8eee83a9006f7c045db2235b8d831fbedb3d2e14;hb=refs%2Fchanges%2F67%2F13367%2F1;hp=6a7ef6053de320e2ccfac8b81794e73e9618d579;hpb=5ac7c081ca13495185ecf6bdf302c16c25a4b759;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus_client.py b/osm_mon/core/message_bus_client.py index 6a7ef60..8eee83a 100644 --- a/osm_mon/core/message_bus_client.py +++ b/osm_mon/core/message_bus_client.py @@ -21,7 +21,6 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio from typing import List, Callable from osm_common import msgkafka, msglocal @@ -30,17 +29,16 @@ from osm_mon.core.config import Config class MessageBusClient: - def __init__(self, config: Config, loop=None): - if config.get('message', 'driver') == "local": + def __init__(self, config: Config): + if config.get("message", "driver") == "local": self.msg_bus = msglocal.MsgLocal() - elif config.get('message', 'driver') == "kafka": + elif config.get("message", "driver") == "kafka": self.msg_bus = msgkafka.MsgKafka() else: - raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver'))) - self.msg_bus.connect(config.get('message')) - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop + raise Exception( + "Unknown message bug driver {}".format(config.get("section", "driver")) + ) + self.msg_bus.connect(config.get("message")) async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): """ @@ -50,7 +48,7 @@ class MessageBusClient: :param kwargs: Keyword arguments to be passed to callback function. :return: None """ - await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) + await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs) async def aiowrite(self, topic: str, key: str, msg: dict): """ @@ -60,7 +58,7 @@ class MessageBusClient: :param msg: Dictionary containing message to be written. :return: None """ - await self.msg_bus.aiowrite(topic, key, msg, self.loop) + await self.msg_bus.aiowrite(topic, key, msg) async def aioread_once(self, topic: str): """ @@ -68,5 +66,5 @@ class MessageBusClient: :param topic: topic to retrieve message from. :return: tuple(topic, key, message) """ - result = await self.msg_bus.aioread(topic, self.loop) + result = await self.msg_bus.aioread(topic) return result