database get filtering enhancement for arrays
[osm/common.git] / osm_common / msgkafka.py
index c819c81..2d82f97 100644 (file)
@@ -4,11 +4,13 @@ import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
+from osm_common.msgbase import MsgBase, MsgException
+# import json
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+
+
 class MsgKafka(MsgBase):
     def __init__(self, logger_name='msg'):
         self.logger = logging.getLogger(logger_name)
@@ -33,22 +35,28 @@ class MsgKafka(MsgBase):
 
     def disconnect(self):
         try:
-            self.loop.close()
+            pass
+            # self.loop.close()
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
 
     def write(self, topic, key, msg):
+        """
+        Write a message at kafka bus
+        :param topic: message topic, must be string
+        :param key: message key, must be string
+        :param msg: message content, can be string or dictionary
+        :return: None or raises MsgException on failing
+        """
         try:
-            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
-                                                       msg=yaml.safe_dump(msg, default_flow_style=True),
-                                                       loop=self.loop))
+            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
 
         except Exception as e:
             raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
         """
-        Read from one or several topics. it is non blocking returning None if nothing is available
+        Read from one or several topics.
         :param topic: can be str: single topic; or str list: several topics
         :return: topic, key, message; or None
         """
@@ -67,9 +75,9 @@ class MsgKafka(MsgBase):
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=msg)
+            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
         except Exception as e:
-            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
         finally:
             await self.producer.stop()
 
@@ -104,4 +112,3 @@ class MsgKafka(MsgBase):
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
-