Openflow controller abstract connector
[osm/openvim.git] / openflow_thread.py
index b797f2d..6b25dab 100644 (file)
@@ -23,7 +23,7 @@
 ##
 
 '''
-This thread interacts with a openflow floodligth controller to create dataplane connections
+This thread interacts with a openflow controller to create dataplane connections
 '''
 
 __author__="Pablo Montes, Alfonso Tierno"
@@ -36,6 +36,11 @@ import time
 import Queue
 import requests
 import logging
+import openflow_conn
+
+OFC_STATUS_ACTIVE = 'ACTIVE'
+OFC_STATUS_INACTIVE = 'INACTIVE'
+OFC_STATUS_ERROR = 'ERROR'
 
 class FlowBadFormat(Exception):
     '''raise when a bad format of flow is found''' 
@@ -84,51 +89,11 @@ def change_db2of(flow):
     flow['actions'] = actions
 
 
-class of_test_connector():
-    '''This is a fake openflow connector for testing.
-        It does nothing and it is used for running openvim without an openflow controller 
-    '''
-    def __init__(self, params):
-        name = params.get("name", "test-ofc")
-        self.name = name
-        self.dpid = params.get("dpid")
-        self.rules= {}
-        self.logger = logging.getLogger('vim.OF.TEST')
-        self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR")))
-        self.pp2ofi = {}
-
-    def get_of_switches(self):
-        return 0, ()
-
-    def obtain_port_correspondence(self):
-        return 0, ()
-
-    def del_flow(self, flow_name):
-        if flow_name in self.rules:
-            self.logger.debug("del_flow OK")
-            del self.rules[flow_name]
-            return 0, None
-        else:
-            self.logger.warning("del_flow not found")
-            return -1, "flow %s not found"
-
-    def new_flow(self, data):
-        self.rules[ data["name"] ] = data
-        self.logger.debug("new_flow OK")
-        return 0, None
-
-    def get_of_rules(self, translate_of_ports=True):
-        return 0, self.rules
-
-    def clear_all_flows(self):
-        self.logger.debug("clear_all_flows OK")
-        self.rules={}
-        return 0, None
-
-
-
 class openflow_thread(threading.Thread):
-    def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
+    """
+    This thread interacts with a openflow controller to create dataplane connections
+    """
+    def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
         threading.Thread.__init__(self)
         self.of_uuid = of_uuid
         self.db = db
@@ -136,10 +101,10 @@ class openflow_thread(threading.Thread):
         self.name = "openflow"
         self.test = of_test
         self.db_lock = db_lock
-        self.OF_connector = OF_connector
+        self.OF_connector = of_connector
         self.logger = logging.getLogger('vim.OF-' + of_uuid)
-        self.logger.setLevel( getattr(logging, debug) )
-        self.logger.name = OF_connector.name + " " + self.OF_connector.dpid
+        self.logger.setLevel(getattr(logging, debug))
+        self.logger.name = of_connector.name + " " + self.OF_connector.dpid
         self.queueLock = threading.Lock()
         self.taskQueue = Queue.Queue(2000)
         
@@ -154,47 +119,57 @@ class openflow_thread(threading.Thread):
 
     def run(self):
         self.logger.debug("Start openflow thread")
+        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+
         while True:
-            self.queueLock.acquire()
-            if not self.taskQueue.empty():
-                task = self.taskQueue.get()
-            else:
-                task = None
-            self.queueLock.release()
+            try:
+                self.queueLock.acquire()
+                if not self.taskQueue.empty():
+                    task = self.taskQueue.get()
+                else:
+                    task = None
+                self.queueLock.release()
 
-            if task is None:
-                time.sleep(1)
-                continue        
+                if task is None:
+                    time.sleep(1)
+                    continue
 
-            if task[0] == 'update-net':
-                r,c = self.update_of_flows(task[1])
-                #update database status
-                self.db_lock.acquire()
-                if r<0:
-                    UPDATE={'status':'ERROR', 'last_error': str(c)}
-                    self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+                if task[0] == 'update-net':
+                    r,c = self.update_of_flows(task[1])
+                    # update database status
+                    if r<0:
+                        UPDATE={'status':'ERROR', 'last_error': str(c)}
+                        self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
+                        self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
+                    else:
+                        UPDATE={'status':'ACTIVE', 'last_error': None}
+                        self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
+                        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+                    self.db_lock.acquire()
+                    self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
+                    self.db_lock.release()
+
+                elif task[0] == 'clear-all':
+                    r,c = self.clear_all_flows()
+                    if r<0:
+                        self.logger.error("processing task 'clear-all': %s", c)
+                        self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
+                    else:
+                        self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
+                        self.logger.debug("processing task 'clear-all': OK")
+                elif task[0] == 'exit':
+                    self.logger.debug("exit from openflow_thread")
+                    self.terminate()
+                    self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
+                    return 0
                 else:
-                    UPDATE={'status':'ACTIVE', 'last_error': None}
-                    self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
-                self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
-                self.db_lock.release()
+                    self.logger.error("unknown task %s", str(task))
+            except openflow_conn.OpenflowconnException as e:
+                self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
 
-            elif task[0] == 'clear-all':
-                r,c = self.clear_all_flows()
-                if r<0:
-                    self.logger.error("processing task 'clear-all': %s", c)
-                else:
-                    self.logger.debug("processing task 'clear-all': OK")
-            elif task[0] == 'exit':
-                self.logger.debug("exit from openflow_thread")
-                self.terminate()
-                return 0
-            else:
-                self.logger.error("unknown task %s", str(task))
-                
     def terminate(self):
         pass
-        #print self.name, ": exit from openflow_thread"
+        # print self.name, ": exit from openflow_thread"
 
     def update_of_flows(self, net_id):
         ports=()
@@ -230,6 +205,7 @@ class openflow_thread(threading.Thread):
                         WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
                 self.db_lock.release()
                 if nb_ports < 0:
+
                     #print self.name, ": update_of_flows() ERROR getting ports", ports
                     return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
                 
@@ -253,23 +229,27 @@ class openflow_thread(threading.Thread):
             result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
             self.db_lock.release()
             if result < 0:
-                #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
-                return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
+                error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
+                # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+                return -1, error_msg
             database_flows += database_net_flows
         # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
         self.db_lock.acquire()
         result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
         self.db_lock.release()
         if result < 0:
-            #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
-            return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
+            error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
+            # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
+            return -1, error_msg
         database_flows += database_net_flows
 
-        #Get the existing flows at openflow controller
-        result, of_flows = self.OF_connector.get_of_rules() 
-        if result < 0:
-            #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
-            return -1, "OF error getting flows: " + of_flows
+        # Get the existing flows at openflow controller
+        try:
+            of_flows = self.OF_connector.get_of_rules()
+            # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
+        except openflow_conn.OpenflowconnException as e:
+            # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
+            return -1, "OF error {} getting flows".format(str(e))
 
         if ifaces_nb < 2:
             pass
@@ -334,37 +314,39 @@ class openflow_thread(threading.Thread):
                 continue
             used_names.append(flow['name'])
         name_index=0
-        #insert at database the new flows, change actions to human text
+        # insert at database the new flows, change actions to human text
         for flow in new_flows:
-            #1 check if an equal flow is already present
+            # 1 check if an equal flow is already present
             index = self._check_flow_already_present(flow, database_flows)
             if index>=0:
                 database_flows[index]["not delete"]=True
                 self.logger.debug("Skipping already present flow %s", str(flow))
                 continue
-            #2 look for a non used name
+            # 2 look for a non used name
             flow_name=flow["net_id"]+"."+str(name_index)
             while flow_name in used_names or flow_name in of_flows:         
                 name_index += 1   
                 flow_name=flow["net_id"]+"."+str(name_index)
             used_names.append(flow_name)
             flow['name'] = flow_name
-            #3 insert at openflow
-            result, content = self.OF_connector.new_flow(flow)
-            if result < 0:
-                #print self.name, ": Error '%s' at flow insertion" % c, flow
-                return -1, content
-            #4 insert at database
+            # 3 insert at openflow
+
+            try:
+                self.OF_connector.new_flow(flow)
+            except openflow_conn.OpenflowconnException as e:
+                return -1, "Error creating new flow {}".format(str(e))
+
+            # 4 insert at database
             try:
                 change_of2db(flow)
             except FlowBadFormat as e:
-                #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
+                # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
                 return -1, str(e)
             self.db_lock.acquire()
             result, content = self.db.new_row('of_flows', flow)
             self.db_lock.release()
             if result < 0:
-                #print self.name, ": Error '%s' at database insertion" % content, flow
+                # print self.name, ": Error '%s' at database insertion" % content, flow
                 return -1, content
 
         #delete not needed old flows from openflow and from DDBB, 
@@ -372,19 +354,23 @@ class openflow_thread(threading.Thread):
         for flow in database_flows:
             if "not delete" in flow:
                 if flow["name"] not in of_flows:
-                    #not in controller, insert it
-                    result, content = self.OF_connector.new_flow(flow)
-                    if result < 0:
-                        #print self.name, ": Error '%s' at flow insertion" % c, flow
-                        return -1, content
+                    # not in controller, insert it
+                    try:
+                        self.OF_connector.new_flow(flow)
+                    except openflow_conn.OpenflowconnException as e:
+                        return -1, "Error creating new flow {}".format(str(e))
+
                 continue
-            #Delete flow
+            # Delete flow
             if flow["name"] in of_flows:
-                result, content = self.OF_connector.del_flow(flow['name'])
-                if result<0:
-                    self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
-                    continue #skip deletion from database
-            #delete from database
+                try:
+                    self.OF_connector.del_flow(flow['name'])
+                except openflow_conn.OpenflowconnException as e:
+                    self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
+                    # skip deletion from database
+                    continue
+
+            # delete from database
             self.db_lock.acquire()
             result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
             self.db_lock.release()
@@ -397,16 +383,17 @@ class openflow_thread(threading.Thread):
         try:
             if not self.test:
                 self.OF_connector.clear_all_flows()
-            #remove from database
+
+            # remove from database
             self.db_lock.acquire()
             self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
             self.db_lock.release()
             return 0, None
-        except requests.exceptions.RequestException as e:
-            #print self.name, ": clear_all_flows Exception:", str(e)
-            return -1, str(e)
+        except openflow_conn.OpenflowconnException as e:
+            return -1, self.logger.error("Error deleting all flows {}", str(e))
+
+    flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
 
-    flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
     def _check_flow_already_present(self, new_flow, flow_list):
         '''check if the same flow is already present in the flow list
         The flow is repeated if all the fields, apart from name, are equal
@@ -435,7 +422,7 @@ class openflow_thread(threading.Thread):
                 nb_ports += 1
                 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
                     error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
-                    #print self.name, ": ERROR " + error_text
+                    # print self.name, ": ERROR " + error_text
                     return -1, error_text
 
         for net_src in nets:
@@ -581,4 +568,31 @@ class openflow_thread(threading.Thread):
             else:  # add all the rules
                 new_flows2 += flow_list
         return 0, new_flows2
-        
+
+    def set_openflow_controller_status(self, status, error_text=None):
+        """
+        Set openflow controller last operation status in DB
+        :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
+        :param error_text: error text
+        :return:
+        """
+        if self.of_uuid == "Default":
+            return True
+
+        ofc = {}
+        ofc['status'] = status
+        ofc['last_error'] = error_text
+        self.db_lock.acquire()
+        result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
+        self.db_lock.release()
+        if result >= 0:
+            return True
+        else:
+            return False
+
+
+
+
+
+
+