new_external_port and DB table adds
[osm/openvim.git] / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow floodligth controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
42
43 def change_of2db(flow):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
48 action_str_list=[]
49 if type(flow)!=dict or "actions" not in flow:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
51 try:
52 for action in flow['actions']:
53 action_str_list.append( action[0] + "=" + str(action[1]) )
54 flow['actions'] = ",".join(action_str_list)
55 except:
56 raise FlowBadFormat("Unexpected format at 'actions'")
57
58 def change_db2of(flow):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
61 a double tuple list
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
64 actions=[]
65 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list = flow['actions'].split(",")
68 for action_item in action_list:
69 action_tuple = action_item.split("=")
70 if len(action_tuple) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple[0].strip().lower()=="vlan":
73 if action_tuple[1].strip().lower() in ("none", "strip"):
74 actions.append( ("vlan",None) )
75 else:
76 try:
77 actions.append( ("vlan", int(action_tuple[1])) )
78 except:
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple[0].strip().lower()=="out":
81 actions.append( ("out", str(action_tuple[1])) )
82 else:
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
84 flow['actions'] = actions
85
86
87 class of_test_connector():
88 '''This is a fake openflow connector for testing.
89 It does nothing and it is used for running openvim without an openflow controller
90 '''
91 def __init__(self, params):
92 name = params.get("name", "test-ofc")
93 self.name = name
94 self.dpid = params.get("dpid")
95 self.rules= {}
96 self.logger = logging.getLogger('vim.OF.TEST')
97 self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR")))
98
99 def get_of_switches(self):
100 return 0, ()
101 def obtain_port_correspondence(self):
102 return 0, ()
103 def del_flow(self, flow_name):
104 if flow_name in self.rules:
105 self.logger.debug("del_flow OK")
106 del self.rules[flow_name]
107 return 0, None
108 else:
109 self.logger.warning("del_flow not found")
110 return -1, "flow %s not found"
111 def new_flow(self, data):
112 self.rules[ data["name"] ] = data
113 self.logger.debug("new_flow OK")
114 return 0, None
115 def get_of_rules(self, translate_of_ports=True):
116 return 0, self.rules
117
118 def clear_all_flows(self):
119 self.logger.debug("clear_all_flows OK")
120 self.rules={}
121 return 0, None
122
123
124
125 class openflow_thread(threading.Thread):
126 def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
127 threading.Thread.__init__(self)
128 self.of_uuid = of_uuid
129 self.db = db
130 self.pmp_with_same_vlan = pmp_with_same_vlan
131 self.name = "openflow"
132 self.test = of_test
133 self.db_lock = db_lock
134 self.OF_connector = OF_connector
135 self.logger = logging.getLogger('vim.OF-' + of_uuid)
136 self.logger.setLevel( getattr(logging, debug) )
137 self.logger.name = OF_connector.name + " " + self.OF_connector.dpid
138 self.queueLock = threading.Lock()
139 self.taskQueue = Queue.Queue(2000)
140
141 def insert_task(self, task, *aditional):
142 try:
143 self.queueLock.acquire()
144 task = self.taskQueue.put( (task,) + aditional, timeout=5)
145 self.queueLock.release()
146 return 1, None
147 except Queue.Full:
148 return -1, "timeout inserting a task over openflow thread " + self.name
149
150 def run(self):
151 self.logger.debug("Start openflow thread")
152 while True:
153 self.queueLock.acquire()
154 if not self.taskQueue.empty():
155 task = self.taskQueue.get()
156 else:
157 task = None
158 self.queueLock.release()
159
160 if task is None:
161 time.sleep(1)
162 continue
163
164 if task[0] == 'update-net':
165 r,c = self.update_of_flows(task[1])
166 #update database status
167 self.db_lock.acquire()
168 if r<0:
169 UPDATE={'status':'ERROR', 'last_error': str(c)}
170 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
171 else:
172 UPDATE={'status':'ACTIVE', 'last_error': None}
173 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
174 self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
175 self.db_lock.release()
176
177 elif task[0] == 'clear-all':
178 r,c = self.clear_all_flows()
179 if r<0:
180 self.logger.error("processing task 'clear-all': %s", c)
181 else:
182 self.logger.debug("processing task 'clear-all': OK")
183 elif task[0] == 'exit':
184 self.logger.debug("exit from openflow_thread")
185 self.terminate()
186 return 0
187 else:
188 self.logger.error("unknown task %s", str(task))
189
190 def terminate(self):
191 pass
192 #print self.name, ": exit from openflow_thread"
193
194 def update_of_flows(self, net_id):
195 ports=()
196 self.db_lock.acquire()
197 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
198 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
199 #get all the networks binding to this
200 if result > 0:
201 if nets[0]['bind_net']:
202 bind_id = nets[0]['bind_net']
203 else:
204 bind_id = net_id
205 #get our net and all bind_nets
206 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
207 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
208
209 self.db_lock.release()
210 if result < 0:
211 return -1, "DB error getting net: " + nets
212 #elif result==0:
213 #net has been deleted
214 ifaces_nb = 0
215 database_flows = []
216 for net in nets:
217 net_id = net["uuid"]
218 if net['admin_state_up'] == 'false':
219 net['ports'] = ()
220 else:
221 self.db_lock.acquire()
222 nb_ports, net_ports = self.db.get_table(
223 FROM='ports',
224 SELECT=('switch_port','vlan','uuid','mac','type','model'),
225 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
226 self.db_lock.release()
227 if nb_ports < 0:
228 #print self.name, ": update_of_flows() ERROR getting ports", ports
229 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
230
231 #add the binding as an external port
232 if net['provider'] and net['provider'][:9]=="openflow:":
233 external_port={"type":"external","mac":None}
234 external_port['uuid'] = net_id + ".1" #fake uuid
235 if net['provider'][-5:]==":vlan":
236 external_port["vlan"] = net["vlan"]
237 external_port["switch_port"] = net['provider'][9:-5]
238 else:
239 external_port["vlan"] = None
240 external_port["switch_port"] = net['provider'][9:]
241 net_ports = net_ports + (external_port,)
242 nb_ports += 1
243 net['ports'] = net_ports
244 ifaces_nb += nb_ports
245
246 # Get the name of flows that will be affected by this NET
247 self.db_lock.acquire()
248 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
249 self.db_lock.release()
250 if result < 0:
251 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
252 return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
253 database_flows += database_net_flows
254 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
255 self.db_lock.acquire()
256 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
257 self.db_lock.release()
258 if result < 0:
259 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
260 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
261 database_flows += database_net_flows
262
263 #Get the existing flows at openflow controller
264 result, of_flows = self.OF_connector.get_of_rules()
265 if result < 0:
266 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
267 return -1, "OF error getting flows: " + of_flows
268
269 if ifaces_nb < 2:
270 pass
271 elif net['type'] == 'ptp':
272 if ifaces_nb > 2:
273 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
274 # str(ifaces_nb)+' interfaces.'
275 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
276 elif net['type'] == 'data':
277 if ifaces_nb > 2 and self.pmp_with_same_vlan:
278 # check all ports are VLAN (tagged) or none
279 vlan_tag = None
280 for port in ports:
281 if port["type"]=="external":
282 if port["vlan"] != None:
283 if port["vlan"]!=net["vlan"]:
284 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
285 #print self.name, "Error", text
286 return -1, text
287 if vlan_tag == None:
288 vlan_tag=True
289 elif vlan_tag==False:
290 text="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
291 #print self.name, "Error", text
292 return -1, text
293 else:
294 if vlan_tag == None:
295 vlan_tag=False
296 elif vlan_tag == True:
297 text="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
298 #print self.name, "Error", text
299 return -1, text
300 elif port["model"]=="PF" or port["model"]=="VFnotShared":
301 if vlan_tag == None:
302 vlan_tag=False
303 elif vlan_tag==True:
304 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
305 #print self.name, "Error", text
306 return -1, text
307 elif port["model"] == "VF":
308 if vlan_tag == None:
309 vlan_tag=True
310 elif vlan_tag==False:
311 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
312 #print self.name, "Error", text
313 return -1, text
314 else:
315 return -1, 'Only ptp and data networks are supported for openflow'
316
317 # calculate new flows to be inserted
318 result, new_flows = self._compute_net_flows(nets)
319 if result < 0:
320 return result, new_flows
321
322 #modify database flows format and get the used names
323 used_names=[]
324 for flow in database_flows:
325 try:
326 change_db2of(flow)
327 except FlowBadFormat as e:
328 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
329 continue
330 used_names.append(flow['name'])
331 name_index=0
332 #insert at database the new flows, change actions to human text
333 for flow in new_flows:
334 #1 check if an equal flow is already present
335 index = self._check_flow_already_present(flow, database_flows)
336 if index>=0:
337 database_flows[index]["not delete"]=True
338 self.logger.debug("Skipping already present flow %s", str(flow))
339 continue
340 #2 look for a non used name
341 flow_name=flow["net_id"]+"."+str(name_index)
342 while flow_name in used_names or flow_name in of_flows:
343 name_index += 1
344 flow_name=flow["net_id"]+"."+str(name_index)
345 used_names.append(flow_name)
346 flow['name'] = flow_name
347 #3 insert at openflow
348 result, content = self.OF_connector.new_flow(flow)
349 if result < 0:
350 #print self.name, ": Error '%s' at flow insertion" % c, flow
351 return -1, content
352 #4 insert at database
353 try:
354 change_of2db(flow)
355 except FlowBadFormat as e:
356 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
357 return -1, str(e)
358 self.db_lock.acquire()
359 result, content = self.db.new_row('of_flows', flow)
360 self.db_lock.release()
361 if result < 0:
362 #print self.name, ": Error '%s' at database insertion" % content, flow
363 return -1, content
364
365 #delete not needed old flows from openflow and from DDBB,
366 #check that the needed flows at DDBB are present in controller or insert them otherwise
367 for flow in database_flows:
368 if "not delete" in flow:
369 if flow["name"] not in of_flows:
370 #not in controller, insert it
371 result, content = self.OF_connector.new_flow(flow)
372 if result < 0:
373 #print self.name, ": Error '%s' at flow insertion" % c, flow
374 return -1, content
375 continue
376 #Delete flow
377 if flow["name"] in of_flows:
378 result, content = self.OF_connector.del_flow(flow['name'])
379 if result<0:
380 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
381 continue #skip deletion from database
382 #delete from database
383 self.db_lock.acquire()
384 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
385 self.db_lock.release()
386 if result<0:
387 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
388
389 return 0, 'Success'
390
391 def clear_all_flows(self):
392 try:
393 if not self.test:
394 self.OF_connector.clear_all_flows()
395 #remove from database
396 self.db_lock.acquire()
397 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
398 self.db_lock.release()
399 return 0, None
400 except requests.exceptions.RequestException as e:
401 #print self.name, ": clear_all_flows Exception:", str(e)
402 return -1, str(e)
403
404 flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
405 def _check_flow_already_present(self, new_flow, flow_list):
406 '''check if the same flow is already present in the flow list
407 The flow is repeated if all the fields, apart from name, are equal
408 Return the index of matching flow, -1 if not match'''
409 index=0
410 for flow in flow_list:
411 equal=True
412 for f in self.flow_fields:
413 if flow.get(f) != new_flow.get(f):
414 equal=False
415 break
416 if equal:
417 return index
418 index += 1
419 return -1
420
421 def _compute_net_flows(self, nets):
422 new_flows=[]
423 new_broadcast_flows={}
424 nb_ports = 0
425
426 # Check switch_port information is right
427 self.logger.debug("_compute_net_flows nets: %s", str(nets))
428 for net in nets:
429 for port in net['ports']:
430 nb_ports += 1
431 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
432 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
433 #print self.name, ": ERROR " + error_text
434 return -1, error_text
435
436 for net_src in nets:
437 net_id = net_src["uuid"]
438 for net_dst in nets:
439 vlan_net_in = None
440 vlan_net_out = None
441 if net_src == net_dst:
442 #intra net rules
443 priority = 1000
444 elif net_src['bind_net'] == net_dst['uuid']:
445 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
446 vlan_net_out = int(net_src['bind_type'][5:])
447 priority = 1100
448 elif net_dst['bind_net'] == net_src['uuid']:
449 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
450 vlan_net_in = int(net_dst['bind_type'][5:])
451 priority = 1100
452 else:
453 #nets not binding
454 continue
455 for src_port in net_src['ports']:
456 vlan_in = vlan_net_in
457 if vlan_in == None and src_port['vlan'] != None:
458 vlan_in = src_port['vlan']
459 elif vlan_in != None and src_port['vlan'] != None:
460 #TODO this is something that we can not do. It requires a double VLAN check
461 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
462 continue
463
464 # BROADCAST:
465 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
466 if broadcast_key in new_broadcast_flows:
467 flow_broadcast = new_broadcast_flows[broadcast_key]
468 else:
469 flow_broadcast = {'priority': priority,
470 'net_id': net_id,
471 'dst_mac': 'ff:ff:ff:ff:ff:ff',
472 "ingress_port": str(src_port['switch_port']),
473 'actions': []
474 }
475 new_broadcast_flows[broadcast_key] = flow_broadcast
476 if vlan_in is not None:
477 flow_broadcast['vlan_id'] = str(vlan_in)
478
479 for dst_port in net_dst['ports']:
480 vlan_out = vlan_net_out
481 if vlan_out == None and dst_port['vlan'] != None:
482 vlan_out = dst_port['vlan']
483 elif vlan_out != None and dst_port['vlan'] != None:
484 #TODO this is something that we can not do. It requires a double VLAN set
485 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
486 continue
487 #if src_port == dst_port:
488 # continue
489 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
490 continue
491 flow = {
492 "priority": priority,
493 'net_id': net_id,
494 "ingress_port": str(src_port['switch_port']),
495 'actions': []
496 }
497 if vlan_in is not None:
498 flow['vlan_id'] = str(vlan_in)
499 # allow that one port have no mac
500 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
501 flow['priority'] = priority-5 # less priority
502 else:
503 flow['dst_mac'] = str(dst_port['mac'])
504
505 if vlan_out == None:
506 if vlan_in != None:
507 flow['actions'].append( ('vlan',None) )
508 else:
509 flow['actions'].append( ('vlan', vlan_out ) )
510 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
511
512 if self._check_flow_already_present(flow, new_flows) >= 0:
513 self.logger.debug("Skipping repeated flow '%s'", str(flow))
514 continue
515
516 new_flows.append(flow)
517
518 # BROADCAST:
519 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
520 continue
521 out = (vlan_out, str(dst_port['switch_port']))
522 if out not in flow_broadcast['actions']:
523 flow_broadcast['actions'].append( out )
524
525 #BROADCAST
526 for flow_broadcast in new_broadcast_flows.values():
527 if len(flow_broadcast['actions'])==0:
528 continue #nothing to do, skip
529 flow_broadcast['actions'].sort()
530 if 'vlan_id' in flow_broadcast:
531 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
532 else:
533 previous_vlan = None
534 final_actions=[]
535 action_number = 0
536 for action in flow_broadcast['actions']:
537 if action[0] != previous_vlan:
538 final_actions.append( ('vlan', action[0]) )
539 previous_vlan = action[0]
540 if self.pmp_with_same_vlan and action_number:
541 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
542 action_number += 1
543 final_actions.append( ('out', action[1]) )
544 flow_broadcast['actions'] = final_actions
545
546 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
547 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
548 continue
549
550 new_flows.append(flow_broadcast)
551
552 #UNIFY openflow rules with the same input port and vlan and the same output actions
553 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
554 #this can happen if there is only two ports. It is converted to a point to point connection
555 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
556 for flow in new_flows:
557 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
558 if key in flow_dict:
559 flow_dict[key].append(flow)
560 else:
561 flow_dict[key]=[ flow ]
562 new_flows2=[]
563 for flow_list in flow_dict.values():
564 convert2ptp=False
565 if len (flow_list)>=2:
566 convert2ptp=True
567 for f in flow_list:
568 if f['actions'] != flow_list[0]['actions']:
569 convert2ptp=False
570 break
571 if convert2ptp: # add only one unified rule without dst_mac
572 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
573 flow_list[0].pop('dst_mac')
574 flow_list[0]["priority"] -= 5
575 new_flows2.append(flow_list[0])
576 else: # add all the rules
577 new_flows2 += flow_list
578 return 0, new_flows2
579