2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
41 OFC_STATUS_ACTIVE
= 'ACTIVE'
42 OFC_STATUS_INACTIVE
= 'INACTIVE'
43 OFC_STATUS_ERROR
= 'ERROR'
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
48 def change_of2db(flow
):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
54 if type(flow
)!=dict or "actions" not in flow
:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
57 for action
in flow
['actions']:
58 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
59 flow
['actions'] = ",".join(action_str_list
)
61 raise FlowBadFormat("Unexpected format at 'actions'")
63 def change_db2of(flow
):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
70 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list
= flow
['actions'].split(",")
73 for action_item
in action_list
:
74 action_tuple
= action_item
.split("=")
75 if len(action_tuple
) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple
[0].strip().lower()=="vlan":
78 if action_tuple
[1].strip().lower() in ("none", "strip"):
79 actions
.append( ("vlan",None) )
82 actions
.append( ("vlan", int(action_tuple
[1])) )
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple
[0].strip().lower()=="out":
86 actions
.append( ("out", str(action_tuple
[1])) )
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
89 flow
['actions'] = actions
92 class openflow_thread(threading
.Thread
):
94 This thread interacts with a openflow controller to create dataplane connections
96 def __init__(self
, of_uuid
, of_connector
, db
, of_test
, pmp_with_same_vlan
=False, logger_name
=None,
98 threading
.Thread
.__init
__(self
)
99 self
.of_uuid
= of_uuid
101 self
.pmp_with_same_vlan
= pmp_with_same_vlan
103 self
.OF_connector
= of_connector
105 self
.logger_name
= logger_name
107 self
.logger_name
= "openvim.ofc." + of_uuid
108 self
.logger
= logging
.getLogger(self
.logger_name
)
110 self
.logger
.setLevel(getattr(logging
, debug
))
111 self
.queueLock
= threading
.Lock()
112 self
.taskQueue
= Queue
.Queue(2000)
115 def _format_error_msg(error_text
, max_length
=1024):
116 if error_text
and len(error_text
) >= max_length
:
117 return error_text
[:max_length
//2-3] + " ... " + error_text
[-max_length
//2+3:]
120 def insert_task(self
, task
, *aditional
):
122 self
.queueLock
.acquire()
123 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
124 self
.queueLock
.release()
127 return -1, "timeout inserting a task over openflow thread " + self
.of_uuid
130 self
.logger
.debug("Start openflow thread")
131 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
135 self
.queueLock
.acquire()
136 if not self
.taskQueue
.empty():
137 task
= self
.taskQueue
.get()
140 self
.queueLock
.release()
146 if task
[0] == 'update-net':
147 r
, c
= self
.update_of_flows(task
[1])
148 # update database status
150 UPDATE
={'status':'ERROR', 'last_error': self
._format
_error
_msg
(str(c
), 255)}
151 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
152 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error updating net {}".format(task
[1]))
154 UPDATE
={'status':'ACTIVE', 'last_error': None}
155 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
156 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
157 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid': task
[1]})
159 elif task
[0] == 'clear-all':
160 r
,c
= self
.clear_all_flows()
162 self
.logger
.error("processing task 'clear-all': %s", c
)
163 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error deleting all flows")
165 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
166 self
.logger
.debug("processing task 'clear-all': OK")
167 elif task
[0] == 'exit':
168 self
.logger
.debug("exit from openflow_thread")
170 self
.set_openflow_controller_status(OFC_STATUS_INACTIVE
, "Ofc with thread killed")
173 self
.logger
.error("unknown task %s", str(task
))
174 except openflow_conn
.OpenflowconnException
as e
:
175 self
.logger
.error("OpenflowconnException: " + str(e
))
176 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, str(e
))
177 except Exception as e
:
178 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
182 # print self.name, ": exit from openflow_thread"
184 def update_of_flows(self
, net_id
):
186 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
187 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
188 #get all the networks binding to this
190 if nets
[0]['bind_net']:
191 bind_id
= nets
[0]['bind_net']
194 #get our net and all bind_nets
195 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
196 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
199 return -1, "DB error getting net: " + nets
201 #net has been deleted
206 if net
['admin_state_up'] == 'false':
209 nb_ports
, net_ports
= self
.db
.get_table(
211 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
212 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
215 #print self.name, ": update_of_flows() ERROR getting ports", ports
216 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
218 #add the binding as an external port
219 if net
['provider'] and net
['provider'][:9]=="openflow:":
220 external_port
={"type":"external","mac":None}
221 external_port
['uuid'] = net_id
+ ".1" #fake uuid
222 if net
['provider'][-5:]==":vlan":
223 external_port
["vlan"] = net
["vlan"]
224 external_port
["switch_port"] = net
['provider'][9:-5]
226 external_port
["vlan"] = None
227 external_port
["switch_port"] = net
['provider'][9:]
228 net_ports
= net_ports
+ (external_port
,)
230 net
['ports'] = net_ports
231 ifaces_nb
+= nb_ports
233 # Get the name of flows that will be affected by this NET
234 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
236 error_msg
= "DB error getting flows from net '{}': {}".format(net_id
, database_net_flows
)
237 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
239 database_flows
+= database_net_flows
240 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
241 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
243 error_msg
= "DB error getting flows from net 'null': {}".format(database_net_flows
)
244 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
246 database_flows
+= database_net_flows
248 # Get the existing flows at openflow controller
250 of_flows
= self
.OF_connector
.get_of_rules()
251 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
252 except openflow_conn
.OpenflowconnException
as e
:
253 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
254 return -1, "OF error {} getting flows".format(str(e
))
258 elif net
['type'] == 'ptp':
260 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
261 # str(ifaces_nb)+' interfaces.'
262 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
263 elif net
['type'] == 'data':
264 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
265 # check all ports are VLAN (tagged) or none
268 if port
["type"]=="external":
269 if port
["vlan"] != None:
270 if port
["vlan"]!=net
["vlan"]:
271 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
272 #print self.name, "Error", text
276 elif vlan_tag
==False:
277 text
="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
278 #print self.name, "Error", text
283 elif vlan_tag
== True:
284 text
="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
285 #print self.name, "Error", text
287 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
291 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
292 #print self.name, "Error", text
294 elif port
["model"] == "VF":
297 elif vlan_tag
==False:
298 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
299 #print self.name, "Error", text
302 return -1, 'Only ptp and data networks are supported for openflow'
304 # calculate new flows to be inserted
305 result
, new_flows
= self
._compute
_net
_flows
(nets
)
307 return result
, new_flows
309 #modify database flows format and get the used names
311 for flow
in database_flows
:
314 except FlowBadFormat
as e
:
315 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
317 used_names
.append(flow
['name'])
319 # insert at database the new flows, change actions to human text
320 for flow
in new_flows
:
321 # 1 check if an equal flow is already present
322 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
324 database_flows
[index
]["not delete"]=True
325 self
.logger
.debug("Skipping already present flow %s", str(flow
))
327 # 2 look for a non used name
328 flow_name
=flow
["net_id"]+"."+str(name_index
)
329 while flow_name
in used_names
or flow_name
in of_flows
:
331 flow_name
=flow
["net_id"]+"."+str(name_index
)
332 used_names
.append(flow_name
)
333 flow
['name'] = flow_name
334 # 3 insert at openflow
337 self
.OF_connector
.new_flow(flow
)
338 except openflow_conn
.OpenflowconnException
as e
:
339 return -1, "Error creating new flow {}".format(str(e
))
341 # 4 insert at database
344 except FlowBadFormat
as e
:
345 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
347 result
, content
= self
.db
.new_row('of_flows', flow
)
349 # print self.name, ": Error '%s' at database insertion" % content, flow
352 #delete not needed old flows from openflow and from DDBB,
353 #check that the needed flows at DDBB are present in controller or insert them otherwise
354 for flow
in database_flows
:
355 if "not delete" in flow
:
356 if flow
["name"] not in of_flows
:
357 # not in controller, insert it
359 self
.OF_connector
.new_flow(flow
)
360 except openflow_conn
.OpenflowconnException
as e
:
361 return -1, "Error creating new flow {}".format(str(e
))
365 if flow
["name"] in of_flows
:
367 self
.OF_connector
.del_flow(flow
['name'])
368 except openflow_conn
.OpenflowconnException
as e
:
369 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], str(e
))
370 # skip deletion from database
373 # delete from database
374 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
376 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
380 def clear_all_flows(self
):
383 self
.OF_connector
.clear_all_flows()
385 # remove from database
386 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
388 except openflow_conn
.OpenflowconnException
as e
:
389 return -1, self
.logger
.error("Error deleting all flows {}", str(e
))
391 flow_fields
= ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
393 def _check_flow_already_present(self
, new_flow
, flow_list
):
394 '''check if the same flow is already present in the flow list
395 The flow is repeated if all the fields, apart from name, are equal
396 Return the index of matching flow, -1 if not match'''
398 for flow
in flow_list
:
400 for f
in self
.flow_fields
:
401 if flow
.get(f
) != new_flow
.get(f
):
409 def _compute_net_flows(self
, nets
):
411 new_broadcast_flows
={}
414 # Check switch_port information is right
415 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
417 for port
in net
['ports']:
419 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
420 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
421 # print self.name, ": ERROR " + error_text
422 return -1, error_text
425 net_id
= net_src
["uuid"]
429 if net_src
== net_dst
:
432 elif net_src
['bind_net'] == net_dst
['uuid']:
433 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
434 vlan_net_out
= int(net_src
['bind_type'][5:])
436 elif net_dst
['bind_net'] == net_src
['uuid']:
437 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
438 vlan_net_in
= int(net_dst
['bind_type'][5:])
443 for src_port
in net_src
['ports']:
444 vlan_in
= vlan_net_in
445 if vlan_in
== None and src_port
['vlan'] != None:
446 vlan_in
= src_port
['vlan']
447 elif vlan_in
!= None and src_port
['vlan'] != None:
448 #TODO this is something that we cannot do. It requires a double VLAN check
449 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
453 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
454 if broadcast_key
in new_broadcast_flows
:
455 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
457 flow_broadcast
= {'priority': priority
,
459 'dst_mac': 'ff:ff:ff:ff:ff:ff',
460 "ingress_port": str(src_port
['switch_port']),
463 new_broadcast_flows
[broadcast_key
] = flow_broadcast
464 if vlan_in
is not None:
465 flow_broadcast
['vlan_id'] = str(vlan_in
)
467 for dst_port
in net_dst
['ports']:
468 vlan_out
= vlan_net_out
469 if vlan_out
== None and dst_port
['vlan'] != None:
470 vlan_out
= dst_port
['vlan']
471 elif vlan_out
!= None and dst_port
['vlan'] != None:
472 #TODO this is something that we cannot do. It requires a double VLAN set
473 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
475 #if src_port == dst_port:
477 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
480 "priority": priority
,
482 "ingress_port": str(src_port
['switch_port']),
485 if vlan_in
is not None:
486 flow
['vlan_id'] = str(vlan_in
)
487 # allow that one port have no mac
488 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
489 flow
['priority'] = priority
-5 # less priority
491 flow
['dst_mac'] = str(dst_port
['mac'])
495 flow
['actions'].append( ('vlan',None) )
497 flow
['actions'].append( ('vlan', vlan_out
) )
498 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
500 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
501 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
504 new_flows
.append(flow
)
507 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
509 out
= (vlan_out
, str(dst_port
['switch_port']))
510 if out
not in flow_broadcast
['actions']:
511 flow_broadcast
['actions'].append( out
)
514 for flow_broadcast
in new_broadcast_flows
.values():
515 if len(flow_broadcast
['actions'])==0:
516 continue #nothing to do, skip
517 flow_broadcast
['actions'].sort()
518 if 'vlan_id' in flow_broadcast
:
519 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
524 for action
in flow_broadcast
['actions']:
525 if action
[0] != previous_vlan
:
526 final_actions
.append( ('vlan', action
[0]) )
527 previous_vlan
= action
[0]
528 if self
.pmp_with_same_vlan
and action_number
:
529 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
531 final_actions
.append( ('out', action
[1]) )
532 flow_broadcast
['actions'] = final_actions
534 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
535 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
538 new_flows
.append(flow_broadcast
)
540 #UNIFY openflow rules with the same input port and vlan and the same output actions
541 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
542 #this can happen if there is only two ports. It is converted to a point to point connection
543 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
544 for flow
in new_flows
:
545 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
547 flow_dict
[key
].append(flow
)
549 flow_dict
[key
]=[ flow
]
551 for flow_list
in flow_dict
.values():
553 if len (flow_list
)>=2:
556 if f
['actions'] != flow_list
[0]['actions']:
559 if convert2ptp
: # add only one unified rule without dst_mac
560 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
561 flow_list
[0].pop('dst_mac')
562 flow_list
[0]["priority"] -= 5
563 new_flows2
.append(flow_list
[0])
564 else: # add all the rules
565 new_flows2
+= flow_list
568 def set_openflow_controller_status(self
, status
, error_text
=None):
570 Set openflow controller last operation status in DB
571 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
572 :param error_text: error text
575 if self
.of_uuid
== "Default":
579 ofc
['status'] = status
580 ofc
['last_error'] = self
._format
_error
_msg
(error_text
, 255)
581 result
, content
= self
.db
.update_rows('ofcs', ofc
, WHERE
={'uuid': self
.of_uuid
}, log
=False)