1 # Copyright 2018 Whitestack, LLC
2 # Copyright 2018 Telefonica S.A.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
22 from osm_common
.msgbase
import MsgBase
, MsgException
26 def exception_message(message
):
27 return "messaging exception " + message
35 def test_constructor():
37 assert msgbase
is not None
38 assert isinstance(msgbase
, MsgBase
)
41 def test_connect(msg_base
):
42 msg_base
.connect(None)
45 def test_disconnect(msg_base
):
49 def test_write(msg_base
):
50 with pytest
.raises(MsgException
) as excinfo
:
51 msg_base
.write("test", "test", "test")
52 assert str(excinfo
.value
).startswith(
53 exception_message("Method 'write' not implemented")
55 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
58 def test_read(msg_base
):
59 with pytest
.raises(MsgException
) as excinfo
:
61 assert str(excinfo
.value
).startswith(
62 exception_message("Method 'read' not implemented")
64 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
67 def test_aiowrite(msg_base
):
68 with pytest
.raises(MsgException
) as excinfo
:
69 asyncio
.run(msg_base
.aiowrite("test", "test", "test"))
70 assert str(excinfo
.value
).startswith(
71 exception_message("Method 'aiowrite' not implemented")
73 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR
76 def test_aioread(msg_base
):
77 with pytest
.raises(MsgException
) as excinfo
:
78 asyncio
.run(msg_base
.aioread("test"))
79 assert str(excinfo
.value
).startswith(
80 exception_message("Method 'aioread' not implemented")
82 assert excinfo
.value
.http_code
== http
.HTTPStatus
.INTERNAL_SERVER_ERROR