Ubuntu 22.04 and Python 3.10 preparation
[osm/common.git] / osm_common / msgkafka.py
index 5487093..02b8241 100644 (file)
@@ -35,7 +35,6 @@ class MsgKafka(MsgBase):
         self.port = None
         self.consumer = None
         self.producer = None
-        self.loop = None
         self.broker = None
         self.group_id = None
 
@@ -45,7 +44,6 @@ class MsgKafka(MsgBase):
                 self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
-            self.loop = config.get("loop") or asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
             self.group_id = config.get("group_id")
 
@@ -55,7 +53,6 @@ class MsgKafka(MsgBase):
     def disconnect(self):
         try:
             pass
-            # self.loop.close()
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
@@ -70,9 +67,7 @@ class MsgKafka(MsgBase):
         retry = 2  # Try two times
         while retry:
             try:
-                self.loop.run_until_complete(
-                    self.aiowrite(topic=topic, key=key, msg=msg)
-                )
+                asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg))
                 break
             except Exception as e:
                 retry -= 1
@@ -88,27 +83,22 @@ class MsgKafka(MsgBase):
         :return: topic, key, message; or None
         """
         try:
-            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+            return asyncio.run(self.aioread(topic))
         except MsgException:
             raise
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, topic, key, msg, loop=None):
+    async def aiowrite(self, topic, key, msg):
         """
         Asyncio write
         :param topic: str kafka topic
         :param key: str kafka key
         :param msg: str or dictionary  kafka message
-        :param loop: asyncio loop. To be DEPRECATED! in near future!!!  loop must be provided inside config at connect
         :return: None
         """
-
-        if not loop:
-            loop = self.loop
         try:
             self.producer = AIOKafkaProducer(
-                loop=loop,
                 key_serializer=str.encode,
                 value_serializer=str.encode,
                 bootstrap_servers=self.broker,
@@ -127,7 +117,6 @@ class MsgKafka(MsgBase):
     async def aioread(
         self,
         topic,
-        loop=None,
         callback=None,
         aiocallback=None,
         group_id=None,
@@ -137,7 +126,6 @@ class MsgKafka(MsgBase):
         """
         Asyncio read from one or several topics.
         :param topic: can be str: single topic; or str list: several topics
-        :param loop: asyncio loop. To be DEPRECATED! in near future!!!  loop must be provided inside config at connect
         :param callback: synchronous callback function that will handle the message in kafka bus
         :param aiocallback: async callback function that will handle the message in kafka bus
         :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
@@ -148,9 +136,6 @@ class MsgKafka(MsgBase):
         :param kwargs: optional keyword arguments for callback function
         :return: If no callback defined, it returns (topic, key, message)
         """
-
-        if not loop:
-            loop = self.loop
         if group_id is False:
             group_id = None
         elif group_id is None:
@@ -161,7 +146,6 @@ class MsgKafka(MsgBase):
             else:
                 topic_list = (topic,)
             self.consumer = AIOKafkaConsumer(
-                loop=loop,
                 bootstrap_servers=self.broker,
                 group_id=group_id,
                 auto_offset_reset="earliest" if from_beginning else "latest",