Code Coverage

Cobertura Coverage Report > osm_common.tests >

test_msglocal.py

Trend

File Coverage summary

NameClassesLinesConditionals
test_msglocal.py
100%
1/1
26%
71/274
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_msglocal.py
26%
71/274
N/A

Source

osm_common/tests/test_msglocal.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19 1 import asyncio
20 1 import http
21 1 import logging
22 1 import os
23 1 import shutil
24 1 import tempfile
25 1 import threading
26 1 import time
27 1 from unittest.mock import MagicMock
28 1 import uuid
29
30 1 from osm_common.msgbase import MsgException
31 1 from osm_common.msglocal import MsgLocal
32 1 import pytest
33 1 import yaml
34
35 1 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
36
37
38 1 def valid_path():
39 1     return tempfile.gettempdir() + "/"
40
41
42 1 def invalid_path():
43 1     return "/#tweeter/"
44
45
46 1 @pytest.fixture(scope="function", params=[True, False])
47 1 def msg_local(request):
48 0     msg = MsgLocal(lock=request.param)
49 0     yield msg
50
51 0     msg.disconnect()
52 0     if msg.path and msg.path != invalid_path() and msg.path != valid_path():
53 0         shutil.rmtree(msg.path)
54
55
56 1 @pytest.fixture(scope="function", params=[True, False])
57 1 def msg_local_config(request):
58 0     msg = MsgLocal(lock=request.param)
59 0     msg.connect({"path": valid_path() + str(uuid.uuid4())})
60 0     yield msg
61
62 0     msg.disconnect()
63 0     if msg.path != invalid_path():
64 0         shutil.rmtree(msg.path)
65
66
67 1 @pytest.fixture(scope="function", params=[True, False])
68 1 def msg_local_with_data(request):
69 0     msg = MsgLocal(lock=request.param)
70 0     msg.connect({"path": valid_path() + str(uuid.uuid4())})
71
72 0     msg.write("topic1", "key1", "msg1")
73 0     msg.write("topic1", "key2", "msg1")
74 0     msg.write("topic2", "key1", "msg1")
75 0     msg.write("topic2", "key2", "msg1")
76 0     msg.write("topic1", "key1", "msg2")
77 0     msg.write("topic1", "key2", "msg2")
78 0     msg.write("topic2", "key1", "msg2")
79 0     msg.write("topic2", "key2", "msg2")
80 0     yield msg
81
82 0     msg.disconnect()
83 0     if msg.path != invalid_path():
84 0         shutil.rmtree(msg.path)
85
86
87 1 def empty_exception_message():
88 0     return "messaging exception "
89
90
91 1 def test_constructor():
92 1     msg = MsgLocal()
93 1     assert msg.logger == logging.getLogger("msg")
94 1     assert msg.path is None
95 1     assert len(msg.files_read) == 0
96 1     assert len(msg.files_write) == 0
97 1     assert len(msg.buffer) == 0
98
99
100 1 def test_constructor_with_logger():
101 1     logger_name = "msg_local"
102 1     msg = MsgLocal(logger_name=logger_name)
103 1     assert msg.logger == logging.getLogger(logger_name)
104 1     assert msg.path is None
105 1     assert len(msg.files_read) == 0
106 1     assert len(msg.files_write) == 0
107 1     assert len(msg.buffer) == 0
108
109
110 1 @pytest.mark.parametrize(
111     "config, logger_name, path",
112     [
113         ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
114         (
115             {"logger_name": "msg_local", "path": valid_path()[:-1]},
116             "msg_local",
117             valid_path(),
118         ),
119         (
120             {"logger_name": "msg_local", "path": valid_path() + "test_it/"},
121             "msg_local",
122             valid_path() + "test_it/",
123         ),
124         (
125             {"logger_name": "msg_local", "path": valid_path() + "test_it"},
126             "msg_local",
127             valid_path() + "test_it/",
128         ),
129         ({"path": valid_path()}, "msg", valid_path()),
130         ({"path": valid_path()[:-1]}, "msg", valid_path()),
131         ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
132         ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/"),
133     ],
134 )
135 1 def test_connect(msg_local, config, logger_name, path):
136 0     msg_local.connect(config)
137 0     assert msg_local.logger == logging.getLogger(logger_name)
138 0     assert msg_local.path == path
139 0     assert len(msg_local.files_read) == 0
140 0     assert len(msg_local.files_write) == 0
141 0     assert len(msg_local.buffer) == 0
142
143
144 1 @pytest.mark.parametrize(
145     "config",
146     [
147         ({"logger_name": "msg_local", "path": invalid_path()}),
148         ({"path": invalid_path()}),
149     ],
150 )
151 1 def test_connect_with_exception(msg_local, config):
152 0     with pytest.raises(MsgException) as excinfo:
153 0         msg_local.connect(config)
154 0     assert str(excinfo.value).startswith(empty_exception_message())
155 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
156
157
158 1 def test_disconnect(msg_local_config):
159 0     files_read = msg_local_config.files_read.copy()
160 0     files_write = msg_local_config.files_write.copy()
161 0     msg_local_config.disconnect()
162 0     for f in files_read.values():
163 0         assert f.closed
164 0     for f in files_write.values():
165 0         assert f.closed
166
167
168 1 def test_disconnect_with_read(msg_local_config):
169 0     msg_local_config.read("topic1", blocks=False)
170 0     msg_local_config.read("topic2", blocks=False)
171 0     files_read = msg_local_config.files_read.copy()
172 0     files_write = msg_local_config.files_write.copy()
173 0     msg_local_config.disconnect()
174 0     for f in files_read.values():
175 0         assert f.closed
176 0     for f in files_write.values():
177 0         assert f.closed
178
179
180 1 def test_disconnect_with_write(msg_local_with_data):
181 0     files_read = msg_local_with_data.files_read.copy()
182 0     files_write = msg_local_with_data.files_write.copy()
183 0     msg_local_with_data.disconnect()
184
185 0     for f in files_read.values():
186 0         assert f.closed
187
188 0     for f in files_write.values():
189 0         assert f.closed
190
191
192 1 def test_disconnect_with_read_and_write(msg_local_with_data):
193 0     msg_local_with_data.read("topic1", blocks=False)
194 0     msg_local_with_data.read("topic2", blocks=False)
195 0     files_read = msg_local_with_data.files_read.copy()
196 0     files_write = msg_local_with_data.files_write.copy()
197
198 0     msg_local_with_data.disconnect()
199 0     for f in files_read.values():
200 0         assert f.closed
201 0     for f in files_write.values():
202 0         assert f.closed
203
204
205 1 @pytest.mark.parametrize(
206     "topic, key, msg",
207     [
208         ("test_topic", "test_key", "test_msg"),
209         ("test", "test_key", "test_msg"),
210         ("test_topic", "test", "test_msg"),
211         ("test_topic", "test_key", "test"),
212         ("test_topic", "test_list", ["a", "b", "c"]),
213         ("test_topic", "test_tuple", ("c", "b", "a")),
214         ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
215         ("test_topic", "test_number", 123),
216         ("test_topic", "test_float", 1.23),
217         ("test_topic", "test_boolean", True),
218         ("test_topic", "test_none", None),
219     ],
220 )
221 1 def test_write(msg_local_config, topic, key, msg):
222 0     file_path = msg_local_config.path + topic
223 0     msg_local_config.write(topic, key, msg)
224 0     assert os.path.exists(file_path)
225
226 0     with open(file_path, "r") as stream:
227 0         assert yaml.safe_load(stream) == {
228             key: msg if not isinstance(msg, tuple) else list(msg)
229         }
230
231
232 1 @pytest.mark.parametrize(
233     "topic, key, msg, times",
234     [
235         ("test_topic", "test_key", "test_msg", 2),
236         ("test", "test_key", "test_msg", 3),
237         ("test_topic", "test", "test_msg", 4),
238         ("test_topic", "test_key", "test", 2),
239         ("test_topic", "test_list", ["a", "b", "c"], 3),
240         ("test_topic", "test_tuple", ("c", "b", "a"), 4),
241         ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
242         ("test_topic", "test_number", 123, 3),
243         ("test_topic", "test_float", 1.23, 4),
244         ("test_topic", "test_boolean", True, 2),
245         ("test_topic", "test_none", None, 3),
246     ],
247 )
248 1 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
249 0     file_path = msg_local_config.path + topic
250
251 0     for _ in range(times):
252 0         msg_local_config.write(topic, key, msg)
253 0     assert os.path.exists(file_path)
254
255 0     with open(file_path, "r") as stream:
256 0         for _ in range(times):
257 0             data = stream.readline()
258 0             assert yaml.safe_load(data) == {
259                 key: msg if not isinstance(msg, tuple) else list(msg)
260             }
261
262
263 1 def test_write_exception(msg_local_config):
264 0     msg_local_config.files_write = MagicMock()
265 0     msg_local_config.files_write.__contains__.side_effect = Exception()
266
267 0     with pytest.raises(MsgException) as excinfo:
268 0         msg_local_config.write("test", "test", "test")
269 0     assert str(excinfo.value).startswith(empty_exception_message())
270 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
271
272
273 1 @pytest.mark.parametrize(
274     "topics, datas",
275     [
276         (["topic"], [{"key": "value"}]),
277         (["topic1"], [{"key": "value"}]),
278         (["topic2"], [{"key": "value"}]),
279         (["topic", "topic1"], [{"key": "value"}]),
280         (["topic", "topic2"], [{"key": "value"}]),
281         (["topic1", "topic2"], [{"key": "value"}]),
282         (["topic", "topic1", "topic2"], [{"key": "value"}]),
283         (["topic"], [{"key": "value"}, {"key1": "value1"}]),
284         (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
285         (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
286         (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
287         (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
288         (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
289         (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
290     ],
291 )
292 1 def test_read(msg_local_with_data, topics, datas):
293 0     def write_to_topic(topics, datas):
294         # Allow msglocal to block while waiting
295 0         time.sleep(2)
296 0         for topic in topics:
297 0             for data in datas:
298 0                 with open(msg_local_with_data.path + topic, "a+") as fp:
299 0                     yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
300 0                     fp.flush()
301
302     # If file is not opened first, the messages written won't be seen
303 0     for topic in topics:
304 0         if topic not in msg_local_with_data.files_read:
305 0             msg_local_with_data.read(topic, blocks=False)
306
307 0     t = threading.Thread(target=write_to_topic, args=(topics, datas))
308 0     t.start()
309
310 0     for topic in topics:
311 0         for data in datas:
312 0             recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
313 0             key = list(data.keys())[0]
314 0             val = data[key]
315 0             assert recv_topic == topic
316 0             assert recv_key == key
317 0             assert recv_msg == val
318 0     t.join()
319
320
321 1 @pytest.mark.parametrize(
322     "topics, datas",
323     [
324         (["topic"], [{"key": "value"}]),
325         (["topic1"], [{"key": "value"}]),
326         (["topic2"], [{"key": "value"}]),
327         (["topic", "topic1"], [{"key": "value"}]),
328         (["topic", "topic2"], [{"key": "value"}]),
329         (["topic1", "topic2"], [{"key": "value"}]),
330         (["topic", "topic1", "topic2"], [{"key": "value"}]),
331         (["topic"], [{"key": "value"}, {"key1": "value1"}]),
332         (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
333         (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
334         (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
335         (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
336         (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
337         (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
338     ],
339 )
340 1 def test_read_non_block(msg_local_with_data, topics, datas):
341 0     def write_to_topic(topics, datas):
342 0         for topic in topics:
343 0             for data in datas:
344 0                 with open(msg_local_with_data.path + topic, "a+") as fp:
345 0                     yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
346 0                     fp.flush()
347
348     # If file is not opened first, the messages written won't be seen
349 0     for topic in topics:
350 0         if topic not in msg_local_with_data.files_read:
351 0             msg_local_with_data.read(topic, blocks=False)
352
353 0     t = threading.Thread(target=write_to_topic, args=(topics, datas))
354 0     t.start()
355 0     t.join()
356
357 0     for topic in topics:
358 0         for data in datas:
359 0             recv_topic, recv_key, recv_msg = msg_local_with_data.read(
360                 topic, blocks=False
361             )
362 0             key = list(data.keys())[0]
363 0             val = data[key]
364 0             assert recv_topic == topic
365 0             assert recv_key == key
366 0             assert recv_msg == val
367
368
369 1 @pytest.mark.parametrize(
370     "topics, datas",
371     [
372         (["topic"], [{"key": "value"}]),
373         (["topic1"], [{"key": "value"}]),
374         (["topic2"], [{"key": "value"}]),
375         (["topic", "topic1"], [{"key": "value"}]),
376         (["topic", "topic2"], [{"key": "value"}]),
377         (["topic1", "topic2"], [{"key": "value"}]),
378         (["topic", "topic1", "topic2"], [{"key": "value"}]),
379         (["topic"], [{"key": "value"}, {"key1": "value1"}]),
380         (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
381         (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
382         (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
383         (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
384         (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
385         (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
386     ],
387 )
388 1 def test_read_non_block_none(msg_local_with_data, topics, datas):
389 0     def write_to_topic(topics, datas):
390 0         time.sleep(2)
391 0         for topic in topics:
392 0             for data in datas:
393 0                 with open(msg_local_with_data.path + topic, "a+") as fp:
394 0                     yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
395 0                     fp.flush()
396
397     # If file is not opened first, the messages written won't be seen
398 0     for topic in topics:
399 0         if topic not in msg_local_with_data.files_read:
400 0             msg_local_with_data.read(topic, blocks=False)
401 0     t = threading.Thread(target=write_to_topic, args=(topics, datas))
402 0     t.start()
403
404 0     for topic in topics:
405 0         recv_data = msg_local_with_data.read(topic, blocks=False)
406 0         assert recv_data is None
407 0     t.join()
408
409
410 1 @pytest.mark.parametrize("blocks", [(True), (False)])
411 1 def test_read_exception(msg_local_with_data, blocks):
412 0     msg_local_with_data.files_read = MagicMock()
413 0     msg_local_with_data.files_read.__contains__.side_effect = Exception()
414
415 0     with pytest.raises(MsgException) as excinfo:
416 0         msg_local_with_data.read("topic1", blocks=blocks)
417 0     assert str(excinfo.value).startswith(empty_exception_message())
418 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
419
420
421 1 @pytest.mark.parametrize(
422     "topics, datas",
423     [
424         (["topic"], [{"key": "value"}]),
425         (["topic1"], [{"key": "value"}]),
426         (["topic2"], [{"key": "value"}]),
427         (["topic", "topic1"], [{"key": "value"}]),
428         (["topic", "topic2"], [{"key": "value"}]),
429         (["topic1", "topic2"], [{"key": "value"}]),
430         (["topic", "topic1", "topic2"], [{"key": "value"}]),
431         (["topic"], [{"key": "value"}, {"key1": "value1"}]),
432         (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
433         (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
434         (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
435         (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
436         (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
437         (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
438     ],
439 )
440 1 def test_aioread(msg_local_with_data, topics, datas):
441 0     def write_to_topic(topics, datas):
442 0         time.sleep(2)
443 0         for topic in topics:
444 0             for data in datas:
445 0                 with open(msg_local_with_data.path + topic, "a+") as fp:
446 0                     yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
447 0                     fp.flush()
448
449     # If file is not opened first, the messages written won't be seen
450 0     for topic in topics:
451 0         if topic not in msg_local_with_data.files_read:
452 0             msg_local_with_data.read(topic, blocks=False)
453
454 0     t = threading.Thread(target=write_to_topic, args=(topics, datas))
455 0     t.start()
456 0     for topic in topics:
457 0         for data in datas:
458 0             recv = asyncio.run(msg_local_with_data.aioread(topic))
459 0             recv_topic, recv_key, recv_msg = recv
460 0             key = list(data.keys())[0]
461 0             val = data[key]
462 0             assert recv_topic == topic
463 0             assert recv_key == key
464 0             assert recv_msg == val
465 0     t.join()
466
467
468 1 def test_aioread_exception(msg_local_with_data):
469 0     msg_local_with_data.files_read = MagicMock()
470 0     msg_local_with_data.files_read.__contains__.side_effect = Exception()
471
472 0     with pytest.raises(MsgException) as excinfo:
473 0         asyncio.run(msg_local_with_data.aioread("topic1"))
474 0     assert str(excinfo.value).startswith(empty_exception_message())
475 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
476
477
478 1 def test_aioread_general_exception(msg_local_with_data):
479 0     msg_local_with_data.read = MagicMock()
480 0     msg_local_with_data.read.side_effect = Exception()
481
482 0     with pytest.raises(MsgException) as excinfo:
483 0         asyncio.run(msg_local_with_data.aioread("topic1"))
484 0     assert str(excinfo.value).startswith(empty_exception_message())
485 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
486
487
488 1 @pytest.mark.parametrize(
489     "topic, key, msg",
490     [
491         ("test_topic", "test_key", "test_msg"),
492         ("test", "test_key", "test_msg"),
493         ("test_topic", "test", "test_msg"),
494         ("test_topic", "test_key", "test"),
495         ("test_topic", "test_list", ["a", "b", "c"]),
496         ("test_topic", "test_tuple", ("c", "b", "a")),
497         ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
498         ("test_topic", "test_number", 123),
499         ("test_topic", "test_float", 1.23),
500         ("test_topic", "test_boolean", True),
501         ("test_topic", "test_none", None),
502     ],
503 )
504 1 def test_aiowrite(msg_local_config, topic, key, msg):
505 0     file_path = msg_local_config.path + topic
506 0     asyncio.run(msg_local_config.aiowrite(topic, key, msg))
507 0     assert os.path.exists(file_path)
508
509 0     with open(file_path, "r") as stream:
510 0         assert yaml.safe_load(stream) == {
511             key: msg if not isinstance(msg, tuple) else list(msg)
512         }
513
514
515 1 @pytest.mark.parametrize(
516     "topic, key, msg, times",
517     [
518         ("test_topic", "test_key", "test_msg", 2),
519         ("test", "test_key", "test_msg", 3),
520         ("test_topic", "test", "test_msg", 4),
521         ("test_topic", "test_key", "test", 2),
522         ("test_topic", "test_list", ["a", "b", "c"], 3),
523         ("test_topic", "test_tuple", ("c", "b", "a"), 4),
524         ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
525         ("test_topic", "test_number", 123, 3),
526         ("test_topic", "test_float", 1.23, 4),
527         ("test_topic", "test_boolean", True, 2),
528         ("test_topic", "test_none", None, 3),
529     ],
530 )
531 1 def test_aiowrite_with_multiple_calls(msg_local_config, topic, key, msg, times):
532 0     file_path = msg_local_config.path + topic
533 0     for _ in range(times):
534 0         asyncio.run(msg_local_config.aiowrite(topic, key, msg))
535 0     assert os.path.exists(file_path)
536
537 0     with open(file_path, "r") as stream:
538 0         for _ in range(times):
539 0             data = stream.readline()
540 0             assert yaml.safe_load(data) == {
541                 key: msg if not isinstance(msg, tuple) else list(msg)
542             }
543
544
545 1 def test_aiowrite_exception(msg_local_config):
546 0     msg_local_config.files_write = MagicMock()
547 0     msg_local_config.files_write.__contains__.side_effect = Exception()
548
549 0     with pytest.raises(MsgException) as excinfo:
550 0         asyncio.run(msg_local_config.aiowrite("test", "test", "test"))
551 0     assert str(excinfo.value).startswith(empty_exception_message())
552 0     assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR