X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsgkafka.py;h=5caa5b1becb86a78142a4ac50d53596aa64e630e;hb=HEAD;hp=5487093b3478c6afc4507de6060436715a68aadc;hpb=3dd0db6efaab109fa1bd43395fbbddecf1eb73d4;p=osm%2Fcommon.git diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index 5487093..02b8241 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -35,7 +35,6 @@ class MsgKafka(MsgBase): self.port = None self.consumer = None self.producer = None - self.loop = None self.broker = None self.group_id = None @@ -45,7 +44,6 @@ class MsgKafka(MsgBase): self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.loop = config.get("loop") or asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) self.group_id = config.get("group_id") @@ -55,7 +53,6 @@ class MsgKafka(MsgBase): def disconnect(self): try: pass - # self.loop.close() except Exception as e: # TODO refine raise MsgException(str(e)) @@ -70,9 +67,7 @@ class MsgKafka(MsgBase): retry = 2 # Try two times while retry: try: - self.loop.run_until_complete( - self.aiowrite(topic=topic, key=key, msg=msg) - ) + asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg)) break except Exception as e: retry -= 1 @@ -88,27 +83,22 @@ class MsgKafka(MsgBase): :return: topic, key, message; or None """ try: - return self.loop.run_until_complete(self.aioread(topic, self.loop)) + return asyncio.run(self.aioread(topic)) except MsgException: raise except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, topic, key, msg, loop=None): + async def aiowrite(self, topic, key, msg): """ Asyncio write :param topic: str kafka topic :param key: str kafka key :param msg: str or dictionary kafka message - :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect :return: None """ - - if not loop: - loop = self.loop try: self.producer = AIOKafkaProducer( - loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker, @@ -127,7 +117,6 @@ class MsgKafka(MsgBase): async def aioread( self, topic, - loop=None, callback=None, aiocallback=None, group_id=None, @@ -137,7 +126,6 @@ class MsgKafka(MsgBase): """ Asyncio read from one or several topics. :param topic: can be str: single topic; or str list: several topics - :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect :param callback: synchronous callback function that will handle the message in kafka bus :param aiocallback: async callback function that will handle the message in kafka bus :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided @@ -148,9 +136,6 @@ class MsgKafka(MsgBase): :param kwargs: optional keyword arguments for callback function :return: If no callback defined, it returns (topic, key, message) """ - - if not loop: - loop = self.loop if group_id is False: group_id = None elif group_id is None: @@ -161,7 +146,6 @@ class MsgKafka(MsgBase): else: topic_list = (topic,) self.consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers=self.broker, group_id=group_id, auto_offset_reset="earliest" if from_beginning else "latest",