Adds aiocallback in aioread of msgkafka

aioread can now receive an async function as param,
which will be awaited during the consumer msg loop.
This allows of taking full advantage of asyncio.

Also it now passes de kwargs to the callback, instead
of the args. Without this, all params of aioread had to be
passed positionally if you wanted to give additional args
to the callback.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I2502c0f2f76643660da1ff24eafa62af2450eacb
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
index aa756c4..767fff6 100644
--- a/osm_common/msgkafka.py
+++ b/osm_common/msgkafka.py
@@ -20,7 +20,6 @@
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from osm_common.msgbase import MsgBase, MsgException
-# import json
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
@@ -98,14 +97,15 @@
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None, callback=None, *args):
+    async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
         """
-        Asyncio read from one or several topics. It blocks
+        Asyncio read from one or several topics. It blocks.
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
-        :callback: callback function that will handle the message in kafka bus
-        :*args: optional arguments for callback function
-        :return: topic, key, message
+        :param callback: synchronous callback function that will handle the message in kafka bus
+        :param aiocallback: async callback function that will handle the message in kafka bus
+        :param kwargs: optional keyword arguments for callback function
+        :return: If no callback defined, it returns (topic, key, message)
         """
 
         if not loop:
@@ -122,7 +122,9 @@
 
             async for message in self.consumer:
                 if callback:
-                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+                elif aiocallback:
+                    await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
                 else:
                     return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e: