Disable the check of the release notes
[osm/RO.git] / RO-SDN-tapi / osm_rosdn_tapi / tests / test_wim_tapi.py
1 # -*- coding: utf-8 -*-
2
3 #######################################################################################
4 # This file is part of OSM RO module
5 #
6 # Copyright ETSI Contributors and Others.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #######################################################################################
20 # This work has been performed in the context of the TeraFlow Project -
21 # funded by the European Commission under Grant number 101015857 through the
22 # Horizon 2020 program.
23 # Contributors:
24 # - Lluis Gifre <lluis.gifre@cttc.es>
25 # - Ricard Vilalta <ricard.vilalta@cttc.es>
26 #######################################################################################
27
28 """This file contains the unit tests for the Transport API (TAPI) WIM connector."""
29
30 import http.server
31 import threading
32 import unittest
33
34 from osm_rosdn_tapi.exceptions import (
35 WimTapiConnectionPointsBadFormat,
36 WimTapiMissingConnPointField,
37 WimTapiUnsupportedServiceType,
38 )
39 from osm_rosdn_tapi.tests.constants import (
40 WIM_ACCOUNT,
41 WIM_HOST_PORT,
42 WIM_PORT_MAPPING,
43 WIM_URL,
44 )
45 from osm_rosdn_tapi.tests.mock_osm_ro import MockOsmRo
46 from osm_rosdn_tapi.tests.mock_tapi_handler import MockTapiRequestHandler
47 from osm_rosdn_tapi.wimconn_tapi import WimconnectorTAPI
48
49
50 SERVICE_TYPE = "ELINE"
51 SERVICE_CONNECTION_POINTS_BIDIRECTIONAL = [
52 # SIPs taken from mock_tapi_handler.py
53 {"service_endpoint_id": "R1-eth0"},
54 {"service_endpoint_id": "R2-eth0"},
55 ]
56 SERVICE_CONNECTION_POINTS_UNIDIRECTIONAL = [
57 # SIPs taken from mock_tapi_handler.py
58 {"service_endpoint_id": "R3-opt1"},
59 {"service_endpoint_id": "R4-opt1"},
60 ]
61
62
63 class UnitTests(unittest.TestCase):
64 """Unit tests for Transport API WIM connector"""
65
66 def setUp(self) -> None:
67 self.wim_server = http.server.ThreadingHTTPServer(
68 WIM_HOST_PORT, MockTapiRequestHandler
69 )
70
71 def test_wrong_cases(self):
72 with self.wim_server:
73 wim_server_thread = threading.Thread(target=self.wim_server.serve_forever)
74 wim_server_thread.daemon = True
75 wim_server_thread.start()
76
77 mock_osm_ro_tapi = MockOsmRo(
78 WimconnectorTAPI, WIM_URL, WIM_ACCOUNT, WIM_PORT_MAPPING
79 )
80
81 # Unsupported service type
82 with self.assertRaises(WimTapiUnsupportedServiceType) as test_context:
83 mock_osm_ro_tapi.create_connectivity_service(
84 "ELAN", SERVICE_CONNECTION_POINTS_BIDIRECTIONAL
85 )
86 self.assertEqual(
87 str(test_context.exception.args[0]),
88 "Unsupported ServiceType(ELAN). Supported ServiceTypes({'ELINE'})",
89 )
90
91 # Wrong number of connection_points
92 with self.assertRaises(WimTapiConnectionPointsBadFormat) as test_context:
93 mock_osm_ro_tapi.create_connectivity_service(SERVICE_TYPE, [])
94 self.assertEqual(
95 str(test_context.exception.args[0]),
96 "ConnectionPoints([]) must be a list or tuple of length 2",
97 )
98
99 # Wrong type of connection_points
100 with self.assertRaises(WimTapiConnectionPointsBadFormat) as test_context:
101 mock_osm_ro_tapi.create_connectivity_service(
102 SERVICE_TYPE, {"a": "b", "c": "d"}
103 )
104 self.assertEqual(
105 str(test_context.exception.args[0]),
106 "ConnectionPoints({'a': 'b', 'c': 'd'}) must be a list or tuple of length 2",
107 )
108
109 with self.assertRaises(WimTapiMissingConnPointField) as test_context:
110 mock_osm_ro_tapi.create_connectivity_service(
111 SERVICE_TYPE,
112 [
113 {"wrong_service_endpoint_id": "value"},
114 {"service_endpoint_id": "value"},
115 ],
116 )
117 self.assertEqual(
118 str(test_context.exception.args[0]),
119 "WIM TAPI Connector: ConnectionPoint({'wrong_service_endpoint_id': 'value'}) has no field 'service_endpoint_id'",
120 )
121
122 self.wim_server.shutdown()
123 wim_server_thread.join()
124
125 def test_correct_bidirectional(self):
126 with self.wim_server:
127 wim_server_thread = threading.Thread(target=self.wim_server.serve_forever)
128 wim_server_thread.daemon = True
129 wim_server_thread.start()
130
131 mock_osm_ro_tapi = MockOsmRo(
132 WimconnectorTAPI, WIM_URL, WIM_ACCOUNT, WIM_PORT_MAPPING
133 )
134
135 # Create bidirectional TAPI service
136 service_uuid = mock_osm_ro_tapi.create_connectivity_service(
137 SERVICE_TYPE, SERVICE_CONNECTION_POINTS_BIDIRECTIONAL
138 )
139 self.assertIsInstance(service_uuid, str)
140
141 # Check status of bidirectional TAPI service
142 status = mock_osm_ro_tapi.get_connectivity_service_status(service_uuid)
143 self.assertIsInstance(status, dict)
144 self.assertIn("sdn_status", status)
145 self.assertEqual(status["sdn_status"], "ACTIVE")
146
147 # Delete bidirectional TAPI service
148 mock_osm_ro_tapi.delete_connectivity_service(service_uuid)
149
150 self.wim_server.shutdown()
151 wim_server_thread.join()
152
153 def test_correct_unidirectional(self):
154 with self.wim_server:
155 wim_server_thread = threading.Thread(target=self.wim_server.serve_forever)
156 wim_server_thread.daemon = True
157 wim_server_thread.start()
158
159 mock_osm_ro_tapi = MockOsmRo(
160 WimconnectorTAPI, WIM_URL, WIM_ACCOUNT, WIM_PORT_MAPPING
161 )
162
163 # Create unidirectional TAPI service
164 service_uuid = mock_osm_ro_tapi.create_connectivity_service(
165 SERVICE_TYPE, SERVICE_CONNECTION_POINTS_UNIDIRECTIONAL
166 )
167 self.assertIsInstance(service_uuid, str)
168
169 # Check status of unidirectional TAPI service
170 status = mock_osm_ro_tapi.get_connectivity_service_status(service_uuid)
171 self.assertIsInstance(status, dict)
172 self.assertIn("sdn_status", status)
173 self.assertEqual(status["sdn_status"], "ACTIVE")
174
175 # Delete unidirectional TAPI service
176 mock_osm_ro_tapi.delete_connectivity_service(service_uuid)
177
178 self.wim_server.shutdown()
179 wim_server_thread.join()