4 from osm_common
.msgbase
import MsgBase
, MsgException
7 def exception_message(message
):
8 return "messaging exception " + message
14 def test_constructor():
17 assert msgbase
!= None
18 assert isinstance(msgbase
, MsgBase
)
20 def test_connect(msg_base
):
21 msg_base
.connect(None)
23 def test_disconnect(msg_base
):
26 def test_write(msg_base
):
27 with pytest
.raises(MsgException
) as excinfo
:
28 msg_base
.write("test", "test", "test")
29 assert str(excinfo
.value
).startswith(exception_message("Method 'write' not implemented"))
30 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
32 def test_read(msg_base
):
33 with pytest
.raises(MsgException
) as excinfo
:
35 assert str(excinfo
.value
).startswith(exception_message("Method 'read' not implemented"))
36 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
38 def test_aiowrite(msg_base
, event_loop
):
39 with pytest
.raises(MsgException
) as excinfo
:
40 event_loop
.run_until_complete(msg_base
.aiowrite("test", "test", "test", event_loop
))
41 assert str(excinfo
.value
).startswith(exception_message("Method 'aiowrite' not implemented"))
42 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
44 def test_aioread(msg_base
, event_loop
):
45 with pytest
.raises(MsgException
) as excinfo
:
46 event_loop
.run_until_complete(msg_base
.aioread("test", event_loop
))
47 assert str(excinfo
.value
).startswith(exception_message("Method 'aioread' not implemented"))
48 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR