##
'''
-This thread interacts with a openflow floodligth controller to create dataplane connections
+This thread interacts with a openflow controller to create dataplane connections
'''
__author__="Pablo Montes, Alfonso Tierno"
import Queue
import requests
import logging
+import openflow_conn
+
+OFC_STATUS_ACTIVE = 'ACTIVE'
+OFC_STATUS_INACTIVE = 'INACTIVE'
+OFC_STATUS_ERROR = 'ERROR'
class FlowBadFormat(Exception):
'''raise when a bad format of flow is found'''
flow['actions'] = actions
-class of_test_connector():
- '''This is a fake openflow connector for testing.
- It does nothing and it is used for running openvim without an openflow controller
- '''
- def __init__(self, params):
- name = params.get("name", "test-ofc")
- self.name = name
- self.dpid = params.get("dpid")
- self.rules= {}
- self.logger = logging.getLogger('vim.OF.TEST')
- self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR")))
- self.pp2ofi = {}
-
- def get_of_switches(self):
- return 0, ()
-
- def obtain_port_correspondence(self):
- return 0, ()
-
- def del_flow(self, flow_name):
- if flow_name in self.rules:
- self.logger.debug("del_flow OK")
- del self.rules[flow_name]
- return 0, None
- else:
- self.logger.warning("del_flow not found")
- return -1, "flow %s not found"
-
- def new_flow(self, data):
- self.rules[ data["name"] ] = data
- self.logger.debug("new_flow OK")
- return 0, None
-
- def get_of_rules(self, translate_of_ports=True):
- return 0, self.rules
-
- def clear_all_flows(self):
- self.logger.debug("clear_all_flows OK")
- self.rules={}
- return 0, None
-
-
-
class openflow_thread(threading.Thread):
- def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
+ """
+ This thread interacts with a openflow controller to create dataplane connections
+ """
+ def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
threading.Thread.__init__(self)
self.of_uuid = of_uuid
self.db = db
self.name = "openflow"
self.test = of_test
self.db_lock = db_lock
- self.OF_connector = OF_connector
+ self.OF_connector = of_connector
self.logger = logging.getLogger('vim.OF-' + of_uuid)
- self.logger.setLevel( getattr(logging, debug) )
- self.logger.name = OF_connector.name + " " + self.OF_connector.dpid
+ self.logger.setLevel(getattr(logging, debug))
+ self.logger.name = of_connector.name + " " + self.OF_connector.dpid
self.queueLock = threading.Lock()
self.taskQueue = Queue.Queue(2000)
def run(self):
self.logger.debug("Start openflow thread")
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+
while True:
- self.queueLock.acquire()
- if not self.taskQueue.empty():
- task = self.taskQueue.get()
- else:
- task = None
- self.queueLock.release()
+ try:
+ self.queueLock.acquire()
+ if not self.taskQueue.empty():
+ task = self.taskQueue.get()
+ else:
+ task = None
+ self.queueLock.release()
- if task is None:
- time.sleep(1)
- continue
+ if task is None:
+ time.sleep(1)
+ continue
- if task[0] == 'update-net':
- r,c = self.update_of_flows(task[1])
- #update database status
- self.db_lock.acquire()
- if r<0:
- UPDATE={'status':'ERROR', 'last_error': str(c)}
- self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+ if task[0] == 'update-net':
+ r,c = self.update_of_flows(task[1])
+ # update database status
+ if r<0:
+ UPDATE={'status':'ERROR', 'last_error': str(c)}
+ self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
+ else:
+ UPDATE={'status':'ACTIVE', 'last_error': None}
+ self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+ self.db_lock.acquire()
+ self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
+ self.db_lock.release()
+
+ elif task[0] == 'clear-all':
+ r,c = self.clear_all_flows()
+ if r<0:
+ self.logger.error("processing task 'clear-all': %s", c)
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
+ else:
+ self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+ self.logger.debug("processing task 'clear-all': OK")
+ elif task[0] == 'exit':
+ self.logger.debug("exit from openflow_thread")
+ self.terminate()
+ self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
+ return 0
else:
- UPDATE={'status':'ACTIVE', 'last_error': None}
- self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
- self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
- self.db_lock.release()
+ self.logger.error("unknown task %s", str(task))
+ except openflow_conn.OpenflowconnException as e:
+ self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
- elif task[0] == 'clear-all':
- r,c = self.clear_all_flows()
- if r<0:
- self.logger.error("processing task 'clear-all': %s", c)
- else:
- self.logger.debug("processing task 'clear-all': OK")
- elif task[0] == 'exit':
- self.logger.debug("exit from openflow_thread")
- self.terminate()
- return 0
- else:
- self.logger.error("unknown task %s", str(task))
-
def terminate(self):
pass
- #print self.name, ": exit from openflow_thread"
+ # print self.name, ": exit from openflow_thread"
def update_of_flows(self, net_id):
ports=()
WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
self.db_lock.release()
if nb_ports < 0:
+
#print self.name, ": update_of_flows() ERROR getting ports", ports
return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
self.db_lock.release()
if result < 0:
- #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
- return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
+ error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
+ # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+ return -1, error_msg
database_flows += database_net_flows
# Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
self.db_lock.acquire()
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
self.db_lock.release()
if result < 0:
- #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
- return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
+ error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
+ # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+ return -1, error_msg
database_flows += database_net_flows
- #Get the existing flows at openflow controller
- result, of_flows = self.OF_connector.get_of_rules()
- if result < 0:
- #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
- return -1, "OF error getting flows: " + of_flows
+ # Get the existing flows at openflow controller
+ try:
+ of_flows = self.OF_connector.get_of_rules()
+ # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
+ except openflow_conn.OpenflowconnException as e:
+ # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
+ return -1, "OF error {} getting flows".format(str(e))
if ifaces_nb < 2:
pass
continue
used_names.append(flow['name'])
name_index=0
- #insert at database the new flows, change actions to human text
+ # insert at database the new flows, change actions to human text
for flow in new_flows:
- #1 check if an equal flow is already present
+ # 1 check if an equal flow is already present
index = self._check_flow_already_present(flow, database_flows)
if index>=0:
database_flows[index]["not delete"]=True
self.logger.debug("Skipping already present flow %s", str(flow))
continue
- #2 look for a non used name
+ # 2 look for a non used name
flow_name=flow["net_id"]+"."+str(name_index)
while flow_name in used_names or flow_name in of_flows:
name_index += 1
flow_name=flow["net_id"]+"."+str(name_index)
used_names.append(flow_name)
flow['name'] = flow_name
- #3 insert at openflow
- result, content = self.OF_connector.new_flow(flow)
- if result < 0:
- #print self.name, ": Error '%s' at flow insertion" % c, flow
- return -1, content
- #4 insert at database
+ # 3 insert at openflow
+
+ try:
+ self.OF_connector.new_flow(flow)
+ except openflow_conn.OpenflowconnException as e:
+ return -1, "Error creating new flow {}".format(str(e))
+
+ # 4 insert at database
try:
change_of2db(flow)
except FlowBadFormat as e:
- #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
+ # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
return -1, str(e)
self.db_lock.acquire()
result, content = self.db.new_row('of_flows', flow)
self.db_lock.release()
if result < 0:
- #print self.name, ": Error '%s' at database insertion" % content, flow
+ # print self.name, ": Error '%s' at database insertion" % content, flow
return -1, content
#delete not needed old flows from openflow and from DDBB,
for flow in database_flows:
if "not delete" in flow:
if flow["name"] not in of_flows:
- #not in controller, insert it
- result, content = self.OF_connector.new_flow(flow)
- if result < 0:
- #print self.name, ": Error '%s' at flow insertion" % c, flow
- return -1, content
+ # not in controller, insert it
+ try:
+ self.OF_connector.new_flow(flow)
+ except openflow_conn.OpenflowconnException as e:
+ return -1, "Error creating new flow {}".format(str(e))
+
continue
- #Delete flow
+ # Delete flow
if flow["name"] in of_flows:
- result, content = self.OF_connector.del_flow(flow['name'])
- if result<0:
- self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
- continue #skip deletion from database
- #delete from database
+ try:
+ self.OF_connector.del_flow(flow['name'])
+ except openflow_conn.OpenflowconnException as e:
+ self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
+ # skip deletion from database
+ continue
+
+ # delete from database
self.db_lock.acquire()
result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
self.db_lock.release()
try:
if not self.test:
self.OF_connector.clear_all_flows()
- #remove from database
+
+ # remove from database
self.db_lock.acquire()
self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
self.db_lock.release()
return 0, None
- except requests.exceptions.RequestException as e:
- #print self.name, ": clear_all_flows Exception:", str(e)
- return -1, str(e)
+ except openflow_conn.OpenflowconnException as e:
+ return -1, self.logger.error("Error deleting all flows {}", str(e))
+
+ flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
- flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
def _check_flow_already_present(self, new_flow, flow_list):
'''check if the same flow is already present in the flow list
The flow is repeated if all the fields, apart from name, are equal
nb_ports += 1
if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
- #print self.name, ": ERROR " + error_text
+ # print self.name, ": ERROR " + error_text
return -1, error_text
for net_src in nets:
else: # add all the rules
new_flows2 += flow_list
return 0, new_flows2
-
+
+ def set_openflow_controller_status(self, status, error_text=None):
+ """
+ Set openflow controller last operation status in DB
+ :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
+ :param error_text: error text
+ :return:
+ """
+ if self.of_uuid == "Default":
+ return True
+
+ ofc = {}
+ ofc['status'] = status
+ ofc['last_error'] = error_text
+ self.db_lock.acquire()
+ result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
+ self.db_lock.release()
+ if result >= 0:
+ return True
+ else:
+ return False
+
+
+
+
+
+
+