assert os.path.exists(file_path)
with open(file_path, 'r') as stream:
- assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
+ assert yaml.safe_load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
@pytest.mark.parametrize("topic, key, msg, times", [
with open(file_path, 'r') as stream:
for _ in range(times):
data = stream.readline()
- assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
+ assert yaml.safe_load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
def test_write_exception(msg_local_config):
assert os.path.exists(file_path)
with open(file_path, 'r') as stream:
- assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
+ assert yaml.safe_load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
@pytest.mark.parametrize("topic, key, msg, times", [
with open(file_path, 'r') as stream:
for _ in range(times):
data = stream.readline()
- assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
+ assert yaml.safe_load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
def test_aiowrite_exception(msg_local_config, event_loop):