| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| |
| ## |
| # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. |
| # This file is part of openvim |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| |
| ''' |
| This thread interacts with a openflow controller to create dataplane connections |
| ''' |
| |
| __author__="Pablo Montes, Alfonso Tierno" |
| __date__ ="17-jul-2015" |
| |
| |
| #import json |
| import threading |
| import time |
| import Queue |
| import requests |
| import logging |
| import openflow_conn |
| |
| OFC_STATUS_ACTIVE = 'ACTIVE' |
| OFC_STATUS_INACTIVE = 'INACTIVE' |
| OFC_STATUS_ERROR = 'ERROR' |
| |
| class FlowBadFormat(Exception): |
| '''raise when a bad format of flow is found''' |
| |
| def change_of2db(flow): |
| '''Change 'flow' dictionary from openflow format to database format |
| Basically the change consist of changing 'flow[actions] from a list of |
| double tuple to a string |
| from [(A,B),(C,D),..] to "A=B,C=D" ''' |
| action_str_list=[] |
| if type(flow)!=dict or "actions" not in flow: |
| raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key") |
| try: |
| for action in flow['actions']: |
| action_str_list.append( action[0] + "=" + str(action[1]) ) |
| flow['actions'] = ",".join(action_str_list) |
| except: |
| raise FlowBadFormat("Unexpected format at 'actions'") |
| |
| def change_db2of(flow): |
| '''Change 'flow' dictionary from database format to openflow format |
| Basically the change consist of changing 'flow[actions]' from a string to |
| a double tuple list |
| from "A=B,C=D,..." to [(A,B),(C,D),..] |
| raise FlowBadFormat ''' |
| actions=[] |
| if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str: |
| raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key") |
| action_list = flow['actions'].split(",") |
| for action_item in action_list: |
| action_tuple = action_item.split("=") |
| if len(action_tuple) != 2: |
| raise FlowBadFormat("Expected key=value format at 'actions'") |
| if action_tuple[0].strip().lower()=="vlan": |
| if action_tuple[1].strip().lower() in ("none", "strip"): |
| actions.append( ("vlan",None) ) |
| else: |
| try: |
| actions.append( ("vlan", int(action_tuple[1])) ) |
| except: |
| raise FlowBadFormat("Expected integer after vlan= at 'actions'") |
| elif action_tuple[0].strip().lower()=="out": |
| actions.append( ("out", str(action_tuple[1])) ) |
| else: |
| raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0]) |
| flow['actions'] = actions |
| |
| |
| class openflow_thread(threading.Thread): |
| """ |
| This thread interacts with a openflow controller to create dataplane connections |
| """ |
| def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'): |
| threading.Thread.__init__(self) |
| self.of_uuid = of_uuid |
| self.db = db |
| self.pmp_with_same_vlan = pmp_with_same_vlan |
| self.name = "openflow" |
| self.test = of_test |
| self.db_lock = db_lock |
| self.OF_connector = of_connector |
| self.logger = logging.getLogger('vim.OF-' + of_uuid) |
| self.logger.setLevel(getattr(logging, debug)) |
| self.logger.name = of_connector.name + " " + self.OF_connector.dpid |
| self.queueLock = threading.Lock() |
| self.taskQueue = Queue.Queue(2000) |
| |
| def insert_task(self, task, *aditional): |
| try: |
| self.queueLock.acquire() |
| task = self.taskQueue.put( (task,) + aditional, timeout=5) |
| self.queueLock.release() |
| return 1, None |
| except Queue.Full: |
| return -1, "timeout inserting a task over openflow thread " + self.name |
| |
| def run(self): |
| self.logger.debug("Start openflow thread") |
| self.set_openflow_controller_status(OFC_STATUS_ACTIVE) |
| |
| while True: |
| try: |
| self.queueLock.acquire() |
| if not self.taskQueue.empty(): |
| task = self.taskQueue.get() |
| else: |
| task = None |
| self.queueLock.release() |
| |
| if task is None: |
| time.sleep(1) |
| continue |
| |
| if task[0] == 'update-net': |
| r,c = self.update_of_flows(task[1]) |
| # update database status |
| if r<0: |
| UPDATE={'status':'ERROR', 'last_error': str(c)} |
| self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c) |
| self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1])) |
| else: |
| UPDATE={'status':'ACTIVE', 'last_error': None} |
| self.logger.debug("processing task 'update-net' %s: OK", str(task[1])) |
| self.set_openflow_controller_status(OFC_STATUS_ACTIVE) |
| self.db_lock.acquire() |
| self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]}) |
| self.db_lock.release() |
| |
| elif task[0] == 'clear-all': |
| r,c = self.clear_all_flows() |
| if r<0: |
| self.logger.error("processing task 'clear-all': %s", c) |
| self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows") |
| else: |
| self.set_openflow_controller_status(OFC_STATUS_ACTIVE) |
| self.logger.debug("processing task 'clear-all': OK") |
| elif task[0] == 'exit': |
| self.logger.debug("exit from openflow_thread") |
| self.terminate() |
| self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed") |
| return 0 |
| else: |
| self.logger.error("unknown task %s", str(task)) |
| except openflow_conn.OpenflowconnException as e: |
| self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e)) |
| |
| def terminate(self): |
| pass |
| # print self.name, ": exit from openflow_thread" |
| |
| def update_of_flows(self, net_id): |
| ports=() |
| self.db_lock.acquire() |
| select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid') |
| result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} ) |
| #get all the networks binding to this |
| if result > 0: |
| if nets[0]['bind_net']: |
| bind_id = nets[0]['bind_net'] |
| else: |
| bind_id = net_id |
| #get our net and all bind_nets |
| result, nets = self.db.get_table(FROM='nets', SELECT=select_, |
| WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} ) |
| |
| self.db_lock.release() |
| if result < 0: |
| return -1, "DB error getting net: " + nets |
| #elif result==0: |
| #net has been deleted |
| ifaces_nb = 0 |
| database_flows = [] |
| for net in nets: |
| net_id = net["uuid"] |
| if net['admin_state_up'] == 'false': |
| net['ports'] = () |
| else: |
| self.db_lock.acquire() |
| nb_ports, net_ports = self.db.get_table( |
| FROM='ports', |
| SELECT=('switch_port','vlan','uuid','mac','type','model'), |
| WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} ) |
| self.db_lock.release() |
| if nb_ports < 0: |
| |
| #print self.name, ": update_of_flows() ERROR getting ports", ports |
| return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports) |
| |
| #add the binding as an external port |
| if net['provider'] and net['provider'][:9]=="openflow:": |
| external_port={"type":"external","mac":None} |
| external_port['uuid'] = net_id + ".1" #fake uuid |
| if net['provider'][-5:]==":vlan": |
| external_port["vlan"] = net["vlan"] |
| external_port["switch_port"] = net['provider'][9:-5] |
| else: |
| external_port["vlan"] = None |
| external_port["switch_port"] = net['provider'][9:] |
| net_ports = net_ports + (external_port,) |
| nb_ports += 1 |
| net['ports'] = net_ports |
| ifaces_nb += nb_ports |
| |
| # Get the name of flows that will be affected by this NET |
| self.db_lock.acquire() |
| result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id}) |
| self.db_lock.release() |
| if result < 0: |
| error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows) |
| # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows |
| return -1, error_msg |
| database_flows += database_net_flows |
| # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null) |
| self.db_lock.acquire() |
| result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None}) |
| self.db_lock.release() |
| if result < 0: |
| error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows) |
| # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows |
| return -1, error_msg |
| database_flows += database_net_flows |
| |
| # Get the existing flows at openflow controller |
| try: |
| of_flows = self.OF_connector.get_of_rules() |
| # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows |
| except openflow_conn.OpenflowconnException as e: |
| # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e))) |
| return -1, "OF error {} getting flows".format(str(e)) |
| |
| if ifaces_nb < 2: |
| pass |
| elif net['type'] == 'ptp': |
| if ifaces_nb > 2: |
| #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\ |
| # str(ifaces_nb)+' interfaces.' |
| return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb |
| elif net['type'] == 'data': |
| if ifaces_nb > 2 and self.pmp_with_same_vlan: |
| # check all ports are VLAN (tagged) or none |
| vlan_tag = None |
| for port in ports: |
| if port["type"]=="external": |
| if port["vlan"] != None: |
| if port["vlan"]!=net["vlan"]: |
| text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True" |
| #print self.name, "Error", text |
| return -1, text |
| if vlan_tag == None: |
| vlan_tag=True |
| elif vlan_tag==False: |
| text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" |
| #print self.name, "Error", text |
| return -1, text |
| else: |
| if vlan_tag == None: |
| vlan_tag=False |
| elif vlan_tag == True: |
| text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" |
| #print self.name, "Error", text |
| return -1, text |
| elif port["model"]=="PF" or port["model"]=="VFnotShared": |
| if vlan_tag == None: |
| vlan_tag=False |
| elif vlan_tag==True: |
| text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" |
| #print self.name, "Error", text |
| return -1, text |
| elif port["model"] == "VF": |
| if vlan_tag == None: |
| vlan_tag=True |
| elif vlan_tag==False: |
| text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" |
| #print self.name, "Error", text |
| return -1, text |
| else: |
| return -1, 'Only ptp and data networks are supported for openflow' |
| |
| # calculate new flows to be inserted |
| result, new_flows = self._compute_net_flows(nets) |
| if result < 0: |
| return result, new_flows |
| |
| #modify database flows format and get the used names |
| used_names=[] |
| for flow in database_flows: |
| try: |
| change_db2of(flow) |
| except FlowBadFormat as e: |
| self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow)) |
| continue |
| used_names.append(flow['name']) |
| name_index=0 |
| # insert at database the new flows, change actions to human text |
| for flow in new_flows: |
| # 1 check if an equal flow is already present |
| index = self._check_flow_already_present(flow, database_flows) |
| if index>=0: |
| database_flows[index]["not delete"]=True |
| self.logger.debug("Skipping already present flow %s", str(flow)) |
| continue |
| # 2 look for a non used name |
| flow_name=flow["net_id"]+"."+str(name_index) |
| while flow_name in used_names or flow_name in of_flows: |
| name_index += 1 |
| flow_name=flow["net_id"]+"."+str(name_index) |
| used_names.append(flow_name) |
| flow['name'] = flow_name |
| # 3 insert at openflow |
| |
| try: |
| self.OF_connector.new_flow(flow) |
| except openflow_conn.OpenflowconnException as e: |
| return -1, "Error creating new flow {}".format(str(e)) |
| |
| # 4 insert at database |
| try: |
| change_of2db(flow) |
| except FlowBadFormat as e: |
| # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow |
| return -1, str(e) |
| self.db_lock.acquire() |
| result, content = self.db.new_row('of_flows', flow) |
| self.db_lock.release() |
| if result < 0: |
| # print self.name, ": Error '%s' at database insertion" % content, flow |
| return -1, content |
| |
| #delete not needed old flows from openflow and from DDBB, |
| #check that the needed flows at DDBB are present in controller or insert them otherwise |
| for flow in database_flows: |
| if "not delete" in flow: |
| if flow["name"] not in of_flows: |
| # not in controller, insert it |
| try: |
| self.OF_connector.new_flow(flow) |
| except openflow_conn.OpenflowconnException as e: |
| return -1, "Error creating new flow {}".format(str(e)) |
| |
| continue |
| # Delete flow |
| if flow["name"] in of_flows: |
| try: |
| self.OF_connector.del_flow(flow['name']) |
| except openflow_conn.OpenflowconnException as e: |
| self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e)) |
| # skip deletion from database |
| continue |
| |
| # delete from database |
| self.db_lock.acquire() |
| result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id']) |
| self.db_lock.release() |
| if result<0: |
| self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content ) |
| |
| return 0, 'Success' |
| |
| def clear_all_flows(self): |
| try: |
| if not self.test: |
| self.OF_connector.clear_all_flows() |
| |
| # remove from database |
| self.db_lock.acquire() |
| self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines |
| self.db_lock.release() |
| return 0, None |
| except openflow_conn.OpenflowconnException as e: |
| return -1, self.logger.error("Error deleting all flows {}", str(e)) |
| |
| flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id') |
| |
| def _check_flow_already_present(self, new_flow, flow_list): |
| '''check if the same flow is already present in the flow list |
| The flow is repeated if all the fields, apart from name, are equal |
| Return the index of matching flow, -1 if not match''' |
| index=0 |
| for flow in flow_list: |
| equal=True |
| for f in self.flow_fields: |
| if flow.get(f) != new_flow.get(f): |
| equal=False |
| break |
| if equal: |
| return index |
| index += 1 |
| return -1 |
| |
| def _compute_net_flows(self, nets): |
| new_flows=[] |
| new_broadcast_flows={} |
| nb_ports = 0 |
| |
| # Check switch_port information is right |
| self.logger.debug("_compute_net_flows nets: %s", str(nets)) |
| for net in nets: |
| for port in net['ports']: |
| nb_ports += 1 |
| if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi: |
| error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port']) |
| # print self.name, ": ERROR " + error_text |
| return -1, error_text |
| |
| for net_src in nets: |
| net_id = net_src["uuid"] |
| for net_dst in nets: |
| vlan_net_in = None |
| vlan_net_out = None |
| if net_src == net_dst: |
| #intra net rules |
| priority = 1000 |
| elif net_src['bind_net'] == net_dst['uuid']: |
| if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:": |
| vlan_net_out = int(net_src['bind_type'][5:]) |
| priority = 1100 |
| elif net_dst['bind_net'] == net_src['uuid']: |
| if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:": |
| vlan_net_in = int(net_dst['bind_type'][5:]) |
| priority = 1100 |
| else: |
| #nets not binding |
| continue |
| for src_port in net_src['ports']: |
| vlan_in = vlan_net_in |
| if vlan_in == None and src_port['vlan'] != None: |
| vlan_in = src_port['vlan'] |
| elif vlan_in != None and src_port['vlan'] != None: |
| #TODO this is something that we cannot do. It requires a double VLAN check |
| #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in |
| continue |
| |
| # BROADCAST: |
| broadcast_key = src_port['uuid'] + "." + str(vlan_in) |
| if broadcast_key in new_broadcast_flows: |
| flow_broadcast = new_broadcast_flows[broadcast_key] |
| else: |
| flow_broadcast = {'priority': priority, |
| 'net_id': net_id, |
| 'dst_mac': 'ff:ff:ff:ff:ff:ff', |
| "ingress_port": str(src_port['switch_port']), |
| 'actions': [] |
| } |
| new_broadcast_flows[broadcast_key] = flow_broadcast |
| if vlan_in is not None: |
| flow_broadcast['vlan_id'] = str(vlan_in) |
| |
| for dst_port in net_dst['ports']: |
| vlan_out = vlan_net_out |
| if vlan_out == None and dst_port['vlan'] != None: |
| vlan_out = dst_port['vlan'] |
| elif vlan_out != None and dst_port['vlan'] != None: |
| #TODO this is something that we cannot do. It requires a double VLAN set |
| #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out |
| continue |
| #if src_port == dst_port: |
| # continue |
| if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out: |
| continue |
| flow = { |
| "priority": priority, |
| 'net_id': net_id, |
| "ingress_port": str(src_port['switch_port']), |
| 'actions': [] |
| } |
| if vlan_in is not None: |
| flow['vlan_id'] = str(vlan_in) |
| # allow that one port have no mac |
| if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements |
| flow['priority'] = priority-5 # less priority |
| else: |
| flow['dst_mac'] = str(dst_port['mac']) |
| |
| if vlan_out == None: |
| if vlan_in != None: |
| flow['actions'].append( ('vlan',None) ) |
| else: |
| flow['actions'].append( ('vlan', vlan_out ) ) |
| flow['actions'].append( ('out', str(dst_port['switch_port'])) ) |
| |
| if self._check_flow_already_present(flow, new_flows) >= 0: |
| self.logger.debug("Skipping repeated flow '%s'", str(flow)) |
| continue |
| |
| new_flows.append(flow) |
| |
| # BROADCAST: |
| if nb_ports <= 2: # point to multipoint or nets with more than 2 elements |
| continue |
| out = (vlan_out, str(dst_port['switch_port'])) |
| if out not in flow_broadcast['actions']: |
| flow_broadcast['actions'].append( out ) |
| |
| #BROADCAST |
| for flow_broadcast in new_broadcast_flows.values(): |
| if len(flow_broadcast['actions'])==0: |
| continue #nothing to do, skip |
| flow_broadcast['actions'].sort() |
| if 'vlan_id' in flow_broadcast: |
| previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan |
| else: |
| previous_vlan = None |
| final_actions=[] |
| action_number = 0 |
| for action in flow_broadcast['actions']: |
| if action[0] != previous_vlan: |
| final_actions.append( ('vlan', action[0]) ) |
| previous_vlan = action[0] |
| if self.pmp_with_same_vlan and action_number: |
| return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True." |
| action_number += 1 |
| final_actions.append( ('out', action[1]) ) |
| flow_broadcast['actions'] = final_actions |
| |
| if self._check_flow_already_present(flow_broadcast, new_flows) >= 0: |
| self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast)) |
| continue |
| |
| new_flows.append(flow_broadcast) |
| |
| #UNIFY openflow rules with the same input port and vlan and the same output actions |
| #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac |
| #this can happen if there is only two ports. It is converted to a point to point connection |
| flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values |
| for flow in new_flows: |
| key = str(flow.get("vlan_id"))+":"+flow["ingress_port"] |
| if key in flow_dict: |
| flow_dict[key].append(flow) |
| else: |
| flow_dict[key]=[ flow ] |
| new_flows2=[] |
| for flow_list in flow_dict.values(): |
| convert2ptp=False |
| if len (flow_list)>=2: |
| convert2ptp=True |
| for f in flow_list: |
| if f['actions'] != flow_list[0]['actions']: |
| convert2ptp=False |
| break |
| if convert2ptp: # add only one unified rule without dst_mac |
| self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) ) |
| flow_list[0].pop('dst_mac') |
| flow_list[0]["priority"] -= 5 |
| new_flows2.append(flow_list[0]) |
| else: # add all the rules |
| new_flows2 += flow_list |
| return 0, new_flows2 |
| |
| def set_openflow_controller_status(self, status, error_text=None): |
| """ |
| Set openflow controller last operation status in DB |
| :param status: ofc status ('ACTIVE','INACTIVE','ERROR') |
| :param error_text: error text |
| :return: |
| """ |
| if self.of_uuid == "Default": |
| return True |
| |
| ofc = {} |
| ofc['status'] = status |
| ofc['last_error'] = error_text |
| self.db_lock.acquire() |
| result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False) |
| self.db_lock.release() |
| if result >= 0: |
| return True |
| else: |
| return False |
| |
| |
| |
| |
| |
| |
| |