From 70459aa6c102da343154b7838877b14bdd0be3ce Mon Sep 17 00:00:00 2001 From: gcalvino Date: Mon, 5 Feb 2018 13:03:05 +0100 Subject: [PATCH] lightweight kafka support with callback Change-Id: I9efcdb6bcdca5a1bc91cd4f1288ef399d88622af Signed-off-by: gcalvino --- lcm/osm_common/msgkafka.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 96456afc..459513d8 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -15,8 +15,6 @@ class MsgKafka(MsgBase): self.port = None self.consumer = None self.producer = None - # create a different file for each topic - #self.files = {} def connect(self, config): try: @@ -24,7 +22,6 @@ class MsgKafka(MsgBase): self.logger = logging.getLogger(config["logger_name"]) self.host = config["host"] self.port = config["port"] - self.topic_lst = [] self.loop = asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) @@ -60,6 +57,9 @@ class MsgKafka(MsgBase): raise MsgException("Error reading {} topic: {}".format(topic, str(e))) async def aiowrite(self, topic, key, msg, loop): + + if not loop: + loop = self.loop try: self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) @@ -70,13 +70,18 @@ class MsgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic, loop): + async def aioread(self, topic, loop=None, callback=None, *args): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics :param loop: asyncio loop + :callback: callback function that will handle the message in kafka bus + :*args: optional arguments for callback function :return: topic, key, message """ + + if not loop: + loop = self.loop try: if isinstance(topic, (list, tuple)): topic_list = topic @@ -86,11 +91,14 @@ class MsgKafka(MsgBase): self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) await self.consumer.start() self.consumer.subscribe(topic_list) + async for message in self.consumer: - return message.topic, yaml.load(message.key), yaml.load(message.value) + if callback: + callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args) + else: + return message.topic, yaml.load(message.key), yaml.load(message.value) except KafkaError as e: raise MsgException(str(e)) finally: await self.consumer.stop() - -- 2.17.1