2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow floodligth controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
43 def change_of2db(flow
):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
49 if type(flow
)!=dict or "actions" not in flow
:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
52 for action
in flow
['actions']:
53 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
54 flow
['actions'] = ",".join(action_str_list
)
56 raise FlowBadFormat("Unexpected format at 'actions'")
58 def change_db2of(flow
):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
65 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list
= flow
['actions'].split(",")
68 for action_item
in action_list
:
69 action_tuple
= action_item
.split("=")
70 if len(action_tuple
) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple
[0].strip().lower()=="vlan":
73 if action_tuple
[1].strip().lower() in ("none", "strip"):
74 actions
.append( ("vlan",None) )
77 actions
.append( ("vlan", int(action_tuple
[1])) )
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple
[0].strip().lower()=="out":
81 actions
.append( ("out", str(action_tuple
[1])) )
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
84 flow
['actions'] = actions
87 class of_test_connector():
88 '''This is a fake openflow connector for testing.
89 It does nothing and it is used for running openvim without an openflow controller
91 def __init__(self
, params
):
92 name
= params
.get("name", "test-ofc")
94 self
.dpid
= params
.get("dpid")
96 self
.logger
= logging
.getLogger('vim.OF.TEST')
97 self
.logger
.setLevel(getattr(logging
, params
.get("of_debug", "ERROR")))
100 def get_of_switches(self
):
103 def obtain_port_correspondence(self
):
106 def del_flow(self
, flow_name
):
107 if flow_name
in self
.rules
:
108 self
.logger
.debug("del_flow OK")
109 del self
.rules
[flow_name
]
112 self
.logger
.warning("del_flow not found")
113 return -1, "flow %s not found"
115 def new_flow(self
, data
):
116 self
.rules
[ data
["name"] ] = data
117 self
.logger
.debug("new_flow OK")
120 def get_of_rules(self
, translate_of_ports
=True):
123 def clear_all_flows(self
):
124 self
.logger
.debug("clear_all_flows OK")
130 class openflow_thread(threading
.Thread
):
131 def __init__(self
, of_uuid
, OF_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, debug
='ERROR'):
132 threading
.Thread
.__init
__(self
)
133 self
.of_uuid
= of_uuid
135 self
.pmp_with_same_vlan
= pmp_with_same_vlan
136 self
.name
= "openflow"
138 self
.db_lock
= db_lock
139 self
.OF_connector
= OF_connector
140 self
.logger
= logging
.getLogger('vim.OF-' + of_uuid
)
141 self
.logger
.setLevel( getattr(logging
, debug
) )
142 self
.logger
.name
= OF_connector
.name
+ " " + self
.OF_connector
.dpid
143 self
.queueLock
= threading
.Lock()
144 self
.taskQueue
= Queue
.Queue(2000)
146 def insert_task(self
, task
, *aditional
):
148 self
.queueLock
.acquire()
149 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
150 self
.queueLock
.release()
153 return -1, "timeout inserting a task over openflow thread " + self
.name
156 self
.logger
.debug("Start openflow thread")
158 self
.queueLock
.acquire()
159 if not self
.taskQueue
.empty():
160 task
= self
.taskQueue
.get()
163 self
.queueLock
.release()
169 if task
[0] == 'update-net':
170 r
,c
= self
.update_of_flows(task
[1])
171 #update database status
172 self
.db_lock
.acquire()
174 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
175 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
177 UPDATE
={'status':'ACTIVE', 'last_error': None}
178 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
179 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid':task
[1]})
180 self
.db_lock
.release()
182 elif task
[0] == 'clear-all':
183 r
,c
= self
.clear_all_flows()
185 self
.logger
.error("processing task 'clear-all': %s", c
)
187 self
.logger
.debug("processing task 'clear-all': OK")
188 elif task
[0] == 'exit':
189 self
.logger
.debug("exit from openflow_thread")
193 self
.logger
.error("unknown task %s", str(task
))
197 #print self.name, ": exit from openflow_thread"
199 def update_of_flows(self
, net_id
):
201 self
.db_lock
.acquire()
202 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
203 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
204 #get all the networks binding to this
206 if nets
[0]['bind_net']:
207 bind_id
= nets
[0]['bind_net']
210 #get our net and all bind_nets
211 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
212 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
214 self
.db_lock
.release()
216 return -1, "DB error getting net: " + nets
218 #net has been deleted
223 if net
['admin_state_up'] == 'false':
226 self
.db_lock
.acquire()
227 nb_ports
, net_ports
= self
.db
.get_table(
229 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
230 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
231 self
.db_lock
.release()
233 #print self.name, ": update_of_flows() ERROR getting ports", ports
234 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
236 #add the binding as an external port
237 if net
['provider'] and net
['provider'][:9]=="openflow:":
238 external_port
={"type":"external","mac":None}
239 external_port
['uuid'] = net_id
+ ".1" #fake uuid
240 if net
['provider'][-5:]==":vlan":
241 external_port
["vlan"] = net
["vlan"]
242 external_port
["switch_port"] = net
['provider'][9:-5]
244 external_port
["vlan"] = None
245 external_port
["switch_port"] = net
['provider'][9:]
246 net_ports
= net_ports
+ (external_port
,)
248 net
['ports'] = net_ports
249 ifaces_nb
+= nb_ports
251 # Get the name of flows that will be affected by this NET
252 self
.db_lock
.acquire()
253 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
254 self
.db_lock
.release()
256 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
257 return -1, "DB error getting flows from net '%s': %s" %(net_id
, database_net_flows
)
258 database_flows
+= database_net_flows
259 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
260 self
.db_lock
.acquire()
261 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
262 self
.db_lock
.release()
264 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
265 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
266 database_flows
+= database_net_flows
268 #Get the existing flows at openflow controller
269 result
, of_flows
= self
.OF_connector
.get_of_rules()
271 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
272 return -1, "OF error getting flows: " + of_flows
276 elif net
['type'] == 'ptp':
278 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
279 # str(ifaces_nb)+' interfaces.'
280 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
281 elif net
['type'] == 'data':
282 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
283 # check all ports are VLAN (tagged) or none
286 if port
["type"]=="external":
287 if port
["vlan"] != None:
288 if port
["vlan"]!=net
["vlan"]:
289 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
294 elif vlan_tag
==False:
295 text
="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
296 #print self.name, "Error", text
301 elif vlan_tag
== True:
302 text
="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
303 #print self.name, "Error", text
305 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
309 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
310 #print self.name, "Error", text
312 elif port
["model"] == "VF":
315 elif vlan_tag
==False:
316 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
317 #print self.name, "Error", text
320 return -1, 'Only ptp and data networks are supported for openflow'
322 # calculate new flows to be inserted
323 result
, new_flows
= self
._compute
_net
_flows
(nets
)
325 return result
, new_flows
327 #modify database flows format and get the used names
329 for flow
in database_flows
:
332 except FlowBadFormat
as e
:
333 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
335 used_names
.append(flow
['name'])
337 #insert at database the new flows, change actions to human text
338 for flow
in new_flows
:
339 #1 check if an equal flow is already present
340 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
342 database_flows
[index
]["not delete"]=True
343 self
.logger
.debug("Skipping already present flow %s", str(flow
))
345 #2 look for a non used name
346 flow_name
=flow
["net_id"]+"."+str(name_index
)
347 while flow_name
in used_names
or flow_name
in of_flows
:
349 flow_name
=flow
["net_id"]+"."+str(name_index
)
350 used_names
.append(flow_name
)
351 flow
['name'] = flow_name
352 #3 insert at openflow
353 result
, content
= self
.OF_connector
.new_flow(flow
)
355 #print self.name, ": Error '%s' at flow insertion" % c, flow
357 #4 insert at database
360 except FlowBadFormat
as e
:
361 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
363 self
.db_lock
.acquire()
364 result
, content
= self
.db
.new_row('of_flows', flow
)
365 self
.db_lock
.release()
367 #print self.name, ": Error '%s' at database insertion" % content, flow
370 #delete not needed old flows from openflow and from DDBB,
371 #check that the needed flows at DDBB are present in controller or insert them otherwise
372 for flow
in database_flows
:
373 if "not delete" in flow
:
374 if flow
["name"] not in of_flows
:
375 #not in controller, insert it
376 result
, content
= self
.OF_connector
.new_flow(flow
)
378 #print self.name, ": Error '%s' at flow insertion" % c, flow
382 if flow
["name"] in of_flows
:
383 result
, content
= self
.OF_connector
.del_flow(flow
['name'])
385 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], content
)
386 continue #skip deletion from database
387 #delete from database
388 self
.db_lock
.acquire()
389 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
390 self
.db_lock
.release()
392 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
396 def clear_all_flows(self
):
399 self
.OF_connector
.clear_all_flows()
400 #remove from database
401 self
.db_lock
.acquire()
402 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
403 self
.db_lock
.release()
405 except requests
.exceptions
.RequestException
as e
:
406 #print self.name, ": clear_all_flows Exception:", str(e)
409 flow_fields
=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
410 def _check_flow_already_present(self
, new_flow
, flow_list
):
411 '''check if the same flow is already present in the flow list
412 The flow is repeated if all the fields, apart from name, are equal
413 Return the index of matching flow, -1 if not match'''
415 for flow
in flow_list
:
417 for f
in self
.flow_fields
:
418 if flow
.get(f
) != new_flow
.get(f
):
426 def _compute_net_flows(self
, nets
):
428 new_broadcast_flows
={}
431 # Check switch_port information is right
432 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
434 for port
in net
['ports']:
436 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
437 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
438 #print self.name, ": ERROR " + error_text
439 return -1, error_text
442 net_id
= net_src
["uuid"]
446 if net_src
== net_dst
:
449 elif net_src
['bind_net'] == net_dst
['uuid']:
450 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
451 vlan_net_out
= int(net_src
['bind_type'][5:])
453 elif net_dst
['bind_net'] == net_src
['uuid']:
454 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
455 vlan_net_in
= int(net_dst
['bind_type'][5:])
460 for src_port
in net_src
['ports']:
461 vlan_in
= vlan_net_in
462 if vlan_in
== None and src_port
['vlan'] != None:
463 vlan_in
= src_port
['vlan']
464 elif vlan_in
!= None and src_port
['vlan'] != None:
465 #TODO this is something that we can not do. It requires a double VLAN check
466 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
470 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
471 if broadcast_key
in new_broadcast_flows
:
472 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
474 flow_broadcast
= {'priority': priority
,
476 'dst_mac': 'ff:ff:ff:ff:ff:ff',
477 "ingress_port": str(src_port
['switch_port']),
480 new_broadcast_flows
[broadcast_key
] = flow_broadcast
481 if vlan_in
is not None:
482 flow_broadcast
['vlan_id'] = str(vlan_in
)
484 for dst_port
in net_dst
['ports']:
485 vlan_out
= vlan_net_out
486 if vlan_out
== None and dst_port
['vlan'] != None:
487 vlan_out
= dst_port
['vlan']
488 elif vlan_out
!= None and dst_port
['vlan'] != None:
489 #TODO this is something that we can not do. It requires a double VLAN set
490 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
492 #if src_port == dst_port:
494 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
497 "priority": priority
,
499 "ingress_port": str(src_port
['switch_port']),
502 if vlan_in
is not None:
503 flow
['vlan_id'] = str(vlan_in
)
504 # allow that one port have no mac
505 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
506 flow
['priority'] = priority
-5 # less priority
508 flow
['dst_mac'] = str(dst_port
['mac'])
512 flow
['actions'].append( ('vlan',None) )
514 flow
['actions'].append( ('vlan', vlan_out
) )
515 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
517 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
518 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
521 new_flows
.append(flow
)
524 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
526 out
= (vlan_out
, str(dst_port
['switch_port']))
527 if out
not in flow_broadcast
['actions']:
528 flow_broadcast
['actions'].append( out
)
531 for flow_broadcast
in new_broadcast_flows
.values():
532 if len(flow_broadcast
['actions'])==0:
533 continue #nothing to do, skip
534 flow_broadcast
['actions'].sort()
535 if 'vlan_id' in flow_broadcast
:
536 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
541 for action
in flow_broadcast
['actions']:
542 if action
[0] != previous_vlan
:
543 final_actions
.append( ('vlan', action
[0]) )
544 previous_vlan
= action
[0]
545 if self
.pmp_with_same_vlan
and action_number
:
546 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
548 final_actions
.append( ('out', action
[1]) )
549 flow_broadcast
['actions'] = final_actions
551 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
552 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
555 new_flows
.append(flow_broadcast
)
557 #UNIFY openflow rules with the same input port and vlan and the same output actions
558 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
559 #this can happen if there is only two ports. It is converted to a point to point connection
560 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
561 for flow
in new_flows
:
562 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
564 flow_dict
[key
].append(flow
)
566 flow_dict
[key
]=[ flow
]
568 for flow_list
in flow_dict
.values():
570 if len (flow_list
)>=2:
573 if f
['actions'] != flow_list
[0]['actions']:
576 if convert2ptp
: # add only one unified rule without dst_mac
577 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
578 flow_list
[0].pop('dst_mac')
579 flow_list
[0]["priority"] -= 5
580 new_flows2
.append(flow_list
[0])
581 else: # add all the rules
582 new_flows2
+= flow_list