2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
41 OFC_STATUS_ACTIVE
= 'ACTIVE'
42 OFC_STATUS_INACTIVE
= 'INACTIVE'
43 OFC_STATUS_ERROR
= 'ERROR'
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
48 def change_of2db(flow
):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
54 if type(flow
)!=dict or "actions" not in flow
:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
57 for action
in flow
['actions']:
58 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
59 flow
['actions'] = ",".join(action_str_list
)
61 raise FlowBadFormat("Unexpected format at 'actions'")
63 def change_db2of(flow
):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
70 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list
= flow
['actions'].split(",")
73 for action_item
in action_list
:
74 action_tuple
= action_item
.split("=")
75 if len(action_tuple
) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple
[0].strip().lower()=="vlan":
78 if action_tuple
[1].strip().lower() in ("none", "strip"):
79 actions
.append( ("vlan",None) )
82 actions
.append( ("vlan", int(action_tuple
[1])) )
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple
[0].strip().lower()=="out":
86 actions
.append( ("out", str(action_tuple
[1])) )
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
89 flow
['actions'] = actions
92 class openflow_thread(threading
.Thread
):
94 This thread interacts with a openflow controller to create dataplane connections
96 def __init__(self
, of_uuid
, of_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, debug
='ERROR'):
97 threading
.Thread
.__init
__(self
)
98 self
.of_uuid
= of_uuid
100 self
.pmp_with_same_vlan
= pmp_with_same_vlan
101 self
.name
= "openflow"
103 self
.db_lock
= db_lock
104 self
.OF_connector
= of_connector
105 self
.logger
= logging
.getLogger('vim.OF-' + of_uuid
)
106 self
.logger
.setLevel(getattr(logging
, debug
))
107 self
.logger
.name
= of_connector
.name
+ " " + self
.OF_connector
.dpid
108 self
.queueLock
= threading
.Lock()
109 self
.taskQueue
= Queue
.Queue(2000)
111 def insert_task(self
, task
, *aditional
):
113 self
.queueLock
.acquire()
114 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
115 self
.queueLock
.release()
118 return -1, "timeout inserting a task over openflow thread " + self
.name
121 self
.logger
.debug("Start openflow thread")
122 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
126 self
.queueLock
.acquire()
127 if not self
.taskQueue
.empty():
128 task
= self
.taskQueue
.get()
131 self
.queueLock
.release()
137 if task
[0] == 'update-net':
138 r
,c
= self
.update_of_flows(task
[1])
139 # update database status
141 UPDATE
={'status':'ERROR', 'last_error': str(c
)}
142 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
143 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error updating net {}".format(task
[1]))
145 UPDATE
={'status':'ACTIVE', 'last_error': None}
146 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
147 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
148 self
.db_lock
.acquire()
149 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid': task
[1]})
150 self
.db_lock
.release()
152 elif task
[0] == 'clear-all':
153 r
,c
= self
.clear_all_flows()
155 self
.logger
.error("processing task 'clear-all': %s", c
)
156 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error deleting all flows")
158 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
159 self
.logger
.debug("processing task 'clear-all': OK")
160 elif task
[0] == 'exit':
161 self
.logger
.debug("exit from openflow_thread")
163 self
.set_openflow_controller_status(OFC_STATUS_INACTIVE
, "Ofc with thread killed")
166 self
.logger
.error("unknown task %s", str(task
))
167 except openflow_conn
.OpenflowconnException
as e
:
168 self
.logger
.error("OpenflowconnException: " + str(e
))
169 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, str(e
))
170 except Exception as e
:
171 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
175 # print self.name, ": exit from openflow_thread"
177 def update_of_flows(self
, net_id
):
179 self
.db_lock
.acquire()
180 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
181 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
182 #get all the networks binding to this
184 if nets
[0]['bind_net']:
185 bind_id
= nets
[0]['bind_net']
188 #get our net and all bind_nets
189 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
190 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
192 self
.db_lock
.release()
194 return -1, "DB error getting net: " + nets
196 #net has been deleted
201 if net
['admin_state_up'] == 'false':
204 self
.db_lock
.acquire()
205 nb_ports
, net_ports
= self
.db
.get_table(
207 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
208 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
209 self
.db_lock
.release()
212 #print self.name, ": update_of_flows() ERROR getting ports", ports
213 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
215 #add the binding as an external port
216 if net
['provider'] and net
['provider'][:9]=="openflow:":
217 external_port
={"type":"external","mac":None}
218 external_port
['uuid'] = net_id
+ ".1" #fake uuid
219 if net
['provider'][-5:]==":vlan":
220 external_port
["vlan"] = net
["vlan"]
221 external_port
["switch_port"] = net
['provider'][9:-5]
223 external_port
["vlan"] = None
224 external_port
["switch_port"] = net
['provider'][9:]
225 net_ports
= net_ports
+ (external_port
,)
227 net
['ports'] = net_ports
228 ifaces_nb
+= nb_ports
230 # Get the name of flows that will be affected by this NET
231 self
.db_lock
.acquire()
232 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
233 self
.db_lock
.release()
235 error_msg
= "DB error getting flows from net '{}': {}".format(net_id
, database_net_flows
)
236 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
238 database_flows
+= database_net_flows
239 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
240 self
.db_lock
.acquire()
241 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
242 self
.db_lock
.release()
244 error_msg
= "DB error getting flows from net 'null': {}".format(database_net_flows
)
245 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
247 database_flows
+= database_net_flows
249 # Get the existing flows at openflow controller
251 of_flows
= self
.OF_connector
.get_of_rules()
252 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
253 except openflow_conn
.OpenflowconnException
as e
:
254 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
255 return -1, "OF error {} getting flows".format(str(e
))
259 elif net
['type'] == 'ptp':
261 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
262 # str(ifaces_nb)+' interfaces.'
263 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
264 elif net
['type'] == 'data':
265 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
266 # check all ports are VLAN (tagged) or none
269 if port
["type"]=="external":
270 if port
["vlan"] != None:
271 if port
["vlan"]!=net
["vlan"]:
272 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
273 #print self.name, "Error", text
277 elif vlan_tag
==False:
278 text
="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
279 #print self.name, "Error", text
284 elif vlan_tag
== True:
285 text
="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
286 #print self.name, "Error", text
288 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
292 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
293 #print self.name, "Error", text
295 elif port
["model"] == "VF":
298 elif vlan_tag
==False:
299 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
300 #print self.name, "Error", text
303 return -1, 'Only ptp and data networks are supported for openflow'
305 # calculate new flows to be inserted
306 result
, new_flows
= self
._compute
_net
_flows
(nets
)
308 return result
, new_flows
310 #modify database flows format and get the used names
312 for flow
in database_flows
:
315 except FlowBadFormat
as e
:
316 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
318 used_names
.append(flow
['name'])
320 # insert at database the new flows, change actions to human text
321 for flow
in new_flows
:
322 # 1 check if an equal flow is already present
323 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
325 database_flows
[index
]["not delete"]=True
326 self
.logger
.debug("Skipping already present flow %s", str(flow
))
328 # 2 look for a non used name
329 flow_name
=flow
["net_id"]+"."+str(name_index
)
330 while flow_name
in used_names
or flow_name
in of_flows
:
332 flow_name
=flow
["net_id"]+"."+str(name_index
)
333 used_names
.append(flow_name
)
334 flow
['name'] = flow_name
335 # 3 insert at openflow
338 self
.OF_connector
.new_flow(flow
)
339 except openflow_conn
.OpenflowconnException
as e
:
340 return -1, "Error creating new flow {}".format(str(e
))
342 # 4 insert at database
345 except FlowBadFormat
as e
:
346 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
348 self
.db_lock
.acquire()
349 result
, content
= self
.db
.new_row('of_flows', flow
)
350 self
.db_lock
.release()
352 # print self.name, ": Error '%s' at database insertion" % content, flow
355 #delete not needed old flows from openflow and from DDBB,
356 #check that the needed flows at DDBB are present in controller or insert them otherwise
357 for flow
in database_flows
:
358 if "not delete" in flow
:
359 if flow
["name"] not in of_flows
:
360 # not in controller, insert it
362 self
.OF_connector
.new_flow(flow
)
363 except openflow_conn
.OpenflowconnException
as e
:
364 return -1, "Error creating new flow {}".format(str(e
))
368 if flow
["name"] in of_flows
:
370 self
.OF_connector
.del_flow(flow
['name'])
371 except openflow_conn
.OpenflowconnException
as e
:
372 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], str(e
))
373 # skip deletion from database
376 # delete from database
377 self
.db_lock
.acquire()
378 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
379 self
.db_lock
.release()
381 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
385 def clear_all_flows(self
):
388 self
.OF_connector
.clear_all_flows()
390 # remove from database
391 self
.db_lock
.acquire()
392 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
393 self
.db_lock
.release()
395 except openflow_conn
.OpenflowconnException
as e
:
396 return -1, self
.logger
.error("Error deleting all flows {}", str(e
))
398 flow_fields
= ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
400 def _check_flow_already_present(self
, new_flow
, flow_list
):
401 '''check if the same flow is already present in the flow list
402 The flow is repeated if all the fields, apart from name, are equal
403 Return the index of matching flow, -1 if not match'''
405 for flow
in flow_list
:
407 for f
in self
.flow_fields
:
408 if flow
.get(f
) != new_flow
.get(f
):
416 def _compute_net_flows(self
, nets
):
418 new_broadcast_flows
={}
421 # Check switch_port information is right
422 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
424 for port
in net
['ports']:
426 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
427 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
428 # print self.name, ": ERROR " + error_text
429 return -1, error_text
432 net_id
= net_src
["uuid"]
436 if net_src
== net_dst
:
439 elif net_src
['bind_net'] == net_dst
['uuid']:
440 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
441 vlan_net_out
= int(net_src
['bind_type'][5:])
443 elif net_dst
['bind_net'] == net_src
['uuid']:
444 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
445 vlan_net_in
= int(net_dst
['bind_type'][5:])
450 for src_port
in net_src
['ports']:
451 vlan_in
= vlan_net_in
452 if vlan_in
== None and src_port
['vlan'] != None:
453 vlan_in
= src_port
['vlan']
454 elif vlan_in
!= None and src_port
['vlan'] != None:
455 #TODO this is something that we cannot do. It requires a double VLAN check
456 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
460 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
461 if broadcast_key
in new_broadcast_flows
:
462 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
464 flow_broadcast
= {'priority': priority
,
466 'dst_mac': 'ff:ff:ff:ff:ff:ff',
467 "ingress_port": str(src_port
['switch_port']),
470 new_broadcast_flows
[broadcast_key
] = flow_broadcast
471 if vlan_in
is not None:
472 flow_broadcast
['vlan_id'] = str(vlan_in
)
474 for dst_port
in net_dst
['ports']:
475 vlan_out
= vlan_net_out
476 if vlan_out
== None and dst_port
['vlan'] != None:
477 vlan_out
= dst_port
['vlan']
478 elif vlan_out
!= None and dst_port
['vlan'] != None:
479 #TODO this is something that we cannot do. It requires a double VLAN set
480 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
482 #if src_port == dst_port:
484 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
487 "priority": priority
,
489 "ingress_port": str(src_port
['switch_port']),
492 if vlan_in
is not None:
493 flow
['vlan_id'] = str(vlan_in
)
494 # allow that one port have no mac
495 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
496 flow
['priority'] = priority
-5 # less priority
498 flow
['dst_mac'] = str(dst_port
['mac'])
502 flow
['actions'].append( ('vlan',None) )
504 flow
['actions'].append( ('vlan', vlan_out
) )
505 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
507 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
508 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
511 new_flows
.append(flow
)
514 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
516 out
= (vlan_out
, str(dst_port
['switch_port']))
517 if out
not in flow_broadcast
['actions']:
518 flow_broadcast
['actions'].append( out
)
521 for flow_broadcast
in new_broadcast_flows
.values():
522 if len(flow_broadcast
['actions'])==0:
523 continue #nothing to do, skip
524 flow_broadcast
['actions'].sort()
525 if 'vlan_id' in flow_broadcast
:
526 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
531 for action
in flow_broadcast
['actions']:
532 if action
[0] != previous_vlan
:
533 final_actions
.append( ('vlan', action
[0]) )
534 previous_vlan
= action
[0]
535 if self
.pmp_with_same_vlan
and action_number
:
536 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
538 final_actions
.append( ('out', action
[1]) )
539 flow_broadcast
['actions'] = final_actions
541 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
542 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
545 new_flows
.append(flow_broadcast
)
547 #UNIFY openflow rules with the same input port and vlan and the same output actions
548 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
549 #this can happen if there is only two ports. It is converted to a point to point connection
550 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
551 for flow
in new_flows
:
552 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
554 flow_dict
[key
].append(flow
)
556 flow_dict
[key
]=[ flow
]
558 for flow_list
in flow_dict
.values():
560 if len (flow_list
)>=2:
563 if f
['actions'] != flow_list
[0]['actions']:
566 if convert2ptp
: # add only one unified rule without dst_mac
567 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
568 flow_list
[0].pop('dst_mac')
569 flow_list
[0]["priority"] -= 5
570 new_flows2
.append(flow_list
[0])
571 else: # add all the rules
572 new_flows2
+= flow_list
575 def set_openflow_controller_status(self
, status
, error_text
=None):
577 Set openflow controller last operation status in DB
578 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
579 :param error_text: error text
582 if self
.of_uuid
== "Default":
586 ofc
['status'] = status
587 ofc
['last_error'] = error_text
588 self
.db_lock
.acquire()
589 result
, content
= self
.db
.update_rows('ofcs', ofc
, WHERE
={'uuid': self
.of_uuid
}, log
=False)
590 self
.db_lock
.release()