aef6fd631bedd8deb1713f774f4d164ea354581b
[osm/common.git] / osm_common / tests / test_msglocal.py
1 import http
2 import logging
3 import pytest
4 import tempfile
5 import shutil
6 import uuid
7 import os
8 import yaml
9 import time
10 import threading
11
12 from unittest.mock import MagicMock
13 from osm_common.msgbase import MsgException
14 from osm_common.msglocal import MsgLocal
15
16 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
17
18 def valid_path():
19 return tempfile.gettempdir() + '/'
20
21 def invalid_path():
22 return '/#tweeter/'
23
24 @pytest.fixture
25 def msg_local():
26 msg = MsgLocal()
27
28 yield msg
29
30 if msg.path and msg.path != invalid_path() and msg.path != valid_path():
31 msg.disconnect()
32 shutil.rmtree(msg.path)
33
34 @pytest.fixture
35 def msg_local_config():
36 msg = MsgLocal()
37 msg.connect({"path": valid_path() + str(uuid.uuid4())})
38
39 yield msg
40
41 msg.disconnect()
42 if msg.path != invalid_path():
43 shutil.rmtree(msg.path)
44
45 @pytest.fixture
46 def msg_local_with_data():
47 msg = MsgLocal()
48 msg.connect({"path": valid_path() + str(uuid.uuid4())})
49
50 msg.write("topic1", "key1", "msg1")
51 msg.write("topic1", "key2", "msg1")
52 msg.write("topic2", "key1", "msg1")
53 msg.write("topic2", "key2", "msg1")
54 msg.write("topic1", "key1", "msg2")
55 msg.write("topic1", "key2", "msg2")
56 msg.write("topic2", "key1", "msg2")
57 msg.write("topic2", "key2", "msg2")
58
59 yield msg
60
61 msg.disconnect()
62 if msg.path != invalid_path():
63 shutil.rmtree(msg.path)
64
65 def empty_exception_message():
66 return "messaging exception "
67
68 def test_constructor():
69 msg = MsgLocal()
70
71 assert msg.logger == logging.getLogger('msg')
72 assert msg.path == None
73 assert len(msg.files_read) == 0
74 assert len(msg.files_write) == 0
75 assert len(msg.buffer) == 0
76
77 def test_constructor_with_logger():
78 logger_name = 'msg_local'
79
80 msg = MsgLocal(logger_name=logger_name)
81
82 assert msg.logger == logging.getLogger(logger_name)
83 assert msg.path == None
84 assert len(msg.files_read) == 0
85 assert len(msg.files_write) == 0
86 assert len(msg.buffer) == 0
87
88 @pytest.mark.parametrize("config, logger_name, path", [
89 ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
90 ({"logger_name": "msg_local", "path": valid_path()[:-1]}, "msg_local", valid_path()),
91 ({"logger_name": "msg_local", "path": valid_path() + "test_it/"}, "msg_local", valid_path() + "test_it/"),
92 ({"logger_name": "msg_local", "path": valid_path() + "test_it"}, "msg_local", valid_path() + "test_it/"),
93 ({"path": valid_path()}, "msg", valid_path()),
94 ({"path": valid_path()[:-1]}, "msg", valid_path()),
95 ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
96 ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/")])
97 def test_connect(msg_local, config, logger_name, path):
98 msg_local.connect(config)
99
100 assert msg_local.logger == logging.getLogger(logger_name)
101 assert msg_local.path == path
102 assert len(msg_local.files_read) == 0
103 assert len(msg_local.files_write) == 0
104 assert len(msg_local.buffer) == 0
105
106 @pytest.mark.parametrize("config", [
107 ({"logger_name": "msg_local", "path": invalid_path()}),
108 ({"path": invalid_path()})])
109 def test_connect_with_exception(msg_local, config):
110 with pytest.raises(MsgException) as excinfo:
111 msg_local.connect(config)
112 assert str(excinfo.value).startswith(empty_exception_message())
113 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
114
115 def test_disconnect(msg_local_config):
116 msg_local_config.disconnect()
117
118 for f in msg_local_config.files_read.values():
119 assert f.closed
120
121 for f in msg_local_config.files_write.values():
122 assert f.closed
123
124 def test_disconnect_with_read(msg_local_config):
125 msg_local_config.read('topic1', blocks=False)
126 msg_local_config.read('topic2', blocks=False)
127
128 msg_local_config.disconnect()
129
130 for f in msg_local_config.files_read.values():
131 assert f.closed
132
133 for f in msg_local_config.files_write.values():
134 assert f.closed
135
136 def test_disconnect_with_write(msg_local_with_data):
137 msg_local_with_data.disconnect()
138
139 for f in msg_local_with_data.files_read.values():
140 assert f.closed
141
142 for f in msg_local_with_data.files_write.values():
143 assert f.closed
144
145 def test_disconnect_with_read_and_write(msg_local_with_data):
146 msg_local_with_data.read('topic1', blocks=False)
147 msg_local_with_data.read('topic2', blocks=False)
148
149 msg_local_with_data.disconnect()
150
151 for f in msg_local_with_data.files_read.values():
152 assert f.closed
153
154 for f in msg_local_with_data.files_write.values():
155 assert f.closed
156
157 @pytest.mark.parametrize("topic, key, msg", [
158 ("test_topic", "test_key", "test_msg"),
159 ("test", "test_key", "test_msg"),
160 ("test_topic", "test", "test_msg"),
161 ("test_topic", "test_key", "test"),
162 ("test_topic", "test_list", ["a", "b", "c"]),
163 ("test_topic", "test_tuple", ("c", "b", "a")),
164 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
165 ("test_topic", "test_number", 123),
166 ("test_topic", "test_float", 1.23),
167 ("test_topic", "test_boolean", True),
168 ("test_topic", "test_none", None)])
169 def test_write(msg_local_config, topic, key, msg):
170 file_path = msg_local_config.path + topic
171
172 msg_local_config.write(topic, key, msg)
173
174 assert os.path.exists(file_path)
175
176 with open(file_path, 'r') as stream:
177 assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
178
179 @pytest.mark.parametrize("topic, key, msg, times", [
180 ("test_topic", "test_key", "test_msg", 2),
181 ("test", "test_key", "test_msg", 3),
182 ("test_topic", "test", "test_msg", 4),
183 ("test_topic", "test_key", "test", 2),
184 ("test_topic", "test_list", ["a", "b", "c"], 3),
185 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
186 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
187 ("test_topic", "test_number", 123, 3),
188 ("test_topic", "test_float", 1.23, 4),
189 ("test_topic", "test_boolean", True, 2),
190 ("test_topic", "test_none", None, 3)])
191 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
192 file_path = msg_local_config.path + topic
193
194 for _ in range(times):
195 msg_local_config.write(topic, key, msg)
196
197 assert os.path.exists(file_path)
198
199 with open(file_path, 'r') as stream:
200 for _ in range(times):
201 data = stream.readline()
202 assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
203
204 def test_write_exception(msg_local_config):
205 msg_local_config.files_write = MagicMock()
206 msg_local_config.files_write.__contains__.side_effect = Exception()
207
208 with pytest.raises(MsgException) as excinfo:
209 msg_local_config.write("test", "test", "test")
210 assert str(excinfo.value).startswith(empty_exception_message())
211 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
212
213 @pytest.mark.parametrize("topics, datas", [
214 (["topic"], [{"key": "value"}]),
215 (["topic1"], [{"key": "value"}]),
216 (["topic2"], [{"key": "value"}]),
217 (["topic", "topic1"], [{"key": "value"}]),
218 (["topic", "topic2"], [{"key": "value"}]),
219 (["topic1", "topic2"], [{"key": "value"}]),
220 (["topic", "topic1", "topic2"], [{"key": "value"}]),
221 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
222 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
223 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
224 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
225 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
226 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
227 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
228 def test_read(msg_local_with_data, topics, datas):
229 def write_to_topic(topics, datas):
230 # Allow msglocal to block while waiting
231 time.sleep(2)
232 for topic in topics:
233 for data in datas:
234 with open(msg_local_with_data.path + topic, "a+") as fp:
235 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
236 fp.flush()
237
238 # If file is not opened first, the messages written won't be seen
239 for topic in topics:
240 if topic not in msg_local_with_data.files_read:
241 msg_local_with_data.read(topic, blocks=False)
242
243 t = threading.Thread(target=write_to_topic, args=(topics, datas))
244 t.start()
245
246 for topic in topics:
247 for data in datas:
248 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
249
250 key = list(data.keys())[0]
251 val = data[key]
252
253 assert recv_topic == topic
254 assert recv_key == key
255 assert recv_msg == val
256
257 t.join()
258
259 @pytest.mark.parametrize("topics, datas", [
260 (["topic"], [{"key": "value"}]),
261 (["topic1"], [{"key": "value"}]),
262 (["topic2"], [{"key": "value"}]),
263 (["topic", "topic1"], [{"key": "value"}]),
264 (["topic", "topic2"], [{"key": "value"}]),
265 (["topic1", "topic2"], [{"key": "value"}]),
266 (["topic", "topic1", "topic2"], [{"key": "value"}]),
267 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
268 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
269 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
270 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
271 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
272 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
273 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
274 def test_read_non_block(msg_local_with_data, topics, datas):
275 def write_to_topic(topics, datas):
276 for topic in topics:
277 for data in datas:
278 with open(msg_local_with_data.path + topic, "a+") as fp:
279 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
280 fp.flush()
281
282 # If file is not opened first, the messages written won't be seen
283 for topic in topics:
284 if topic not in msg_local_with_data.files_read:
285 msg_local_with_data.read(topic, blocks=False)
286
287 t = threading.Thread(target=write_to_topic, args=(topics, datas))
288 t.start()
289 t.join()
290
291 for topic in topics:
292 for data in datas:
293 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic, blocks=False)
294
295 key = list(data.keys())[0]
296 val = data[key]
297
298 assert recv_topic == topic
299 assert recv_key == key
300 assert recv_msg == val
301
302 @pytest.mark.parametrize("topics, datas", [
303 (["topic"], [{"key": "value"}]),
304 (["topic1"], [{"key": "value"}]),
305 (["topic2"], [{"key": "value"}]),
306 (["topic", "topic1"], [{"key": "value"}]),
307 (["topic", "topic2"], [{"key": "value"}]),
308 (["topic1", "topic2"], [{"key": "value"}]),
309 (["topic", "topic1", "topic2"], [{"key": "value"}]),
310 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
311 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
312 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
313 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
314 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
315 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
316 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
317 def test_read_non_block_none(msg_local_with_data, topics, datas):
318 def write_to_topic(topics, datas):
319 time.sleep(2)
320 for topic in topics:
321 for data in datas:
322 with open(msg_local_with_data.path + topic, "a+") as fp:
323 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
324 fp.flush()
325
326 # If file is not opened first, the messages written won't be seen
327 for topic in topics:
328 if topic not in msg_local_with_data.files_read:
329 msg_local_with_data.read(topic, blocks=False)
330
331 t = threading.Thread(target=write_to_topic, args=(topics, datas))
332 t.start()
333
334 for topic in topics:
335 recv_data = msg_local_with_data.read(topic, blocks=False)
336
337 assert recv_data == None
338
339 t.join()
340
341 @pytest.mark.parametrize("blocks", [
342 (True),
343 (False)])
344 def test_read_exception(msg_local_with_data, blocks):
345 msg_local_with_data.files_read = MagicMock()
346 msg_local_with_data.files_read.__contains__.side_effect = Exception()
347
348 with pytest.raises(MsgException) as excinfo:
349 msg_local_with_data.read("topic1", blocks=blocks)
350 assert str(excinfo.value).startswith(empty_exception_message())
351 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
352
353 @pytest.mark.parametrize("topics, datas", [
354 (["topic"], [{"key": "value"}]),
355 (["topic1"], [{"key": "value"}]),
356 (["topic2"], [{"key": "value"}]),
357 (["topic", "topic1"], [{"key": "value"}]),
358 (["topic", "topic2"], [{"key": "value"}]),
359 (["topic1", "topic2"], [{"key": "value"}]),
360 (["topic", "topic1", "topic2"], [{"key": "value"}]),
361 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
362 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
363 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
364 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
365 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
366 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
367 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
368 def test_aioread(msg_local_with_data, event_loop, topics, datas):
369 def write_to_topic(topics, datas):
370 time.sleep(2)
371 for topic in topics:
372 for data in datas:
373 with open(msg_local_with_data.path + topic, "a+") as fp:
374 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
375 fp.flush()
376
377 # If file is not opened first, the messages written won't be seen
378 for topic in topics:
379 if topic not in msg_local_with_data.files_read:
380 msg_local_with_data.read(topic, blocks=False)
381
382 t = threading.Thread(target=write_to_topic, args=(topics, datas))
383 t.start()
384
385 for topic in topics:
386 for data in datas:
387 recv = event_loop.run_until_complete(msg_local_with_data.aioread(topic, event_loop))
388 recv_topic, recv_key, recv_msg = recv
389
390 key = list(data.keys())[0]
391 val = data[key]
392
393 assert recv_topic == topic
394 assert recv_key == key
395 assert recv_msg == val
396
397 t.join()
398
399 def test_aioread_exception(msg_local_with_data, event_loop):
400 msg_local_with_data.files_read = MagicMock()
401 msg_local_with_data.files_read.__contains__.side_effect = Exception()
402
403 with pytest.raises(MsgException) as excinfo:
404 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
405 assert str(excinfo.value).startswith(empty_exception_message())
406 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
407
408 def test_aioread_general_exception(msg_local_with_data, event_loop):
409 msg_local_with_data.read = MagicMock()
410 msg_local_with_data.read.side_effect = Exception()
411
412 with pytest.raises(MsgException) as excinfo:
413 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
414 assert str(excinfo.value).startswith(empty_exception_message())
415 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
416
417 @pytest.mark.parametrize("topic, key, msg", [
418 ("test_topic", "test_key", "test_msg"),
419 ("test", "test_key", "test_msg"),
420 ("test_topic", "test", "test_msg"),
421 ("test_topic", "test_key", "test"),
422 ("test_topic", "test_list", ["a", "b", "c"]),
423 ("test_topic", "test_tuple", ("c", "b", "a")),
424 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
425 ("test_topic", "test_number", 123),
426 ("test_topic", "test_float", 1.23),
427 ("test_topic", "test_boolean", True),
428 ("test_topic", "test_none", None)])
429 def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
430 file_path = msg_local_config.path + topic
431
432 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
433
434 assert os.path.exists(file_path)
435
436 with open(file_path, 'r') as stream:
437 assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
438
439 @pytest.mark.parametrize("topic, key, msg, times", [
440 ("test_topic", "test_key", "test_msg", 2),
441 ("test", "test_key", "test_msg", 3),
442 ("test_topic", "test", "test_msg", 4),
443 ("test_topic", "test_key", "test", 2),
444 ("test_topic", "test_list", ["a", "b", "c"], 3),
445 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
446 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
447 ("test_topic", "test_number", 123, 3),
448 ("test_topic", "test_float", 1.23, 4),
449 ("test_topic", "test_boolean", True, 2),
450 ("test_topic", "test_none", None, 3)])
451 def test_aiowrite_with_multiple_calls(msg_local_config, event_loop, topic, key, msg, times):
452 file_path = msg_local_config.path + topic
453
454 for _ in range(times):
455 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
456
457 assert os.path.exists(file_path)
458
459 with open(file_path, 'r') as stream:
460 for _ in range(times):
461 data = stream.readline()
462 assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
463
464 def test_aiowrite_exception(msg_local_config, event_loop):
465 msg_local_config.files_write = MagicMock()
466 msg_local_config.files_write.__contains__.side_effect = Exception()
467
468 with pytest.raises(MsgException) as excinfo:
469 event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
470 assert str(excinfo.value).startswith(empty_exception_message())
471 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR