lightweight kafka support with callback 39/5839/5
authorgcalvino <guillermo.calvinosanchez@altran.com>
Mon, 5 Feb 2018 12:03:05 +0000 (13:03 +0100)
committergcalvino <guillermo.calvinosanchez@altran.com>
Fri, 13 Apr 2018 09:40:47 +0000 (11:40 +0200)
Change-Id: I9efcdb6bcdca5a1bc91cd4f1288ef399d88622af
Signed-off-by: gcalvino <guillermo.calvinosanchez@altran.com>
lcm/osm_common/msgkafka.py

index 96456af..459513d 100644 (file)
@@ -15,8 +15,6 @@ class MsgKafka(MsgBase):
         self.port = None
         self.consumer = None
         self.producer = None
-        # create a different file for each topic
-        #self.files = {}
 
     def connect(self, config):
         try:
@@ -24,7 +22,6 @@ class MsgKafka(MsgBase):
                 self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
-            self.topic_lst = []
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
@@ -60,6 +57,9 @@ class MsgKafka(MsgBase):
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
     async def aiowrite(self, topic, key, msg, loop):
+
+        if not loop:
+            loop = self.loop
         try:
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
@@ -70,13 +70,18 @@ class MsgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop):
+    async def aioread(self, topic, loop=None, callback=None, *args):
         """
         Asyncio read from one or several topics. It blocks
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
         :return: topic, key, message
         """
+
+        if not loop:
+            loop = self.loop
         try:
             if isinstance(topic, (list, tuple)):
                 topic_list = topic
@@ -86,11 +91,14 @@ class MsgKafka(MsgBase):
             self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
             await self.consumer.start()
             self.consumer.subscribe(topic_list)
+
             async for message in self.consumer:
-                return message.topic, yaml.load(message.key), yaml.load(message.value)
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
-