Added LICENSE file to root folder
[osm/openvim.git] / osm_openvim / openflow_thread.py
index 24fc77f..a26b48e 100644 (file)
@@ -2,7 +2,7 @@
 # -*- coding: utf-8 -*-
 
 ##
-# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
+# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
 # This file is part of openvim
 # All Rights Reserved.
 #
@@ -93,14 +93,13 @@ class openflow_thread(threading.Thread):
     """
     This thread interacts with a openflow controller to create dataplane connections
     """
-    def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, logger_name=None,
+    def __init__(self, of_uuid, of_connector, db, of_test, pmp_with_same_vlan=False, logger_name=None,
                  debug=None):
         threading.Thread.__init__(self)
         self.of_uuid = of_uuid
         self.db = db
         self.pmp_with_same_vlan = pmp_with_same_vlan
         self.test = of_test
-        self.db_lock = db_lock
         self.OF_connector = of_connector
         if logger_name:
             self.logger_name = logger_name
@@ -111,7 +110,13 @@ class openflow_thread(threading.Thread):
             self.logger.setLevel(getattr(logging, debug))
         self.queueLock = threading.Lock()
         self.taskQueue = Queue.Queue(2000)
-        
+
+    @staticmethod
+    def _format_error_msg(error_text, max_length=1024):
+        if error_text and len(error_text) >= max_length:
+            return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
+        return error_text
+
     def insert_task(self, task, *aditional):
         try:
             self.queueLock.acquire()
@@ -139,19 +144,17 @@ class openflow_thread(threading.Thread):
                     continue
 
                 if task[0] == 'update-net':
-                    r,c = self.update_of_flows(task[1])
+                    r, c = self.update_of_flows(task[1])
                     # update database status
                     if r<0:
-                        UPDATE={'status':'ERROR', 'last_error': str(c)}
+                        UPDATE={'status':'ERROR', 'last_error': self._format_error_msg(str(c), 255)}
                         self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
                         self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
                     else:
                         UPDATE={'status':'ACTIVE', 'last_error': None}
                         self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
                         self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
-                    self.db_lock.acquire()
                     self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
-                    self.db_lock.release()
 
                 elif task[0] == 'clear-all':
                     r,c = self.clear_all_flows()
@@ -180,7 +183,6 @@ class openflow_thread(threading.Thread):
 
     def update_of_flows(self, net_id):
         ports=()
-        self.db_lock.acquire()
         select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
         result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
         #get all the networks binding to this
@@ -193,7 +195,6 @@ class openflow_thread(threading.Thread):
             result, nets = self.db.get_table(FROM='nets', SELECT=select_,
                                                 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
             
-        self.db_lock.release()
         if result < 0:
             return -1, "DB error getting net: " + nets
         #elif result==0:
@@ -205,12 +206,10 @@ class openflow_thread(threading.Thread):
             if net['admin_state_up'] == 'false':
                 net['ports'] = ()
             else:
-                self.db_lock.acquire()
                 nb_ports, net_ports = self.db.get_table(
                         FROM='ports',
                         SELECT=('switch_port','vlan','uuid','mac','type','model'),
                         WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
-                self.db_lock.release()
                 if nb_ports < 0:
 
                     #print self.name, ": update_of_flows() ERROR getting ports", ports
@@ -232,18 +231,14 @@ class openflow_thread(threading.Thread):
                 ifaces_nb += nb_ports
         
             # Get the name of flows that will be affected by this NET 
-            self.db_lock.acquire()
             result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
-            self.db_lock.release()
             if result < 0:
                 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
                 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
                 return -1, error_msg
             database_flows += database_net_flows
         # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
-        self.db_lock.acquire()
         result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
-        self.db_lock.release()
         if result < 0:
             error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
             # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
@@ -349,9 +344,7 @@ class openflow_thread(threading.Thread):
             except FlowBadFormat as e:
                 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
                 return -1, str(e)
-            self.db_lock.acquire()
             result, content = self.db.new_row('of_flows', flow)
-            self.db_lock.release()
             if result < 0:
                 # print self.name, ": Error '%s' at database insertion" % content, flow
                 return -1, content
@@ -378,9 +371,7 @@ class openflow_thread(threading.Thread):
                     continue
 
             # delete from database
-            self.db_lock.acquire()
             result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
-            self.db_lock.release()
             if result<0:
                 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
         
@@ -392,9 +383,7 @@ class openflow_thread(threading.Thread):
                 self.OF_connector.clear_all_flows()
 
             # remove from database
-            self.db_lock.acquire()
             self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
-            self.db_lock.release()
             return 0, None
         except openflow_conn.OpenflowconnException as e:
             return -1, self.logger.error("Error deleting all flows {}", str(e))
@@ -588,10 +577,8 @@ class openflow_thread(threading.Thread):
 
         ofc = {}
         ofc['status'] = status
-        ofc['last_error'] = error_text
-        self.db_lock.acquire()
+        ofc['last_error'] = self._format_error_msg(error_text, 255)
         result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
-        self.db_lock.release()
         if result >= 0:
             return True
         else: