Update to Python 3.10 and Ubuntu 22.04
[osm/MON.git] / osm_mon / core / message_bus_client.py
index 6a7ef60..8eee83a 100644 (file)
@@ -21,7 +21,6 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import asyncio
 from typing import List, Callable
 
 from osm_common import msgkafka, msglocal
@@ -30,17 +29,16 @@ from osm_mon.core.config import Config
 
 
 class MessageBusClient:
-    def __init__(self, config: Config, loop=None):
-        if config.get('message', 'driver') == "local":
+    def __init__(self, config: Config):
+        if config.get("message", "driver") == "local":
             self.msg_bus = msglocal.MsgLocal()
-        elif config.get('message', 'driver') == "kafka":
+        elif config.get("message", "driver") == "kafka":
             self.msg_bus = msgkafka.MsgKafka()
         else:
-            raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
-        self.msg_bus.connect(config.get('message'))
-        if not loop:
-            loop = asyncio.get_event_loop()
-        self.loop = loop
+            raise Exception(
+                "Unknown message bug driver {}".format(config.get("section", "driver"))
+            )
+        self.msg_bus.connect(config.get("message"))
 
     async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
         """
@@ -50,7 +48,7 @@ class MessageBusClient:
         :param kwargs: Keyword arguments to be passed to callback function.
         :return: None
         """
-        await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+        await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs)
 
     async def aiowrite(self, topic: str, key: str, msg: dict):
         """
@@ -60,7 +58,7 @@ class MessageBusClient:
         :param msg: Dictionary containing message to be written.
         :return: None
         """
-        await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+        await self.msg_bus.aiowrite(topic, key, msg)
 
     async def aioread_once(self, topic: str):
         """
@@ -68,5 +66,5 @@ class MessageBusClient:
         :param topic: topic to retrieve message from.
         :return: tuple(topic, key, message)
         """
-        result = await self.msg_bus.aioread(topic, self.loop)
+        result = await self.msg_bus.aioread(topic)
         return result