From 48b78e112662d927cad376d739e43fb94f108ff3 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Thu, 18 Oct 2018 17:55:12 -0300 Subject: [PATCH] Adds aiocallback in aioread of msgkafka aioread can now receive an async function as param, which will be awaited during the consumer msg loop. This allows of taking full advantage of asyncio. Also it now passes de kwargs to the callback, instead of the args. Without this, all params of aioread had to be passed positionally if you wanted to give additional args to the callback. Signed-off-by: Benjamin Diaz Change-Id: I2502c0f2f76643660da1ff24eafa62af2450eacb --- osm_common/__init__.py | 2 +- osm_common/msgbase.py | 5 ++--- osm_common/msgkafka.py | 16 +++++++++------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/osm_common/__init__.py b/osm_common/__init__.py index d283da5..f288cb6 100644 --- a/osm_common/__init__.py +++ b/osm_common/__init__.py @@ -15,5 +15,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.1.8' +version = '0.1.9' date_version = '2018-10-09' diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py index 8978085..6e6dec0 100644 --- a/osm_common/msgbase.py +++ b/osm_common/msgbase.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import asyncio from http import HTTPStatus __author__ = "Alfonso Tierno " @@ -56,8 +55,8 @@ class MsgBase(object): def read(self, topic): raise MsgException("Method 'read' not implemented") - async def aiowrite(self, topic, key, msg, loop): + async def aiowrite(self, topic, key, msg, loop=None): raise MsgException("Method 'aiowrite' not implemented") - async def aioread(self, topic, loop): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): raise MsgException("Method 'aioread' not implemented") diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index aa756c4..767fff6 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -20,7 +20,6 @@ from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from osm_common.msgbase import MsgBase, MsgException -# import json __author__ = "Alfonso Tierno , " \ "Guillermo Calvino " @@ -98,14 +97,15 @@ class MsgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic, loop=None, callback=None, *args): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): """ - Asyncio read from one or several topics. It blocks + Asyncio read from one or several topics. It blocks. :param topic: can be str: single topic; or str list: several topics :param loop: asyncio loop - :callback: callback function that will handle the message in kafka bus - :*args: optional arguments for callback function - :return: topic, key, message + :param callback: synchronous callback function that will handle the message in kafka bus + :param aiocallback: async callback function that will handle the message in kafka bus + :param kwargs: optional keyword arguments for callback function + :return: If no callback defined, it returns (topic, key, message) """ if not loop: @@ -122,7 +122,9 @@ class MsgKafka(MsgBase): async for message in self.consumer: if callback: - callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs) + elif aiocallback: + await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs) else: return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: -- 2.25.1