1 # -*- coding: utf-8 -*-
3 #######################################################################################
4 # This file is part of OSM RO module
6 # Copyright ETSI Contributors and Others.
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
19 #######################################################################################
20 # This work has been performed in the context of the TeraFlow Project -
21 # funded by the European Commission under Grant number 101015857 through the
22 # Horizon 2020 program.
24 # - Lluis Gifre <lluis.gifre@cttc.es>
25 # - Ricard Vilalta <ricard.vilalta@cttc.es>
26 #######################################################################################
28 """This file contains a Mock OSM RO component that can be used for rapid unit testing.
30 This code is based on code taken with permission from ETSI TeraFlowSDN project at:
31 https://labs.etsi.org/rep/tfs/controller
35 from typing
import Dict
, List
37 from osm_ro_plugin
.sdnconn
import SdnConnectorBase
39 from .exceptions
import MockOsmRoServiceNotFound
45 klass
: SdnConnectorBase
,
48 wim_port_mapping
: Dict
,
50 wim
= {"wim_url": url
}
52 "mapping_not_needed": False,
53 "service_endpoint_mapping": wim_port_mapping
,
56 # Instantiate WIM connector
57 self
.wim_connector
= klass(wim
, wim_account
, config
=config
)
59 # Internal DB emulating OSM RO storage provided to WIM Connectors
62 def create_connectivity_service(
63 self
, service_type
: str, connection_points
: List
[Dict
]
65 self
.wim_connector
.check_credentials()
66 service_uuid
, conn_info
= self
.wim_connector
.create_connectivity_service(
67 service_type
, connection_points
69 self
.conn_info
[service_uuid
] = conn_info
72 def get_connectivity_service_status(self
, service_uuid
: str) -> Dict
:
73 conn_info
= self
.conn_info
.get(service_uuid
)
75 raise MockOsmRoServiceNotFound(service_uuid
)
76 self
.wim_connector
.check_credentials()
77 return self
.wim_connector
.get_connectivity_service_status(
78 service_uuid
, conn_info
=conn_info
81 def edit_connectivity_service(
82 self
, service_uuid
: str, connection_points
: List
[Dict
]
84 conn_info
= self
.conn_info
.get(service_uuid
)
86 raise MockOsmRoServiceNotFound(service_uuid
)
87 self
.wim_connector
.check_credentials()
88 self
.wim_connector
.edit_connectivity_service(
89 service_uuid
, conn_info
=conn_info
, connection_points
=connection_points
92 def delete_connectivity_service(self
, service_uuid
: str) -> None:
93 conn_info
= self
.conn_info
.get(service_uuid
)
95 raise MockOsmRoServiceNotFound(service_uuid
)
96 self
.wim_connector
.check_credentials()
97 self
.wim_connector
.delete_connectivity_service(
98 service_uuid
, conn_info
=conn_info