Adds aiocallback in aioread of msgkafka 25/6725/3
authorBenjamin Diaz <bdiaz@whitestack.com>
Thu, 18 Oct 2018 20:55:12 +0000 (17:55 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Mon, 22 Oct 2018 16:49:15 +0000 (13:49 -0300)
aioread can now receive an async function as param,
which will be awaited during the consumer msg loop.
This allows of taking full advantage of asyncio.

Also it now passes de kwargs to the callback, instead
of the args. Without this, all params of aioread had to be
passed positionally if you wanted to give additional args
to the callback.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I2502c0f2f76643660da1ff24eafa62af2450eacb

osm_common/__init__.py
osm_common/msgbase.py
osm_common/msgkafka.py

index d283da5..f288cb6 100644 (file)
@@ -15,5 +15,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-version = '0.1.8'
+version = '0.1.9'
 date_version = '2018-10-09'
index 8978085..6e6dec0 100644 (file)
@@ -15,7 +15,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# import asyncio
 from http import HTTPStatus
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
@@ -56,8 +55,8 @@ class MsgBase(object):
     def read(self, topic):
         raise MsgException("Method 'read' not implemented")
 
-    async def aiowrite(self, topic, key, msg, loop):
+    async def aiowrite(self, topic, key, msg, loop=None):
         raise MsgException("Method 'aiowrite' not implemented")
 
-    async def aioread(self, topic, loop):
+    async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
         raise MsgException("Method 'aioread' not implemented")
index aa756c4..767fff6 100644 (file)
@@ -20,7 +20,6 @@ from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from osm_common.msgbase import MsgBase, MsgException
-# import json
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
@@ -98,14 +97,15 @@ class MsgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None, callback=None, *args):
+    async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
         """
-        Asyncio read from one or several topics. It blocks
+        Asyncio read from one or several topics. It blocks.
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
-        :callback: callback function that will handle the message in kafka bus
-        :*args: optional arguments for callback function
-        :return: topic, key, message
+        :param callback: synchronous callback function that will handle the message in kafka bus
+        :param aiocallback: async callback function that will handle the message in kafka bus
+        :param kwargs: optional keyword arguments for callback function
+        :return: If no callback defined, it returns (topic, key, message)
         """
 
         if not loop:
@@ -122,7 +122,9 @@ class MsgKafka(MsgBase):
 
             async for message in self.consumer:
                 if callback:
-                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+                elif aiocallback:
+                    await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
                 else:
                     return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e: