fixing imports to get ready for module distribution.
fixing errors detected with flake8
Change-Id: Ib99df43fd5f86ccbb3e35111972a2075e7a956dd
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
index c819c81..71e4948 100644
--- a/osm_common/msgkafka.py
+++ b/osm_common/msgkafka.py
@@ -4,11 +4,13 @@
from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
+from osm_common.msgbase import MsgBase, MsgException
+# import json
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
"Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+
+
class MsgKafka(MsgBase):
def __init__(self, logger_name='msg'):
self.logger = logging.getLogger(logger_name)
@@ -67,9 +69,9 @@
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
- await self.producer.send(topic=topic, key=key, value=msg)
+ await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
except Exception as e:
- raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+ raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
finally:
await self.producer.stop()
@@ -104,4 +106,3 @@
raise MsgException(str(e))
finally:
await self.consumer.stop()
-