From: Benjamin Diaz Date: Thu, 18 Oct 2018 20:55:12 +0000 (-0300) Subject: Adds aiocallback in aioread of msgkafka X-Git-Tag: v5.0.0~10 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=48b78e112662d927cad376d739e43fb94f108ff3;p=osm%2Fcommon.git Adds aiocallback in aioread of msgkafka aioread can now receive an async function as param, which will be awaited during the consumer msg loop. This allows of taking full advantage of asyncio. Also it now passes de kwargs to the callback, instead of the args. Without this, all params of aioread had to be passed positionally if you wanted to give additional args to the callback. Signed-off-by: Benjamin Diaz Change-Id: I2502c0f2f76643660da1ff24eafa62af2450eacb --- diff --git a/osm_common/__init__.py b/osm_common/__init__.py index d283da5..f288cb6 100644 --- a/osm_common/__init__.py +++ b/osm_common/__init__.py @@ -15,5 +15,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.1.8' +version = '0.1.9' date_version = '2018-10-09' diff --git a/osm_common/msgbase.py b/osm_common/msgbase.py index 8978085..6e6dec0 100644 --- a/osm_common/msgbase.py +++ b/osm_common/msgbase.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import asyncio from http import HTTPStatus __author__ = "Alfonso Tierno " @@ -56,8 +55,8 @@ class MsgBase(object): def read(self, topic): raise MsgException("Method 'read' not implemented") - async def aiowrite(self, topic, key, msg, loop): + async def aiowrite(self, topic, key, msg, loop=None): raise MsgException("Method 'aiowrite' not implemented") - async def aioread(self, topic, loop): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): raise MsgException("Method 'aioread' not implemented") diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index aa756c4..767fff6 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -20,7 +20,6 @@ from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaError from osm_common.msgbase import MsgBase, MsgException -# import json __author__ = "Alfonso Tierno , " \ "Guillermo Calvino " @@ -98,14 +97,15 @@ class MsgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic, loop=None, callback=None, *args): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): """ - Asyncio read from one or several topics. It blocks + Asyncio read from one or several topics. It blocks. :param topic: can be str: single topic; or str list: several topics :param loop: asyncio loop - :callback: callback function that will handle the message in kafka bus - :*args: optional arguments for callback function - :return: topic, key, message + :param callback: synchronous callback function that will handle the message in kafka bus + :param aiocallback: async callback function that will handle the message in kafka bus + :param kwargs: optional keyword arguments for callback function + :return: If no callback defined, it returns (topic, key, message) """ if not loop: @@ -122,7 +122,9 @@ class MsgKafka(MsgBase): async for message in self.consumer: if callback: - callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs) + elif aiocallback: + await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs) else: return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: