| # -*- coding: utf-8 -*- |
| |
| ####################################################################################### |
| # This file is part of OSM RO module |
| # |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| ####################################################################################### |
| # This work has been performed in the context of the TeraFlow Project - |
| # funded by the European Commission under Grant number 101015857 through the |
| # Horizon 2020 program. |
| # Contributors: |
| # - Lluis Gifre <lluis.gifre@cttc.es> |
| # - Ricard Vilalta <ricard.vilalta@cttc.es> |
| ####################################################################################### |
| |
| """This file contains the helper methods used to compose the Transport API (TAPI) |
| messages sent by the TAPI WIM connector to the WIM.""" |
| |
| |
| import copy |
| |
| from .message_templates import ( |
| CREATE_TEMPLATE, |
| DELETE_TEMPLATE, |
| ENDPOINT_TEMPLATE, |
| REQUESTED_CAPACITY_TEMPLATE, |
| VLAN_CONSTRAINT_TEMPLATE, |
| ) |
| |
| |
| def compose_requested_capacity(capacity, unit="GBPS"): |
| requested_capacity = copy.deepcopy(REQUESTED_CAPACITY_TEMPLATE) |
| total_size = requested_capacity["total-size"] |
| total_size["value"] = capacity |
| total_size["unit"] = "GBPS" |
| return requested_capacity |
| |
| |
| def compose_vlan_constraint(vlan_id): |
| vlan_constraint = copy.deepcopy(VLAN_CONSTRAINT_TEMPLATE) |
| vlan_constraint["vlan-id"] = vlan_id |
| return vlan_constraint |
| |
| |
| def compose_endpoint(sip): |
| sip_uuid = sip["uuid"] |
| endpoint = copy.deepcopy(ENDPOINT_TEMPLATE) |
| endpoint["service-interface-point"]["service-interface-point-uuid"] = sip_uuid |
| endpoint["layer-protocol-name"] = sip["layer-protocol-name"] |
| # TODO: implement smart selection of layer-protocol-qualifier instead of selecting first one available |
| supported_layer_protocol_qualifier = sip["supported-layer-protocol-qualifier"][0] |
| endpoint["layer-protocol-qualifier"] = supported_layer_protocol_qualifier |
| endpoint["local-id"] = sip_uuid |
| return endpoint |
| |
| |
| def compose_create_request( |
| service_uuid, |
| endpoints, |
| bidirectional=False, |
| requested_capacity=None, |
| vlan_constraint=None, |
| ): |
| request = copy.deepcopy(CREATE_TEMPLATE) |
| con_svc = request["tapi-connectivity:connectivity-service"][0] |
| con_svc["uuid"] = service_uuid |
| con_svc["connectivity-direction"] = ( |
| "BIDIRECTIONAL" if bidirectional else "UNIDIRECTIONAL" |
| ) |
| con_svc["end-point"] = endpoints |
| if requested_capacity is not None: |
| con_svc["requested-capacity"] = requested_capacity |
| if vlan_constraint is not None: |
| con_svc["vlan-constraint"] = vlan_constraint |
| return request |
| |
| |
| def compose_delete_request(service_uuid): |
| request = copy.deepcopy(DELETE_TEMPLATE) |
| request["tapi-connectivity:input"]["uuid"] = service_uuid |
| return request |