X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_openvim%2Fopenflow_thread.py;h=a26b48e4723315704aa43e6a106ce70ee775cb63;hb=9cead2a0262deabadc900d580ef312fbc468efc0;hp=2f39fabb68af06a3d4fbaf1966e440ce238148dd;hpb=f135eff232fe844439c3f097734693ab4320460e;p=osm%2Fopenvim.git diff --git a/osm_openvim/openflow_thread.py b/osm_openvim/openflow_thread.py index 2f39fab..a26b48e 100644 --- a/osm_openvim/openflow_thread.py +++ b/osm_openvim/openflow_thread.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- ## -# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U. # This file is part of openvim # All Rights Reserved. # @@ -93,21 +93,30 @@ class openflow_thread(threading.Thread): """ This thread interacts with a openflow controller to create dataplane connections """ - def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'): + def __init__(self, of_uuid, of_connector, db, of_test, pmp_with_same_vlan=False, logger_name=None, + debug=None): threading.Thread.__init__(self) self.of_uuid = of_uuid self.db = db self.pmp_with_same_vlan = pmp_with_same_vlan - self.name = "openflow" self.test = of_test - self.db_lock = db_lock self.OF_connector = of_connector - self.logger = logging.getLogger('vim.OF-' + of_uuid) - self.logger.setLevel(getattr(logging, debug)) - self.logger.name = of_connector.name + " " + self.OF_connector.dpid + if logger_name: + self.logger_name = logger_name + else: + self.logger_name = "openvim.ofc." + of_uuid + self.logger = logging.getLogger(self.logger_name) + if debug: + self.logger.setLevel(getattr(logging, debug)) self.queueLock = threading.Lock() self.taskQueue = Queue.Queue(2000) - + + @staticmethod + def _format_error_msg(error_text, max_length=1024): + if error_text and len(error_text) >= max_length: + return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:] + return error_text + def insert_task(self, task, *aditional): try: self.queueLock.acquire() @@ -115,7 +124,7 @@ class openflow_thread(threading.Thread): self.queueLock.release() return 1, None except Queue.Full: - return -1, "timeout inserting a task over openflow thread " + self.name + return -1, "timeout inserting a task over openflow thread " + self.of_uuid def run(self): self.logger.debug("Start openflow thread") @@ -135,19 +144,17 @@ class openflow_thread(threading.Thread): continue if task[0] == 'update-net': - r,c = self.update_of_flows(task[1]) + r, c = self.update_of_flows(task[1]) # update database status if r<0: - UPDATE={'status':'ERROR', 'last_error': str(c)} + UPDATE={'status':'ERROR', 'last_error': self._format_error_msg(str(c), 255)} self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c) self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1])) else: UPDATE={'status':'ACTIVE', 'last_error': None} self.logger.debug("processing task 'update-net' %s: OK", str(task[1])) self.set_openflow_controller_status(OFC_STATUS_ACTIVE) - self.db_lock.acquire() self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]}) - self.db_lock.release() elif task[0] == 'clear-all': r,c = self.clear_all_flows() @@ -176,7 +183,6 @@ class openflow_thread(threading.Thread): def update_of_flows(self, net_id): ports=() - self.db_lock.acquire() select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid') result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} ) #get all the networks binding to this @@ -189,7 +195,6 @@ class openflow_thread(threading.Thread): result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} ) - self.db_lock.release() if result < 0: return -1, "DB error getting net: " + nets #elif result==0: @@ -201,12 +206,10 @@ class openflow_thread(threading.Thread): if net['admin_state_up'] == 'false': net['ports'] = () else: - self.db_lock.acquire() nb_ports, net_ports = self.db.get_table( FROM='ports', SELECT=('switch_port','vlan','uuid','mac','type','model'), WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} ) - self.db_lock.release() if nb_ports < 0: #print self.name, ": update_of_flows() ERROR getting ports", ports @@ -228,18 +231,14 @@ class openflow_thread(threading.Thread): ifaces_nb += nb_ports # Get the name of flows that will be affected by this NET - self.db_lock.acquire() result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id}) - self.db_lock.release() if result < 0: error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows) # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows return -1, error_msg database_flows += database_net_flows # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null) - self.db_lock.acquire() result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None}) - self.db_lock.release() if result < 0: error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows) # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows @@ -345,9 +344,7 @@ class openflow_thread(threading.Thread): except FlowBadFormat as e: # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow return -1, str(e) - self.db_lock.acquire() result, content = self.db.new_row('of_flows', flow) - self.db_lock.release() if result < 0: # print self.name, ": Error '%s' at database insertion" % content, flow return -1, content @@ -374,9 +371,7 @@ class openflow_thread(threading.Thread): continue # delete from database - self.db_lock.acquire() result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id']) - self.db_lock.release() if result<0: self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content ) @@ -388,9 +383,7 @@ class openflow_thread(threading.Thread): self.OF_connector.clear_all_flows() # remove from database - self.db_lock.acquire() self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines - self.db_lock.release() return 0, None except openflow_conn.OpenflowconnException as e: return -1, self.logger.error("Error deleting all flows {}", str(e)) @@ -584,10 +577,8 @@ class openflow_thread(threading.Thread): ofc = {} ofc['status'] = status - ofc['last_error'] = error_text - self.db_lock.acquire() + ofc['last_error'] = self._format_error_msg(error_text, 255) result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False) - self.db_lock.release() if result >= 0: return True else: