async for message in self.consumer:
if callback:
- callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+ callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
elif aiocallback:
- await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+ await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
+ **kwargs)
else:
- return message.topic, yaml.load(message.key), yaml.load(message.value)
+ return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
except KafkaError as e:
raise MsgException(str(e))
finally: