except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
- async def aiowrite(self, topic, key, msg, loop):
+ async def aiowrite(self, topic, key, msg, loop=None):
if not loop:
loop = self.loop
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
- await self.producer.send(topic=topic, key=key, value=msg)
+ await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
except Exception as e:
- raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+ raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
finally:
await self.producer.stop()