+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
+
+
+@pytest.mark.parametrize(
+ "topic, key, msg",
+ [
+ ("test_topic", "test_key", "test_msg"),
+ ("test", "test_key", "test_msg"),
+ ("test_topic", "test", "test_msg"),
+ ("test_topic", "test_key", "test"),
+ ("test_topic", "test_list", ["a", "b", "c"]),
+ ("test_topic", "test_tuple", ("c", "b", "a")),
+ ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
+ ("test_topic", "test_number", 123),
+ ("test_topic", "test_float", 1.23),
+ ("test_topic", "test_boolean", True),
+ ("test_topic", "test_none", None),
+ ],
+)
+def test_aiowrite(msg_local_config, topic, key, msg):
+ file_path = msg_local_config.path + topic
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
+ assert os.path.exists(file_path)
+
+ with open(file_path, "r") as stream:
+ assert yaml.safe_load(stream) == {
+ key: msg if not isinstance(msg, tuple) else list(msg)
+ }
+
+
+@pytest.mark.parametrize(
+ "topic, key, msg, times",
+ [
+ ("test_topic", "test_key", "test_msg", 2),
+ ("test", "test_key", "test_msg", 3),
+ ("test_topic", "test", "test_msg", 4),
+ ("test_topic", "test_key", "test", 2),
+ ("test_topic", "test_list", ["a", "b", "c"], 3),
+ ("test_topic", "test_tuple", ("c", "b", "a"), 4),
+ ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
+ ("test_topic", "test_number", 123, 3),
+ ("test_topic", "test_float", 1.23, 4),
+ ("test_topic", "test_boolean", True, 2),
+ ("test_topic", "test_none", None, 3),
+ ],
+)
+def test_aiowrite_with_multiple_calls(msg_local_config, topic, key, msg, times):
+ file_path = msg_local_config.path + topic
+ for _ in range(times):
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
+ assert os.path.exists(file_path)
+
+ with open(file_path, "r") as stream:
+ for _ in range(times):
+ data = stream.readline()
+ assert yaml.safe_load(data) == {
+ key: msg if not isinstance(msg, tuple) else list(msg)
+ }
+
+
+def test_aiowrite_exception(msg_local_config):
+ msg_local_config.files_write = MagicMock()
+ msg_local_config.files_write.__contains__.side_effect = Exception()
+
+ with pytest.raises(MsgException) as excinfo:
+ asyncio.run(msg_local_config.aiowrite("test", "test", "test"))
+ assert str(excinfo.value).startswith(empty_exception_message())
+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR