95488859aee32008c42761bb5c78edb302eeac06
[osm/common.git] / osm_common / tests / test_msglocal.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19
20 import http
21 import logging
22 import pytest
23 import tempfile
24 import shutil
25 import uuid
26 import os
27 import yaml
28 import time
29 import threading
30
31 from unittest.mock import MagicMock
32 from osm_common.msgbase import MsgException
33 from osm_common.msglocal import MsgLocal
34
35 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
36
37
38 def valid_path():
39 return tempfile.gettempdir() + "/"
40
41
42 def invalid_path():
43 return "/#tweeter/"
44
45
46 @pytest.fixture(scope="function", params=[True, False])
47 def msg_local(request):
48 msg = MsgLocal(lock=request.param)
49 yield msg
50
51 msg.disconnect()
52 if msg.path and msg.path != invalid_path() and msg.path != valid_path():
53 shutil.rmtree(msg.path)
54
55
56 @pytest.fixture(scope="function", params=[True, False])
57 def msg_local_config(request):
58 msg = MsgLocal(lock=request.param)
59 msg.connect({"path": valid_path() + str(uuid.uuid4())})
60 yield msg
61
62 msg.disconnect()
63 if msg.path != invalid_path():
64 shutil.rmtree(msg.path)
65
66
67 @pytest.fixture(scope="function", params=[True, False])
68 def msg_local_with_data(request):
69 msg = MsgLocal(lock=request.param)
70 msg.connect({"path": valid_path() + str(uuid.uuid4())})
71
72 msg.write("topic1", "key1", "msg1")
73 msg.write("topic1", "key2", "msg1")
74 msg.write("topic2", "key1", "msg1")
75 msg.write("topic2", "key2", "msg1")
76 msg.write("topic1", "key1", "msg2")
77 msg.write("topic1", "key2", "msg2")
78 msg.write("topic2", "key1", "msg2")
79 msg.write("topic2", "key2", "msg2")
80 yield msg
81
82 msg.disconnect()
83 if msg.path != invalid_path():
84 shutil.rmtree(msg.path)
85
86
87 def empty_exception_message():
88 return "messaging exception "
89
90
91 def test_constructor():
92 msg = MsgLocal()
93 assert msg.logger == logging.getLogger("msg")
94 assert msg.path is None
95 assert len(msg.files_read) == 0
96 assert len(msg.files_write) == 0
97 assert len(msg.buffer) == 0
98
99
100 def test_constructor_with_logger():
101 logger_name = "msg_local"
102 msg = MsgLocal(logger_name=logger_name)
103 assert msg.logger == logging.getLogger(logger_name)
104 assert msg.path is None
105 assert len(msg.files_read) == 0
106 assert len(msg.files_write) == 0
107 assert len(msg.buffer) == 0
108
109
110 @pytest.mark.parametrize(
111 "config, logger_name, path",
112 [
113 ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
114 (
115 {"logger_name": "msg_local", "path": valid_path()[:-1]},
116 "msg_local",
117 valid_path(),
118 ),
119 (
120 {"logger_name": "msg_local", "path": valid_path() + "test_it/"},
121 "msg_local",
122 valid_path() + "test_it/",
123 ),
124 (
125 {"logger_name": "msg_local", "path": valid_path() + "test_it"},
126 "msg_local",
127 valid_path() + "test_it/",
128 ),
129 ({"path": valid_path()}, "msg", valid_path()),
130 ({"path": valid_path()[:-1]}, "msg", valid_path()),
131 ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
132 ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/"),
133 ],
134 )
135 def test_connect(msg_local, config, logger_name, path):
136 msg_local.connect(config)
137 assert msg_local.logger == logging.getLogger(logger_name)
138 assert msg_local.path == path
139 assert len(msg_local.files_read) == 0
140 assert len(msg_local.files_write) == 0
141 assert len(msg_local.buffer) == 0
142
143
144 @pytest.mark.parametrize(
145 "config",
146 [
147 ({"logger_name": "msg_local", "path": invalid_path()}),
148 ({"path": invalid_path()}),
149 ],
150 )
151 def test_connect_with_exception(msg_local, config):
152 with pytest.raises(MsgException) as excinfo:
153 msg_local.connect(config)
154 assert str(excinfo.value).startswith(empty_exception_message())
155 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
156
157
158 def test_disconnect(msg_local_config):
159 files_read = msg_local_config.files_read.copy()
160 files_write = msg_local_config.files_write.copy()
161 msg_local_config.disconnect()
162 for f in files_read.values():
163 assert f.closed
164 for f in files_write.values():
165 assert f.closed
166
167
168 def test_disconnect_with_read(msg_local_config):
169 msg_local_config.read("topic1", blocks=False)
170 msg_local_config.read("topic2", blocks=False)
171 files_read = msg_local_config.files_read.copy()
172 files_write = msg_local_config.files_write.copy()
173 msg_local_config.disconnect()
174 for f in files_read.values():
175 assert f.closed
176 for f in files_write.values():
177 assert f.closed
178
179
180 def test_disconnect_with_write(msg_local_with_data):
181 files_read = msg_local_with_data.files_read.copy()
182 files_write = msg_local_with_data.files_write.copy()
183 msg_local_with_data.disconnect()
184
185 for f in files_read.values():
186 assert f.closed
187
188 for f in files_write.values():
189 assert f.closed
190
191
192 def test_disconnect_with_read_and_write(msg_local_with_data):
193 msg_local_with_data.read("topic1", blocks=False)
194 msg_local_with_data.read("topic2", blocks=False)
195 files_read = msg_local_with_data.files_read.copy()
196 files_write = msg_local_with_data.files_write.copy()
197
198 msg_local_with_data.disconnect()
199 for f in files_read.values():
200 assert f.closed
201 for f in files_write.values():
202 assert f.closed
203
204
205 @pytest.mark.parametrize(
206 "topic, key, msg",
207 [
208 ("test_topic", "test_key", "test_msg"),
209 ("test", "test_key", "test_msg"),
210 ("test_topic", "test", "test_msg"),
211 ("test_topic", "test_key", "test"),
212 ("test_topic", "test_list", ["a", "b", "c"]),
213 ("test_topic", "test_tuple", ("c", "b", "a")),
214 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
215 ("test_topic", "test_number", 123),
216 ("test_topic", "test_float", 1.23),
217 ("test_topic", "test_boolean", True),
218 ("test_topic", "test_none", None),
219 ],
220 )
221 def test_write(msg_local_config, topic, key, msg):
222 file_path = msg_local_config.path + topic
223 msg_local_config.write(topic, key, msg)
224 assert os.path.exists(file_path)
225
226 with open(file_path, "r") as stream:
227 assert yaml.safe_load(stream) == {
228 key: msg if not isinstance(msg, tuple) else list(msg)
229 }
230
231
232 @pytest.mark.parametrize(
233 "topic, key, msg, times",
234 [
235 ("test_topic", "test_key", "test_msg", 2),
236 ("test", "test_key", "test_msg", 3),
237 ("test_topic", "test", "test_msg", 4),
238 ("test_topic", "test_key", "test", 2),
239 ("test_topic", "test_list", ["a", "b", "c"], 3),
240 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
241 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
242 ("test_topic", "test_number", 123, 3),
243 ("test_topic", "test_float", 1.23, 4),
244 ("test_topic", "test_boolean", True, 2),
245 ("test_topic", "test_none", None, 3),
246 ],
247 )
248 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
249 file_path = msg_local_config.path + topic
250
251 for _ in range(times):
252 msg_local_config.write(topic, key, msg)
253 assert os.path.exists(file_path)
254
255 with open(file_path, "r") as stream:
256 for _ in range(times):
257 data = stream.readline()
258 assert yaml.safe_load(data) == {
259 key: msg if not isinstance(msg, tuple) else list(msg)
260 }
261
262
263 def test_write_exception(msg_local_config):
264 msg_local_config.files_write = MagicMock()
265 msg_local_config.files_write.__contains__.side_effect = Exception()
266
267 with pytest.raises(MsgException) as excinfo:
268 msg_local_config.write("test", "test", "test")
269 assert str(excinfo.value).startswith(empty_exception_message())
270 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
271
272
273 @pytest.mark.parametrize(
274 "topics, datas",
275 [
276 (["topic"], [{"key": "value"}]),
277 (["topic1"], [{"key": "value"}]),
278 (["topic2"], [{"key": "value"}]),
279 (["topic", "topic1"], [{"key": "value"}]),
280 (["topic", "topic2"], [{"key": "value"}]),
281 (["topic1", "topic2"], [{"key": "value"}]),
282 (["topic", "topic1", "topic2"], [{"key": "value"}]),
283 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
284 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
285 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
286 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
287 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
288 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
289 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
290 ],
291 )
292 def test_read(msg_local_with_data, topics, datas):
293 def write_to_topic(topics, datas):
294 # Allow msglocal to block while waiting
295 time.sleep(2)
296 for topic in topics:
297 for data in datas:
298 with open(msg_local_with_data.path + topic, "a+") as fp:
299 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
300 fp.flush()
301
302 # If file is not opened first, the messages written won't be seen
303 for topic in topics:
304 if topic not in msg_local_with_data.files_read:
305 msg_local_with_data.read(topic, blocks=False)
306
307 t = threading.Thread(target=write_to_topic, args=(topics, datas))
308 t.start()
309
310 for topic in topics:
311 for data in datas:
312 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
313 key = list(data.keys())[0]
314 val = data[key]
315 assert recv_topic == topic
316 assert recv_key == key
317 assert recv_msg == val
318 t.join()
319
320
321 @pytest.mark.parametrize(
322 "topics, datas",
323 [
324 (["topic"], [{"key": "value"}]),
325 (["topic1"], [{"key": "value"}]),
326 (["topic2"], [{"key": "value"}]),
327 (["topic", "topic1"], [{"key": "value"}]),
328 (["topic", "topic2"], [{"key": "value"}]),
329 (["topic1", "topic2"], [{"key": "value"}]),
330 (["topic", "topic1", "topic2"], [{"key": "value"}]),
331 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
332 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
333 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
334 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
335 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
336 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
337 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
338 ],
339 )
340 def test_read_non_block(msg_local_with_data, topics, datas):
341 def write_to_topic(topics, datas):
342 for topic in topics:
343 for data in datas:
344 with open(msg_local_with_data.path + topic, "a+") as fp:
345 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
346 fp.flush()
347
348 # If file is not opened first, the messages written won't be seen
349 for topic in topics:
350 if topic not in msg_local_with_data.files_read:
351 msg_local_with_data.read(topic, blocks=False)
352
353 t = threading.Thread(target=write_to_topic, args=(topics, datas))
354 t.start()
355 t.join()
356
357 for topic in topics:
358 for data in datas:
359 recv_topic, recv_key, recv_msg = msg_local_with_data.read(
360 topic, blocks=False
361 )
362 key = list(data.keys())[0]
363 val = data[key]
364 assert recv_topic == topic
365 assert recv_key == key
366 assert recv_msg == val
367
368
369 @pytest.mark.parametrize(
370 "topics, datas",
371 [
372 (["topic"], [{"key": "value"}]),
373 (["topic1"], [{"key": "value"}]),
374 (["topic2"], [{"key": "value"}]),
375 (["topic", "topic1"], [{"key": "value"}]),
376 (["topic", "topic2"], [{"key": "value"}]),
377 (["topic1", "topic2"], [{"key": "value"}]),
378 (["topic", "topic1", "topic2"], [{"key": "value"}]),
379 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
380 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
381 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
382 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
383 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
384 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
385 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
386 ],
387 )
388 def test_read_non_block_none(msg_local_with_data, topics, datas):
389 def write_to_topic(topics, datas):
390 time.sleep(2)
391 for topic in topics:
392 for data in datas:
393 with open(msg_local_with_data.path + topic, "a+") as fp:
394 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
395 fp.flush()
396
397 # If file is not opened first, the messages written won't be seen
398 for topic in topics:
399 if topic not in msg_local_with_data.files_read:
400 msg_local_with_data.read(topic, blocks=False)
401 t = threading.Thread(target=write_to_topic, args=(topics, datas))
402 t.start()
403
404 for topic in topics:
405 recv_data = msg_local_with_data.read(topic, blocks=False)
406 assert recv_data is None
407 t.join()
408
409
410 @pytest.mark.parametrize("blocks", [(True), (False)])
411 def test_read_exception(msg_local_with_data, blocks):
412 msg_local_with_data.files_read = MagicMock()
413 msg_local_with_data.files_read.__contains__.side_effect = Exception()
414
415 with pytest.raises(MsgException) as excinfo:
416 msg_local_with_data.read("topic1", blocks=blocks)
417 assert str(excinfo.value).startswith(empty_exception_message())
418 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
419
420
421 @pytest.mark.parametrize(
422 "topics, datas",
423 [
424 (["topic"], [{"key": "value"}]),
425 (["topic1"], [{"key": "value"}]),
426 (["topic2"], [{"key": "value"}]),
427 (["topic", "topic1"], [{"key": "value"}]),
428 (["topic", "topic2"], [{"key": "value"}]),
429 (["topic1", "topic2"], [{"key": "value"}]),
430 (["topic", "topic1", "topic2"], [{"key": "value"}]),
431 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
432 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
433 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
434 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
435 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
436 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
437 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
438 ],
439 )
440 def test_aioread(msg_local_with_data, event_loop, topics, datas):
441 def write_to_topic(topics, datas):
442 time.sleep(2)
443 for topic in topics:
444 for data in datas:
445 with open(msg_local_with_data.path + topic, "a+") as fp:
446 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
447 fp.flush()
448
449 # If file is not opened first, the messages written won't be seen
450 for topic in topics:
451 if topic not in msg_local_with_data.files_read:
452 msg_local_with_data.read(topic, blocks=False)
453
454 t = threading.Thread(target=write_to_topic, args=(topics, datas))
455 t.start()
456 for topic in topics:
457 for data in datas:
458 recv = event_loop.run_until_complete(
459 msg_local_with_data.aioread(topic, event_loop)
460 )
461 recv_topic, recv_key, recv_msg = recv
462 key = list(data.keys())[0]
463 val = data[key]
464 assert recv_topic == topic
465 assert recv_key == key
466 assert recv_msg == val
467 t.join()
468
469
470 def test_aioread_exception(msg_local_with_data, event_loop):
471 msg_local_with_data.files_read = MagicMock()
472 msg_local_with_data.files_read.__contains__.side_effect = Exception()
473
474 with pytest.raises(MsgException) as excinfo:
475 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
476 assert str(excinfo.value).startswith(empty_exception_message())
477 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
478
479
480 def test_aioread_general_exception(msg_local_with_data, event_loop):
481 msg_local_with_data.read = MagicMock()
482 msg_local_with_data.read.side_effect = Exception()
483
484 with pytest.raises(MsgException) as excinfo:
485 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
486 assert str(excinfo.value).startswith(empty_exception_message())
487 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
488
489
490 @pytest.mark.parametrize(
491 "topic, key, msg",
492 [
493 ("test_topic", "test_key", "test_msg"),
494 ("test", "test_key", "test_msg"),
495 ("test_topic", "test", "test_msg"),
496 ("test_topic", "test_key", "test"),
497 ("test_topic", "test_list", ["a", "b", "c"]),
498 ("test_topic", "test_tuple", ("c", "b", "a")),
499 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
500 ("test_topic", "test_number", 123),
501 ("test_topic", "test_float", 1.23),
502 ("test_topic", "test_boolean", True),
503 ("test_topic", "test_none", None),
504 ],
505 )
506 def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
507 file_path = msg_local_config.path + topic
508 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
509 assert os.path.exists(file_path)
510
511 with open(file_path, "r") as stream:
512 assert yaml.safe_load(stream) == {
513 key: msg if not isinstance(msg, tuple) else list(msg)
514 }
515
516
517 @pytest.mark.parametrize(
518 "topic, key, msg, times",
519 [
520 ("test_topic", "test_key", "test_msg", 2),
521 ("test", "test_key", "test_msg", 3),
522 ("test_topic", "test", "test_msg", 4),
523 ("test_topic", "test_key", "test", 2),
524 ("test_topic", "test_list", ["a", "b", "c"], 3),
525 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
526 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
527 ("test_topic", "test_number", 123, 3),
528 ("test_topic", "test_float", 1.23, 4),
529 ("test_topic", "test_boolean", True, 2),
530 ("test_topic", "test_none", None, 3),
531 ],
532 )
533 def test_aiowrite_with_multiple_calls(
534 msg_local_config, event_loop, topic, key, msg, times
535 ):
536 file_path = msg_local_config.path + topic
537 for _ in range(times):
538 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
539 assert os.path.exists(file_path)
540
541 with open(file_path, "r") as stream:
542 for _ in range(times):
543 data = stream.readline()
544 assert yaml.safe_load(data) == {
545 key: msg if not isinstance(msg, tuple) else list(msg)
546 }
547
548
549 def test_aiowrite_exception(msg_local_config, event_loop):
550 msg_local_config.files_write = MagicMock()
551 msg_local_config.files_write.__contains__.side_effect = Exception()
552
553 with pytest.raises(MsgException) as excinfo:
554 event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
555 assert str(excinfo.value).startswith(empty_exception_message())
556 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR