# See the License for the specific language governing permissions and
# limitations under the License.
-# import asyncio
from http import HTTPStatus
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
def read(self, topic):
raise MsgException("Method 'read' not implemented")
- async def aiowrite(self, topic, key, msg, loop):
+ async def aiowrite(self, topic, key, msg, loop=None):
raise MsgException("Method 'aiowrite' not implemented")
- async def aioread(self, topic, loop):
+ async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
raise MsgException("Method 'aioread' not implemented")
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
from osm_common.msgbase import MsgBase, MsgException
-# import json
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
"Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
finally:
await self.producer.stop()
- async def aioread(self, topic, loop=None, callback=None, *args):
+ async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
"""
- Asyncio read from one or several topics. It blocks
+ Asyncio read from one or several topics. It blocks.
:param topic: can be str: single topic; or str list: several topics
:param loop: asyncio loop
- :callback: callback function that will handle the message in kafka bus
- :*args: optional arguments for callback function
- :return: topic, key, message
+ :param callback: synchronous callback function that will handle the message in kafka bus
+ :param aiocallback: async callback function that will handle the message in kafka bus
+ :param kwargs: optional keyword arguments for callback function
+ :return: If no callback defined, it returns (topic, key, message)
"""
if not loop:
async for message in self.consumer:
if callback:
- callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+ callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+ elif aiocallback:
+ await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
else:
return message.topic, yaml.load(message.key), yaml.load(message.value)
except KafkaError as e: