X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsgkafka.py;h=02b8241e2f9d4ed1049a0937f5fd60585b8228dc;hb=37e0914881759a514424ad5c0cc9278b9ced27a2;hp=5caa5b1becb86a78142a4ac50d53596aa64e630e;hpb=2644b76248a1b96f7a47013b414e31b4e3feecf8;p=osm%2Fcommon.git diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index 5caa5b1..02b8241 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -13,13 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import asyncio -import yaml +import logging + from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from osm_common.msgbase import MsgBase, MsgException +import yaml __author__ = ( "Alfonso Tierno , " @@ -34,7 +35,6 @@ class MsgKafka(MsgBase): self.port = None self.consumer = None self.producer = None - self.loop = None self.broker = None self.group_id = None @@ -44,7 +44,6 @@ class MsgKafka(MsgBase): self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.loop = config.get("loop") or asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) self.group_id = config.get("group_id") @@ -54,7 +53,6 @@ class MsgKafka(MsgBase): def disconnect(self): try: pass - # self.loop.close() except Exception as e: # TODO refine raise MsgException(str(e)) @@ -69,9 +67,7 @@ class MsgKafka(MsgBase): retry = 2 # Try two times while retry: try: - self.loop.run_until_complete( - self.aiowrite(topic=topic, key=key, msg=msg) - ) + asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg)) break except Exception as e: retry -= 1 @@ -87,27 +83,22 @@ class MsgKafka(MsgBase): :return: topic, key, message; or None """ try: - return self.loop.run_until_complete(self.aioread(topic, self.loop)) + return asyncio.run(self.aioread(topic)) except MsgException: raise except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, topic, key, msg, loop=None): + async def aiowrite(self, topic, key, msg): """ Asyncio write :param topic: str kafka topic :param key: str kafka key :param msg: str or dictionary kafka message - :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect :return: None """ - - if not loop: - loop = self.loop try: self.producer = AIOKafkaProducer( - loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker, @@ -126,7 +117,6 @@ class MsgKafka(MsgBase): async def aioread( self, topic, - loop=None, callback=None, aiocallback=None, group_id=None, @@ -136,7 +126,6 @@ class MsgKafka(MsgBase): """ Asyncio read from one or several topics. :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect :param callback: synchronous callback function that will handle the message in kafka bus :param aiocallback: async callback function that will handle the message in kafka bus :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided @@ -147,9 +136,6 @@ class MsgKafka(MsgBase): :param kwargs: optional keyword arguments for callback function :return: If no callback defined, it returns (topic, key, message) """ - - if not loop: - loop = self.loop if group_id is False: group_id = None elif group_id is None: @@ -160,7 +146,6 @@ class MsgKafka(MsgBase): else: topic_list = (topic,) self.consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers=self.broker, group_id=group_id, auto_offset_reset="earliest" if from_beginning else "latest",