93cd19bd4961b5430b59a236cf1f49856a9742be
[osm/common.git] / osm_common / tests / test_msglocal.py
1 import http
2 import logging
3 import pytest
4 import tempfile
5 import shutil
6 import uuid
7 import os
8 import yaml
9 import time
10 import threading
11
12 from unittest.mock import MagicMock
13 from osm_common.msgbase import MsgException
14 from osm_common.msglocal import MsgLocal
15
16 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
17
18 def valid_path():
19 return tempfile.gettempdir() + '/'
20
21 def invalid_path():
22 return '/#tweeter/'
23
24 @pytest.fixture
25 def msg_local():
26 msg = MsgLocal()
27
28 yield msg
29
30 if msg.path and msg.path != invalid_path() and msg.path != valid_path():
31 msg.disconnect()
32 shutil.rmtree(msg.path)
33
34 @pytest.fixture
35 def msg_local_config():
36 msg = MsgLocal()
37 msg.connect({"path": valid_path() + str(uuid.uuid4())})
38
39 yield msg
40
41 msg.disconnect()
42 if msg.path != invalid_path():
43 shutil.rmtree(msg.path)
44
45 @pytest.fixture
46 def msg_local_with_data():
47 msg = MsgLocal()
48 msg.connect({"path": valid_path() + str(uuid.uuid4())})
49
50 msg.write("topic1", "key1", "msg1")
51 msg.write("topic1", "key2", "msg1")
52 msg.write("topic2", "key1", "msg1")
53 msg.write("topic2", "key2", "msg1")
54 msg.write("topic1", "key1", "msg2")
55 msg.write("topic1", "key2", "msg2")
56 msg.write("topic2", "key1", "msg2")
57 msg.write("topic2", "key2", "msg2")
58
59 yield msg
60
61 msg.disconnect()
62 if msg.path != invalid_path():
63 shutil.rmtree(msg.path)
64
65 def empty_exception_message():
66 return "messaging exception "
67
68 def test_constructor():
69 msg = MsgLocal()
70
71 assert msg.logger == logging.getLogger('msg')
72 assert msg.path == None
73 assert len(msg.files) == 0
74
75 def test_constructor_with_logger():
76 logger_name = 'msg_local'
77
78 msg = MsgLocal(logger_name=logger_name)
79
80 assert msg.logger == logging.getLogger(logger_name)
81 assert msg.path == None
82 assert len(msg.files) == 0
83
84 @pytest.mark.parametrize("config, logger_name, path", [
85 ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
86 ({"logger_name": "msg_local", "path": valid_path()[:-1]}, "msg_local", valid_path()),
87 ({"logger_name": "msg_local", "path": valid_path() + "test_it/"}, "msg_local", valid_path() + "test_it/"),
88 ({"logger_name": "msg_local", "path": valid_path() + "test_it"}, "msg_local", valid_path() + "test_it/"),
89 ({"path": valid_path()}, "msg", valid_path()),
90 ({"path": valid_path()[:-1]}, "msg", valid_path()),
91 ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
92 ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/")])
93 def test_connect(msg_local, config, logger_name, path):
94 msg_local.connect(config)
95
96 assert msg_local.logger == logging.getLogger(logger_name)
97 assert msg_local.path == path
98 assert len(msg_local.files) == 0
99
100 @pytest.mark.parametrize("config", [
101 ({"logger_name": "msg_local", "path": invalid_path()}),
102 ({"path": invalid_path()})])
103 def test_connect_with_exception(msg_local, config):
104 with pytest.raises(MsgException) as excinfo:
105 msg_local.connect(config)
106 assert str(excinfo.value).startswith(empty_exception_message())
107 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
108
109 def test_disconnect():
110 pass
111
112 @pytest.mark.parametrize("topic, key, msg", [
113 ("test_topic", "test_key", "test_msg"),
114 ("test", "test_key", "test_msg"),
115 ("test_topic", "test", "test_msg"),
116 ("test_topic", "test_key", "test"),
117 ("test_topic", "test_list", ["a", "b", "c"]),
118 ("test_topic", "test_tuple", ("c", "b", "a")),
119 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
120 ("test_topic", "test_number", 123),
121 ("test_topic", "test_float", 1.23),
122 ("test_topic", "test_boolean", True),
123 ("test_topic", "test_none", None)])
124 def test_write(msg_local_config, topic, key, msg):
125 file_path = msg_local_config.path + topic
126
127 msg_local_config.write(topic, key, msg)
128
129 assert os.path.exists(file_path)
130
131 with open(file_path, 'r') as stream:
132 assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
133
134 @pytest.mark.parametrize("topic, key, msg, times", [
135 ("test_topic", "test_key", "test_msg", 2),
136 ("test", "test_key", "test_msg", 3),
137 ("test_topic", "test", "test_msg", 4),
138 ("test_topic", "test_key", "test", 2),
139 ("test_topic", "test_list", ["a", "b", "c"], 3),
140 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
141 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
142 ("test_topic", "test_number", 123, 3),
143 ("test_topic", "test_float", 1.23, 4),
144 ("test_topic", "test_boolean", True, 2),
145 ("test_topic", "test_none", None, 3)])
146 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
147 file_path = msg_local_config.path + topic
148
149 for _ in range(times):
150 msg_local_config.write(topic, key, msg)
151
152 assert os.path.exists(file_path)
153
154 with open(file_path, 'r') as stream:
155 for _ in range(times):
156 data = stream.readline()
157 assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
158
159 def test_write_exception(msg_local_config):
160 msg_local_config.files = MagicMock()
161 msg_local_config.files.__contains__.side_effect = Exception()
162
163 with pytest.raises(MsgException) as excinfo:
164 msg_local_config.write("test", "test", "test")
165 assert str(excinfo.value).startswith(empty_exception_message())
166 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
167
168 @pytest.mark.parametrize("topics, datas", [
169 (["topic"], [{"key": "value"}]),
170 (["topic1"], [{"key": "value"}]),
171 (["topic2"], [{"key": "value"}]),
172 (["topic", "topic1"], [{"key": "value"}]),
173 (["topic", "topic2"], [{"key": "value"}]),
174 (["topic1", "topic2"], [{"key": "value"}]),
175 (["topic", "topic1", "topic2"], [{"key": "value"}]),
176 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
177 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
178 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
179 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
180 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
181 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
182 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
183 def test_read(msg_local_with_data, topics, datas):
184 def write_to_topic(topics, datas):
185 time.sleep(2)
186 for topic in topics:
187 for data in datas:
188 with open(msg_local_with_data.path + topic, "a+") as fp:
189 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
190 fp.flush()
191
192 # If file is not opened first, the messages written won't be seen
193 for topic in topics:
194 if topic not in msg_local_with_data.files:
195 msg_local_with_data.read(topic, blocks=False)
196
197 t = threading.Thread(target=write_to_topic, args=(topics, datas))
198 t.start()
199
200 for topic in topics:
201 for data in datas:
202 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
203
204 key = list(data.keys())[0]
205 val = data[key]
206
207 assert recv_topic == topic
208 assert recv_key == key
209 assert recv_msg == val
210
211 t.join()
212
213 @pytest.mark.parametrize("topics, datas", [
214 (["topic"], [{"key": "value"}]),
215 (["topic1"], [{"key": "value"}]),
216 (["topic2"], [{"key": "value"}]),
217 (["topic", "topic1"], [{"key": "value"}]),
218 (["topic", "topic2"], [{"key": "value"}]),
219 (["topic1", "topic2"], [{"key": "value"}]),
220 (["topic", "topic1", "topic2"], [{"key": "value"}]),
221 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
222 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
223 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
224 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
225 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
226 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
227 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
228 def test_read_non_block(msg_local_with_data, topics, datas):
229 def write_to_topic(topics, datas):
230 for topic in topics:
231 for data in datas:
232 with open(msg_local_with_data.path + topic, "a+") as fp:
233 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
234 fp.flush()
235
236 # If file is not opened first, the messages written won't be seen
237 for topic in topics:
238 if topic not in msg_local_with_data.files:
239 msg_local_with_data.read(topic, blocks=False)
240
241 t = threading.Thread(target=write_to_topic, args=(topics, datas))
242 t.start()
243
244 time.sleep(2)
245
246 for topic in topics:
247 for data in datas:
248 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic, blocks=False)
249
250 key = list(data.keys())[0]
251 val = data[key]
252
253 assert recv_topic == topic
254 assert recv_key == key
255 assert recv_msg == val
256
257 t.join()
258
259 @pytest.mark.parametrize("topics, datas", [
260 (["topic"], [{"key": "value"}]),
261 (["topic1"], [{"key": "value"}]),
262 (["topic2"], [{"key": "value"}]),
263 (["topic", "topic1"], [{"key": "value"}]),
264 (["topic", "topic2"], [{"key": "value"}]),
265 (["topic1", "topic2"], [{"key": "value"}]),
266 (["topic", "topic1", "topic2"], [{"key": "value"}]),
267 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
268 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
269 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
270 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
271 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
272 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
273 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
274 def test_read_non_block_none(msg_local_with_data, topics, datas):
275 def write_to_topic(topics, datas):
276 time.sleep(2)
277 for topic in topics:
278 for data in datas:
279 with open(msg_local_with_data.path + topic, "a+") as fp:
280 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
281 fp.flush()
282
283 # If file is not opened first, the messages written won't be seen
284 for topic in topics:
285 if topic not in msg_local_with_data.files:
286 msg_local_with_data.read(topic, blocks=False)
287
288 t = threading.Thread(target=write_to_topic, args=(topics, datas))
289 t.start()
290
291 for topic in topics:
292 recv_data = msg_local_with_data.read(topic, blocks=False)
293
294 assert recv_data == None
295
296 t.join()
297
298 @pytest.mark.parametrize("blocks", [
299 (True),
300 (False)])
301 def test_read_exception(msg_local_with_data, blocks):
302 msg_local_with_data.files = MagicMock()
303 msg_local_with_data.files.__contains__.side_effect = Exception()
304
305 with pytest.raises(MsgException) as excinfo:
306 msg_local_with_data.read("topic1", blocks=blocks)
307 assert str(excinfo.value).startswith(empty_exception_message())
308 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
309
310 @pytest.mark.parametrize("topics, datas", [
311 (["topic"], [{"key": "value"}]),
312 (["topic1"], [{"key": "value"}]),
313 (["topic2"], [{"key": "value"}]),
314 (["topic", "topic1"], [{"key": "value"}]),
315 (["topic", "topic2"], [{"key": "value"}]),
316 (["topic1", "topic2"], [{"key": "value"}]),
317 (["topic", "topic1", "topic2"], [{"key": "value"}]),
318 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
319 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
320 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
321 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
322 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
323 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
324 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
325 def test_aioread(msg_local_with_data, event_loop, topics, datas):
326 def write_to_topic(topics, datas):
327 time.sleep(2)
328 for topic in topics:
329 for data in datas:
330 with open(msg_local_with_data.path + topic, "a+") as fp:
331 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
332 fp.flush()
333
334 # If file is not opened first, the messages written won't be seen
335 for topic in topics:
336 if topic not in msg_local_with_data.files:
337 msg_local_with_data.read(topic, blocks=False)
338
339 t = threading.Thread(target=write_to_topic, args=(topics, datas))
340 t.start()
341
342 for topic in topics:
343 for data in datas:
344 recv = event_loop.run_until_complete(msg_local_with_data.aioread(topic, event_loop))
345 recv_topic, recv_key, recv_msg = recv
346
347 key = list(data.keys())[0]
348 val = data[key]
349
350 assert recv_topic == topic
351 assert recv_key == key
352 assert recv_msg == val
353
354 t.join()
355
356 def test_aioread_exception(msg_local_with_data, event_loop):
357 msg_local_with_data.files = MagicMock()
358 msg_local_with_data.files.__contains__.side_effect = Exception()
359
360 with pytest.raises(MsgException) as excinfo:
361 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
362 assert str(excinfo.value).startswith(empty_exception_message())
363 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
364
365 def test_aioread_general_exception(msg_local_with_data, event_loop):
366 msg_local_with_data.read = MagicMock()
367 msg_local_with_data.read.side_effect = Exception()
368
369 with pytest.raises(MsgException) as excinfo:
370 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
371 assert str(excinfo.value).startswith(empty_exception_message())
372 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR