2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
41 OFC_STATUS_ACTIVE
= 'ACTIVE'
42 OFC_STATUS_INACTIVE
= 'INACTIVE'
43 OFC_STATUS_ERROR
= 'ERROR'
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
48 def change_of2db(flow
):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
54 if type(flow
)!=dict or "actions" not in flow
:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
57 for action
in flow
['actions']:
58 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
59 flow
['actions'] = ",".join(action_str_list
)
61 raise FlowBadFormat("Unexpected format at 'actions'")
63 def change_db2of(flow
):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
70 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list
= flow
['actions'].split(",")
73 for action_item
in action_list
:
74 action_tuple
= action_item
.split("=")
75 if len(action_tuple
) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple
[0].strip().lower()=="vlan":
78 if action_tuple
[1].strip().lower() in ("none", "strip"):
79 actions
.append( ("vlan",None) )
82 actions
.append( ("vlan", int(action_tuple
[1])) )
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple
[0].strip().lower()=="out":
86 actions
.append( ("out", str(action_tuple
[1])) )
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
89 flow
['actions'] = actions
92 class openflow_thread(threading
.Thread
):
94 This thread interacts with a openflow controller to create dataplane connections
96 def __init__(self
, of_uuid
, of_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, logger_name
=None,
98 threading
.Thread
.__init
__(self
)
99 self
.of_uuid
= of_uuid
101 self
.pmp_with_same_vlan
= pmp_with_same_vlan
103 self
.db_lock
= db_lock
104 self
.OF_connector
= of_connector
106 self
.logger_name
= logger_name
108 self
.logger_name
= "openvim.ofc." + of_uuid
109 self
.logger
= logging
.getLogger(self
.logger_name
)
111 self
.logger
.setLevel(getattr(logging
, debug
))
112 self
.queueLock
= threading
.Lock()
113 self
.taskQueue
= Queue
.Queue(2000)
115 def insert_task(self
, task
, *aditional
):
117 self
.queueLock
.acquire()
118 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
119 self
.queueLock
.release()
122 return -1, "timeout inserting a task over openflow thread " + self
.of_uuid
125 self
.logger
.debug("Start openflow thread")
126 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
130 self
.queueLock
.acquire()
131 if not self
.taskQueue
.empty():
132 task
= self
.taskQueue
.get()
135 self
.queueLock
.release()
141 if task
[0] == 'update-net':
142 r
,c
= self
.update_of_flows(task
[1])
143 # update database status
145 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
146 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
147 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error updating net {}".format(task
[1]))
149 UPDATE
={'status':'ACTIVE', 'last_error': None}
150 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
151 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
152 self
.db_lock
.acquire()
153 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid': task
[1]})
154 self
.db_lock
.release()
156 elif task
[0] == 'clear-all':
157 r
,c
= self
.clear_all_flows()
159 self
.logger
.error("processing task 'clear-all': %s", c
)
160 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error deleting all flows")
162 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
163 self
.logger
.debug("processing task 'clear-all': OK")
164 elif task
[0] == 'exit':
165 self
.logger
.debug("exit from openflow_thread")
167 self
.set_openflow_controller_status(OFC_STATUS_INACTIVE
, "Ofc with thread killed")
170 self
.logger
.error("unknown task %s", str(task
))
171 except openflow_conn
.OpenflowconnException
as e
:
172 self
.logger
.error("OpenflowconnException: " + str(e
))
173 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, str(e
))
174 except Exception as e
:
175 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
179 # print self.name, ": exit from openflow_thread"
181 def update_of_flows(self
, net_id
):
183 self
.db_lock
.acquire()
184 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
185 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
186 #get all the networks binding to this
188 if nets
[0]['bind_net']:
189 bind_id
= nets
[0]['bind_net']
192 #get our net and all bind_nets
193 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
194 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
196 self
.db_lock
.release()
198 return -1, "DB error getting net: " + nets
200 #net has been deleted
205 if net
['admin_state_up'] == 'false':
208 self
.db_lock
.acquire()
209 nb_ports
, net_ports
= self
.db
.get_table(
211 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
212 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
213 self
.db_lock
.release()
216 #print self.name, ": update_of_flows() ERROR getting ports", ports
217 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
219 #add the binding as an external port
220 if net
['provider'] and net
['provider'][:9]=="openflow:":
221 external_port
={"type":"external","mac":None}
222 external_port
['uuid'] = net_id
+ ".1" #fake uuid
223 if net
['provider'][-5:]==":vlan":
224 external_port
["vlan"] = net
["vlan"]
225 external_port
["switch_port"] = net
['provider'][9:-5]
227 external_port
["vlan"] = None
228 external_port
["switch_port"] = net
['provider'][9:]
229 net_ports
= net_ports
+ (external_port
,)
231 net
['ports'] = net_ports
232 ifaces_nb
+= nb_ports
234 # Get the name of flows that will be affected by this NET
235 self
.db_lock
.acquire()
236 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
237 self
.db_lock
.release()
239 error_msg
= "DB error getting flows from net '{}': {}".format(net_id
, database_net_flows
)
240 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
242 database_flows
+= database_net_flows
243 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
244 self
.db_lock
.acquire()
245 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
246 self
.db_lock
.release()
248 error_msg
= "DB error getting flows from net 'null': {}".format(database_net_flows
)
249 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
251 database_flows
+= database_net_flows
253 # Get the existing flows at openflow controller
255 of_flows
= self
.OF_connector
.get_of_rules()
256 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
257 except openflow_conn
.OpenflowconnException
as e
:
258 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
259 return -1, "OF error {} getting flows".format(str(e
))
263 elif net
['type'] == 'ptp':
265 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
266 # str(ifaces_nb)+' interfaces.'
267 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
268 elif net
['type'] == 'data':
269 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
270 # check all ports are VLAN (tagged) or none
273 if port
["type"]=="external":
274 if port
["vlan"] != None:
275 if port
["vlan"]!=net
["vlan"]:
276 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
277 #print self.name, "Error", text
281 elif vlan_tag
==False:
282 text
="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
288 elif vlan_tag
== True:
289 text
="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
292 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
296 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
297 #print self.name, "Error", text
299 elif port
["model"] == "VF":
302 elif vlan_tag
==False:
303 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
304 #print self.name, "Error", text
307 return -1, 'Only ptp and data networks are supported for openflow'
309 # calculate new flows to be inserted
310 result
, new_flows
= self
._compute
_net
_flows
(nets
)
312 return result
, new_flows
314 #modify database flows format and get the used names
316 for flow
in database_flows
:
319 except FlowBadFormat
as e
:
320 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
322 used_names
.append(flow
['name'])
324 # insert at database the new flows, change actions to human text
325 for flow
in new_flows
:
326 # 1 check if an equal flow is already present
327 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
329 database_flows
[index
]["not delete"]=True
330 self
.logger
.debug("Skipping already present flow %s", str(flow
))
332 # 2 look for a non used name
333 flow_name
=flow
["net_id"]+"."+str(name_index
)
334 while flow_name
in used_names
or flow_name
in of_flows
:
336 flow_name
=flow
["net_id"]+"."+str(name_index
)
337 used_names
.append(flow_name
)
338 flow
['name'] = flow_name
339 # 3 insert at openflow
342 self
.OF_connector
.new_flow(flow
)
343 except openflow_conn
.OpenflowconnException
as e
:
344 return -1, "Error creating new flow {}".format(str(e
))
346 # 4 insert at database
349 except FlowBadFormat
as e
:
350 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
352 self
.db_lock
.acquire()
353 result
, content
= self
.db
.new_row('of_flows', flow
)
354 self
.db_lock
.release()
356 # print self.name, ": Error '%s' at database insertion" % content, flow
359 #delete not needed old flows from openflow and from DDBB,
360 #check that the needed flows at DDBB are present in controller or insert them otherwise
361 for flow
in database_flows
:
362 if "not delete" in flow
:
363 if flow
["name"] not in of_flows
:
364 # not in controller, insert it
366 self
.OF_connector
.new_flow(flow
)
367 except openflow_conn
.OpenflowconnException
as e
:
368 return -1, "Error creating new flow {}".format(str(e
))
372 if flow
["name"] in of_flows
:
374 self
.OF_connector
.del_flow(flow
['name'])
375 except openflow_conn
.OpenflowconnException
as e
:
376 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], str(e
))
377 # skip deletion from database
380 # delete from database
381 self
.db_lock
.acquire()
382 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
383 self
.db_lock
.release()
385 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
389 def clear_all_flows(self
):
392 self
.OF_connector
.clear_all_flows()
394 # remove from database
395 self
.db_lock
.acquire()
396 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
397 self
.db_lock
.release()
399 except openflow_conn
.OpenflowconnException
as e
:
400 return -1, self
.logger
.error("Error deleting all flows {}", str(e
))
402 flow_fields
= ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
404 def _check_flow_already_present(self
, new_flow
, flow_list
):
405 '''check if the same flow is already present in the flow list
406 The flow is repeated if all the fields, apart from name, are equal
407 Return the index of matching flow, -1 if not match'''
409 for flow
in flow_list
:
411 for f
in self
.flow_fields
:
412 if flow
.get(f
) != new_flow
.get(f
):
420 def _compute_net_flows(self
, nets
):
422 new_broadcast_flows
={}
425 # Check switch_port information is right
426 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
428 for port
in net
['ports']:
430 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
431 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
432 # print self.name, ": ERROR " + error_text
433 return -1, error_text
436 net_id
= net_src
["uuid"]
440 if net_src
== net_dst
:
443 elif net_src
['bind_net'] == net_dst
['uuid']:
444 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
445 vlan_net_out
= int(net_src
['bind_type'][5:])
447 elif net_dst
['bind_net'] == net_src
['uuid']:
448 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
449 vlan_net_in
= int(net_dst
['bind_type'][5:])
454 for src_port
in net_src
['ports']:
455 vlan_in
= vlan_net_in
456 if vlan_in
== None and src_port
['vlan'] != None:
457 vlan_in
= src_port
['vlan']
458 elif vlan_in
!= None and src_port
['vlan'] != None:
459 #TODO this is something that we cannot do. It requires a double VLAN check
460 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
464 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
465 if broadcast_key
in new_broadcast_flows
:
466 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
468 flow_broadcast
= {'priority': priority
,
470 'dst_mac': 'ff:ff:ff:ff:ff:ff',
471 "ingress_port": str(src_port
['switch_port']),
474 new_broadcast_flows
[broadcast_key
] = flow_broadcast
475 if vlan_in
is not None:
476 flow_broadcast
['vlan_id'] = str(vlan_in
)
478 for dst_port
in net_dst
['ports']:
479 vlan_out
= vlan_net_out
480 if vlan_out
== None and dst_port
['vlan'] != None:
481 vlan_out
= dst_port
['vlan']
482 elif vlan_out
!= None and dst_port
['vlan'] != None:
483 #TODO this is something that we cannot do. It requires a double VLAN set
484 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
486 #if src_port == dst_port:
488 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
491 "priority": priority
,
493 "ingress_port": str(src_port
['switch_port']),
496 if vlan_in
is not None:
497 flow
['vlan_id'] = str(vlan_in
)
498 # allow that one port have no mac
499 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
500 flow
['priority'] = priority
-5 # less priority
502 flow
['dst_mac'] = str(dst_port
['mac'])
506 flow
['actions'].append( ('vlan',None) )
508 flow
['actions'].append( ('vlan', vlan_out
) )
509 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
511 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
512 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
515 new_flows
.append(flow
)
518 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
520 out
= (vlan_out
, str(dst_port
['switch_port']))
521 if out
not in flow_broadcast
['actions']:
522 flow_broadcast
['actions'].append( out
)
525 for flow_broadcast
in new_broadcast_flows
.values():
526 if len(flow_broadcast
['actions'])==0:
527 continue #nothing to do, skip
528 flow_broadcast
['actions'].sort()
529 if 'vlan_id' in flow_broadcast
:
530 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
535 for action
in flow_broadcast
['actions']:
536 if action
[0] != previous_vlan
:
537 final_actions
.append( ('vlan', action
[0]) )
538 previous_vlan
= action
[0]
539 if self
.pmp_with_same_vlan
and action_number
:
540 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
542 final_actions
.append( ('out', action
[1]) )
543 flow_broadcast
['actions'] = final_actions
545 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
546 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
549 new_flows
.append(flow_broadcast
)
551 #UNIFY openflow rules with the same input port and vlan and the same output actions
552 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
553 #this can happen if there is only two ports. It is converted to a point to point connection
554 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
555 for flow
in new_flows
:
556 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
558 flow_dict
[key
].append(flow
)
560 flow_dict
[key
]=[ flow
]
562 for flow_list
in flow_dict
.values():
564 if len (flow_list
)>=2:
567 if f
['actions'] != flow_list
[0]['actions']:
570 if convert2ptp
: # add only one unified rule without dst_mac
571 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
572 flow_list
[0].pop('dst_mac')
573 flow_list
[0]["priority"] -= 5
574 new_flows2
.append(flow_list
[0])
575 else: # add all the rules
576 new_flows2
+= flow_list
579 def set_openflow_controller_status(self
, status
, error_text
=None):
581 Set openflow controller last operation status in DB
582 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
583 :param error_text: error text
586 if self
.of_uuid
== "Default":
590 ofc
['status'] = status
591 ofc
['last_error'] = error_text
592 self
.db_lock
.acquire()
593 result
, content
= self
.db
.update_rows('ofcs', ofc
, WHERE
={'uuid': self
.of_uuid
}, log
=False)
594 self
.db_lock
.release()