Modify openvim code structure, improve py package
[osm/openvim.git] / osm_openvim / openflow_thread.py
diff --git a/osm_openvim/openflow_thread.py b/osm_openvim/openflow_thread.py
new file mode 100644 (file)
index 0000000..cd873e7
--- /dev/null
@@ -0,0 +1,598 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
+# This file is part of openvim
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+
+'''
+This thread interacts with a openflow controller to create dataplane connections
+'''
+
+__author__="Pablo Montes, Alfonso Tierno"
+__date__ ="17-jul-2015"
+
+
+#import json
+import threading
+import time
+import Queue
+import requests
+import logging
+import openflow_conn
+
+OFC_STATUS_ACTIVE = 'ACTIVE'
+OFC_STATUS_INACTIVE = 'INACTIVE'
+OFC_STATUS_ERROR = 'ERROR'
+
+class FlowBadFormat(Exception):
+    '''raise when a bad format of flow is found''' 
+
+def change_of2db(flow):
+    '''Change 'flow' dictionary from openflow format to database format
+    Basically the change consist of changing 'flow[actions] from a list of
+    double tuple to a string
+    from [(A,B),(C,D),..] to "A=B,C=D" '''
+    action_str_list=[]
+    if type(flow)!=dict or "actions" not in flow:
+        raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
+    try:
+        for action in flow['actions']:
+            action_str_list.append( action[0] + "=" + str(action[1]) )
+        flow['actions'] = ",".join(action_str_list)
+    except:
+        raise FlowBadFormat("Unexpected format at 'actions'")
+
+def change_db2of(flow):
+    '''Change 'flow' dictionary from database format to openflow format
+    Basically the change consist of changing 'flow[actions]' from a string to 
+    a double tuple list
+    from "A=B,C=D,..." to [(A,B),(C,D),..] 
+    raise FlowBadFormat '''
+    actions=[]
+    if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
+        raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
+    action_list = flow['actions'].split(",")
+    for action_item in action_list:
+        action_tuple = action_item.split("=")
+        if len(action_tuple) != 2:
+            raise FlowBadFormat("Expected key=value format at 'actions'")
+        if action_tuple[0].strip().lower()=="vlan":
+            if action_tuple[1].strip().lower() in ("none", "strip"):
+                actions.append( ("vlan",None) )
+            else:
+                try:
+                    actions.append( ("vlan", int(action_tuple[1])) )
+                except:
+                    raise FlowBadFormat("Expected integer after vlan= at 'actions'")
+        elif action_tuple[0].strip().lower()=="out":
+            actions.append( ("out", str(action_tuple[1])) )
+        else:
+            raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
+    flow['actions'] = actions
+
+
+class openflow_thread(threading.Thread):
+    """
+    This thread interacts with a openflow controller to create dataplane connections
+    """
+    def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
+        threading.Thread.__init__(self)
+        self.of_uuid = of_uuid
+        self.db = db
+        self.pmp_with_same_vlan = pmp_with_same_vlan
+        self.name = "openflow"
+        self.test = of_test
+        self.db_lock = db_lock
+        self.OF_connector = of_connector
+        self.logger = logging.getLogger('vim.OF-' + of_uuid)
+        self.logger.setLevel(getattr(logging, debug))
+        self.logger.name = of_connector.name + " " + self.OF_connector.dpid
+        self.queueLock = threading.Lock()
+        self.taskQueue = Queue.Queue(2000)
+        
+    def insert_task(self, task, *aditional):
+        try:
+            self.queueLock.acquire()
+            task = self.taskQueue.put( (task,) + aditional, timeout=5) 
+            self.queueLock.release()
+            return 1, None
+        except Queue.Full:
+            return -1, "timeout inserting a task over openflow thread " + self.name
+
+    def run(self):
+        self.logger.debug("Start openflow thread")
+        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+
+        while True:
+            try:
+                self.queueLock.acquire()
+                if not self.taskQueue.empty():
+                    task = self.taskQueue.get()
+                else:
+                    task = None
+                self.queueLock.release()
+
+                if task is None:
+                    time.sleep(1)
+                    continue
+
+                if task[0] == 'update-net':
+                    r,c = self.update_of_flows(task[1])
+                    # update database status
+                    if r<0:
+                        UPDATE={'status':'ERROR', 'last_error': str(c)}
+                        self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+                        self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
+                    else:
+                        UPDATE={'status':'ACTIVE', 'last_error': None}
+                        self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
+                        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+                    self.db_lock.acquire()
+                    self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
+                    self.db_lock.release()
+
+                elif task[0] == 'clear-all':
+                    r,c = self.clear_all_flows()
+                    if r<0:
+                        self.logger.error("processing task 'clear-all': %s", c)
+                        self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
+                    else:
+                        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+                        self.logger.debug("processing task 'clear-all': OK")
+                elif task[0] == 'exit':
+                    self.logger.debug("exit from openflow_thread")
+                    self.terminate()
+                    self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
+                    return 0
+                else:
+                    self.logger.error("unknown task %s", str(task))
+            except openflow_conn.OpenflowconnException as e:
+                self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
+
+    def terminate(self):
+        pass
+        # print self.name, ": exit from openflow_thread"
+
+    def update_of_flows(self, net_id):
+        ports=()
+        self.db_lock.acquire()
+        select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
+        result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
+        #get all the networks binding to this
+        if result > 0:
+            if nets[0]['bind_net']:
+                bind_id = nets[0]['bind_net']
+            else:
+                bind_id = net_id
+            #get our net and all bind_nets
+            result, nets = self.db.get_table(FROM='nets', SELECT=select_,
+                                                WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
+            
+        self.db_lock.release()
+        if result < 0:
+            return -1, "DB error getting net: " + nets
+        #elif result==0:
+            #net has been deleted
+        ifaces_nb = 0
+        database_flows = []
+        for net in nets:
+            net_id = net["uuid"]
+            if net['admin_state_up'] == 'false':
+                net['ports'] = ()
+            else:
+                self.db_lock.acquire()
+                nb_ports, net_ports = self.db.get_table(
+                        FROM='ports',
+                        SELECT=('switch_port','vlan','uuid','mac','type','model'),
+                        WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
+                self.db_lock.release()
+                if nb_ports < 0:
+
+                    #print self.name, ": update_of_flows() ERROR getting ports", ports
+                    return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
+                
+                #add the binding as an external port
+                if net['provider'] and net['provider'][:9]=="openflow:":
+                    external_port={"type":"external","mac":None}
+                    external_port['uuid'] = net_id + ".1" #fake uuid
+                    if net['provider'][-5:]==":vlan":
+                        external_port["vlan"] = net["vlan"]
+                        external_port["switch_port"] = net['provider'][9:-5]
+                    else:
+                        external_port["vlan"] = None
+                        external_port["switch_port"] = net['provider'][9:]
+                    net_ports = net_ports + (external_port,)
+                    nb_ports += 1
+                net['ports'] = net_ports
+                ifaces_nb += nb_ports
+        
+            # Get the name of flows that will be affected by this NET 
+            self.db_lock.acquire()
+            result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
+            self.db_lock.release()
+            if result < 0:
+                error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
+                # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+                return -1, error_msg
+            database_flows += database_net_flows
+        # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
+        self.db_lock.acquire()
+        result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
+        self.db_lock.release()
+        if result < 0:
+            error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
+            # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+            return -1, error_msg
+        database_flows += database_net_flows
+
+        # Get the existing flows at openflow controller
+        try:
+            of_flows = self.OF_connector.get_of_rules()
+            # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
+        except openflow_conn.OpenflowconnException as e:
+            # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
+            return -1, "OF error {} getting flows".format(str(e))
+
+        if ifaces_nb < 2:
+            pass
+        elif net['type'] == 'ptp':
+            if ifaces_nb > 2:
+                #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
+                #                 str(ifaces_nb)+' interfaces.'
+                return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
+        elif net['type'] == 'data':
+            if ifaces_nb > 2 and self.pmp_with_same_vlan:
+                # check all ports are VLAN (tagged) or none
+                vlan_tag = None
+                for port in ports:
+                    if port["type"]=="external":
+                        if port["vlan"] != None:
+                            if port["vlan"]!=net["vlan"]:
+                                text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
+                                #print self.name, "Error", text
+                                return -1, text
+                            if vlan_tag == None:
+                                vlan_tag=True
+                            elif vlan_tag==False:
+                                text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+                                #print self.name, "Error", text
+                                return -1, text
+                        else:
+                            if vlan_tag == None:
+                                vlan_tag=False
+                            elif vlan_tag == True:
+                                text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+                                #print self.name, "Error", text
+                                return -1, text
+                    elif port["model"]=="PF" or port["model"]=="VFnotShared":
+                        if vlan_tag == None:
+                            vlan_tag=False
+                        elif vlan_tag==True:
+                            text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+                            #print self.name, "Error", text
+                            return -1, text
+                    elif port["model"] == "VF":
+                        if vlan_tag == None:
+                            vlan_tag=True
+                        elif vlan_tag==False:
+                            text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
+                            #print self.name, "Error", text
+                            return -1, text
+        else:
+            return -1, 'Only ptp and data networks are supported for openflow'
+            
+        # calculate new flows to be inserted
+        result, new_flows = self._compute_net_flows(nets)
+        if result < 0:
+            return result, new_flows
+
+        #modify database flows format and get the used names
+        used_names=[]
+        for flow in database_flows:
+            try:
+                change_db2of(flow)
+            except FlowBadFormat as e:
+                self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
+                continue
+            used_names.append(flow['name'])
+        name_index=0
+        # insert at database the new flows, change actions to human text
+        for flow in new_flows:
+            # 1 check if an equal flow is already present
+            index = self._check_flow_already_present(flow, database_flows)
+            if index>=0:
+                database_flows[index]["not delete"]=True
+                self.logger.debug("Skipping already present flow %s", str(flow))
+                continue
+            # 2 look for a non used name
+            flow_name=flow["net_id"]+"."+str(name_index)
+            while flow_name in used_names or flow_name in of_flows:         
+                name_index += 1   
+                flow_name=flow["net_id"]+"."+str(name_index)
+            used_names.append(flow_name)
+            flow['name'] = flow_name
+            # 3 insert at openflow
+
+            try:
+                self.OF_connector.new_flow(flow)
+            except openflow_conn.OpenflowconnException as e:
+                return -1, "Error creating new flow {}".format(str(e))
+
+            # 4 insert at database
+            try:
+                change_of2db(flow)
+            except FlowBadFormat as e:
+                # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
+                return -1, str(e)
+            self.db_lock.acquire()
+            result, content = self.db.new_row('of_flows', flow)
+            self.db_lock.release()
+            if result < 0:
+                # print self.name, ": Error '%s' at database insertion" % content, flow
+                return -1, content
+
+        #delete not needed old flows from openflow and from DDBB, 
+        #check that the needed flows at DDBB are present in controller or insert them otherwise
+        for flow in database_flows:
+            if "not delete" in flow:
+                if flow["name"] not in of_flows:
+                    # not in controller, insert it
+                    try:
+                        self.OF_connector.new_flow(flow)
+                    except openflow_conn.OpenflowconnException as e:
+                        return -1, "Error creating new flow {}".format(str(e))
+
+                continue
+            # Delete flow
+            if flow["name"] in of_flows:
+                try:
+                    self.OF_connector.del_flow(flow['name'])
+                except openflow_conn.OpenflowconnException as e:
+                    self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
+                    # skip deletion from database
+                    continue
+
+            # delete from database
+            self.db_lock.acquire()
+            result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
+            self.db_lock.release()
+            if result<0:
+                self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
+        
+        return 0, 'Success'
+
+    def clear_all_flows(self):
+        try:
+            if not self.test:
+                self.OF_connector.clear_all_flows()
+
+            # remove from database
+            self.db_lock.acquire()
+            self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
+            self.db_lock.release()
+            return 0, None
+        except openflow_conn.OpenflowconnException as e:
+            return -1, self.logger.error("Error deleting all flows {}", str(e))
+
+    flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
+
+    def _check_flow_already_present(self, new_flow, flow_list):
+        '''check if the same flow is already present in the flow list
+        The flow is repeated if all the fields, apart from name, are equal
+        Return the index of matching flow, -1 if not match'''
+        index=0
+        for flow in flow_list:
+            equal=True
+            for f in self.flow_fields:
+                if flow.get(f) != new_flow.get(f):
+                    equal=False
+                    break
+            if equal:
+                return index
+            index += 1
+        return -1
+        
+    def _compute_net_flows(self, nets):
+        new_flows=[]
+        new_broadcast_flows={}
+        nb_ports = 0
+
+        # Check switch_port information is right
+        self.logger.debug("_compute_net_flows nets: %s", str(nets))
+        for net in nets:
+            for port in net['ports']:
+                nb_ports += 1
+                if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
+                    error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
+                    # print self.name, ": ERROR " + error_text
+                    return -1, error_text
+
+        for net_src in nets:
+            net_id = net_src["uuid"]
+            for net_dst in nets:
+                vlan_net_in  = None
+                vlan_net_out = None
+                if net_src == net_dst:
+                    #intra net rules    
+                    priority = 1000
+                elif net_src['bind_net'] == net_dst['uuid']:
+                    if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
+                        vlan_net_out = int(net_src['bind_type'][5:])
+                    priority = 1100
+                elif net_dst['bind_net'] == net_src['uuid']:
+                    if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
+                        vlan_net_in = int(net_dst['bind_type'][5:])
+                    priority = 1100
+                else:
+                    #nets not binding
+                    continue
+                for src_port in net_src['ports']:
+                    vlan_in  = vlan_net_in
+                    if vlan_in == None  and src_port['vlan'] != None:
+                        vlan_in  = src_port['vlan']
+                    elif vlan_in != None  and src_port['vlan'] != None:
+                        #TODO this is something that we cannot do. It requires a double VLAN check
+                        #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
+                        continue
+
+                    # BROADCAST:
+                    broadcast_key = src_port['uuid'] + "." + str(vlan_in)
+                    if broadcast_key in new_broadcast_flows:
+                        flow_broadcast = new_broadcast_flows[broadcast_key]
+                    else:
+                        flow_broadcast = {'priority': priority,
+                            'net_id':  net_id,
+                            'dst_mac': 'ff:ff:ff:ff:ff:ff',
+                            "ingress_port": str(src_port['switch_port']),
+                            'actions': [] 
+                        }
+                        new_broadcast_flows[broadcast_key] = flow_broadcast
+                        if vlan_in is not None:
+                            flow_broadcast['vlan_id'] = str(vlan_in)
+
+                    for dst_port in net_dst['ports']:
+                        vlan_out = vlan_net_out 
+                        if vlan_out == None and dst_port['vlan'] != None:
+                            vlan_out = dst_port['vlan']
+                        elif vlan_out != None and dst_port['vlan'] != None:
+                            #TODO this is something that we cannot do. It requires a double VLAN set
+                            #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
+                            continue
+                        #if src_port == dst_port:
+                        #    continue
+                        if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
+                            continue
+                        flow = {
+                            "priority": priority,
+                            'net_id':  net_id,
+                            "ingress_port": str(src_port['switch_port']),
+                            'actions': []
+                        }
+                        if vlan_in is not None:
+                            flow['vlan_id'] = str(vlan_in)
+                        # allow that one port have no mac
+                        if dst_port['mac'] is None or nb_ports==2:  # point to point or nets with 2 elements
+                            flow['priority'] = priority-5  # less priority
+                        else:
+                            flow['dst_mac'] = str(dst_port['mac'])
+            
+                        if vlan_out == None:
+                            if vlan_in != None:
+                                flow['actions'].append( ('vlan',None) )
+                        else:
+                            flow['actions'].append( ('vlan', vlan_out ) )
+                        flow['actions'].append( ('out', str(dst_port['switch_port'])) )
+            
+                        if self._check_flow_already_present(flow, new_flows) >= 0:
+                            self.logger.debug("Skipping repeated flow '%s'", str(flow))
+                            continue
+                        
+                        new_flows.append(flow)
+                    
+                        # BROADCAST:
+                        if nb_ports <= 2:  # point to multipoint or nets with more than 2 elements
+                            continue
+                        out = (vlan_out, str(dst_port['switch_port']))
+                        if out not in flow_broadcast['actions']:
+                            flow_broadcast['actions'].append( out )
+
+        #BROADCAST
+        for flow_broadcast in new_broadcast_flows.values():      
+            if len(flow_broadcast['actions'])==0:
+                continue #nothing to do, skip
+            flow_broadcast['actions'].sort()
+            if 'vlan_id' in flow_broadcast:
+                previous_vlan = 0  # indicates that a packet contains a vlan, and the vlan
+            else:
+                previous_vlan = None
+            final_actions=[]
+            action_number = 0
+            for action in flow_broadcast['actions']:
+                if action[0] != previous_vlan:
+                    final_actions.append( ('vlan', action[0]) )
+                    previous_vlan = action[0]
+                    if self.pmp_with_same_vlan and action_number:
+                        return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
+                    action_number += 1
+                final_actions.append( ('out', action[1]) )
+            flow_broadcast['actions'] = final_actions
+
+            if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
+                self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
+                continue
+            
+            new_flows.append(flow_broadcast)        
+        
+        #UNIFY openflow rules with the same input port and vlan and the same output actions
+        #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
+        #this can happen if there is only two ports. It is converted to a point to point connection
+        flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
+        for flow in new_flows:
+            key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
+            if key in flow_dict:
+                flow_dict[key].append(flow)
+            else:
+                flow_dict[key]=[ flow ]
+        new_flows2=[]
+        for flow_list in flow_dict.values():
+            convert2ptp=False
+            if len (flow_list)>=2:
+                convert2ptp=True
+                for f in flow_list:
+                    if f['actions'] != flow_list[0]['actions']:
+                        convert2ptp=False
+                        break
+            if convert2ptp: # add only one unified rule without dst_mac
+                self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
+                flow_list[0].pop('dst_mac')
+                flow_list[0]["priority"] -= 5
+                new_flows2.append(flow_list[0])
+            else:  # add all the rules
+                new_flows2 += flow_list
+        return 0, new_flows2
+
+    def set_openflow_controller_status(self, status, error_text=None):
+        """
+        Set openflow controller last operation status in DB
+        :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
+        :param error_text: error text
+        :return:
+        """
+        if self.of_uuid == "Default":
+            return True
+
+        ofc = {}
+        ofc['status'] = status
+        ofc['last_error'] = error_text
+        self.db_lock.acquire()
+        result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
+        self.db_lock.release()
+        if result >= 0:
+            return True
+        else:
+            return False
+
+
+
+
+
+
+