# -*- coding: utf-8 -*-
##
-# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
+# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
# This file is part of openvim
# All Rights Reserved.
#
"""
This thread interacts with a openflow controller to create dataplane connections
"""
- def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, logger_name=None,
+ def __init__(self, of_uuid, of_connector, db, of_test, pmp_with_same_vlan=False, logger_name=None,
debug=None):
threading.Thread.__init__(self)
self.of_uuid = of_uuid
self.db = db
self.pmp_with_same_vlan = pmp_with_same_vlan
self.test = of_test
- self.db_lock = db_lock
self.OF_connector = of_connector
if logger_name:
self.logger_name = logger_name
self.logger.setLevel(getattr(logging, debug))
self.queueLock = threading.Lock()
self.taskQueue = Queue.Queue(2000)
-
+
+ @staticmethod
+ def _format_error_msg(error_text, max_length=1024):
+ if error_text and len(error_text) >= max_length:
+ return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
+ return error_text
+
def insert_task(self, task, *aditional):
try:
self.queueLock.acquire()
continue
if task[0] == 'update-net':
- r,c = self.update_of_flows(task[1])
+ r, c = self.update_of_flows(task[1])
# update database status
if r<0:
- UPDATE={'status':'ERROR', 'last_error': str(c)}
+ UPDATE={'status':'ERROR', 'last_error': self._format_error_msg(str(c), 255)}
self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
else:
UPDATE={'status':'ACTIVE', 'last_error': None}
self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
- self.db_lock.acquire()
self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
- self.db_lock.release()
elif task[0] == 'clear-all':
r,c = self.clear_all_flows()
def update_of_flows(self, net_id):
ports=()
- self.db_lock.acquire()
select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
#get all the networks binding to this
result, nets = self.db.get_table(FROM='nets', SELECT=select_,
WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
- self.db_lock.release()
if result < 0:
return -1, "DB error getting net: " + nets
#elif result==0:
if net['admin_state_up'] == 'false':
net['ports'] = ()
else:
- self.db_lock.acquire()
nb_ports, net_ports = self.db.get_table(
FROM='ports',
SELECT=('switch_port','vlan','uuid','mac','type','model'),
WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
- self.db_lock.release()
if nb_ports < 0:
#print self.name, ": update_of_flows() ERROR getting ports", ports
ifaces_nb += nb_ports
# Get the name of flows that will be affected by this NET
- self.db_lock.acquire()
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
- self.db_lock.release()
if result < 0:
error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
# print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
return -1, error_msg
database_flows += database_net_flows
# Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
- self.db_lock.acquire()
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
- self.db_lock.release()
if result < 0:
error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
# print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
except FlowBadFormat as e:
# print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
return -1, str(e)
- self.db_lock.acquire()
result, content = self.db.new_row('of_flows', flow)
- self.db_lock.release()
if result < 0:
# print self.name, ": Error '%s' at database insertion" % content, flow
return -1, content
continue
# delete from database
- self.db_lock.acquire()
result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
- self.db_lock.release()
if result<0:
self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
self.OF_connector.clear_all_flows()
# remove from database
- self.db_lock.acquire()
self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
- self.db_lock.release()
return 0, None
except openflow_conn.OpenflowconnException as e:
return -1, self.logger.error("Error deleting all flows {}", str(e))
ofc = {}
ofc['status'] = status
- ofc['last_error'] = error_text
- self.db_lock.acquire()
+ ofc['last_error'] = self._format_error_msg(error_text, 255)
result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
- self.db_lock.release()
if result >= 0:
return True
else: