lightweight kafka support with callback
[osm/RO.git] / lcm / osm_common / msgkafka.py
index f350745..459513d 100644 (file)
@@ -1,49 +1,66 @@
+import logging
+import asyncio
+import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
-import asyncio
-import yaml
 #import json
 
 #import json
 
+
 class MsgKafka(MsgBase):
 class MsgKafka(MsgBase):
-    def __init__(self):
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
-        # create a different file for each topic
-        #self.files = {}
 
     def connect(self, config):
         try:
 
     def connect(self, config):
         try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
             self.host = config["host"]
             self.port = config["port"]
-            self.topic_lst = []
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
+    def disconnect(self):
+        try:
+            self.loop.close()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
     def write(self, topic, key, msg):
         try:
     def write(self, topic, key, msg):
         try:
-            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True)))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
+                                                       loop=self.loop))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
-        #self.topic_lst.append(topic)
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :return: topic, key, message; or None
+        """
         try:
         try:
-            return self.loop.run_until_complete(self.aioread(topic))
+            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+        except MsgException:
+            raise
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, topic, key, msg, loop=None):
+    async def aiowrite(self, topic, key, msg, loop):
+
+        if not loop:
+            loop = self.loop
         try:
         try:
-            if not loop:
-                loop = self.loop
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
@@ -53,18 +70,35 @@ class MsgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None):
+    async def aioread(self, topic, loop=None, callback=None, *args):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
+        :return: topic, key, message
+        """
+
         if not loop:
             loop = self.loop
         if not loop:
             loop = self.loop
-        self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
-        await self.consumer.start()
-        self.consumer.subscribe([topic])
         try:
         try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic,)
+
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            await self.consumer.start()
+            self.consumer.subscribe(topic_list)
+
             async for message in self.consumer:
             async for message in self.consumer:
-                return yaml.load(message.key), yaml.load(message.value)
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
-