Make common methods threading safe. pytest enhancements
[osm/common.git] / osm_common / tests / test_msglocal.py
1 import http
2 import logging
3 import pytest
4 import tempfile
5 import shutil
6 import uuid
7 import os
8 import yaml
9 import time
10 import threading
11
12 from unittest.mock import MagicMock
13 from osm_common.msgbase import MsgException
14 from osm_common.msglocal import MsgLocal
15
16 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
17
18
19 def valid_path():
20 return tempfile.gettempdir() + '/'
21
22
23 def invalid_path():
24 return '/#tweeter/'
25
26
27 @pytest.fixture(scope="function", params=[True, False])
28 def msg_local(request):
29 msg = MsgLocal(lock=request.param)
30 yield msg
31
32 msg.disconnect()
33 if msg.path and msg.path != invalid_path() and msg.path != valid_path():
34 shutil.rmtree(msg.path)
35
36
37 @pytest.fixture(scope="function", params=[True, False])
38 def msg_local_config(request):
39 msg = MsgLocal(lock=request.param)
40 msg.connect({"path": valid_path() + str(uuid.uuid4())})
41 yield msg
42
43 msg.disconnect()
44 if msg.path != invalid_path():
45 shutil.rmtree(msg.path)
46
47
48 @pytest.fixture(scope="function", params=[True, False])
49 def msg_local_with_data(request):
50 msg = MsgLocal(lock=request.param)
51 msg.connect({"path": valid_path() + str(uuid.uuid4())})
52
53 msg.write("topic1", "key1", "msg1")
54 msg.write("topic1", "key2", "msg1")
55 msg.write("topic2", "key1", "msg1")
56 msg.write("topic2", "key2", "msg1")
57 msg.write("topic1", "key1", "msg2")
58 msg.write("topic1", "key2", "msg2")
59 msg.write("topic2", "key1", "msg2")
60 msg.write("topic2", "key2", "msg2")
61 yield msg
62
63 msg.disconnect()
64 if msg.path != invalid_path():
65 shutil.rmtree(msg.path)
66
67
68 def empty_exception_message():
69 return "messaging exception "
70
71
72 def test_constructor():
73 msg = MsgLocal()
74 assert msg.logger == logging.getLogger('msg')
75 assert msg.path is None
76 assert len(msg.files_read) == 0
77 assert len(msg.files_write) == 0
78 assert len(msg.buffer) == 0
79
80
81 def test_constructor_with_logger():
82 logger_name = 'msg_local'
83 msg = MsgLocal(logger_name=logger_name)
84 assert msg.logger == logging.getLogger(logger_name)
85 assert msg.path is None
86 assert len(msg.files_read) == 0
87 assert len(msg.files_write) == 0
88 assert len(msg.buffer) == 0
89
90
91 @pytest.mark.parametrize("config, logger_name, path", [
92 ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
93 ({"logger_name": "msg_local", "path": valid_path()[:-1]}, "msg_local", valid_path()),
94 ({"logger_name": "msg_local", "path": valid_path() + "test_it/"}, "msg_local", valid_path() + "test_it/"),
95 ({"logger_name": "msg_local", "path": valid_path() + "test_it"}, "msg_local", valid_path() + "test_it/"),
96 ({"path": valid_path()}, "msg", valid_path()),
97 ({"path": valid_path()[:-1]}, "msg", valid_path()),
98 ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
99 ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/")])
100 def test_connect(msg_local, config, logger_name, path):
101 msg_local.connect(config)
102 assert msg_local.logger == logging.getLogger(logger_name)
103 assert msg_local.path == path
104 assert len(msg_local.files_read) == 0
105 assert len(msg_local.files_write) == 0
106 assert len(msg_local.buffer) == 0
107
108
109 @pytest.mark.parametrize("config", [
110 ({"logger_name": "msg_local", "path": invalid_path()}),
111 ({"path": invalid_path()})])
112 def test_connect_with_exception(msg_local, config):
113 with pytest.raises(MsgException) as excinfo:
114 msg_local.connect(config)
115 assert str(excinfo.value).startswith(empty_exception_message())
116 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
117
118
119 def test_disconnect(msg_local_config):
120 files_read = msg_local_config.files_read.copy()
121 files_write = msg_local_config.files_write.copy()
122 msg_local_config.disconnect()
123 for f in files_read.values():
124 assert f.closed
125 for f in files_write.values():
126 assert f.closed
127
128
129 def test_disconnect_with_read(msg_local_config):
130 msg_local_config.read('topic1', blocks=False)
131 msg_local_config.read('topic2', blocks=False)
132 files_read = msg_local_config.files_read.copy()
133 files_write = msg_local_config.files_write.copy()
134 msg_local_config.disconnect()
135 for f in files_read.values():
136 assert f.closed
137 for f in files_write.values():
138 assert f.closed
139
140
141 def test_disconnect_with_write(msg_local_with_data):
142 files_read = msg_local_with_data.files_read.copy()
143 files_write = msg_local_with_data.files_write.copy()
144 msg_local_with_data.disconnect()
145
146 for f in files_read.values():
147 assert f.closed
148
149 for f in files_write.values():
150 assert f.closed
151
152
153 def test_disconnect_with_read_and_write(msg_local_with_data):
154 msg_local_with_data.read('topic1', blocks=False)
155 msg_local_with_data.read('topic2', blocks=False)
156 files_read = msg_local_with_data.files_read.copy()
157 files_write = msg_local_with_data.files_write.copy()
158
159 msg_local_with_data.disconnect()
160 for f in files_read.values():
161 assert f.closed
162 for f in files_write.values():
163 assert f.closed
164
165
166 @pytest.mark.parametrize("topic, key, msg", [
167 ("test_topic", "test_key", "test_msg"),
168 ("test", "test_key", "test_msg"),
169 ("test_topic", "test", "test_msg"),
170 ("test_topic", "test_key", "test"),
171 ("test_topic", "test_list", ["a", "b", "c"]),
172 ("test_topic", "test_tuple", ("c", "b", "a")),
173 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
174 ("test_topic", "test_number", 123),
175 ("test_topic", "test_float", 1.23),
176 ("test_topic", "test_boolean", True),
177 ("test_topic", "test_none", None)])
178 def test_write(msg_local_config, topic, key, msg):
179 file_path = msg_local_config.path + topic
180 msg_local_config.write(topic, key, msg)
181 assert os.path.exists(file_path)
182
183 with open(file_path, 'r') as stream:
184 assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
185
186
187 @pytest.mark.parametrize("topic, key, msg, times", [
188 ("test_topic", "test_key", "test_msg", 2),
189 ("test", "test_key", "test_msg", 3),
190 ("test_topic", "test", "test_msg", 4),
191 ("test_topic", "test_key", "test", 2),
192 ("test_topic", "test_list", ["a", "b", "c"], 3),
193 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
194 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
195 ("test_topic", "test_number", 123, 3),
196 ("test_topic", "test_float", 1.23, 4),
197 ("test_topic", "test_boolean", True, 2),
198 ("test_topic", "test_none", None, 3)])
199 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
200 file_path = msg_local_config.path + topic
201
202 for _ in range(times):
203 msg_local_config.write(topic, key, msg)
204 assert os.path.exists(file_path)
205
206 with open(file_path, 'r') as stream:
207 for _ in range(times):
208 data = stream.readline()
209 assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
210
211
212 def test_write_exception(msg_local_config):
213 msg_local_config.files_write = MagicMock()
214 msg_local_config.files_write.__contains__.side_effect = Exception()
215
216 with pytest.raises(MsgException) as excinfo:
217 msg_local_config.write("test", "test", "test")
218 assert str(excinfo.value).startswith(empty_exception_message())
219 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
220
221
222 @pytest.mark.parametrize("topics, datas", [
223 (["topic"], [{"key": "value"}]),
224 (["topic1"], [{"key": "value"}]),
225 (["topic2"], [{"key": "value"}]),
226 (["topic", "topic1"], [{"key": "value"}]),
227 (["topic", "topic2"], [{"key": "value"}]),
228 (["topic1", "topic2"], [{"key": "value"}]),
229 (["topic", "topic1", "topic2"], [{"key": "value"}]),
230 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
231 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
232 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
233 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
234 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
235 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
236 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
237 def test_read(msg_local_with_data, topics, datas):
238 def write_to_topic(topics, datas):
239 # Allow msglocal to block while waiting
240 time.sleep(2)
241 for topic in topics:
242 for data in datas:
243 with open(msg_local_with_data.path + topic, "a+") as fp:
244 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
245 fp.flush()
246
247 # If file is not opened first, the messages written won't be seen
248 for topic in topics:
249 if topic not in msg_local_with_data.files_read:
250 msg_local_with_data.read(topic, blocks=False)
251
252 t = threading.Thread(target=write_to_topic, args=(topics, datas))
253 t.start()
254
255 for topic in topics:
256 for data in datas:
257 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
258 key = list(data.keys())[0]
259 val = data[key]
260 assert recv_topic == topic
261 assert recv_key == key
262 assert recv_msg == val
263 t.join()
264
265
266 @pytest.mark.parametrize("topics, datas", [
267 (["topic"], [{"key": "value"}]),
268 (["topic1"], [{"key": "value"}]),
269 (["topic2"], [{"key": "value"}]),
270 (["topic", "topic1"], [{"key": "value"}]),
271 (["topic", "topic2"], [{"key": "value"}]),
272 (["topic1", "topic2"], [{"key": "value"}]),
273 (["topic", "topic1", "topic2"], [{"key": "value"}]),
274 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
275 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
276 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
277 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
278 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
279 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
280 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
281 def test_read_non_block(msg_local_with_data, topics, datas):
282 def write_to_topic(topics, datas):
283 for topic in topics:
284 for data in datas:
285 with open(msg_local_with_data.path + topic, "a+") as fp:
286 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
287 fp.flush()
288
289 # If file is not opened first, the messages written won't be seen
290 for topic in topics:
291 if topic not in msg_local_with_data.files_read:
292 msg_local_with_data.read(topic, blocks=False)
293
294 t = threading.Thread(target=write_to_topic, args=(topics, datas))
295 t.start()
296 t.join()
297
298 for topic in topics:
299 for data in datas:
300 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic, blocks=False)
301 key = list(data.keys())[0]
302 val = data[key]
303 assert recv_topic == topic
304 assert recv_key == key
305 assert recv_msg == val
306
307
308 @pytest.mark.parametrize("topics, datas", [
309 (["topic"], [{"key": "value"}]),
310 (["topic1"], [{"key": "value"}]),
311 (["topic2"], [{"key": "value"}]),
312 (["topic", "topic1"], [{"key": "value"}]),
313 (["topic", "topic2"], [{"key": "value"}]),
314 (["topic1", "topic2"], [{"key": "value"}]),
315 (["topic", "topic1", "topic2"], [{"key": "value"}]),
316 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
317 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
318 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
319 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
320 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
321 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
322 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
323 def test_read_non_block_none(msg_local_with_data, topics, datas):
324 def write_to_topic(topics, datas):
325 time.sleep(2)
326 for topic in topics:
327 for data in datas:
328 with open(msg_local_with_data.path + topic, "a+") as fp:
329 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
330 fp.flush()
331 # If file is not opened first, the messages written won't be seen
332 for topic in topics:
333 if topic not in msg_local_with_data.files_read:
334 msg_local_with_data.read(topic, blocks=False)
335 t = threading.Thread(target=write_to_topic, args=(topics, datas))
336 t.start()
337
338 for topic in topics:
339 recv_data = msg_local_with_data.read(topic, blocks=False)
340 assert recv_data is None
341 t.join()
342
343
344 @pytest.mark.parametrize("blocks", [
345 (True),
346 (False)])
347 def test_read_exception(msg_local_with_data, blocks):
348 msg_local_with_data.files_read = MagicMock()
349 msg_local_with_data.files_read.__contains__.side_effect = Exception()
350
351 with pytest.raises(MsgException) as excinfo:
352 msg_local_with_data.read("topic1", blocks=blocks)
353 assert str(excinfo.value).startswith(empty_exception_message())
354 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
355
356
357 @pytest.mark.parametrize("topics, datas", [
358 (["topic"], [{"key": "value"}]),
359 (["topic1"], [{"key": "value"}]),
360 (["topic2"], [{"key": "value"}]),
361 (["topic", "topic1"], [{"key": "value"}]),
362 (["topic", "topic2"], [{"key": "value"}]),
363 (["topic1", "topic2"], [{"key": "value"}]),
364 (["topic", "topic1", "topic2"], [{"key": "value"}]),
365 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
366 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
367 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
368 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
369 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
370 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
371 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
372 def test_aioread(msg_local_with_data, event_loop, topics, datas):
373 def write_to_topic(topics, datas):
374 time.sleep(2)
375 for topic in topics:
376 for data in datas:
377 with open(msg_local_with_data.path + topic, "a+") as fp:
378 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
379 fp.flush()
380 # If file is not opened first, the messages written won't be seen
381 for topic in topics:
382 if topic not in msg_local_with_data.files_read:
383 msg_local_with_data.read(topic, blocks=False)
384
385 t = threading.Thread(target=write_to_topic, args=(topics, datas))
386 t.start()
387 for topic in topics:
388 for data in datas:
389 recv = event_loop.run_until_complete(msg_local_with_data.aioread(topic, event_loop))
390 recv_topic, recv_key, recv_msg = recv
391 key = list(data.keys())[0]
392 val = data[key]
393 assert recv_topic == topic
394 assert recv_key == key
395 assert recv_msg == val
396 t.join()
397
398
399 def test_aioread_exception(msg_local_with_data, event_loop):
400 msg_local_with_data.files_read = MagicMock()
401 msg_local_with_data.files_read.__contains__.side_effect = Exception()
402
403 with pytest.raises(MsgException) as excinfo:
404 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
405 assert str(excinfo.value).startswith(empty_exception_message())
406 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
407
408
409 def test_aioread_general_exception(msg_local_with_data, event_loop):
410 msg_local_with_data.read = MagicMock()
411 msg_local_with_data.read.side_effect = Exception()
412
413 with pytest.raises(MsgException) as excinfo:
414 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
415 assert str(excinfo.value).startswith(empty_exception_message())
416 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
417
418
419 @pytest.mark.parametrize("topic, key, msg", [
420 ("test_topic", "test_key", "test_msg"),
421 ("test", "test_key", "test_msg"),
422 ("test_topic", "test", "test_msg"),
423 ("test_topic", "test_key", "test"),
424 ("test_topic", "test_list", ["a", "b", "c"]),
425 ("test_topic", "test_tuple", ("c", "b", "a")),
426 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
427 ("test_topic", "test_number", 123),
428 ("test_topic", "test_float", 1.23),
429 ("test_topic", "test_boolean", True),
430 ("test_topic", "test_none", None)])
431 def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
432 file_path = msg_local_config.path + topic
433 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
434 assert os.path.exists(file_path)
435
436 with open(file_path, 'r') as stream:
437 assert yaml.load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
438
439
440 @pytest.mark.parametrize("topic, key, msg, times", [
441 ("test_topic", "test_key", "test_msg", 2),
442 ("test", "test_key", "test_msg", 3),
443 ("test_topic", "test", "test_msg", 4),
444 ("test_topic", "test_key", "test", 2),
445 ("test_topic", "test_list", ["a", "b", "c"], 3),
446 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
447 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
448 ("test_topic", "test_number", 123, 3),
449 ("test_topic", "test_float", 1.23, 4),
450 ("test_topic", "test_boolean", True, 2),
451 ("test_topic", "test_none", None, 3)])
452 def test_aiowrite_with_multiple_calls(msg_local_config, event_loop, topic, key, msg, times):
453 file_path = msg_local_config.path + topic
454 for _ in range(times):
455 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
456 assert os.path.exists(file_path)
457
458 with open(file_path, 'r') as stream:
459 for _ in range(times):
460 data = stream.readline()
461 assert yaml.load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
462
463
464 def test_aiowrite_exception(msg_local_config, event_loop):
465 msg_local_config.files_write = MagicMock()
466 msg_local_config.files_write.__contains__.side_effect = Exception()
467
468 with pytest.raises(MsgException) as excinfo:
469 event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
470 assert str(excinfo.value).startswith(empty_exception_message())
471 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR