1 # -*- coding: utf-8 -*-
3 #######################################################################################
4 # This file is part of OSM RO module
6 # Copyright ETSI Contributors and Others.
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
19 #######################################################################################
20 # This work has been performed in the context of the TeraFlow Project -
21 # funded by the European Commission under Grant number 101015857 through the
22 # Horizon 2020 program.
24 # - Lluis Gifre <lluis.gifre@cttc.es>
25 # - Ricard Vilalta <ricard.vilalta@cttc.es>
26 #######################################################################################
28 """This file contains the unit tests for the Transport API (TAPI) WIM connector."""
34 from osm_rosdn_tapi
.exceptions
import (
35 WimTapiConnectionPointsBadFormat
,
36 WimTapiMissingConnPointField
,
37 WimTapiUnsupportedServiceType
,
39 from osm_rosdn_tapi
.tests
.constants
import (
45 from osm_rosdn_tapi
.tests
.mock_osm_ro
import MockOsmRo
46 from osm_rosdn_tapi
.tests
.mock_tapi_handler
import MockTapiRequestHandler
47 from osm_rosdn_tapi
.wimconn_tapi
import WimconnectorTAPI
50 SERVICE_TYPE
= "ELINE"
51 SERVICE_CONNECTION_POINTS_BIDIRECTIONAL
= [
52 # SIPs taken from mock_tapi_handler.py
53 {"service_endpoint_id": "R1-eth0"},
54 {"service_endpoint_id": "R2-eth0"},
56 SERVICE_CONNECTION_POINTS_UNIDIRECTIONAL
= [
57 # SIPs taken from mock_tapi_handler.py
58 {"service_endpoint_id": "R3-opt1"},
59 {"service_endpoint_id": "R4-opt1"},
63 class UnitTests(unittest
.TestCase
):
64 """Unit tests for Transport API WIM connector"""
66 def setUp(self
) -> None:
67 self
.wim_server
= http
.server
.ThreadingHTTPServer(
68 WIM_HOST_PORT
, MockTapiRequestHandler
71 def test_wrong_cases(self
):
73 wim_server_thread
= threading
.Thread(target
=self
.wim_server
.serve_forever
)
74 wim_server_thread
.daemon
= True
75 wim_server_thread
.start()
77 mock_osm_ro_tapi
= MockOsmRo(
78 WimconnectorTAPI
, WIM_URL
, WIM_ACCOUNT
, WIM_PORT_MAPPING
81 # Unsupported service type
82 with self
.assertRaises(WimTapiUnsupportedServiceType
) as test_context
:
83 mock_osm_ro_tapi
.create_connectivity_service(
84 "ELAN", SERVICE_CONNECTION_POINTS_BIDIRECTIONAL
87 str(test_context
.exception
.args
[0]),
88 "Unsupported ServiceType(ELAN). Supported ServiceTypes({'ELINE'})",
91 # Wrong number of connection_points
92 with self
.assertRaises(WimTapiConnectionPointsBadFormat
) as test_context
:
93 mock_osm_ro_tapi
.create_connectivity_service(SERVICE_TYPE
, [])
95 str(test_context
.exception
.args
[0]),
96 "ConnectionPoints([]) must be a list or tuple of length 2",
99 # Wrong type of connection_points
100 with self
.assertRaises(WimTapiConnectionPointsBadFormat
) as test_context
:
101 mock_osm_ro_tapi
.create_connectivity_service(
102 SERVICE_TYPE
, {"a": "b", "c": "d"}
105 str(test_context
.exception
.args
[0]),
106 "ConnectionPoints({'a': 'b', 'c': 'd'}) must be a list or tuple of length 2",
109 with self
.assertRaises(WimTapiMissingConnPointField
) as test_context
:
110 mock_osm_ro_tapi
.create_connectivity_service(
113 {"wrong_service_endpoint_id": "value"},
114 {"service_endpoint_id": "value"},
118 str(test_context
.exception
.args
[0]),
119 "WIM TAPI Connector: ConnectionPoint({'wrong_service_endpoint_id': 'value'}) has no field 'service_endpoint_id'",
122 self
.wim_server
.shutdown()
123 wim_server_thread
.join()
125 def test_correct_bidirectional(self
):
126 with self
.wim_server
:
127 wim_server_thread
= threading
.Thread(target
=self
.wim_server
.serve_forever
)
128 wim_server_thread
.daemon
= True
129 wim_server_thread
.start()
131 mock_osm_ro_tapi
= MockOsmRo(
132 WimconnectorTAPI
, WIM_URL
, WIM_ACCOUNT
, WIM_PORT_MAPPING
135 # Create bidirectional TAPI service
136 service_uuid
= mock_osm_ro_tapi
.create_connectivity_service(
137 SERVICE_TYPE
, SERVICE_CONNECTION_POINTS_BIDIRECTIONAL
139 self
.assertIsInstance(service_uuid
, str)
141 # Check status of bidirectional TAPI service
142 status
= mock_osm_ro_tapi
.get_connectivity_service_status(service_uuid
)
143 self
.assertIsInstance(status
, dict)
144 self
.assertIn("sdn_status", status
)
145 self
.assertEqual(status
["sdn_status"], "ACTIVE")
147 # Delete bidirectional TAPI service
148 mock_osm_ro_tapi
.delete_connectivity_service(service_uuid
)
150 self
.wim_server
.shutdown()
151 wim_server_thread
.join()
153 def test_correct_unidirectional(self
):
154 with self
.wim_server
:
155 wim_server_thread
= threading
.Thread(target
=self
.wim_server
.serve_forever
)
156 wim_server_thread
.daemon
= True
157 wim_server_thread
.start()
159 mock_osm_ro_tapi
= MockOsmRo(
160 WimconnectorTAPI
, WIM_URL
, WIM_ACCOUNT
, WIM_PORT_MAPPING
163 # Create unidirectional TAPI service
164 service_uuid
= mock_osm_ro_tapi
.create_connectivity_service(
165 SERVICE_TYPE
, SERVICE_CONNECTION_POINTS_UNIDIRECTIONAL
167 self
.assertIsInstance(service_uuid
, str)
169 # Check status of unidirectional TAPI service
170 status
= mock_osm_ro_tapi
.get_connectivity_service_status(service_uuid
)
171 self
.assertIsInstance(status
, dict)
172 self
.assertIn("sdn_status", status
)
173 self
.assertEqual(status
["sdn_status"], "ACTIVE")
175 # Delete unidirectional TAPI service
176 mock_osm_ro_tapi
.delete_connectivity_service(service_uuid
)
178 self
.wim_server
.shutdown()
179 wim_server_thread
.join()