X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=openflow_thread.py;h=cd873e7dc72949851e5f4dda740df2dd7da88c30;hb=refs%2Fchanges%2F17%2F1317%2F3;hp=265b16777c9a408870648d1cf3292d6291763755;hpb=9a61c6b761065160d0889e7bd1e0f9fc37de5310;p=osm%2Fopenvim.git diff --git a/openflow_thread.py b/openflow_thread.py index 265b167..cd873e7 100644 --- a/openflow_thread.py +++ b/openflow_thread.py @@ -23,7 +23,7 @@ ## ''' -This thread interacts with a openflow floodligth controller to create dataplane connections +This thread interacts with a openflow controller to create dataplane connections ''' __author__="Pablo Montes, Alfonso Tierno" @@ -36,6 +36,11 @@ import time import Queue import requests import logging +import openflow_conn + +OFC_STATUS_ACTIVE = 'ACTIVE' +OFC_STATUS_INACTIVE = 'INACTIVE' +OFC_STATUS_ERROR = 'ERROR' class FlowBadFormat(Exception): '''raise when a bad format of flow is found''' @@ -84,55 +89,22 @@ def change_db2of(flow): flow['actions'] = actions - -class of_test_connector(): - '''This is a fake openflow connector for testing. - It does nothing and it is used for running openvim without an openflow controller - ''' - def __init__(self, params): - self.name = "ofc_test" - self.rules={} - self.logger = logging.getLogger('vim.OF.TEST') - self.logger.setLevel( getattr(logging, params.get("of_debug", "ERROR") ) ) - def get_of_switches(self): - return 0, () - def obtain_port_correspondence(self): - return 0, () - def del_flow(self, flow_name): - if flow_name in self.rules: - self.logger.debug("del_flow OK") - del self.rules[flow_name] - return 0, None - else: - self.logger.warning("del_flow not found") - return -1, "flow %s not found" - def new_flow(self, data): - self.rules[ data["name"] ] = data - self.logger.debug("new_flow OK") - return 0, None - def get_of_rules(self, translate_of_ports=True): - return 0, self.rules - - def clear_all_flows(self): - self.logger.debug("clear_all_flows OK") - self.rules={} - return 0, None - - - class openflow_thread(threading.Thread): - def __init__(self, OF_connector, db, db_lock, of_test, pmp_with_same_vlan, debug='ERROR'): + """ + This thread interacts with a openflow controller to create dataplane connections + """ + def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'): threading.Thread.__init__(self) - + self.of_uuid = of_uuid self.db = db self.pmp_with_same_vlan = pmp_with_same_vlan self.name = "openflow" self.test = of_test self.db_lock = db_lock - self.OF_connector = OF_connector - self.logger = logging.getLogger('vim.OF') - self.logger.setLevel( getattr(logging, debug) ) - + self.OF_connector = of_connector + self.logger = logging.getLogger('vim.OF-' + of_uuid) + self.logger.setLevel(getattr(logging, debug)) + self.logger.name = of_connector.name + " " + self.OF_connector.dpid self.queueLock = threading.Lock() self.taskQueue = Queue.Queue(2000) @@ -146,47 +118,58 @@ class openflow_thread(threading.Thread): return -1, "timeout inserting a task over openflow thread " + self.name def run(self): + self.logger.debug("Start openflow thread") + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + while True: - self.queueLock.acquire() - if not self.taskQueue.empty(): - task = self.taskQueue.get() - else: - task = None - self.queueLock.release() + try: + self.queueLock.acquire() + if not self.taskQueue.empty(): + task = self.taskQueue.get() + else: + task = None + self.queueLock.release() - if task is None: - time.sleep(1) - continue + if task is None: + time.sleep(1) + continue - if task[0] == 'update-net': - r,c = self.update_of_flows(task[1]) - #update database status - self.db_lock.acquire() - if r<0: - UPDATE={'status':'ERROR', 'last_error': str(c)} - self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c) + if task[0] == 'update-net': + r,c = self.update_of_flows(task[1]) + # update database status + if r<0: + UPDATE={'status':'ERROR', 'last_error': str(c)} + self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c) + self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1])) + else: + UPDATE={'status':'ACTIVE', 'last_error': None} + self.logger.debug("processing task 'update-net' %s: OK", str(task[1])) + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + self.db_lock.acquire() + self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]}) + self.db_lock.release() + + elif task[0] == 'clear-all': + r,c = self.clear_all_flows() + if r<0: + self.logger.error("processing task 'clear-all': %s", c) + self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows") + else: + self.set_openflow_controller_status(OFC_STATUS_ACTIVE) + self.logger.debug("processing task 'clear-all': OK") + elif task[0] == 'exit': + self.logger.debug("exit from openflow_thread") + self.terminate() + self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed") + return 0 else: - UPDATE={'status':'ACTIVE', 'last_error': None} - self.logger.debug("processing task 'update-net' %s: OK", str(task[1])) - self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]}) - self.db_lock.release() + self.logger.error("unknown task %s", str(task)) + except openflow_conn.OpenflowconnException as e: + self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e)) - elif task[0] == 'clear-all': - r,c = self.clear_all_flows() - if r<0: - self.logger.error("processing task 'clear-all': %s", c) - else: - self.logger.debug("processing task 'clear-all': OK") - elif task[0] == 'exit': - self.logger.debug("exit from openflow_thread") - self.terminate() - return 0 - else: - self.logger.error("unknown task %s", str(task)) - def terminate(self): pass - #print self.name, ": exit from openflow_thread" + # print self.name, ": exit from openflow_thread" def update_of_flows(self, net_id): ports=() @@ -222,6 +205,7 @@ class openflow_thread(threading.Thread): WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} ) self.db_lock.release() if nb_ports < 0: + #print self.name, ": update_of_flows() ERROR getting ports", ports return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports) @@ -245,23 +229,27 @@ class openflow_thread(threading.Thread): result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id}) self.db_lock.release() if result < 0: - #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows - return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows) + error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows) + # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows + return -1, error_msg database_flows += database_net_flows # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null) self.db_lock.acquire() result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None}) self.db_lock.release() if result < 0: - #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows - return -1, "DB error getting flows from net 'null': %s" %(database_net_flows) + error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows) + # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows + return -1, error_msg database_flows += database_net_flows - #Get the existing flows at openflow controller - result, of_flows = self.OF_connector.get_of_rules() - if result < 0: - #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows - return -1, "OF error getting flows: " + of_flows + # Get the existing flows at openflow controller + try: + of_flows = self.OF_connector.get_of_rules() + # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows + except openflow_conn.OpenflowconnException as e: + # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e))) + return -1, "OF error {} getting flows".format(str(e)) if ifaces_nb < 2: pass @@ -284,14 +272,14 @@ class openflow_thread(threading.Thread): if vlan_tag == None: vlan_tag=True elif vlan_tag==False: - text="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True" + text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" #print self.name, "Error", text return -1, text else: if vlan_tag == None: vlan_tag=False elif vlan_tag == True: - text="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True" + text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True" #print self.name, "Error", text return -1, text elif port["model"]=="PF" or port["model"]=="VFnotShared": @@ -326,37 +314,39 @@ class openflow_thread(threading.Thread): continue used_names.append(flow['name']) name_index=0 - #insert at database the new flows, change actions to human text + # insert at database the new flows, change actions to human text for flow in new_flows: - #1 check if an equal flow is already present + # 1 check if an equal flow is already present index = self._check_flow_already_present(flow, database_flows) if index>=0: database_flows[index]["not delete"]=True self.logger.debug("Skipping already present flow %s", str(flow)) continue - #2 look for a non used name + # 2 look for a non used name flow_name=flow["net_id"]+"."+str(name_index) while flow_name in used_names or flow_name in of_flows: name_index += 1 flow_name=flow["net_id"]+"."+str(name_index) used_names.append(flow_name) flow['name'] = flow_name - #3 insert at openflow - result, content = self.OF_connector.new_flow(flow) - if result < 0: - #print self.name, ": Error '%s' at flow insertion" % c, flow - return -1, content - #4 insert at database + # 3 insert at openflow + + try: + self.OF_connector.new_flow(flow) + except openflow_conn.OpenflowconnException as e: + return -1, "Error creating new flow {}".format(str(e)) + + # 4 insert at database try: change_of2db(flow) except FlowBadFormat as e: - #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow + # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow return -1, str(e) self.db_lock.acquire() result, content = self.db.new_row('of_flows', flow) self.db_lock.release() if result < 0: - #print self.name, ": Error '%s' at database insertion" % content, flow + # print self.name, ": Error '%s' at database insertion" % content, flow return -1, content #delete not needed old flows from openflow and from DDBB, @@ -364,19 +354,23 @@ class openflow_thread(threading.Thread): for flow in database_flows: if "not delete" in flow: if flow["name"] not in of_flows: - #not in controller, insert it - result, content = self.OF_connector.new_flow(flow) - if result < 0: - #print self.name, ": Error '%s' at flow insertion" % c, flow - return -1, content + # not in controller, insert it + try: + self.OF_connector.new_flow(flow) + except openflow_conn.OpenflowconnException as e: + return -1, "Error creating new flow {}".format(str(e)) + continue - #Delete flow + # Delete flow if flow["name"] in of_flows: - result, content = self.OF_connector.del_flow(flow['name']) - if result<0: - self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content ) - continue #skip deletion from database - #delete from database + try: + self.OF_connector.del_flow(flow['name']) + except openflow_conn.OpenflowconnException as e: + self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e)) + # skip deletion from database + continue + + # delete from database self.db_lock.acquire() result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id']) self.db_lock.release() @@ -389,16 +383,17 @@ class openflow_thread(threading.Thread): try: if not self.test: self.OF_connector.clear_all_flows() - #remove from database + + # remove from database self.db_lock.acquire() self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines self.db_lock.release() return 0, None - except requests.exceptions.RequestException as e: - #print self.name, ": clear_all_flows Exception:", str(e) - return -1, str(e) + except openflow_conn.OpenflowconnException as e: + return -1, self.logger.error("Error deleting all flows {}", str(e)) + + flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id') - flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id') def _check_flow_already_present(self, new_flow, flow_list): '''check if the same flow is already present in the flow list The flow is repeated if all the fields, apart from name, are equal @@ -427,7 +422,7 @@ class openflow_thread(threading.Thread): nb_ports += 1 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi: error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port']) - #print self.name, ": ERROR " + error_text + # print self.name, ": ERROR " + error_text return -1, error_text for net_src in nets: @@ -454,7 +449,7 @@ class openflow_thread(threading.Thread): if vlan_in == None and src_port['vlan'] != None: vlan_in = src_port['vlan'] elif vlan_in != None and src_port['vlan'] != None: - #TODO this is something that we can not do. It requires a double VLAN check + #TODO this is something that we cannot do. It requires a double VLAN check #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in continue @@ -478,7 +473,7 @@ class openflow_thread(threading.Thread): if vlan_out == None and dst_port['vlan'] != None: vlan_out = dst_port['vlan'] elif vlan_out != None and dst_port['vlan'] != None: - #TODO this is something that we can not do. It requires a double VLAN set + #TODO this is something that we cannot do. It requires a double VLAN set #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out continue #if src_port == dst_port: @@ -535,7 +530,7 @@ class openflow_thread(threading.Thread): final_actions.append( ('vlan', action[0]) ) previous_vlan = action[0] if self.pmp_with_same_vlan and action_number: - return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True." + return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True." action_number += 1 final_actions.append( ('out', action[1]) ) flow_broadcast['actions'] = final_actions @@ -573,4 +568,31 @@ class openflow_thread(threading.Thread): else: # add all the rules new_flows2 += flow_list return 0, new_flows2 - + + def set_openflow_controller_status(self, status, error_text=None): + """ + Set openflow controller last operation status in DB + :param status: ofc status ('ACTIVE','INACTIVE','ERROR') + :param error_text: error text + :return: + """ + if self.of_uuid == "Default": + return True + + ofc = {} + ofc['status'] = status + ofc['last_error'] = error_text + self.db_lock.acquire() + result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False) + self.db_lock.release() + if result >= 0: + return True + else: + return False + + + + + + +