2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
41 OFC_STATUS_ACTIVE
= 'ACTIVE'
42 OFC_STATUS_INACTIVE
= 'INACTIVE'
43 OFC_STATUS_ERROR
= 'ERROR'
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
48 def change_of2db(flow
):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
54 if type(flow
)!=dict or "actions" not in flow
:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
57 for action
in flow
['actions']:
58 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
59 flow
['actions'] = ",".join(action_str_list
)
61 raise FlowBadFormat("Unexpected format at 'actions'")
63 def change_db2of(flow
):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
70 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list
= flow
['actions'].split(",")
73 for action_item
in action_list
:
74 action_tuple
= action_item
.split("=")
75 if len(action_tuple
) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple
[0].strip().lower()=="vlan":
78 if action_tuple
[1].strip().lower() in ("none", "strip"):
79 actions
.append( ("vlan",None) )
82 actions
.append( ("vlan", int(action_tuple
[1])) )
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple
[0].strip().lower()=="out":
86 actions
.append( ("out", str(action_tuple
[1])) )
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
89 flow
['actions'] = actions
92 class openflow_thread(threading
.Thread
):
94 This thread interacts with a openflow controller to create dataplane connections
96 def __init__(self
, of_uuid
, of_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, debug
='ERROR'):
97 threading
.Thread
.__init
__(self
)
98 self
.of_uuid
= of_uuid
100 self
.pmp_with_same_vlan
= pmp_with_same_vlan
101 self
.name
= "openflow"
103 self
.db_lock
= db_lock
104 self
.OF_connector
= of_connector
105 self
.logger
= logging
.getLogger('vim.OF-' + of_uuid
)
106 self
.logger
.setLevel(getattr(logging
, debug
))
107 self
.logger
.name
= of_connector
.name
+ " " + self
.OF_connector
.dpid
108 self
.queueLock
= threading
.Lock()
109 self
.taskQueue
= Queue
.Queue(2000)
111 def insert_task(self
, task
, *aditional
):
113 self
.queueLock
.acquire()
114 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
115 self
.queueLock
.release()
118 return -1, "timeout inserting a task over openflow thread " + self
.name
121 self
.logger
.debug("Start openflow thread")
122 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
126 self
.queueLock
.acquire()
127 if not self
.taskQueue
.empty():
128 task
= self
.taskQueue
.get()
131 self
.queueLock
.release()
137 if task
[0] == 'update-net':
138 r
,c
= self
.update_of_flows(task
[1])
139 # update database status
141 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
142 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
143 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error updating net {}".format(task
[1]))
145 UPDATE
={'status':'ACTIVE', 'last_error': None}
146 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
147 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
148 self
.db_lock
.acquire()
149 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid': task
[1]})
150 self
.db_lock
.release()
152 elif task
[0] == 'clear-all':
153 r
,c
= self
.clear_all_flows()
155 self
.logger
.error("processing task 'clear-all': %s", c
)
156 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error deleting all flows")
158 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
159 self
.logger
.debug("processing task 'clear-all': OK")
160 elif task
[0] == 'exit':
161 self
.logger
.debug("exit from openflow_thread")
163 self
.set_openflow_controller_status(OFC_STATUS_INACTIVE
, "Ofc with thread killed")
166 self
.logger
.error("unknown task %s", str(task
))
167 except openflow_conn
.OpenflowconnException
as e
:
168 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, str(e
))
172 # print self.name, ": exit from openflow_thread"
174 def update_of_flows(self
, net_id
):
176 self
.db_lock
.acquire()
177 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
178 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
179 #get all the networks binding to this
181 if nets
[0]['bind_net']:
182 bind_id
= nets
[0]['bind_net']
185 #get our net and all bind_nets
186 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
187 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
189 self
.db_lock
.release()
191 return -1, "DB error getting net: " + nets
193 #net has been deleted
198 if net
['admin_state_up'] == 'false':
201 self
.db_lock
.acquire()
202 nb_ports
, net_ports
= self
.db
.get_table(
204 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
205 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
206 self
.db_lock
.release()
209 #print self.name, ": update_of_flows() ERROR getting ports", ports
210 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
212 #add the binding as an external port
213 if net
['provider'] and net
['provider'][:9]=="openflow:":
214 external_port
={"type":"external","mac":None}
215 external_port
['uuid'] = net_id
+ ".1" #fake uuid
216 if net
['provider'][-5:]==":vlan":
217 external_port
["vlan"] = net
["vlan"]
218 external_port
["switch_port"] = net
['provider'][9:-5]
220 external_port
["vlan"] = None
221 external_port
["switch_port"] = net
['provider'][9:]
222 net_ports
= net_ports
+ (external_port
,)
224 net
['ports'] = net_ports
225 ifaces_nb
+= nb_ports
227 # Get the name of flows that will be affected by this NET
228 self
.db_lock
.acquire()
229 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
230 self
.db_lock
.release()
232 error_msg
= "DB error getting flows from net '{}': {}".format(net_id
, database_net_flows
)
233 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
235 database_flows
+= database_net_flows
236 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
237 self
.db_lock
.acquire()
238 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
239 self
.db_lock
.release()
241 error_msg
= "DB error getting flows from net 'null': {}".format(database_net_flows
)
242 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
244 database_flows
+= database_net_flows
246 # Get the existing flows at openflow controller
248 of_flows
= self
.OF_connector
.get_of_rules()
249 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
250 except openflow_conn
.OpenflowconnException
as e
:
251 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
252 return -1, "OF error {} getting flows".format(str(e
))
256 elif net
['type'] == 'ptp':
258 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
259 # str(ifaces_nb)+' interfaces.'
260 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
261 elif net
['type'] == 'data':
262 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
263 # check all ports are VLAN (tagged) or none
266 if port
["type"]=="external":
267 if port
["vlan"] != None:
268 if port
["vlan"]!=net
["vlan"]:
269 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
270 #print self.name, "Error", text
274 elif vlan_tag
==False:
275 text
="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
276 #print self.name, "Error", text
281 elif vlan_tag
== True:
282 text
="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
285 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
289 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
292 elif port
["model"] == "VF":
295 elif vlan_tag
==False:
296 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
297 #print self.name, "Error", text
300 return -1, 'Only ptp and data networks are supported for openflow'
302 # calculate new flows to be inserted
303 result
, new_flows
= self
._compute
_net
_flows
(nets
)
305 return result
, new_flows
307 #modify database flows format and get the used names
309 for flow
in database_flows
:
312 except FlowBadFormat
as e
:
313 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
315 used_names
.append(flow
['name'])
317 # insert at database the new flows, change actions to human text
318 for flow
in new_flows
:
319 # 1 check if an equal flow is already present
320 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
322 database_flows
[index
]["not delete"]=True
323 self
.logger
.debug("Skipping already present flow %s", str(flow
))
325 # 2 look for a non used name
326 flow_name
=flow
["net_id"]+"."+str(name_index
)
327 while flow_name
in used_names
or flow_name
in of_flows
:
329 flow_name
=flow
["net_id"]+"."+str(name_index
)
330 used_names
.append(flow_name
)
331 flow
['name'] = flow_name
332 # 3 insert at openflow
335 self
.OF_connector
.new_flow(flow
)
336 except openflow_conn
.OpenflowconnException
as e
:
337 return -1, "Error creating new flow {}".format(str(e
))
339 # 4 insert at database
342 except FlowBadFormat
as e
:
343 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
345 self
.db_lock
.acquire()
346 result
, content
= self
.db
.new_row('of_flows', flow
)
347 self
.db_lock
.release()
349 # print self.name, ": Error '%s' at database insertion" % content, flow
352 #delete not needed old flows from openflow and from DDBB,
353 #check that the needed flows at DDBB are present in controller or insert them otherwise
354 for flow
in database_flows
:
355 if "not delete" in flow
:
356 if flow
["name"] not in of_flows
:
357 # not in controller, insert it
359 self
.OF_connector
.new_flow(flow
)
360 except openflow_conn
.OpenflowconnException
as e
:
361 return -1, "Error creating new flow {}".format(str(e
))
365 if flow
["name"] in of_flows
:
367 self
.OF_connector
.del_flow(flow
['name'])
368 except openflow_conn
.OpenflowconnException
as e
:
369 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], str(e
))
370 # skip deletion from database
373 # delete from database
374 self
.db_lock
.acquire()
375 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
376 self
.db_lock
.release()
378 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
382 def clear_all_flows(self
):
385 self
.OF_connector
.clear_all_flows()
387 # remove from database
388 self
.db_lock
.acquire()
389 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
390 self
.db_lock
.release()
392 except openflow_conn
.OpenflowconnException
as e
:
393 return -1, self
.logger
.error("Error deleting all flows {}", str(e
))
395 flow_fields
= ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
397 def _check_flow_already_present(self
, new_flow
, flow_list
):
398 '''check if the same flow is already present in the flow list
399 The flow is repeated if all the fields, apart from name, are equal
400 Return the index of matching flow, -1 if not match'''
402 for flow
in flow_list
:
404 for f
in self
.flow_fields
:
405 if flow
.get(f
) != new_flow
.get(f
):
413 def _compute_net_flows(self
, nets
):
415 new_broadcast_flows
={}
418 # Check switch_port information is right
419 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
421 for port
in net
['ports']:
423 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
424 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
425 # print self.name, ": ERROR " + error_text
426 return -1, error_text
429 net_id
= net_src
["uuid"]
433 if net_src
== net_dst
:
436 elif net_src
['bind_net'] == net_dst
['uuid']:
437 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
438 vlan_net_out
= int(net_src
['bind_type'][5:])
440 elif net_dst
['bind_net'] == net_src
['uuid']:
441 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
442 vlan_net_in
= int(net_dst
['bind_type'][5:])
447 for src_port
in net_src
['ports']:
448 vlan_in
= vlan_net_in
449 if vlan_in
== None and src_port
['vlan'] != None:
450 vlan_in
= src_port
['vlan']
451 elif vlan_in
!= None and src_port
['vlan'] != None:
452 #TODO this is something that we cannot do. It requires a double VLAN check
453 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
457 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
458 if broadcast_key
in new_broadcast_flows
:
459 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
461 flow_broadcast
= {'priority': priority
,
463 'dst_mac': 'ff:ff:ff:ff:ff:ff',
464 "ingress_port": str(src_port
['switch_port']),
467 new_broadcast_flows
[broadcast_key
] = flow_broadcast
468 if vlan_in
is not None:
469 flow_broadcast
['vlan_id'] = str(vlan_in
)
471 for dst_port
in net_dst
['ports']:
472 vlan_out
= vlan_net_out
473 if vlan_out
== None and dst_port
['vlan'] != None:
474 vlan_out
= dst_port
['vlan']
475 elif vlan_out
!= None and dst_port
['vlan'] != None:
476 #TODO this is something that we cannot do. It requires a double VLAN set
477 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
479 #if src_port == dst_port:
481 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
484 "priority": priority
,
486 "ingress_port": str(src_port
['switch_port']),
489 if vlan_in
is not None:
490 flow
['vlan_id'] = str(vlan_in
)
491 # allow that one port have no mac
492 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
493 flow
['priority'] = priority
-5 # less priority
495 flow
['dst_mac'] = str(dst_port
['mac'])
499 flow
['actions'].append( ('vlan',None) )
501 flow
['actions'].append( ('vlan', vlan_out
) )
502 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
504 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
505 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
508 new_flows
.append(flow
)
511 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
513 out
= (vlan_out
, str(dst_port
['switch_port']))
514 if out
not in flow_broadcast
['actions']:
515 flow_broadcast
['actions'].append( out
)
518 for flow_broadcast
in new_broadcast_flows
.values():
519 if len(flow_broadcast
['actions'])==0:
520 continue #nothing to do, skip
521 flow_broadcast
['actions'].sort()
522 if 'vlan_id' in flow_broadcast
:
523 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
528 for action
in flow_broadcast
['actions']:
529 if action
[0] != previous_vlan
:
530 final_actions
.append( ('vlan', action
[0]) )
531 previous_vlan
= action
[0]
532 if self
.pmp_with_same_vlan
and action_number
:
533 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
535 final_actions
.append( ('out', action
[1]) )
536 flow_broadcast
['actions'] = final_actions
538 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
539 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
542 new_flows
.append(flow_broadcast
)
544 #UNIFY openflow rules with the same input port and vlan and the same output actions
545 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
546 #this can happen if there is only two ports. It is converted to a point to point connection
547 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
548 for flow
in new_flows
:
549 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
551 flow_dict
[key
].append(flow
)
553 flow_dict
[key
]=[ flow
]
555 for flow_list
in flow_dict
.values():
557 if len (flow_list
)>=2:
560 if f
['actions'] != flow_list
[0]['actions']:
563 if convert2ptp
: # add only one unified rule without dst_mac
564 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
565 flow_list
[0].pop('dst_mac')
566 flow_list
[0]["priority"] -= 5
567 new_flows2
.append(flow_list
[0])
568 else: # add all the rules
569 new_flows2
+= flow_list
572 def set_openflow_controller_status(self
, status
, error_text
=None):
574 Set openflow controller last operation status in DB
575 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
576 :param error_text: error text
579 if self
.of_uuid
== "Default":
583 ofc
['status'] = status
584 ofc
['last_error'] = error_text
585 self
.db_lock
.acquire()
586 result
, content
= self
.db
.update_rows('ofcs', ofc
, WHERE
={'uuid': self
.of_uuid
}, log
=False)
587 self
.db_lock
.release()