:param msg: message content, can be string or dictionary
:return: None or raises MsgException on failing
"""
- try:
- self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
-
- except Exception as e:
- raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+ retry = 2 # Try two times
+ while retry:
+ try:
+ self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
+ break
+ except Exception as e:
+ retry -= 1
+ if retry == 0:
+ raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
"""
async for message in self.consumer:
if callback:
- callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+ callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
elif aiocallback:
- await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+ await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
+ **kwargs)
else:
- return message.topic, yaml.load(message.key), yaml.load(message.value)
+ return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
except KafkaError as e:
raise MsgException(str(e))
finally: