X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Fmessage_bus_client.py;h=1356dc50feb58012c82991238ab66aebd8a6db7d;hb=d37c54c64eec65c9a3c490a31eef3a02a76cb474;hp=4073d0fd025d0fb53b6d70651fbec79057f12c65;hpb=c72b9d5f574d51608e4810294004414c7a9c02fe;p=osm%2FPOL.git diff --git a/osm_policy_module/common/message_bus_client.py b/osm_policy_module/common/message_bus_client.py index 4073d0f..1356dc5 100644 --- a/osm_policy_module/common/message_bus_client.py +++ b/osm_policy_module/common/message_bus_client.py @@ -21,7 +21,6 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio from typing import List, Callable from osm_common import msgkafka, msglocal @@ -30,7 +29,7 @@ from osm_policy_module.core.config import Config class MessageBusClient: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): if config.get("message", "driver") == "local": self.msg_bus = msglocal.MsgLocal() elif config.get("message", "driver") == "kafka": @@ -40,9 +39,6 @@ class MessageBusClient: "Unknown message bug driver {}".format(config.get("section", "driver")) ) self.msg_bus.connect(config.get("message")) - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): """ @@ -52,7 +48,7 @@ class MessageBusClient: :param kwargs: Keyword arguments to be passed to callback function. :return: None """ - await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) + await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs) async def aiowrite(self, topic: str, key: str, msg: dict): """ @@ -62,7 +58,7 @@ class MessageBusClient: :param msg: Dictionary containing message to be written. :return: None """ - await self.msg_bus.aiowrite(topic, key, msg, self.loop) + await self.msg_bus.aiowrite(topic, key, msg) async def aioread_once(self, topic: str): """ @@ -70,5 +66,5 @@ class MessageBusClient: :param topic: topic to retrieve message from. :return: tuple(topic, key, message) """ - result = await self.msg_bus.aioread(topic, self.loop) + result = await self.msg_bus.aioread(topic) return result