Feature 10937: Transport API (TAPI) WIM connector for RO
[osm/RO.git] / RO-SDN-tapi / osm_rosdn_tapi / tests / mock_osm_ro.py
1 # -*- coding: utf-8 -*-
2
3 #######################################################################################
4 # This file is part of OSM RO module
5 #
6 # Copyright ETSI Contributors and Others.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #######################################################################################
20 # This work has been performed in the context of the TeraFlow Project -
21 # funded by the European Commission under Grant number 101015857 through the
22 # Horizon 2020 program.
23 # Contributors:
24 # - Lluis Gifre <lluis.gifre@cttc.es>
25 # - Ricard Vilalta <ricard.vilalta@cttc.es>
26 #######################################################################################
27
28 """This file contains a Mock OSM RO component that can be used for rapid unit testing.
29
30 This code is based on code taken with permission from ETSI TeraFlowSDN project at:
31 https://labs.etsi.org/rep/tfs/controller
32 """
33
34
35 from typing import Dict, List
36
37 from osm_ro_plugin.sdnconn import SdnConnectorBase
38
39 from .exceptions import MockOsmRoServiceNotFound
40
41
42 class MockOsmRo:
43 def __init__(
44 self,
45 klass: SdnConnectorBase,
46 url: str,
47 wim_account: Dict,
48 wim_port_mapping: Dict,
49 ) -> None:
50 wim = {"wim_url": url}
51 config = {
52 "mapping_not_needed": False,
53 "service_endpoint_mapping": wim_port_mapping,
54 }
55
56 # Instantiate WIM connector
57 self.wim_connector = klass(wim, wim_account, config=config)
58
59 # Internal DB emulating OSM RO storage provided to WIM Connectors
60 self.conn_info = {}
61
62 def create_connectivity_service(
63 self, service_type: str, connection_points: List[Dict]
64 ) -> str:
65 self.wim_connector.check_credentials()
66 service_uuid, conn_info = self.wim_connector.create_connectivity_service(
67 service_type, connection_points
68 )
69 self.conn_info[service_uuid] = conn_info
70 return service_uuid
71
72 def get_connectivity_service_status(self, service_uuid: str) -> Dict:
73 conn_info = self.conn_info.get(service_uuid)
74 if conn_info is None:
75 raise MockOsmRoServiceNotFound(service_uuid)
76 self.wim_connector.check_credentials()
77 return self.wim_connector.get_connectivity_service_status(
78 service_uuid, conn_info=conn_info
79 )
80
81 def edit_connectivity_service(
82 self, service_uuid: str, connection_points: List[Dict]
83 ) -> None:
84 conn_info = self.conn_info.get(service_uuid)
85 if conn_info is None:
86 raise MockOsmRoServiceNotFound(service_uuid)
87 self.wim_connector.check_credentials()
88 self.wim_connector.edit_connectivity_service(
89 service_uuid, conn_info=conn_info, connection_points=connection_points
90 )
91
92 def delete_connectivity_service(self, service_uuid: str) -> None:
93 conn_info = self.conn_info.get(service_uuid)
94 if conn_info is None:
95 raise MockOsmRoServiceNotFound(service_uuid)
96 self.wim_connector.check_credentials()
97 self.wim_connector.delete_connectivity_service(
98 service_uuid, conn_info=conn_info
99 )