--- /dev/null
+import http
+import pytest
+
+from osm_common.msgbase import MsgBase, MsgException
+
+
+def exception_message(message):
+ return "messaging exception " + message
+
+@pytest.fixture
+def msg_base():
+ return MsgBase()
+
+def test_constructor():
+ msgbase = MsgBase()
+
+ assert msgbase != None
+ assert isinstance(msgbase, MsgBase)
+
+def test_connect(msg_base):
+ msg_base.connect()
+
+def test_disconnect(msg_base):
+ msg_base.disconnect()
+
+def test_write(msg_base):
+ with pytest.raises(MsgException) as excinfo:
+ msg_base.write("test", "test", "test")
+ assert str(excinfo.value).startswith(exception_message("Method 'write' not implemented"))
+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
+
+def test_read(msg_base):
+ with pytest.raises(MsgException) as excinfo:
+ msg_base.read("test")
+ assert str(excinfo.value).startswith(exception_message("Method 'read' not implemented"))
+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
+
+def test_aiowrite(msg_base, event_loop):
+ with pytest.raises(MsgException) as excinfo:
+ event_loop.run_until_complete(msg_base.aiowrite("test", "test", "test", event_loop))
+ assert str(excinfo.value).startswith(exception_message("Method 'aiowrite' not implemented"))
+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
+
+def test_aioread(msg_base, event_loop):
+ with pytest.raises(MsgException) as excinfo:
+ event_loop.run_until_complete(msg_base.aioread("test", event_loop))
+ assert str(excinfo.value).startswith(exception_message("Method 'aioread' not implemented"))
+ assert excinfo.value.http_code == http.HTTPStatus.INTERNAL_SERVER_ERROR