AWS vimconn: Allow config:flavor_data as str
[osm/RO.git] / lcm / osm_common / msgkafka.py
index 96456af..de1e764 100644 (file)
@@ -15,8 +15,6 @@ class MsgKafka(MsgBase):
         self.port = None
         self.consumer = None
         self.producer = None
-        # create a different file for each topic
-        #self.files = {}
 
     def connect(self, config):
         try:
@@ -24,7 +22,6 @@ class MsgKafka(MsgBase):
                 self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
-            self.topic_lst = []
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
@@ -59,24 +56,32 @@ class MsgKafka(MsgBase):
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, topic, key, msg, loop):
+    async def aiowrite(self, topic, key, msg, loop=None):
+
+        if not loop:
+            loop = self.loop
         try:
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=msg)
+            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
         except Exception as e:
-            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop):
+    async def aioread(self, topic, loop=None, callback=None, *args):
         """
         Asyncio read from one or several topics. It blocks
         :param topic: can be str: single topic; or str list: several topics
         :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
         :return: topic, key, message
         """
+
+        if not loop:
+            loop = self.loop
         try:
             if isinstance(topic, (list, tuple)):
                 topic_list = topic
@@ -86,11 +91,14 @@ class MsgKafka(MsgBase):
             self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
             await self.consumer.start()
             self.consumer.subscribe(topic_list)
+
             async for message in self.consumer:
-                return message.topic, yaml.load(message.key), yaml.load(message.value)
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
-