X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_openvim%2Fopenflow_thread.py;fp=osm_openvim%2Fopenflow_thread.py;h=cd873e7dc72949851e5f4dda740df2dd7da88c30;hb=9f6571090b203922cabb0382226be0fa48d6e046;hp=0000000000000000000000000000000000000000;hpb=ee19576ffab403bf0218974ed96c98dc0375b507;p=osm%2Fopenvim.git diff --git a/osm_openvim/openflow_thread.py b/osm_openvim/openflow_thread.py new file mode 100644 index 0000000..cd873e7 --- /dev/null +++ b/osm_openvim/openflow_thread.py @@ -0,0 +1,598 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +## +# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# This file is part of openvim +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +''' +This thread interacts with a openflow controller to create dataplane connections +''' + +__author__="Pablo Montes, Alfonso Tierno" +__date__ ="17-jul-2015" + + +#import json +import threading +import time +import Queue +import requests +import logging +import openflow_conn + +OFC_STATUS_ACTIVE = 'ACTIVE' +OFC_STATUS_INACTIVE = 'INACTIVE' +OFC_STATUS_ERROR = 'ERROR' + +class FlowBadFormat(Exception): + '''raise when a bad format of flow is found''' + +def change_of2db(flow): + '''Change 'flow' dictionary from openflow format to database format + Basically the change consist of changing 'flow[actions] from a list of + double tuple to a string + from [(A,B),(C,D),..] to "A=B,C=D" ''' + action_str_list=[] + if type(flow)!=dict or "actions" not in flow: + raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key") + try: + for action in flow['actions']: + action_str_list.append( action[0] + "=" + str(action[1]) ) + flow['actions'] = ",".join(action_str_list) + except: + raise FlowBadFormat("Unexpected format at 'actions'") + +def change_db2of(flow): + '''Change 'flow' dictionary from database format to openflow format + Basically the change consist of changing 'flow[actions]' from a string to + a double tuple list + from "A=B,C=D,..." to [(A,B),(C,D),..] + raise FlowBadFormat ''' + actions=[] + if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str: + raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key") + action_list = flow['actions'].split(",") + for action_item in action_list: + action_tuple = action_item.split("=") + if len(action_tuple) != 2: + raise FlowBadFormat("Expected key=value format at 'actions'") + if action_tuple[0].strip().lower()=="vlan": + if action_tuple[1].strip().lower() in ("none", "strip"): + actions.append( ("vlan",None) ) + else: + try: + actions.append( ("vlan", int(action_tuple[1])) ) + except: + raise FlowBadFormat("Expected integer after vlan= at 'actions'") + elif action_tuple[0].strip().lower()=="out": + actions.append( ("out", str(action_tuple[1])) ) + else: + raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0]) + flow['actions'] = actions + + +class openflow_thread(threading.Thread): + """ + This thread interacts with a openflow controller to create dataplane connections + """ + def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'): + threading.Thread.__init__(self) + self.of_uuid = of_uuid + self.db = db + self.pmp_with_same_vlan = pmp_with_same_vlan + self.name = "openflow" + self.test = of_test + self.db_lock = db_lock + self.OF_connector = of_connector + self.logger = logging.getLogger('vim.OF-' + of_uuid) + self.logger.setLevel(getattr(logging, debug)) + self.logger.name = of_connector.name + " " + self.OF_connector.dpid + self.queueLock = threading.Lock() + self.taskQueue = Queue.Queue(2000) + + def insert_task(self, task, *aditional): + try: + self.queueLock.acquire() + task = self.taskQueue.put( (task,) + aditional, timeout=5) + self.queueLock.release() + return 1, None + except Queue.Full: + return -1, "timeout inserting a task over openflow thread " + self.name + + def run(self): + self.logger.debug("Start openflow thread") + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + + while True: + try: + self.queueLock.acquire() + if not self.taskQueue.empty(): + task = self.taskQueue.get() + else: + task = None + self.queueLock.release() + + if task is None: + time.sleep(1) + continue + + if task[0] == 'update-net': + r,c = self.update_of_flows(task[1]) + # update database status + if r<0: + UPDATE={'status':'ERROR', 'last_error': str(c)} + self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c) + self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1])) + else: + UPDATE={'status':'ACTIVE', 'last_error': None} + self.logger.debug("processing task 'update-net' %s: OK", str(task[1])) + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + self.db_lock.acquire() + self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]}) + self.db_lock.release() + + elif task[0] == 'clear-all': + r,c = self.clear_all_flows() + if r<0: + self.logger.error("processing task 'clear-all': %s", c) + self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows") + else: + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + self.logger.debug("processing task 'clear-all': OK") + elif task[0] == 'exit': + self.logger.debug("exit from openflow_thread") + self.terminate() + self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed") + return 0 + else: + self.logger.error("unknown task %s", str(task)) + except openflow_conn.OpenflowconnException as e: + self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e)) + + def terminate(self): + pass + # print self.name, ": exit from openflow_thread" + + def update_of_flows(self, net_id): + ports=() + self.db_lock.acquire() + select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid') + result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} ) + #get all the networks binding to this + if result > 0: + if nets[0]['bind_net']: + bind_id = nets[0]['bind_net'] + else: + bind_id = net_id + #get our net and all bind_nets + result, nets = self.db.get_table(FROM='nets', SELECT=select_, + WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} ) + + self.db_lock.release() + if result < 0: + return -1, "DB error getting net: " + nets + #elif result==0: + #net has been deleted + ifaces_nb = 0 + database_flows = [] + for net in nets: + net_id = net["uuid"] + if net['admin_state_up'] == 'false': + net['ports'] = () + else: + self.db_lock.acquire() + nb_ports, net_ports = self.db.get_table( + FROM='ports', + SELECT=('switch_port','vlan','uuid','mac','type','model'), + WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} ) + self.db_lock.release() + if nb_ports < 0: + + #print self.name, ": update_of_flows() ERROR getting ports", ports + return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports) + + #add the binding as an external port + if net['provider'] and net['provider'][:9]=="openflow:": + external_port={"type":"external","mac":None} + external_port['uuid'] = net_id + ".1" #fake uuid + if net['provider'][-5:]==":vlan": + external_port["vlan"] = net["vlan"] + external_port["switch_port"] = net['provider'][9:-5] + else: + external_port["vlan"] = None + external_port["switch_port"] = net['provider'][9:] + net_ports = net_ports + (external_port,) + nb_ports += 1 + net['ports'] = net_ports + ifaces_nb += nb_ports + + # Get the name of flows that will be affected by this NET + self.db_lock.acquire() + result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id}) + self.db_lock.release() + if result < 0: + error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows) + # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows + return -1, error_msg + database_flows += database_net_flows + # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null) + self.db_lock.acquire() + result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None}) + self.db_lock.release() + if result < 0: + error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows) + # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows + return -1, error_msg + database_flows += database_net_flows + + # Get the existing flows at openflow controller + try: + of_flows = self.OF_connector.get_of_rules() + # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows + except openflow_conn.OpenflowconnException as e: + # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e))) + return -1, "OF error {} getting flows".format(str(e)) + + if ifaces_nb < 2: + pass + elif net['type'] == 'ptp': + if ifaces_nb > 2: + #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\ + # str(ifaces_nb)+' interfaces.' + return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb + elif net['type'] == 'data': + if ifaces_nb > 2 and self.pmp_with_same_vlan: + # check all ports are VLAN (tagged) or none + vlan_tag = None + for port in ports: + if port["type"]=="external": + if port["vlan"] != None: + if port["vlan"]!=net["vlan"]: + text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True" + #print self.name, "Error", text + return -1, text + if vlan_tag == None: + vlan_tag=True + elif vlan_tag==False: + text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" + #print self.name, "Error", text + return -1, text + else: + if vlan_tag == None: + vlan_tag=False + elif vlan_tag == True: + text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" + #print self.name, "Error", text + return -1, text + elif port["model"]=="PF" or port["model"]=="VFnotShared": + if vlan_tag == None: + vlan_tag=False + elif vlan_tag==True: + text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" + #print self.name, "Error", text + return -1, text + elif port["model"] == "VF": + if vlan_tag == None: + vlan_tag=True + elif vlan_tag==False: + text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" + #print self.name, "Error", text + return -1, text + else: + return -1, 'Only ptp and data networks are supported for openflow' + + # calculate new flows to be inserted + result, new_flows = self._compute_net_flows(nets) + if result < 0: + return result, new_flows + + #modify database flows format and get the used names + used_names=[] + for flow in database_flows: + try: + change_db2of(flow) + except FlowBadFormat as e: + self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow)) + continue + used_names.append(flow['name']) + name_index=0 + # insert at database the new flows, change actions to human text + for flow in new_flows: + # 1 check if an equal flow is already present + index = self._check_flow_already_present(flow, database_flows) + if index>=0: + database_flows[index]["not delete"]=True + self.logger.debug("Skipping already present flow %s", str(flow)) + continue + # 2 look for a non used name + flow_name=flow["net_id"]+"."+str(name_index) + while flow_name in used_names or flow_name in of_flows: + name_index += 1 + flow_name=flow["net_id"]+"."+str(name_index) + used_names.append(flow_name) + flow['name'] = flow_name + # 3 insert at openflow + + try: + self.OF_connector.new_flow(flow) + except openflow_conn.OpenflowconnException as e: + return -1, "Error creating new flow {}".format(str(e)) + + # 4 insert at database + try: + change_of2db(flow) + except FlowBadFormat as e: + # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow + return -1, str(e) + self.db_lock.acquire() + result, content = self.db.new_row('of_flows', flow) + self.db_lock.release() + if result < 0: + # print self.name, ": Error '%s' at database insertion" % content, flow + return -1, content + + #delete not needed old flows from openflow and from DDBB, + #check that the needed flows at DDBB are present in controller or insert them otherwise + for flow in database_flows: + if "not delete" in flow: + if flow["name"] not in of_flows: + # not in controller, insert it + try: + self.OF_connector.new_flow(flow) + except openflow_conn.OpenflowconnException as e: + return -1, "Error creating new flow {}".format(str(e)) + + continue + # Delete flow + if flow["name"] in of_flows: + try: + self.OF_connector.del_flow(flow['name']) + except openflow_conn.OpenflowconnException as e: + self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e)) + # skip deletion from database + continue + + # delete from database + self.db_lock.acquire() + result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id']) + self.db_lock.release() + if result<0: + self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content ) + + return 0, 'Success' + + def clear_all_flows(self): + try: + if not self.test: + self.OF_connector.clear_all_flows() + + # remove from database + self.db_lock.acquire() + self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines + self.db_lock.release() + return 0, None + except openflow_conn.OpenflowconnException as e: + return -1, self.logger.error("Error deleting all flows {}", str(e)) + + flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id') + + def _check_flow_already_present(self, new_flow, flow_list): + '''check if the same flow is already present in the flow list + The flow is repeated if all the fields, apart from name, are equal + Return the index of matching flow, -1 if not match''' + index=0 + for flow in flow_list: + equal=True + for f in self.flow_fields: + if flow.get(f) != new_flow.get(f): + equal=False + break + if equal: + return index + index += 1 + return -1 + + def _compute_net_flows(self, nets): + new_flows=[] + new_broadcast_flows={} + nb_ports = 0 + + # Check switch_port information is right + self.logger.debug("_compute_net_flows nets: %s", str(nets)) + for net in nets: + for port in net['ports']: + nb_ports += 1 + if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi: + error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port']) + # print self.name, ": ERROR " + error_text + return -1, error_text + + for net_src in nets: + net_id = net_src["uuid"] + for net_dst in nets: + vlan_net_in = None + vlan_net_out = None + if net_src == net_dst: + #intra net rules + priority = 1000 + elif net_src['bind_net'] == net_dst['uuid']: + if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:": + vlan_net_out = int(net_src['bind_type'][5:]) + priority = 1100 + elif net_dst['bind_net'] == net_src['uuid']: + if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:": + vlan_net_in = int(net_dst['bind_type'][5:]) + priority = 1100 + else: + #nets not binding + continue + for src_port in net_src['ports']: + vlan_in = vlan_net_in + if vlan_in == None and src_port['vlan'] != None: + vlan_in = src_port['vlan'] + elif vlan_in != None and src_port['vlan'] != None: + #TODO this is something that we cannot do. It requires a double VLAN check + #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in + continue + + # BROADCAST: + broadcast_key = src_port['uuid'] + "." + str(vlan_in) + if broadcast_key in new_broadcast_flows: + flow_broadcast = new_broadcast_flows[broadcast_key] + else: + flow_broadcast = {'priority': priority, + 'net_id': net_id, + 'dst_mac': 'ff:ff:ff:ff:ff:ff', + "ingress_port": str(src_port['switch_port']), + 'actions': [] + } + new_broadcast_flows[broadcast_key] = flow_broadcast + if vlan_in is not None: + flow_broadcast['vlan_id'] = str(vlan_in) + + for dst_port in net_dst['ports']: + vlan_out = vlan_net_out + if vlan_out == None and dst_port['vlan'] != None: + vlan_out = dst_port['vlan'] + elif vlan_out != None and dst_port['vlan'] != None: + #TODO this is something that we cannot do. It requires a double VLAN set + #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out + continue + #if src_port == dst_port: + # continue + if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out: + continue + flow = { + "priority": priority, + 'net_id': net_id, + "ingress_port": str(src_port['switch_port']), + 'actions': [] + } + if vlan_in is not None: + flow['vlan_id'] = str(vlan_in) + # allow that one port have no mac + if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements + flow['priority'] = priority-5 # less priority + else: + flow['dst_mac'] = str(dst_port['mac']) + + if vlan_out == None: + if vlan_in != None: + flow['actions'].append( ('vlan',None) ) + else: + flow['actions'].append( ('vlan', vlan_out ) ) + flow['actions'].append( ('out', str(dst_port['switch_port'])) ) + + if self._check_flow_already_present(flow, new_flows) >= 0: + self.logger.debug("Skipping repeated flow '%s'", str(flow)) + continue + + new_flows.append(flow) + + # BROADCAST: + if nb_ports <= 2: # point to multipoint or nets with more than 2 elements + continue + out = (vlan_out, str(dst_port['switch_port'])) + if out not in flow_broadcast['actions']: + flow_broadcast['actions'].append( out ) + + #BROADCAST + for flow_broadcast in new_broadcast_flows.values(): + if len(flow_broadcast['actions'])==0: + continue #nothing to do, skip + flow_broadcast['actions'].sort() + if 'vlan_id' in flow_broadcast: + previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan + else: + previous_vlan = None + final_actions=[] + action_number = 0 + for action in flow_broadcast['actions']: + if action[0] != previous_vlan: + final_actions.append( ('vlan', action[0]) ) + previous_vlan = action[0] + if self.pmp_with_same_vlan and action_number: + return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True." + action_number += 1 + final_actions.append( ('out', action[1]) ) + flow_broadcast['actions'] = final_actions + + if self._check_flow_already_present(flow_broadcast, new_flows) >= 0: + self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast)) + continue + + new_flows.append(flow_broadcast) + + #UNIFY openflow rules with the same input port and vlan and the same output actions + #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac + #this can happen if there is only two ports. It is converted to a point to point connection + flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values + for flow in new_flows: + key = str(flow.get("vlan_id"))+":"+flow["ingress_port"] + if key in flow_dict: + flow_dict[key].append(flow) + else: + flow_dict[key]=[ flow ] + new_flows2=[] + for flow_list in flow_dict.values(): + convert2ptp=False + if len (flow_list)>=2: + convert2ptp=True + for f in flow_list: + if f['actions'] != flow_list[0]['actions']: + convert2ptp=False + break + if convert2ptp: # add only one unified rule without dst_mac + self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) ) + flow_list[0].pop('dst_mac') + flow_list[0]["priority"] -= 5 + new_flows2.append(flow_list[0]) + else: # add all the rules + new_flows2 += flow_list + return 0, new_flows2 + + def set_openflow_controller_status(self, status, error_text=None): + """ + Set openflow controller last operation status in DB + :param status: ofc status ('ACTIVE','INACTIVE','ERROR') + :param error_text: error text + :return: + """ + if self.of_uuid == "Default": + return True + + ofc = {} + ofc['status'] = status + ofc['last_error'] = error_text + self.db_lock.acquire() + result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False) + self.db_lock.release() + if result >= 0: + return True + else: + return False + + + + + + +