2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow floodligth controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
43 def change_of2db(flow
):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
49 if type(flow
)!=dict or "actions" not in flow
:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
52 for action
in flow
['actions']:
53 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
54 flow
['actions'] = ",".join(action_str_list
)
56 raise FlowBadFormat("Unexpected format at 'actions'")
58 def change_db2of(flow
):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
65 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list
= flow
['actions'].split(",")
68 for action_item
in action_list
:
69 action_tuple
= action_item
.split("=")
70 if len(action_tuple
) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple
[0].strip().lower()=="vlan":
73 if action_tuple
[1].strip().lower() in ("none", "strip"):
74 actions
.append( ("vlan",None) )
77 actions
.append( ("vlan", int(action_tuple
[1])) )
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple
[0].strip().lower()=="out":
81 actions
.append( ("out", str(action_tuple
[1])) )
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
84 flow
['actions'] = actions
87 class of_test_connector():
88 '''This is a fake openflow connector for testing.
89 It does nothing and it is used for running openvim without an openflow controller
91 def __init__(self
, params
):
92 name
= params
.get("name", "test-ofc")
94 self
.dpid
= params
.get("dpid")
96 self
.logger
= logging
.getLogger('vim.OF.TEST')
97 self
.logger
.setLevel(getattr(logging
, params
.get("of_debug", "ERROR")))
99 def get_of_switches(self
):
101 def obtain_port_correspondence(self
):
103 def del_flow(self
, flow_name
):
104 if flow_name
in self
.rules
:
105 self
.logger
.debug("del_flow OK")
106 del self
.rules
[flow_name
]
109 self
.logger
.warning("del_flow not found")
110 return -1, "flow %s not found"
111 def new_flow(self
, data
):
112 self
.rules
[ data
["name"] ] = data
113 self
.logger
.debug("new_flow OK")
115 def get_of_rules(self
, translate_of_ports
=True):
118 def clear_all_flows(self
):
119 self
.logger
.debug("clear_all_flows OK")
125 class openflow_thread(threading
.Thread
):
126 def __init__(self
, of_uuid
, OF_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, debug
='ERROR'):
127 threading
.Thread
.__init
__(self
)
128 self
.of_uuid
= of_uuid
130 self
.pmp_with_same_vlan
= pmp_with_same_vlan
131 self
.name
= "openflow"
133 self
.db_lock
= db_lock
134 self
.OF_connector
= OF_connector
135 self
.logger
= logging
.getLogger('vim.OF-' + of_uuid
)
136 self
.logger
.setLevel( getattr(logging
, debug
) )
137 self
.logger
.name
= OF_connector
.name
+ " " + self
.OF_connector
.dpid
138 self
.queueLock
= threading
.Lock()
139 self
.taskQueue
= Queue
.Queue(2000)
141 def insert_task(self
, task
, *aditional
):
143 self
.queueLock
.acquire()
144 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
145 self
.queueLock
.release()
148 return -1, "timeout inserting a task over openflow thread " + self
.name
151 self
.logger
.debug("Start openflow thread")
153 self
.queueLock
.acquire()
154 if not self
.taskQueue
.empty():
155 task
= self
.taskQueue
.get()
158 self
.queueLock
.release()
164 if task
[0] == 'update-net':
165 r
,c
= self
.update_of_flows(task
[1])
166 #update database status
167 self
.db_lock
.acquire()
169 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
170 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
172 UPDATE
={'status':'ACTIVE', 'last_error': None}
173 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
174 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid':task
[1]})
175 self
.db_lock
.release()
177 elif task
[0] == 'clear-all':
178 r
,c
= self
.clear_all_flows()
180 self
.logger
.error("processing task 'clear-all': %s", c
)
182 self
.logger
.debug("processing task 'clear-all': OK")
183 elif task
[0] == 'exit':
184 self
.logger
.debug("exit from openflow_thread")
188 self
.logger
.error("unknown task %s", str(task
))
192 #print self.name, ": exit from openflow_thread"
194 def update_of_flows(self
, net_id
):
196 self
.db_lock
.acquire()
197 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
198 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
199 #get all the networks binding to this
201 if nets
[0]['bind_net']:
202 bind_id
= nets
[0]['bind_net']
205 #get our net and all bind_nets
206 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
207 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
209 self
.db_lock
.release()
211 return -1, "DB error getting net: " + nets
213 #net has been deleted
218 if net
['admin_state_up'] == 'false':
221 self
.db_lock
.acquire()
222 nb_ports
, net_ports
= self
.db
.get_table(
224 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
225 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
226 self
.db_lock
.release()
228 #print self.name, ": update_of_flows() ERROR getting ports", ports
229 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
231 #add the binding as an external port
232 if net
['provider'] and net
['provider'][:9]=="openflow:":
233 external_port
={"type":"external","mac":None}
234 external_port
['uuid'] = net_id
+ ".1" #fake uuid
235 if net
['provider'][-5:]==":vlan":
236 external_port
["vlan"] = net
["vlan"]
237 external_port
["switch_port"] = net
['provider'][9:-5]
239 external_port
["vlan"] = None
240 external_port
["switch_port"] = net
['provider'][9:]
241 net_ports
= net_ports
+ (external_port
,)
243 net
['ports'] = net_ports
244 ifaces_nb
+= nb_ports
246 # Get the name of flows that will be affected by this NET
247 self
.db_lock
.acquire()
248 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
249 self
.db_lock
.release()
251 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
252 return -1, "DB error getting flows from net '%s': %s" %(net_id
, database_net_flows
)
253 database_flows
+= database_net_flows
254 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
255 self
.db_lock
.acquire()
256 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
257 self
.db_lock
.release()
259 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
260 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
261 database_flows
+= database_net_flows
263 #Get the existing flows at openflow controller
264 result
, of_flows
= self
.OF_connector
.get_of_rules()
266 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
267 return -1, "OF error getting flows: " + of_flows
271 elif net
['type'] == 'ptp':
273 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
274 # str(ifaces_nb)+' interfaces.'
275 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
276 elif net
['type'] == 'data':
277 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
278 # check all ports are VLAN (tagged) or none
281 if port
["type"]=="external":
282 if port
["vlan"] != None:
283 if port
["vlan"]!=net
["vlan"]:
284 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
285 #print self.name, "Error", text
289 elif vlan_tag
==False:
290 text
="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
291 #print self.name, "Error", text
296 elif vlan_tag
== True:
297 text
="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
298 #print self.name, "Error", text
300 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
304 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
305 #print self.name, "Error", text
307 elif port
["model"] == "VF":
310 elif vlan_tag
==False:
311 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
312 #print self.name, "Error", text
315 return -1, 'Only ptp and data networks are supported for openflow'
317 # calculate new flows to be inserted
318 result
, new_flows
= self
._compute
_net
_flows
(nets
)
320 return result
, new_flows
322 #modify database flows format and get the used names
324 for flow
in database_flows
:
327 except FlowBadFormat
as e
:
328 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
330 used_names
.append(flow
['name'])
332 #insert at database the new flows, change actions to human text
333 for flow
in new_flows
:
334 #1 check if an equal flow is already present
335 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
337 database_flows
[index
]["not delete"]=True
338 self
.logger
.debug("Skipping already present flow %s", str(flow
))
340 #2 look for a non used name
341 flow_name
=flow
["net_id"]+"."+str(name_index
)
342 while flow_name
in used_names
or flow_name
in of_flows
:
344 flow_name
=flow
["net_id"]+"."+str(name_index
)
345 used_names
.append(flow_name
)
346 flow
['name'] = flow_name
347 #3 insert at openflow
348 result
, content
= self
.OF_connector
.new_flow(flow
)
350 #print self.name, ": Error '%s' at flow insertion" % c, flow
352 #4 insert at database
355 except FlowBadFormat
as e
:
356 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
358 self
.db_lock
.acquire()
359 result
, content
= self
.db
.new_row('of_flows', flow
)
360 self
.db_lock
.release()
362 #print self.name, ": Error '%s' at database insertion" % content, flow
365 #delete not needed old flows from openflow and from DDBB,
366 #check that the needed flows at DDBB are present in controller or insert them otherwise
367 for flow
in database_flows
:
368 if "not delete" in flow
:
369 if flow
["name"] not in of_flows
:
370 #not in controller, insert it
371 result
, content
= self
.OF_connector
.new_flow(flow
)
373 #print self.name, ": Error '%s' at flow insertion" % c, flow
377 if flow
["name"] in of_flows
:
378 result
, content
= self
.OF_connector
.del_flow(flow
['name'])
380 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], content
)
381 continue #skip deletion from database
382 #delete from database
383 self
.db_lock
.acquire()
384 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
385 self
.db_lock
.release()
387 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
391 def clear_all_flows(self
):
394 self
.OF_connector
.clear_all_flows()
395 #remove from database
396 self
.db_lock
.acquire()
397 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
398 self
.db_lock
.release()
400 except requests
.exceptions
.RequestException
as e
:
401 #print self.name, ": clear_all_flows Exception:", str(e)
404 flow_fields
=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
405 def _check_flow_already_present(self
, new_flow
, flow_list
):
406 '''check if the same flow is already present in the flow list
407 The flow is repeated if all the fields, apart from name, are equal
408 Return the index of matching flow, -1 if not match'''
410 for flow
in flow_list
:
412 for f
in self
.flow_fields
:
413 if flow
.get(f
) != new_flow
.get(f
):
421 def _compute_net_flows(self
, nets
):
423 new_broadcast_flows
={}
426 # Check switch_port information is right
427 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
429 for port
in net
['ports']:
431 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
432 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
433 #print self.name, ": ERROR " + error_text
434 return -1, error_text
437 net_id
= net_src
["uuid"]
441 if net_src
== net_dst
:
444 elif net_src
['bind_net'] == net_dst
['uuid']:
445 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
446 vlan_net_out
= int(net_src
['bind_type'][5:])
448 elif net_dst
['bind_net'] == net_src
['uuid']:
449 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
450 vlan_net_in
= int(net_dst
['bind_type'][5:])
455 for src_port
in net_src
['ports']:
456 vlan_in
= vlan_net_in
457 if vlan_in
== None and src_port
['vlan'] != None:
458 vlan_in
= src_port
['vlan']
459 elif vlan_in
!= None and src_port
['vlan'] != None:
460 #TODO this is something that we can not do. It requires a double VLAN check
461 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
465 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
466 if broadcast_key
in new_broadcast_flows
:
467 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
469 flow_broadcast
= {'priority': priority
,
471 'dst_mac': 'ff:ff:ff:ff:ff:ff',
472 "ingress_port": str(src_port
['switch_port']),
475 new_broadcast_flows
[broadcast_key
] = flow_broadcast
476 if vlan_in
is not None:
477 flow_broadcast
['vlan_id'] = str(vlan_in
)
479 for dst_port
in net_dst
['ports']:
480 vlan_out
= vlan_net_out
481 if vlan_out
== None and dst_port
['vlan'] != None:
482 vlan_out
= dst_port
['vlan']
483 elif vlan_out
!= None and dst_port
['vlan'] != None:
484 #TODO this is something that we can not do. It requires a double VLAN set
485 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
487 #if src_port == dst_port:
489 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
492 "priority": priority
,
494 "ingress_port": str(src_port
['switch_port']),
497 if vlan_in
is not None:
498 flow
['vlan_id'] = str(vlan_in
)
499 # allow that one port have no mac
500 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
501 flow
['priority'] = priority
-5 # less priority
503 flow
['dst_mac'] = str(dst_port
['mac'])
507 flow
['actions'].append( ('vlan',None) )
509 flow
['actions'].append( ('vlan', vlan_out
) )
510 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
512 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
513 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
516 new_flows
.append(flow
)
519 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
521 out
= (vlan_out
, str(dst_port
['switch_port']))
522 if out
not in flow_broadcast
['actions']:
523 flow_broadcast
['actions'].append( out
)
526 for flow_broadcast
in new_broadcast_flows
.values():
527 if len(flow_broadcast
['actions'])==0:
528 continue #nothing to do, skip
529 flow_broadcast
['actions'].sort()
530 if 'vlan_id' in flow_broadcast
:
531 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
536 for action
in flow_broadcast
['actions']:
537 if action
[0] != previous_vlan
:
538 final_actions
.append( ('vlan', action
[0]) )
539 previous_vlan
= action
[0]
540 if self
.pmp_with_same_vlan
and action_number
:
541 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
543 final_actions
.append( ('out', action
[1]) )
544 flow_broadcast
['actions'] = final_actions
546 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
547 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
550 new_flows
.append(flow_broadcast
)
552 #UNIFY openflow rules with the same input port and vlan and the same output actions
553 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
554 #this can happen if there is only two ports. It is converted to a point to point connection
555 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
556 for flow
in new_flows
:
557 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
559 flow_dict
[key
].append(flow
)
561 flow_dict
[key
]=[ flow
]
563 for flow_list
in flow_dict
.values():
565 if len (flow_list
)>=2:
568 if f
['actions'] != flow_list
[0]['actions']:
571 if convert2ptp
: # add only one unified rule without dst_mac
572 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
573 flow_list
[0].pop('dst_mac')
574 flow_list
[0]["priority"] -= 5
575 new_flows2
.append(flow_list
[0])
576 else: # add all the rules
577 new_flows2
+= flow_list