Methods for managing VIMs, SDNs
[osm/NBI.git] / osm_nbi / msgkafka.py
index 90c9c7f..96456af 100644 (file)
@@ -31,24 +31,36 @@ class MsgKafka(MsgBase):
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
+    def disconnect(self):
+        try:
+            self.loop.close()
+        except Exception as e:  # TODO refine
+            raise MsgException(str(e))
+
     def write(self, topic, key, msg):
         try:
     def write(self, topic, key, msg):
         try:
-            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True)))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
+                                                       loop=self.loop))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
-        #self.topic_lst.append(topic)
+        """
+        Read from one or several topics. it is non blocking returning None if nothing is available
+        :param topic: can be str: single topic; or str list: several topics
+        :return: topic, key, message; or None
+        """
         try:
         try:
-            return self.loop.run_until_complete(self.aioread(topic))
+            return self.loop.run_until_complete(self.aioread(topic, self.loop))
+        except MsgException:
+            raise
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, topic, key, msg, loop=None):
+    async def aiowrite(self, topic, key, msg, loop):
         try:
         try:
-            if not loop:
-                loop = self.loop
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
@@ -58,15 +70,24 @@ class MsgKafka(MsgBase):
         finally:
             await self.producer.stop()
 
         finally:
             await self.producer.stop()
 
-    async def aioread(self, topic, loop=None):
-        if not loop:
-            loop = self.loop
-        self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
-        await self.consumer.start()
-        self.consumer.subscribe([topic])
+    async def aioread(self, topic, loop):
+        """
+        Asyncio read from one or several topics. It blocks
+        :param topic: can be str: single topic; or str list: several topics
+        :param loop: asyncio loop
+        :return: topic, key, message
+        """
         try:
         try:
+            if isinstance(topic, (list, tuple)):
+                topic_list = topic
+            else:
+                topic_list = (topic,)
+
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            await self.consumer.start()
+            self.consumer.subscribe(topic_list)
             async for message in self.consumer:
             async for message in self.consumer:
-                return yaml.load(message.key), yaml.load(message.value)
+                return message.topic, yaml.load(message.key), yaml.load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally:
         except KafkaError as e:
             raise MsgException(str(e))
         finally: