2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 This thread interacts with a openflow controller to create dataplane connections
29 __author__
="Pablo Montes, Alfonso Tierno"
30 __date__
="17-jul-2015"
41 OFC_STATUS_ACTIVE
= 'ACTIVE'
42 OFC_STATUS_INACTIVE
= 'INACTIVE'
43 OFC_STATUS_ERROR
= 'ERROR'
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
48 def change_of2db(flow
):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
54 if type(flow
)!=dict or "actions" not in flow
:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
57 for action
in flow
['actions']:
58 action_str_list
.append( action
[0] + "=" + str(action
[1]) )
59 flow
['actions'] = ",".join(action_str_list
)
61 raise FlowBadFormat("Unexpected format at 'actions'")
63 def change_db2of(flow
):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
70 if type(flow
)!=dict or "actions" not in flow
or type(flow
["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list
= flow
['actions'].split(",")
73 for action_item
in action_list
:
74 action_tuple
= action_item
.split("=")
75 if len(action_tuple
) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple
[0].strip().lower()=="vlan":
78 if action_tuple
[1].strip().lower() in ("none", "strip"):
79 actions
.append( ("vlan",None) )
82 actions
.append( ("vlan", int(action_tuple
[1])) )
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple
[0].strip().lower()=="out":
86 actions
.append( ("out", str(action_tuple
[1])) )
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple
[0])
89 flow
['actions'] = actions
92 class openflow_thread(threading
.Thread
):
94 This thread interacts with a openflow controller to create dataplane connections
96 def __init__(self
, of_uuid
, of_connector
, db
, db_lock
, of_test
, pmp_with_same_vlan
=False, logger_name
=None,
98 threading
.Thread
.__init
__(self
)
99 self
.of_uuid
= of_uuid
101 self
.pmp_with_same_vlan
= pmp_with_same_vlan
103 self
.db_lock
= db_lock
104 self
.OF_connector
= of_connector
106 self
.logger_name
= logger_name
108 self
.logger_name
= "openvim.ofc." + of_uuid
109 self
.logger
= logging
.getLogger(self
.logger_name
)
111 self
.logger
.setLevel(getattr(logging
, debug
))
112 self
.queueLock
= threading
.Lock()
113 self
.taskQueue
= Queue
.Queue(2000)
116 def _format_error_msg(error_text
, max_length
=1024):
117 if error_text
and len(error_text
) >= max_length
:
118 return error_text
[:max_length
//2-3] + " ... " + error_text
[-max_length
//2+3:]
121 def insert_task(self
, task
, *aditional
):
123 self
.queueLock
.acquire()
124 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
125 self
.queueLock
.release()
128 return -1, "timeout inserting a task over openflow thread " + self
.of_uuid
131 self
.logger
.debug("Start openflow thread")
132 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
136 self
.queueLock
.acquire()
137 if not self
.taskQueue
.empty():
138 task
= self
.taskQueue
.get()
141 self
.queueLock
.release()
147 if task
[0] == 'update-net':
148 r
, c
= self
.update_of_flows(task
[1])
149 # update database status
151 UPDATE
={'status':'ERROR', 'last_error': self
._format
_error
_msg
(str(c
), 255)}
152 self
.logger
.error("processing task 'update-net' %s: %s", str(task
[1]), c
)
153 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error updating net {}".format(task
[1]))
155 UPDATE
={'status':'ACTIVE', 'last_error': None}
156 self
.logger
.debug("processing task 'update-net' %s: OK", str(task
[1]))
157 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
158 self
.db_lock
.acquire()
159 self
.db
.update_rows('nets', UPDATE
, WHERE
={'uuid': task
[1]})
160 self
.db_lock
.release()
162 elif task
[0] == 'clear-all':
163 r
,c
= self
.clear_all_flows()
165 self
.logger
.error("processing task 'clear-all': %s", c
)
166 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, "Error deleting all flows")
168 self
.set_openflow_controller_status(OFC_STATUS_ACTIVE
)
169 self
.logger
.debug("processing task 'clear-all': OK")
170 elif task
[0] == 'exit':
171 self
.logger
.debug("exit from openflow_thread")
173 self
.set_openflow_controller_status(OFC_STATUS_INACTIVE
, "Ofc with thread killed")
176 self
.logger
.error("unknown task %s", str(task
))
177 except openflow_conn
.OpenflowconnException
as e
:
178 self
.logger
.error("OpenflowconnException: " + str(e
))
179 self
.set_openflow_controller_status(OFC_STATUS_ERROR
, str(e
))
180 except Exception as e
:
181 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
185 # print self.name, ": exit from openflow_thread"
187 def update_of_flows(self
, net_id
):
189 self
.db_lock
.acquire()
190 select_
= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
191 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
, WHERE
={'uuid':net_id
} )
192 #get all the networks binding to this
194 if nets
[0]['bind_net']:
195 bind_id
= nets
[0]['bind_net']
198 #get our net and all bind_nets
199 result
, nets
= self
.db
.get_table(FROM
='nets', SELECT
=select_
,
200 WHERE_OR
={'bind_net':bind_id
, 'uuid':bind_id
} )
202 self
.db_lock
.release()
204 return -1, "DB error getting net: " + nets
206 #net has been deleted
211 if net
['admin_state_up'] == 'false':
214 self
.db_lock
.acquire()
215 nb_ports
, net_ports
= self
.db
.get_table(
217 SELECT
=('switch_port','vlan','uuid','mac','type','model'),
218 WHERE
={'net_id':net_id
, 'admin_state_up':'true', 'status':'ACTIVE'} )
219 self
.db_lock
.release()
222 #print self.name, ": update_of_flows() ERROR getting ports", ports
223 return -1, "DB error getting ports from net '%s': %s" % (net_id
, net_ports
)
225 #add the binding as an external port
226 if net
['provider'] and net
['provider'][:9]=="openflow:":
227 external_port
={"type":"external","mac":None}
228 external_port
['uuid'] = net_id
+ ".1" #fake uuid
229 if net
['provider'][-5:]==":vlan":
230 external_port
["vlan"] = net
["vlan"]
231 external_port
["switch_port"] = net
['provider'][9:-5]
233 external_port
["vlan"] = None
234 external_port
["switch_port"] = net
['provider'][9:]
235 net_ports
= net_ports
+ (external_port
,)
237 net
['ports'] = net_ports
238 ifaces_nb
+= nb_ports
240 # Get the name of flows that will be affected by this NET
241 self
.db_lock
.acquire()
242 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':net_id
})
243 self
.db_lock
.release()
245 error_msg
= "DB error getting flows from net '{}': {}".format(net_id
, database_net_flows
)
246 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
248 database_flows
+= database_net_flows
249 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
250 self
.db_lock
.acquire()
251 result
, database_net_flows
= self
.db
.get_table(FROM
='of_flows', WHERE
={'net_id':None})
252 self
.db_lock
.release()
254 error_msg
= "DB error getting flows from net 'null': {}".format(database_net_flows
)
255 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
257 database_flows
+= database_net_flows
259 # Get the existing flows at openflow controller
261 of_flows
= self
.OF_connector
.get_of_rules()
262 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
263 except openflow_conn
.OpenflowconnException
as e
:
264 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
265 return -1, "OF error {} getting flows".format(str(e
))
269 elif net
['type'] == 'ptp':
271 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
272 # str(ifaces_nb)+' interfaces.'
273 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
274 elif net
['type'] == 'data':
275 if ifaces_nb
> 2 and self
.pmp_with_same_vlan
:
276 # check all ports are VLAN (tagged) or none
279 if port
["type"]=="external":
280 if port
["vlan"] != None:
281 if port
["vlan"]!=net
["vlan"]:
282 text
="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
287 elif vlan_tag
==False:
288 text
="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
289 #print self.name, "Error", text
294 elif vlan_tag
== True:
295 text
="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
296 #print self.name, "Error", text
298 elif port
["model"]=="PF" or port
["model"]=="VFnotShared":
302 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
303 #print self.name, "Error", text
305 elif port
["model"] == "VF":
308 elif vlan_tag
==False:
309 text
="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
310 #print self.name, "Error", text
313 return -1, 'Only ptp and data networks are supported for openflow'
315 # calculate new flows to be inserted
316 result
, new_flows
= self
._compute
_net
_flows
(nets
)
318 return result
, new_flows
320 #modify database flows format and get the used names
322 for flow
in database_flows
:
325 except FlowBadFormat
as e
:
326 self
.logger
.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e
), str(flow
))
328 used_names
.append(flow
['name'])
330 # insert at database the new flows, change actions to human text
331 for flow
in new_flows
:
332 # 1 check if an equal flow is already present
333 index
= self
._check
_flow
_already
_present
(flow
, database_flows
)
335 database_flows
[index
]["not delete"]=True
336 self
.logger
.debug("Skipping already present flow %s", str(flow
))
338 # 2 look for a non used name
339 flow_name
=flow
["net_id"]+"."+str(name_index
)
340 while flow_name
in used_names
or flow_name
in of_flows
:
342 flow_name
=flow
["net_id"]+"."+str(name_index
)
343 used_names
.append(flow_name
)
344 flow
['name'] = flow_name
345 # 3 insert at openflow
348 self
.OF_connector
.new_flow(flow
)
349 except openflow_conn
.OpenflowconnException
as e
:
350 return -1, "Error creating new flow {}".format(str(e
))
352 # 4 insert at database
355 except FlowBadFormat
as e
:
356 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
358 self
.db_lock
.acquire()
359 result
, content
= self
.db
.new_row('of_flows', flow
)
360 self
.db_lock
.release()
362 # print self.name, ": Error '%s' at database insertion" % content, flow
365 #delete not needed old flows from openflow and from DDBB,
366 #check that the needed flows at DDBB are present in controller or insert them otherwise
367 for flow
in database_flows
:
368 if "not delete" in flow
:
369 if flow
["name"] not in of_flows
:
370 # not in controller, insert it
372 self
.OF_connector
.new_flow(flow
)
373 except openflow_conn
.OpenflowconnException
as e
:
374 return -1, "Error creating new flow {}".format(str(e
))
378 if flow
["name"] in of_flows
:
380 self
.OF_connector
.del_flow(flow
['name'])
381 except openflow_conn
.OpenflowconnException
as e
:
382 self
.logger
.error("cannot delete flow '%s' from OF: %s", flow
['name'], str(e
))
383 # skip deletion from database
386 # delete from database
387 self
.db_lock
.acquire()
388 result
, content
= self
.db
.delete_row_by_key('of_flows', 'id', flow
['id'])
389 self
.db_lock
.release()
391 self
.logger
.error("cannot delete flow '%s' from DB: %s", flow
['name'], content
)
395 def clear_all_flows(self
):
398 self
.OF_connector
.clear_all_flows()
400 # remove from database
401 self
.db_lock
.acquire()
402 self
.db
.delete_row_by_key('of_flows', None, None) #this will delete all lines
403 self
.db_lock
.release()
405 except openflow_conn
.OpenflowconnException
as e
:
406 return -1, self
.logger
.error("Error deleting all flows {}", str(e
))
408 flow_fields
= ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
410 def _check_flow_already_present(self
, new_flow
, flow_list
):
411 '''check if the same flow is already present in the flow list
412 The flow is repeated if all the fields, apart from name, are equal
413 Return the index of matching flow, -1 if not match'''
415 for flow
in flow_list
:
417 for f
in self
.flow_fields
:
418 if flow
.get(f
) != new_flow
.get(f
):
426 def _compute_net_flows(self
, nets
):
428 new_broadcast_flows
={}
431 # Check switch_port information is right
432 self
.logger
.debug("_compute_net_flows nets: %s", str(nets
))
434 for port
in net
['ports']:
436 if not self
.test
and str(port
['switch_port']) not in self
.OF_connector
.pp2ofi
:
437 error_text
= "switch port name '%s' is not valid for the openflow controller" % str(port
['switch_port'])
438 # print self.name, ": ERROR " + error_text
439 return -1, error_text
442 net_id
= net_src
["uuid"]
446 if net_src
== net_dst
:
449 elif net_src
['bind_net'] == net_dst
['uuid']:
450 if net_src
.get('bind_type') and net_src
['bind_type'][0:5] == "vlan:":
451 vlan_net_out
= int(net_src
['bind_type'][5:])
453 elif net_dst
['bind_net'] == net_src
['uuid']:
454 if net_dst
.get('bind_type') and net_dst
['bind_type'][0:5] == "vlan:":
455 vlan_net_in
= int(net_dst
['bind_type'][5:])
460 for src_port
in net_src
['ports']:
461 vlan_in
= vlan_net_in
462 if vlan_in
== None and src_port
['vlan'] != None:
463 vlan_in
= src_port
['vlan']
464 elif vlan_in
!= None and src_port
['vlan'] != None:
465 #TODO this is something that we cannot do. It requires a double VLAN check
466 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
470 broadcast_key
= src_port
['uuid'] + "." + str(vlan_in
)
471 if broadcast_key
in new_broadcast_flows
:
472 flow_broadcast
= new_broadcast_flows
[broadcast_key
]
474 flow_broadcast
= {'priority': priority
,
476 'dst_mac': 'ff:ff:ff:ff:ff:ff',
477 "ingress_port": str(src_port
['switch_port']),
480 new_broadcast_flows
[broadcast_key
] = flow_broadcast
481 if vlan_in
is not None:
482 flow_broadcast
['vlan_id'] = str(vlan_in
)
484 for dst_port
in net_dst
['ports']:
485 vlan_out
= vlan_net_out
486 if vlan_out
== None and dst_port
['vlan'] != None:
487 vlan_out
= dst_port
['vlan']
488 elif vlan_out
!= None and dst_port
['vlan'] != None:
489 #TODO this is something that we cannot do. It requires a double VLAN set
490 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
492 #if src_port == dst_port:
494 if src_port
['switch_port'] == dst_port
['switch_port'] and vlan_in
== vlan_out
:
497 "priority": priority
,
499 "ingress_port": str(src_port
['switch_port']),
502 if vlan_in
is not None:
503 flow
['vlan_id'] = str(vlan_in
)
504 # allow that one port have no mac
505 if dst_port
['mac'] is None or nb_ports
==2: # point to point or nets with 2 elements
506 flow
['priority'] = priority
-5 # less priority
508 flow
['dst_mac'] = str(dst_port
['mac'])
512 flow
['actions'].append( ('vlan',None) )
514 flow
['actions'].append( ('vlan', vlan_out
) )
515 flow
['actions'].append( ('out', str(dst_port
['switch_port'])) )
517 if self
._check
_flow
_already
_present
(flow
, new_flows
) >= 0:
518 self
.logger
.debug("Skipping repeated flow '%s'", str(flow
))
521 new_flows
.append(flow
)
524 if nb_ports
<= 2: # point to multipoint or nets with more than 2 elements
526 out
= (vlan_out
, str(dst_port
['switch_port']))
527 if out
not in flow_broadcast
['actions']:
528 flow_broadcast
['actions'].append( out
)
531 for flow_broadcast
in new_broadcast_flows
.values():
532 if len(flow_broadcast
['actions'])==0:
533 continue #nothing to do, skip
534 flow_broadcast
['actions'].sort()
535 if 'vlan_id' in flow_broadcast
:
536 previous_vlan
= 0 # indicates that a packet contains a vlan, and the vlan
541 for action
in flow_broadcast
['actions']:
542 if action
[0] != previous_vlan
:
543 final_actions
.append( ('vlan', action
[0]) )
544 previous_vlan
= action
[0]
545 if self
.pmp_with_same_vlan
and action_number
:
546 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
548 final_actions
.append( ('out', action
[1]) )
549 flow_broadcast
['actions'] = final_actions
551 if self
._check
_flow
_already
_present
(flow_broadcast
, new_flows
) >= 0:
552 self
.logger
.debug("Skipping repeated flow '%s'", str(flow_broadcast
))
555 new_flows
.append(flow_broadcast
)
557 #UNIFY openflow rules with the same input port and vlan and the same output actions
558 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
559 #this can happen if there is only two ports. It is converted to a point to point connection
560 flow_dict
={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
561 for flow
in new_flows
:
562 key
= str(flow
.get("vlan_id"))+":"+flow
["ingress_port"]
564 flow_dict
[key
].append(flow
)
566 flow_dict
[key
]=[ flow
]
568 for flow_list
in flow_dict
.values():
570 if len (flow_list
)>=2:
573 if f
['actions'] != flow_list
[0]['actions']:
576 if convert2ptp
: # add only one unified rule without dst_mac
577 self
.logger
.debug("Convert flow rules to NON mac dst_address " + str(flow_list
) )
578 flow_list
[0].pop('dst_mac')
579 flow_list
[0]["priority"] -= 5
580 new_flows2
.append(flow_list
[0])
581 else: # add all the rules
582 new_flows2
+= flow_list
585 def set_openflow_controller_status(self
, status
, error_text
=None):
587 Set openflow controller last operation status in DB
588 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
589 :param error_text: error text
592 if self
.of_uuid
== "Default":
596 ofc
['status'] = status
597 ofc
['last_error'] = self
._format
_error
_msg
(error_text
, 255)
598 self
.db_lock
.acquire()
599 result
, content
= self
.db
.update_rows('ofcs', ofc
, WHERE
={'uuid': self
.of_uuid
}, log
=False)
600 self
.db_lock
.release()