# For those usages not covered by the Apache License, Version 2.0 please
# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
##
-
+import asyncio
import http
import logging
-import pytest
-import tempfile
-import shutil
-import uuid
import os
-import yaml
-import time
+import shutil
+import tempfile
import threading
-
+import time
from unittest.mock import MagicMock
+import uuid
+
from osm_common.msgbase import MsgException
from osm_common.msglocal import MsgLocal
+import pytest
+import yaml
__author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
(["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
],
)
-def test_aioread(msg_local_with_data, event_loop, topics, datas):
+def test_aioread(msg_local_with_data, topics, datas):
def write_to_topic(topics, datas):
time.sleep(2)
for topic in topics:
t.start()
for topic in topics:
for data in datas:
- recv = event_loop.run_until_complete(
- msg_local_with_data.aioread(topic, event_loop)
- )
+ recv = asyncio.run(msg_local_with_data.aioread(topic))
recv_topic, recv_key, recv_msg = recv
key = list(data.keys())[0]
val = data[key]
t.join()
-def test_aioread_exception(msg_local_with_data, event_loop):
+def test_aioread_exception(msg_local_with_data):
msg_local_with_data.files_read = MagicMock()
msg_local_with_data.files_read.__contains__.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
+ asyncio.run(msg_local_with_data.aioread("topic1"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
-def test_aioread_general_exception(msg_local_with_data, event_loop):
+def test_aioread_general_exception(msg_local_with_data):
msg_local_with_data.read = MagicMock()
msg_local_with_data.read.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
+ asyncio.run(msg_local_with_data.aioread("topic1"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
("test_topic", "test_none", None),
],
)
-def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
+def test_aiowrite(msg_local_config, topic, key, msg):
file_path = msg_local_config.path + topic
- event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
assert os.path.exists(file_path)
with open(file_path, "r") as stream:
("test_topic", "test_none", None, 3),
],
)
-def test_aiowrite_with_multiple_calls(
- msg_local_config, event_loop, topic, key, msg, times
-):
+def test_aiowrite_with_multiple_calls(msg_local_config, topic, key, msg, times):
file_path = msg_local_config.path + topic
for _ in range(times):
- event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
+ asyncio.run(msg_local_config.aiowrite(topic, key, msg))
assert os.path.exists(file_path)
with open(file_path, "r") as stream:
}
-def test_aiowrite_exception(msg_local_config, event_loop):
+def test_aiowrite_exception(msg_local_config):
msg_local_config.files_write = MagicMock()
msg_local_config.files_write.__contains__.side_effect = Exception()
with pytest.raises(MsgException) as excinfo:
- event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
+ asyncio.run(msg_local_config.aiowrite("test", "test", "test"))
assert str(excinfo.value).startswith(empty_exception_message())
assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR