--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
+# This file is part of openvim
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+'''
+This thread interacts with a openflow controller to create dataplane connections
+'''
+
+__author__="Pablo Montes, Alfonso Tierno"
+__date__ ="17-jul-2015"
+
+
+#import json
+import threading
+import time
+import Queue
+import requests
+import logging
+import openflow_conn
+
+OFC_STATUS_ACTIVE = 'ACTIVE'
+OFC_STATUS_INACTIVE = 'INACTIVE'
+OFC_STATUS_ERROR = 'ERROR'
+
+class FlowBadFormat(Exception):
+ '''raise when a bad format of flow is found'''
+
+def change_of2db(flow):
+ '''Change 'flow' dictionary from openflow format to database format
+ Basically the change consist of changing 'flow[actions] from a list of
+ double tuple to a string
+ from [(A,B),(C,D),..] to "A=B,C=D" '''
+ action_str_list=[]
+ if type(flow)!=dict or "actions" not in flow:
+ raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
+ try:
+ for action in flow['actions']:
+ action_str_list.append( action[0] + "=" + str(action[1]) )
+ flow['actions'] = ",".join(action_str_list)
+ except:
+ raise FlowBadFormat("Unexpected format at 'actions'")
+
+def change_db2of(flow):
+ '''Change 'flow' dictionary from database format to openflow format
+ Basically the change consist of changing 'flow[actions]' from a string to
+ a double tuple list
+ from "A=B,C=D,..." to [(A,B),(C,D),..]
+ raise FlowBadFormat '''
+ actions=[]
+ if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
+ raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
+ action_list = flow['actions'].split(",")
+ for action_item in action_list:
+ action_tuple = action_item.split("=")
+ if len(action_tuple) != 2:
+ raise FlowBadFormat("Expected key=value format at 'actions'")
+ if action_tuple[0].strip().lower()=="vlan":
+ if action_tuple[1].strip().lower() in ("none", "strip"):
+ actions.append( ("vlan",None) )
+ else:
+ try:
+ actions.append( ("vlan", int(action_tuple[1])) )
+ except:
+ raise FlowBadFormat("Expected integer after vlan= at 'actions'")
+ elif action_tuple[0].strip().lower()=="out":
+ actions.append( ("out", str(action_tuple[1])) )
+ else:
+ raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
+ flow['actions'] = actions
+
+
+class openflow_thread(threading.Thread):
+ """
+ This thread interacts with a openflow controller to create dataplane connections
+ """
+ def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
+ threading.Thread.__init__(self)
+ self.of_uuid = of_uuid
+ self.db = db
+ self.pmp_with_same_vlan = pmp_with_same_vlan
+ self.name = "openflow"
+ self.test = of_test
+ self.db_lock = db_lock
+ self.OF_connector = of_connector
+ self.logger = logging.getLogger('vim.OF-' + of_uuid)
+ self.logger.setLevel(getattr(logging, debug))
+ self.logger.name = of_connector.name + " " + self.OF_connector.dpid
+ self.queueLock = threading.Lock()
+ self.taskQueue = Queue.Queue(2000)
+
+ def insert_task(self, task, *aditional):
+ try:
+ self.queueLock.acquire()
+ task = self.taskQueue.put( (task,) + aditional, timeout=5)
+ self.queueLock.release()
+ return 1, None
+ except Queue.Full:
+ return -1, "timeout inserting a task over openflow thread " + self.name
+
+ def run(self):
+ self.logger.debug("Start openflow thread")
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+
+ while True:
+ try:
+ self.queueLock.acquire()
+ if not self.taskQueue.empty():
+ task = self.taskQueue.get()
+ else:
+ task = None
+ self.queueLock.release()
+
+ if task is None:
+ time.sleep(1)
+ continue
+
+ if task[0] == 'update-net':
+ r,c = self.update_of_flows(task[1])
+ # update database status
+ if r<0:
+ UPDATE={'status':'ERROR', 'last_error': str(c)}
+ self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
+ else:
+ UPDATE={'status':'ACTIVE', 'last_error': None}
+ self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+ self.db_lock.acquire()
+ self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
+ self.db_lock.release()
+
+ elif task[0] == 'clear-all':
+ r,c = self.clear_all_flows()
+ if r<0:
+ self.logger.error("processing task 'clear-all': %s", c)
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
+ else:
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+ self.logger.debug("processing task 'clear-all': OK")
+ elif task[0] == 'exit':
+ self.logger.debug("exit from openflow_thread")
+ self.terminate()
+ self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
+ return 0
+ else:
+ self.logger.error("unknown task %s", str(task))
+ except openflow_conn.OpenflowconnException as e:
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
+
+ def terminate(self):
+ pass
+ # print self.name, ": exit from openflow_thread"
+
+ def update_of_flows(self, net_id):
+ ports=()
+ self.db_lock.acquire()
+ select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
+ result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
+ #get all the networks binding to this
+ if result > 0:
+ if nets[0]['bind_net']:
+ bind_id = nets[0]['bind_net']
+ else:
+ bind_id = net_id
+ #get our net and all bind_nets
+ result, nets = self.db.get_table(FROM='nets', SELECT=select_,
+ WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
+
+ self.db_lock.release()
+ if result < 0:
+ return -1, "DB error getting net: " + nets
+ #elif result==0:
+ #net has been deleted
+ ifaces_nb = 0
+ database_flows = []
+ for net in nets:
+ net_id = net["uuid"]
+ if net['admin_state_up'] == 'false':
+ net['ports'] = ()
+ else:
+ self.db_lock.acquire()
+ nb_ports, net_ports = self.db.get_table(
+ FROM='ports',
+ SELECT=('switch_port','vlan','uuid','mac','type','model'),
+ WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
+ self.db_lock.release()
+ if nb_ports < 0:
+
+ #print self.name, ": update_of_flows() ERROR getting ports", ports
+ return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
+
+ #add the binding as an external port
+ if net['provider'] and net['provider'][:9]=="openflow:":
+ external_port={"type":"external","mac":None}
+ external_port['uuid'] = net_id + ".1" #fake uuid
+ if net['provider'][-5:]==":vlan":
+ external_port["vlan"] = net["vlan"]
+ external_port["switch_port"] = net['provider'][9:-5]
+ else:
+ external_port["vlan"] = None
+ external_port["switch_port"] = net['provider'][9:]
+ net_ports = net_ports + (external_port,)
+ nb_ports += 1
+ net['ports'] = net_ports
+ ifaces_nb += nb_ports
+
+ # Get the name of flows that will be affected by this NET
+ self.db_lock.acquire()
+ result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
+ self.db_lock.release()
+ if result < 0:
+ error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
+ # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+ return -1, error_msg
+ database_flows += database_net_flows
+ # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
+ self.db_lock.acquire()
+ result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
+ self.db_lock.release()
+ if result < 0:
+ error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
+ # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+ return -1, error_msg
+ database_flows += database_net_flows
+
+ # Get the existing flows at openflow controller
+ try:
+ of_flows = self.OF_connector.get_of_rules()
+ # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
+ except openflow_conn.OpenflowconnException as e:
+ # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
+ return -1, "OF error {} getting flows".format(str(e))
+
+ if ifaces_nb < 2:
+ pass
+ elif net['type'] == 'ptp':
+ if ifaces_nb > 2:
+ #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
+ # str(ifaces_nb)+' interfaces.'
+ return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
+ elif net['type'] == 'data':
+ if ifaces_nb > 2 and self.pmp_with_same_vlan:
+ # check all ports are VLAN (tagged) or none
+ vlan_tag = None
+ for port in ports:
+ if port["type"]=="external":
+ if port["vlan"] != None:
+ if port["vlan"]!=net["vlan"]:
+ text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
+ #print self.name, "Error", text
+ return -1, text
+ if vlan_tag == None:
+ vlan_tag=True
+ elif vlan_tag==False:
+ text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+ #print self.name, "Error", text
+ return -1, text
+ else:
+ if vlan_tag == None:
+ vlan_tag=False
+ elif vlan_tag == True:
+ text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+ #print self.name, "Error", text
+ return -1, text
+ elif port["model"]=="PF" or port["model"]=="VFnotShared":
+ if vlan_tag == None:
+ vlan_tag=False
+ elif vlan_tag==True:
+ text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+ #print self.name, "Error", text
+ return -1, text
+ elif port["model"] == "VF":
+ if vlan_tag == None:
+ vlan_tag=True
+ elif vlan_tag==False:
+ text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+ #print self.name, "Error", text
+ return -1, text
+ else:
+ return -1, 'Only ptp and data networks are supported for openflow'
+
+ # calculate new flows to be inserted
+ result, new_flows = self._compute_net_flows(nets)
+ if result < 0:
+ return result, new_flows
+
+ #modify database flows format and get the used names
+ used_names=[]
+ for flow in database_flows:
+ try:
+ change_db2of(flow)
+ except FlowBadFormat as e:
+ self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
+ continue
+ used_names.append(flow['name'])
+ name_index=0
+ # insert at database the new flows, change actions to human text
+ for flow in new_flows:
+ # 1 check if an equal flow is already present
+ index = self._check_flow_already_present(flow, database_flows)
+ if index>=0:
+ database_flows[index]["not delete"]=True
+ self.logger.debug("Skipping already present flow %s", str(flow))
+ continue
+ # 2 look for a non used name
+ flow_name=flow["net_id"]+"."+str(name_index)
+ while flow_name in used_names or flow_name in of_flows:
+ name_index += 1
+ flow_name=flow["net_id"]+"."+str(name_index)
+ used_names.append(flow_name)
+ flow['name'] = flow_name
+ # 3 insert at openflow
+
+ try:
+ self.OF_connector.new_flow(flow)
+ except openflow_conn.OpenflowconnException as e:
+ return -1, "Error creating new flow {}".format(str(e))
+
+ # 4 insert at database
+ try:
+ change_of2db(flow)
+ except FlowBadFormat as e:
+ # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
+ return -1, str(e)
+ self.db_lock.acquire()
+ result, content = self.db.new_row('of_flows', flow)
+ self.db_lock.release()
+ if result < 0:
+ # print self.name, ": Error '%s' at database insertion" % content, flow
+ return -1, content
+
+ #delete not needed old flows from openflow and from DDBB,
+ #check that the needed flows at DDBB are present in controller or insert them otherwise
+ for flow in database_flows:
+ if "not delete" in flow:
+ if flow["name"] not in of_flows:
+ # not in controller, insert it
+ try:
+ self.OF_connector.new_flow(flow)
+ except openflow_conn.OpenflowconnException as e:
+ return -1, "Error creating new flow {}".format(str(e))
+
+ continue
+ # Delete flow
+ if flow["name"] in of_flows:
+ try:
+ self.OF_connector.del_flow(flow['name'])
+ except openflow_conn.OpenflowconnException as e:
+ self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
+ # skip deletion from database
+ continue
+
+ # delete from database
+ self.db_lock.acquire()
+ result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
+ self.db_lock.release()
+ if result<0:
+ self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
+
+ return 0, 'Success'
+
+ def clear_all_flows(self):
+ try:
+ if not self.test:
+ self.OF_connector.clear_all_flows()
+
+ # remove from database
+ self.db_lock.acquire()
+ self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
+ self.db_lock.release()
+ return 0, None
+ except openflow_conn.OpenflowconnException as e:
+ return -1, self.logger.error("Error deleting all flows {}", str(e))
+
+ flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
+
+ def _check_flow_already_present(self, new_flow, flow_list):
+ '''check if the same flow is already present in the flow list
+ The flow is repeated if all the fields, apart from name, are equal
+ Return the index of matching flow, -1 if not match'''
+ index=0
+ for flow in flow_list:
+ equal=True
+ for f in self.flow_fields:
+ if flow.get(f) != new_flow.get(f):
+ equal=False
+ break
+ if equal:
+ return index
+ index += 1
+ return -1
+
+ def _compute_net_flows(self, nets):
+ new_flows=[]
+ new_broadcast_flows={}
+ nb_ports = 0
+
+ # Check switch_port information is right
+ self.logger.debug("_compute_net_flows nets: %s", str(nets))
+ for net in nets:
+ for port in net['ports']:
+ nb_ports += 1
+ if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
+ error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
+ # print self.name, ": ERROR " + error_text
+ return -1, error_text
+
+ for net_src in nets:
+ net_id = net_src["uuid"]
+ for net_dst in nets:
+ vlan_net_in = None
+ vlan_net_out = None
+ if net_src == net_dst:
+ #intra net rules
+ priority = 1000
+ elif net_src['bind_net'] == net_dst['uuid']:
+ if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
+ vlan_net_out = int(net_src['bind_type'][5:])
+ priority = 1100
+ elif net_dst['bind_net'] == net_src['uuid']:
+ if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
+ vlan_net_in = int(net_dst['bind_type'][5:])
+ priority = 1100
+ else:
+ #nets not binding
+ continue
+ for src_port in net_src['ports']:
+ vlan_in = vlan_net_in
+ if vlan_in == None and src_port['vlan'] != None:
+ vlan_in = src_port['vlan']
+ elif vlan_in != None and src_port['vlan'] != None:
+ #TODO this is something that we cannot do. It requires a double VLAN check
+ #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
+ continue
+
+ # BROADCAST:
+ broadcast_key = src_port['uuid'] + "." + str(vlan_in)
+ if broadcast_key in new_broadcast_flows:
+ flow_broadcast = new_broadcast_flows[broadcast_key]
+ else:
+ flow_broadcast = {'priority': priority,
+ 'net_id': net_id,
+ 'dst_mac': 'ff:ff:ff:ff:ff:ff',
+ "ingress_port": str(src_port['switch_port']),
+ 'actions': []
+ }
+ new_broadcast_flows[broadcast_key] = flow_broadcast
+ if vlan_in is not None:
+ flow_broadcast['vlan_id'] = str(vlan_in)
+
+ for dst_port in net_dst['ports']:
+ vlan_out = vlan_net_out
+ if vlan_out == None and dst_port['vlan'] != None:
+ vlan_out = dst_port['vlan']
+ elif vlan_out != None and dst_port['vlan'] != None:
+ #TODO this is something that we cannot do. It requires a double VLAN set
+ #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
+ continue
+ #if src_port == dst_port:
+ # continue
+ if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
+ continue
+ flow = {
+ "priority": priority,
+ 'net_id': net_id,
+ "ingress_port": str(src_port['switch_port']),
+ 'actions': []
+ }
+ if vlan_in is not None:
+ flow['vlan_id'] = str(vlan_in)
+ # allow that one port have no mac
+ if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
+ flow['priority'] = priority-5 # less priority
+ else:
+ flow['dst_mac'] = str(dst_port['mac'])
+
+ if vlan_out == None:
+ if vlan_in != None:
+ flow['actions'].append( ('vlan',None) )
+ else:
+ flow['actions'].append( ('vlan', vlan_out ) )
+ flow['actions'].append( ('out', str(dst_port['switch_port'])) )
+
+ if self._check_flow_already_present(flow, new_flows) >= 0:
+ self.logger.debug("Skipping repeated flow '%s'", str(flow))
+ continue
+
+ new_flows.append(flow)
+
+ # BROADCAST:
+ if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
+ continue
+ out = (vlan_out, str(dst_port['switch_port']))
+ if out not in flow_broadcast['actions']:
+ flow_broadcast['actions'].append( out )
+
+ #BROADCAST
+ for flow_broadcast in new_broadcast_flows.values():
+ if len(flow_broadcast['actions'])==0:
+ continue #nothing to do, skip
+ flow_broadcast['actions'].sort()
+ if 'vlan_id' in flow_broadcast:
+ previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
+ else:
+ previous_vlan = None
+ final_actions=[]
+ action_number = 0
+ for action in flow_broadcast['actions']:
+ if action[0] != previous_vlan:
+ final_actions.append( ('vlan', action[0]) )
+ previous_vlan = action[0]
+ if self.pmp_with_same_vlan and action_number:
+ return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
+ action_number += 1
+ final_actions.append( ('out', action[1]) )
+ flow_broadcast['actions'] = final_actions
+
+ if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
+ self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
+ continue
+
+ new_flows.append(flow_broadcast)
+
+ #UNIFY openflow rules with the same input port and vlan and the same output actions
+ #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
+ #this can happen if there is only two ports. It is converted to a point to point connection
+ flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
+ for flow in new_flows:
+ key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
+ if key in flow_dict:
+ flow_dict[key].append(flow)
+ else:
+ flow_dict[key]=[ flow ]
+ new_flows2=[]
+ for flow_list in flow_dict.values():
+ convert2ptp=False
+ if len (flow_list)>=2:
+ convert2ptp=True
+ for f in flow_list:
+ if f['actions'] != flow_list[0]['actions']:
+ convert2ptp=False
+ break
+ if convert2ptp: # add only one unified rule without dst_mac
+ self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
+ flow_list[0].pop('dst_mac')
+ flow_list[0]["priority"] -= 5
+ new_flows2.append(flow_list[0])
+ else: # add all the rules
+ new_flows2 += flow_list
+ return 0, new_flows2
+
+ def set_openflow_controller_status(self, status, error_text=None):
+ """
+ Set openflow controller last operation status in DB
+ :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
+ :param error_text: error text
+ :return:
+ """
+ if self.of_uuid == "Default":
+ return True
+
+ ofc = {}
+ ofc['status'] = status
+ ofc['last_error'] = error_text
+ self.db_lock.acquire()
+ result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
+ self.db_lock.release()
+ if result >= 0:
+ return True
+ else:
+ return False
+
+
+
+
+
+
+