fixing imports to get ready for module distribution.
[osm/common.git] / osm_common / msgkafka.py
index c819c81..71e4948 100644 (file)
@@ -4,11 +4,13 @@ import yaml
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
 from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
+from osm_common.msgbase import MsgBase, MsgException
+# import json
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
              "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+
+
 class MsgKafka(MsgBase):
     def __init__(self, logger_name='msg'):
         self.logger = logging.getLogger(logger_name)
@@ -67,9 +69,9 @@ class MsgKafka(MsgBase):
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=msg)
+            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
         except Exception as e:
-            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
         finally:
             await self.producer.stop()
 
@@ -104,4 +106,3 @@ class MsgKafka(MsgBase):
             raise MsgException(str(e))
         finally:
             await self.consumer.stop()
-