Fix multi-vdu workflow w/o charm
[osm/RO.git] / lcm / osm_common / msgkafka.py
index 7b294fc..459513d 100644 (file)
@@ -1,49 +1,67 @@
+import logging
+import asyncio
+import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
 from msgbase import MsgBase, MsgException
-import asyncio
-import yaml
 #import json
 
 #import json
 
-class msgKafka(MsgBase):
-    def __init__(self):
+
+class MsgKafka(MsgBase):
+    def __init__(self, logger_name='msg'):
+        self.logger = logging.getLogger(logger_name)
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
         self.host = None
         self.port = None
         self.consumer = None
         self.producer = None
-        # create a different file for each topic
-        #self.files = {}
 
     def connect(self, config):
         try:
 
     def connect(self, config):
         try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
             self.host = config["host"]
             self.port = config["port"]
             self.host = config["host"]
             self.port = config["port"]
-            self.topic_lst = []
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
-    def write(self, topic, msg, key):
+    def disconnect(self):
+        try:
+            self.loop.close()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
 
 
+    def write(self, topic, key, msg):
         try:
         try:
-            self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
+                                                       loop=self.loop))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
-        self.topic_lst.append(topic)
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :return: topic, key, message; or None
+        """
         try:
         try:
-            return self.loop.run_until_complete(self.aioread(self.topic_lst))
+            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+        except MsgException:
+            raise
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, key, msg, topic):
+    async def aiowrite(self, topic, key, msg, loop):
+
+        if not loop:
+            loop = self.loop
         try:
         try:
-            self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode,
+            self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             await self.producer.send(topic=topic, key=key, value=msg)
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             await self.producer.send(topic=topic, key=key, value=msg)
@@ -52,16 +70,35 @@ class msgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic):
-        self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker)
-        await self.consumer.start()
-        self.consumer.subscribe(topic)
+    async def aioread(self, topic, loop=None, callback=None, *args):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :callback: callback function that will handle the message in kafka bus
+        :*args: optional arguments for callback function
+        :return: topic, key, message
+        """
+
+        if not loop:
+            loop = self.loop
         try:
         try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic,)
+
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            await self.consumer.start()
+            self.consumer.subscribe(topic_list)
+
             async for message in self.consumer:
             async for message in self.consumer:
-                return yaml.load(message.key), yaml.load(message.value)
+                if callback:
+                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
+                else:
+                    return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
 
-