2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow floodligth controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
43 def change_of2db(flow
):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
49 if type(flow
)!=dict or "actions" not in flow
:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
52 for action
in flow
['actions']:
53 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
54 flow
['actions'] = ",".join(action_str_list
)
56 raise FlowBadFormat("Unexpected format at 'actions'")
58 def change_db2of(flow
):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
65 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list
= flow
['actions'].split(",")
68 for action_item
in action_list
:
69 action_tuple
= action_item
.split("=")
70 if len(action_tuple
) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple
[0].strip().lower()=="vlan":
73 if action_tuple
[1].strip().lower() in ("none", "strip"):
74 actions
.append( ("vlan",None) )
77 actions
.append( ("vlan", int(action_tuple
[1])) )
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple
[0].strip().lower()=="out":
81 actions
.append( ("out", str(action_tuple
[1])) )
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
84 flow
['actions'] = actions
88 class of_test_connector():
89 '''This is a fake openflow connector for testing.
90 It does nothing and it is used for running openvim without an openflow controller
92 def __init__(self
, params
):
93 self
.name
= "ofc_test"
95 self
.logger
= logging
.getLogger('vim.OF.TEST')
96 self
.logger
.setLevel( getattr(logging
, params
.get("of_debug", "ERROR") ) )
97 def get_of_switches(self
):
99 def obtain_port_correspondence(self
):
101 def del_flow(self
, flow_name
):
102 if flow_name
in self
.rules
:
103 self
.logger
.debug("del_flow OK")
104 del self
.rules
[flow_name
]
107 self
.logger
.warning("del_flow not found")
108 return -1, "flow %s not found"
109 def new_flow(self
, data
):
110 self
.rules
[ data
["name"] ] = data
111 self
.logger
.debug("new_flow OK")
113 def get_of_rules(self
, translate_of_ports
=True):
116 def clear_all_flows(self
):
117 self
.logger
.debug("clear_all_flows OK")
123 class openflow_thread(threading
.Thread
):
124 def __init__(self
, OF_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
, debug
='ERROR'):
125 threading
.Thread
.__init
__(self
)
128 self
.pmp_with_same_vlan
= pmp_with_same_vlan
129 self
.name
= "openflow"
131 self
.db_lock
= db_lock
132 self
.OF_connector
= OF_connector
133 self
.logger
= logging
.getLogger('vim.OF')
134 self
.logger
.setLevel( getattr(logging
, debug
) )
136 self
.queueLock
= threading
.Lock()
137 self
.taskQueue
= Queue
.Queue(2000)
139 def insert_task(self
, task
, *aditional
):
141 self
.queueLock
.acquire()
142 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
143 self
.queueLock
.release()
146 return -1, "timeout inserting a task over openflow thread " + self
.name
150 self
.queueLock
.acquire()
151 if not self
.taskQueue
.empty():
152 task
= self
.taskQueue
.get()
155 self
.queueLock
.release()
161 if task
[0] == 'update-net':
162 r
,c
= self
.update_of_flows(task
[1])
163 #update database status
164 self
.db_lock
.acquire()
166 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
167 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
169 UPDATE
={'status':'ACTIVE', 'last_error': None}
170 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
171 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid':task
[1]})
172 self
.db_lock
.release()
174 elif task
[0] == 'clear-all':
175 r
,c
= self
.clear_all_flows()
177 self
.logger
.error("processing task 'clear-all': %s", c
)
179 self
.logger
.debug("processing task 'clear-all': OK")
180 elif task
[0] == 'exit':
181 self
.logger
.debug("exit from openflow_thread")
185 self
.logger
.error("unknown task %s", str(task
))
189 #print self.name, ": exit from openflow_thread"
191 def update_of_flows(self
, net_id
):
193 self
.db_lock
.acquire()
194 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
195 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
196 #get all the networks binding to this
198 if nets
[0]['bind_net']:
199 bind_id
= nets
[0]['bind_net']
202 #get our net and all bind_nets
203 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
204 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
206 self
.db_lock
.release()
208 return -1, "DB error getting net: " + nets
210 #net has been deleted
215 if net
['admin_state_up'] == 'false':
218 self
.db_lock
.acquire()
219 nb_ports
, net_ports
= self
.db
.get_table(
221 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
222 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
223 self
.db_lock
.release()
225 #print self.name, ": update_of_flows() ERROR getting ports", ports
226 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
228 #add the binding as an external port
229 if net
['provider'] and net
['provider'][:9]=="openflow:":
230 external_port
={"type":"external","mac":None}
231 external_port
['uuid'] = net_id
+ ".1" #fake uuid
232 if net
['provider'][-5:]==":vlan":
233 external_port
["vlan"] = net
["vlan"]
234 external_port
["switch_port"] = net
['provider'][9:-5]
236 external_port
["vlan"] = None
237 external_port
["switch_port"] = net
['provider'][9:]
238 net_ports
= net_ports
+ (external_port
,)
240 net
['ports'] = net_ports
241 ifaces_nb
+= nb_ports
243 # Get the name of flows that will be affected by this NET
244 self
.db_lock
.acquire()
245 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
246 self
.db_lock
.release()
248 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
249 return -1, "DB error getting flows from net '%s': %s" %(net_id
, database_net_flows
)
250 database_flows
+= database_net_flows
251 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
252 self
.db_lock
.acquire()
253 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
254 self
.db_lock
.release()
256 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
257 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
258 database_flows
+= database_net_flows
260 #Get the existing flows at openflow controller
261 result
, of_flows
= self
.OF_connector
.get_of_rules()
263 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
264 return -1, "OF error getting flows: " + of_flows
268 elif net
['type'] == 'ptp':
270 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
271 # str(ifaces_nb)+' interfaces.'
272 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
273 elif net
['type'] == 'data':
274 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
275 # check all ports are VLAN (tagged) or none
278 if port
["type"]=="external":
279 if port
["vlan"] != None:
280 if port
["vlan"]!=net
["vlan"]:
281 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
282 #print self.name, "Error", text
286 elif vlan_tag
==False:
287 text
="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
288 #print self.name, "Error", text
293 elif vlan_tag
== True:
294 text
="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
295 #print self.name, "Error", text
297 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
301 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
302 #print self.name, "Error", text
304 elif port
["model"] == "VF":
307 elif vlan_tag
==False:
308 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
309 #print self.name, "Error", text
312 return -1, 'Only ptp and data networks are supported for openflow'
314 # calculate new flows to be inserted
315 result
, new_flows
= self
._compute
_net
_flows
(nets
)
317 return result
, new_flows
319 #modify database flows format and get the used names
321 for flow
in database_flows
:
324 except FlowBadFormat
as e
:
325 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
327 used_names
.append(flow
['name'])
329 #insert at database the new flows, change actions to human text
330 for flow
in new_flows
:
331 #1 check if an equal flow is already present
332 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
334 database_flows
[index
]["not delete"]=True
335 self
.logger
.debug("Skipping already present flow %s", str(flow
))
337 #2 look for a non used name
338 flow_name
=flow
["net_id"]+"."+str(name_index
)
339 while flow_name
in used_names
or flow_name
in of_flows
:
341 flow_name
=flow
["net_id"]+"."+str(name_index
)
342 used_names
.append(flow_name
)
343 flow
['name'] = flow_name
344 #3 insert at openflow
345 result
, content
= self
.OF_connector
.new_flow(flow
)
347 #print self.name, ": Error '%s' at flow insertion" % c, flow
349 #4 insert at database
352 except FlowBadFormat
as e
:
353 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
355 self
.db_lock
.acquire()
356 result
, content
= self
.db
.new_row('of_flows', flow
)
357 self
.db_lock
.release()
359 #print self.name, ": Error '%s' at database insertion" % content, flow
362 #delete not needed old flows from openflow and from DDBB,
363 #check that the needed flows at DDBB are present in controller or insert them otherwise
364 for flow
in database_flows
:
365 if "not delete" in flow
:
366 if flow
["name"] not in of_flows
:
367 #not in controller, insert it
368 result
, content
= self
.OF_connector
.new_flow(flow
)
370 #print self.name, ": Error '%s' at flow insertion" % c, flow
374 if flow
["name"] in of_flows
:
375 result
, content
= self
.OF_connector
.del_flow(flow
['name'])
377 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], content
)
378 continue #skip deletion from database
379 #delete from database
380 self
.db_lock
.acquire()
381 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
382 self
.db_lock
.release()
384 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
388 def clear_all_flows(self
):
391 self
.OF_connector
.clear_all_flows()
392 #remove from database
393 self
.db_lock
.acquire()
394 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
395 self
.db_lock
.release()
397 except requests
.exceptions
.RequestException
as e
:
398 #print self.name, ": clear_all_flows Exception:", str(e)
401 flow_fields
=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
402 def _check_flow_already_present(self
, new_flow
, flow_list
):
403 '''check if the same flow is already present in the flow list
404 The flow is repeated if all the fields, apart from name, are equal
405 Return the index of matching flow, -1 if not match'''
407 for flow
in flow_list
:
409 for f
in self
.flow_fields
:
410 if flow
.get(f
) != new_flow
.get(f
):
418 def _compute_net_flows(self
, nets
):
420 new_broadcast_flows
={}
423 # Check switch_port information is right
424 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
426 for port
in net
['ports']:
428 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
429 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
430 #print self.name, ": ERROR " + error_text
431 return -1, error_text
434 net_id
= net_src
["uuid"]
438 if net_src
== net_dst
:
441 elif net_src
['bind_net'] == net_dst
['uuid']:
442 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
443 vlan_net_out
= int(net_src
['bind_type'][5:])
445 elif net_dst
['bind_net'] == net_src
['uuid']:
446 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
447 vlan_net_in
= int(net_dst
['bind_type'][5:])
452 for src_port
in net_src
['ports']:
453 vlan_in
= vlan_net_in
454 if vlan_in
== None and src_port
['vlan'] != None:
455 vlan_in
= src_port
['vlan']
456 elif vlan_in
!= None and src_port
['vlan'] != None:
457 #TODO this is something that we can not do. It requires a double VLAN check
458 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
462 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
463 if broadcast_key
in new_broadcast_flows
:
464 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
466 flow_broadcast
= {'priority': priority
,
468 'dst_mac': 'ff:ff:ff:ff:ff:ff',
469 "ingress_port": str(src_port
['switch_port']),
472 new_broadcast_flows
[broadcast_key
] = flow_broadcast
473 if vlan_in
is not None:
474 flow_broadcast
['vlan_id'] = str(vlan_in
)
476 for dst_port
in net_dst
['ports']:
477 vlan_out
= vlan_net_out
478 if vlan_out
== None and dst_port
['vlan'] != None:
479 vlan_out
= dst_port
['vlan']
480 elif vlan_out
!= None and dst_port
['vlan'] != None:
481 #TODO this is something that we can not do. It requires a double VLAN set
482 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
484 #if src_port == dst_port:
486 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
489 "priority": priority
,
491 "ingress_port": str(src_port
['switch_port']),
494 if vlan_in
is not None:
495 flow
['vlan_id'] = str(vlan_in
)
496 # allow that one port have no mac
497 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
498 flow
['priority'] = priority
-5 # less priority
500 flow
['dst_mac'] = str(dst_port
['mac'])
504 flow
['actions'].append( ('vlan',None) )
506 flow
['actions'].append( ('vlan', vlan_out
) )
507 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
509 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
510 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
513 new_flows
.append(flow
)
516 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
518 out
= (vlan_out
, str(dst_port
['switch_port']))
519 if out
not in flow_broadcast
['actions']:
520 flow_broadcast
['actions'].append( out
)
523 for flow_broadcast
in new_broadcast_flows
.values():
524 if len(flow_broadcast
['actions'])==0:
525 continue #nothing to do, skip
526 flow_broadcast
['actions'].sort()
527 if 'vlan_id' in flow_broadcast
:
528 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
533 for action
in flow_broadcast
['actions']:
534 if action
[0] != previous_vlan
:
535 final_actions
.append( ('vlan', action
[0]) )
536 previous_vlan
= action
[0]
537 if self
.pmp_with_same_vlan
and action_number
:
538 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
540 final_actions
.append( ('out', action
[1]) )
541 flow_broadcast
['actions'] = final_actions
543 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
544 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
547 new_flows
.append(flow_broadcast
)
549 #UNIFY openflow rules with the same input port and vlan and the same output actions
550 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
551 #this can happen if there is only two ports. It is converted to a point to point connection
552 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
553 for flow
in new_flows
:
554 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
556 flow_dict
[key
].append(flow
)
558 flow_dict
[key
]=[ flow
]
560 for flow_list
in flow_dict
.values():
562 if len (flow_list
)>=2:
565 if f
['actions'] != flow_list
[0]['actions']:
568 if convert2ptp
: # add only one unified rule without dst_mac
569 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
570 flow_list
[0].pop('dst_mac')
571 flow_list
[0]["priority"] -= 5
572 new_flows2
.append(flow_list
[0])
573 else: # add all the rules
574 new_flows2
+= flow_list