41f6eb8d0d4c6ebcaec4dfff00855837809d50e0
[osm/common.git] / osm_common / tests / test_msglocal.py
1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 #
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
18 ##
19
20 import http
21 import logging
22 import pytest
23 import tempfile
24 import shutil
25 import uuid
26 import os
27 import yaml
28 import time
29 import threading
30
31 from unittest.mock import MagicMock
32 from osm_common.msgbase import MsgException
33 from osm_common.msglocal import MsgLocal
34
35 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
36
37
38 def valid_path():
39 return tempfile.gettempdir() + '/'
40
41
42 def invalid_path():
43 return '/#tweeter/'
44
45
46 @pytest.fixture(scope="function", params=[True, False])
47 def msg_local(request):
48 msg = MsgLocal(lock=request.param)
49 yield msg
50
51 msg.disconnect()
52 if msg.path and msg.path != invalid_path() and msg.path != valid_path():
53 shutil.rmtree(msg.path)
54
55
56 @pytest.fixture(scope="function", params=[True, False])
57 def msg_local_config(request):
58 msg = MsgLocal(lock=request.param)
59 msg.connect({"path": valid_path() + str(uuid.uuid4())})
60 yield msg
61
62 msg.disconnect()
63 if msg.path != invalid_path():
64 shutil.rmtree(msg.path)
65
66
67 @pytest.fixture(scope="function", params=[True, False])
68 def msg_local_with_data(request):
69 msg = MsgLocal(lock=request.param)
70 msg.connect({"path": valid_path() + str(uuid.uuid4())})
71
72 msg.write("topic1", "key1", "msg1")
73 msg.write("topic1", "key2", "msg1")
74 msg.write("topic2", "key1", "msg1")
75 msg.write("topic2", "key2", "msg1")
76 msg.write("topic1", "key1", "msg2")
77 msg.write("topic1", "key2", "msg2")
78 msg.write("topic2", "key1", "msg2")
79 msg.write("topic2", "key2", "msg2")
80 yield msg
81
82 msg.disconnect()
83 if msg.path != invalid_path():
84 shutil.rmtree(msg.path)
85
86
87 def empty_exception_message():
88 return "messaging exception "
89
90
91 def test_constructor():
92 msg = MsgLocal()
93 assert msg.logger == logging.getLogger('msg')
94 assert msg.path is None
95 assert len(msg.files_read) == 0
96 assert len(msg.files_write) == 0
97 assert len(msg.buffer) == 0
98
99
100 def test_constructor_with_logger():
101 logger_name = 'msg_local'
102 msg = MsgLocal(logger_name=logger_name)
103 assert msg.logger == logging.getLogger(logger_name)
104 assert msg.path is None
105 assert len(msg.files_read) == 0
106 assert len(msg.files_write) == 0
107 assert len(msg.buffer) == 0
108
109
110 @pytest.mark.parametrize("config, logger_name, path", [
111 ({"logger_name": "msg_local", "path": valid_path()}, "msg_local", valid_path()),
112 ({"logger_name": "msg_local", "path": valid_path()[:-1]}, "msg_local", valid_path()),
113 ({"logger_name": "msg_local", "path": valid_path() + "test_it/"}, "msg_local", valid_path() + "test_it/"),
114 ({"logger_name": "msg_local", "path": valid_path() + "test_it"}, "msg_local", valid_path() + "test_it/"),
115 ({"path": valid_path()}, "msg", valid_path()),
116 ({"path": valid_path()[:-1]}, "msg", valid_path()),
117 ({"path": valid_path() + "test_it/"}, "msg", valid_path() + "test_it/"),
118 ({"path": valid_path() + "test_it"}, "msg", valid_path() + "test_it/")])
119 def test_connect(msg_local, config, logger_name, path):
120 msg_local.connect(config)
121 assert msg_local.logger == logging.getLogger(logger_name)
122 assert msg_local.path == path
123 assert len(msg_local.files_read) == 0
124 assert len(msg_local.files_write) == 0
125 assert len(msg_local.buffer) == 0
126
127
128 @pytest.mark.parametrize("config", [
129 ({"logger_name": "msg_local", "path": invalid_path()}),
130 ({"path": invalid_path()})])
131 def test_connect_with_exception(msg_local, config):
132 with pytest.raises(MsgException) as excinfo:
133 msg_local.connect(config)
134 assert str(excinfo.value).startswith(empty_exception_message())
135 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
136
137
138 def test_disconnect(msg_local_config):
139 files_read = msg_local_config.files_read.copy()
140 files_write = msg_local_config.files_write.copy()
141 msg_local_config.disconnect()
142 for f in files_read.values():
143 assert f.closed
144 for f in files_write.values():
145 assert f.closed
146
147
148 def test_disconnect_with_read(msg_local_config):
149 msg_local_config.read('topic1', blocks=False)
150 msg_local_config.read('topic2', blocks=False)
151 files_read = msg_local_config.files_read.copy()
152 files_write = msg_local_config.files_write.copy()
153 msg_local_config.disconnect()
154 for f in files_read.values():
155 assert f.closed
156 for f in files_write.values():
157 assert f.closed
158
159
160 def test_disconnect_with_write(msg_local_with_data):
161 files_read = msg_local_with_data.files_read.copy()
162 files_write = msg_local_with_data.files_write.copy()
163 msg_local_with_data.disconnect()
164
165 for f in files_read.values():
166 assert f.closed
167
168 for f in files_write.values():
169 assert f.closed
170
171
172 def test_disconnect_with_read_and_write(msg_local_with_data):
173 msg_local_with_data.read('topic1', blocks=False)
174 msg_local_with_data.read('topic2', blocks=False)
175 files_read = msg_local_with_data.files_read.copy()
176 files_write = msg_local_with_data.files_write.copy()
177
178 msg_local_with_data.disconnect()
179 for f in files_read.values():
180 assert f.closed
181 for f in files_write.values():
182 assert f.closed
183
184
185 @pytest.mark.parametrize("topic, key, msg", [
186 ("test_topic", "test_key", "test_msg"),
187 ("test", "test_key", "test_msg"),
188 ("test_topic", "test", "test_msg"),
189 ("test_topic", "test_key", "test"),
190 ("test_topic", "test_list", ["a", "b", "c"]),
191 ("test_topic", "test_tuple", ("c", "b", "a")),
192 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
193 ("test_topic", "test_number", 123),
194 ("test_topic", "test_float", 1.23),
195 ("test_topic", "test_boolean", True),
196 ("test_topic", "test_none", None)])
197 def test_write(msg_local_config, topic, key, msg):
198 file_path = msg_local_config.path + topic
199 msg_local_config.write(topic, key, msg)
200 assert os.path.exists(file_path)
201
202 with open(file_path, 'r') as stream:
203 assert yaml.safe_load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
204
205
206 @pytest.mark.parametrize("topic, key, msg, times", [
207 ("test_topic", "test_key", "test_msg", 2),
208 ("test", "test_key", "test_msg", 3),
209 ("test_topic", "test", "test_msg", 4),
210 ("test_topic", "test_key", "test", 2),
211 ("test_topic", "test_list", ["a", "b", "c"], 3),
212 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
213 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
214 ("test_topic", "test_number", 123, 3),
215 ("test_topic", "test_float", 1.23, 4),
216 ("test_topic", "test_boolean", True, 2),
217 ("test_topic", "test_none", None, 3)])
218 def test_write_with_multiple_calls(msg_local_config, topic, key, msg, times):
219 file_path = msg_local_config.path + topic
220
221 for _ in range(times):
222 msg_local_config.write(topic, key, msg)
223 assert os.path.exists(file_path)
224
225 with open(file_path, 'r') as stream:
226 for _ in range(times):
227 data = stream.readline()
228 assert yaml.safe_load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
229
230
231 def test_write_exception(msg_local_config):
232 msg_local_config.files_write = MagicMock()
233 msg_local_config.files_write.__contains__.side_effect = Exception()
234
235 with pytest.raises(MsgException) as excinfo:
236 msg_local_config.write("test", "test", "test")
237 assert str(excinfo.value).startswith(empty_exception_message())
238 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
239
240
241 @pytest.mark.parametrize("topics, datas", [
242 (["topic"], [{"key": "value"}]),
243 (["topic1"], [{"key": "value"}]),
244 (["topic2"], [{"key": "value"}]),
245 (["topic", "topic1"], [{"key": "value"}]),
246 (["topic", "topic2"], [{"key": "value"}]),
247 (["topic1", "topic2"], [{"key": "value"}]),
248 (["topic", "topic1", "topic2"], [{"key": "value"}]),
249 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
250 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
251 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
252 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
253 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
254 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
255 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
256 def test_read(msg_local_with_data, topics, datas):
257 def write_to_topic(topics, datas):
258 # Allow msglocal to block while waiting
259 time.sleep(2)
260 for topic in topics:
261 for data in datas:
262 with open(msg_local_with_data.path + topic, "a+") as fp:
263 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
264 fp.flush()
265
266 # If file is not opened first, the messages written won't be seen
267 for topic in topics:
268 if topic not in msg_local_with_data.files_read:
269 msg_local_with_data.read(topic, blocks=False)
270
271 t = threading.Thread(target=write_to_topic, args=(topics, datas))
272 t.start()
273
274 for topic in topics:
275 for data in datas:
276 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic)
277 key = list(data.keys())[0]
278 val = data[key]
279 assert recv_topic == topic
280 assert recv_key == key
281 assert recv_msg == val
282 t.join()
283
284
285 @pytest.mark.parametrize("topics, datas", [
286 (["topic"], [{"key": "value"}]),
287 (["topic1"], [{"key": "value"}]),
288 (["topic2"], [{"key": "value"}]),
289 (["topic", "topic1"], [{"key": "value"}]),
290 (["topic", "topic2"], [{"key": "value"}]),
291 (["topic1", "topic2"], [{"key": "value"}]),
292 (["topic", "topic1", "topic2"], [{"key": "value"}]),
293 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
294 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
295 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
296 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
297 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
298 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
299 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
300 def test_read_non_block(msg_local_with_data, topics, datas):
301 def write_to_topic(topics, datas):
302 for topic in topics:
303 for data in datas:
304 with open(msg_local_with_data.path + topic, "a+") as fp:
305 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
306 fp.flush()
307
308 # If file is not opened first, the messages written won't be seen
309 for topic in topics:
310 if topic not in msg_local_with_data.files_read:
311 msg_local_with_data.read(topic, blocks=False)
312
313 t = threading.Thread(target=write_to_topic, args=(topics, datas))
314 t.start()
315 t.join()
316
317 for topic in topics:
318 for data in datas:
319 recv_topic, recv_key, recv_msg = msg_local_with_data.read(topic, blocks=False)
320 key = list(data.keys())[0]
321 val = data[key]
322 assert recv_topic == topic
323 assert recv_key == key
324 assert recv_msg == val
325
326
327 @pytest.mark.parametrize("topics, datas", [
328 (["topic"], [{"key": "value"}]),
329 (["topic1"], [{"key": "value"}]),
330 (["topic2"], [{"key": "value"}]),
331 (["topic", "topic1"], [{"key": "value"}]),
332 (["topic", "topic2"], [{"key": "value"}]),
333 (["topic1", "topic2"], [{"key": "value"}]),
334 (["topic", "topic1", "topic2"], [{"key": "value"}]),
335 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
336 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
337 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
338 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
339 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
340 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
341 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
342 def test_read_non_block_none(msg_local_with_data, topics, datas):
343 def write_to_topic(topics, datas):
344 time.sleep(2)
345 for topic in topics:
346 for data in datas:
347 with open(msg_local_with_data.path + topic, "a+") as fp:
348 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
349 fp.flush()
350 # If file is not opened first, the messages written won't be seen
351 for topic in topics:
352 if topic not in msg_local_with_data.files_read:
353 msg_local_with_data.read(topic, blocks=False)
354 t = threading.Thread(target=write_to_topic, args=(topics, datas))
355 t.start()
356
357 for topic in topics:
358 recv_data = msg_local_with_data.read(topic, blocks=False)
359 assert recv_data is None
360 t.join()
361
362
363 @pytest.mark.parametrize("blocks", [
364 (True),
365 (False)])
366 def test_read_exception(msg_local_with_data, blocks):
367 msg_local_with_data.files_read = MagicMock()
368 msg_local_with_data.files_read.__contains__.side_effect = Exception()
369
370 with pytest.raises(MsgException) as excinfo:
371 msg_local_with_data.read("topic1", blocks=blocks)
372 assert str(excinfo.value).startswith(empty_exception_message())
373 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
374
375
376 @pytest.mark.parametrize("topics, datas", [
377 (["topic"], [{"key": "value"}]),
378 (["topic1"], [{"key": "value"}]),
379 (["topic2"], [{"key": "value"}]),
380 (["topic", "topic1"], [{"key": "value"}]),
381 (["topic", "topic2"], [{"key": "value"}]),
382 (["topic1", "topic2"], [{"key": "value"}]),
383 (["topic", "topic1", "topic2"], [{"key": "value"}]),
384 (["topic"], [{"key": "value"}, {"key1": "value1"}]),
385 (["topic1"], [{"key": "value"}, {"key1": "value1"}]),
386 (["topic2"], [{"key": "value"}, {"key1": "value1"}]),
387 (["topic", "topic1"], [{"key": "value"}, {"key1": "value1"}]),
388 (["topic", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
389 (["topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}]),
390 (["topic", "topic1", "topic2"], [{"key": "value"}, {"key1": "value1"}])])
391 def test_aioread(msg_local_with_data, event_loop, topics, datas):
392 def write_to_topic(topics, datas):
393 time.sleep(2)
394 for topic in topics:
395 for data in datas:
396 with open(msg_local_with_data.path + topic, "a+") as fp:
397 yaml.safe_dump(data, fp, default_flow_style=True, width=20000)
398 fp.flush()
399 # If file is not opened first, the messages written won't be seen
400 for topic in topics:
401 if topic not in msg_local_with_data.files_read:
402 msg_local_with_data.read(topic, blocks=False)
403
404 t = threading.Thread(target=write_to_topic, args=(topics, datas))
405 t.start()
406 for topic in topics:
407 for data in datas:
408 recv = event_loop.run_until_complete(msg_local_with_data.aioread(topic, event_loop))
409 recv_topic, recv_key, recv_msg = recv
410 key = list(data.keys())[0]
411 val = data[key]
412 assert recv_topic == topic
413 assert recv_key == key
414 assert recv_msg == val
415 t.join()
416
417
418 def test_aioread_exception(msg_local_with_data, event_loop):
419 msg_local_with_data.files_read = MagicMock()
420 msg_local_with_data.files_read.__contains__.side_effect = Exception()
421
422 with pytest.raises(MsgException) as excinfo:
423 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
424 assert str(excinfo.value).startswith(empty_exception_message())
425 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
426
427
428 def test_aioread_general_exception(msg_local_with_data, event_loop):
429 msg_local_with_data.read = MagicMock()
430 msg_local_with_data.read.side_effect = Exception()
431
432 with pytest.raises(MsgException) as excinfo:
433 event_loop.run_until_complete(msg_local_with_data.aioread("topic1", event_loop))
434 assert str(excinfo.value).startswith(empty_exception_message())
435 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
436
437
438 @pytest.mark.parametrize("topic, key, msg", [
439 ("test_topic", "test_key", "test_msg"),
440 ("test", "test_key", "test_msg"),
441 ("test_topic", "test", "test_msg"),
442 ("test_topic", "test_key", "test"),
443 ("test_topic", "test_list", ["a", "b", "c"]),
444 ("test_topic", "test_tuple", ("c", "b", "a")),
445 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}),
446 ("test_topic", "test_number", 123),
447 ("test_topic", "test_float", 1.23),
448 ("test_topic", "test_boolean", True),
449 ("test_topic", "test_none", None)])
450 def test_aiowrite(msg_local_config, event_loop, topic, key, msg):
451 file_path = msg_local_config.path + topic
452 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
453 assert os.path.exists(file_path)
454
455 with open(file_path, 'r') as stream:
456 assert yaml.safe_load(stream) == {key: msg if not isinstance(msg, tuple) else list(msg)}
457
458
459 @pytest.mark.parametrize("topic, key, msg, times", [
460 ("test_topic", "test_key", "test_msg", 2),
461 ("test", "test_key", "test_msg", 3),
462 ("test_topic", "test", "test_msg", 4),
463 ("test_topic", "test_key", "test", 2),
464 ("test_topic", "test_list", ["a", "b", "c"], 3),
465 ("test_topic", "test_tuple", ("c", "b", "a"), 4),
466 ("test_topic", "test_dict", {"a": 1, "b": 2, "c": 3}, 2),
467 ("test_topic", "test_number", 123, 3),
468 ("test_topic", "test_float", 1.23, 4),
469 ("test_topic", "test_boolean", True, 2),
470 ("test_topic", "test_none", None, 3)])
471 def test_aiowrite_with_multiple_calls(msg_local_config, event_loop, topic, key, msg, times):
472 file_path = msg_local_config.path + topic
473 for _ in range(times):
474 event_loop.run_until_complete(msg_local_config.aiowrite(topic, key, msg))
475 assert os.path.exists(file_path)
476
477 with open(file_path, 'r') as stream:
478 for _ in range(times):
479 data = stream.readline()
480 assert yaml.safe_load(data) == {key: msg if not isinstance(msg, tuple) else list(msg)}
481
482
483 def test_aiowrite_exception(msg_local_config, event_loop):
484 msg_local_config.files_write = MagicMock()
485 msg_local_config.files_write.__contains__.side_effect = Exception()
486
487 with pytest.raises(MsgException) as excinfo:
488 event_loop.run_until_complete(msg_local_config.aiowrite("test", "test", "test"))
489 assert str(excinfo.value).startswith(empty_exception_message())
490 assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR