fix moving openvim version/date/db_version to ovim
[osm/openvim.git] / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow floodligth controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
42
43 def change_of2db(flow):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
48 action_str_list=[]
49 if type(flow)!=dict or "actions" not in flow:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
51 try:
52 for action in flow['actions']:
53 action_str_list.append( action[0] + "=" + str(action[1]) )
54 flow['actions'] = ",".join(action_str_list)
55 except:
56 raise FlowBadFormat("Unexpected format at 'actions'")
57
58 def change_db2of(flow):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
61 a double tuple list
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
64 actions=[]
65 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list = flow['actions'].split(",")
68 for action_item in action_list:
69 action_tuple = action_item.split("=")
70 if len(action_tuple) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple[0].strip().lower()=="vlan":
73 if action_tuple[1].strip().lower() in ("none", "strip"):
74 actions.append( ("vlan",None) )
75 else:
76 try:
77 actions.append( ("vlan", int(action_tuple[1])) )
78 except:
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple[0].strip().lower()=="out":
81 actions.append( ("out", str(action_tuple[1])) )
82 else:
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
84 flow['actions'] = actions
85
86
87 class of_test_connector():
88 '''This is a fake openflow connector for testing.
89 It does nothing and it is used for running openvim without an openflow controller
90 '''
91 def __init__(self, params):
92 name = params.get("name", "test-ofc")
93 self.name = name
94 self.dpid = params.get("dpid")
95 self.rules= {}
96 self.logger = logging.getLogger('vim.OF.TEST')
97 self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR")))
98 self.pp2ofi = {}
99
100 def get_of_switches(self):
101 return 0, ()
102
103 def obtain_port_correspondence(self):
104 return 0, ()
105
106 def del_flow(self, flow_name):
107 if flow_name in self.rules:
108 self.logger.debug("del_flow OK")
109 del self.rules[flow_name]
110 return 0, None
111 else:
112 self.logger.warning("del_flow not found")
113 return -1, "flow %s not found"
114
115 def new_flow(self, data):
116 self.rules[ data["name"] ] = data
117 self.logger.debug("new_flow OK")
118 return 0, None
119
120 def get_of_rules(self, translate_of_ports=True):
121 return 0, self.rules
122
123 def clear_all_flows(self):
124 self.logger.debug("clear_all_flows OK")
125 self.rules={}
126 return 0, None
127
128
129
130 class openflow_thread(threading.Thread):
131 def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
132 threading.Thread.__init__(self)
133 self.of_uuid = of_uuid
134 self.db = db
135 self.pmp_with_same_vlan = pmp_with_same_vlan
136 self.name = "openflow"
137 self.test = of_test
138 self.db_lock = db_lock
139 self.OF_connector = OF_connector
140 self.logger = logging.getLogger('vim.OF-' + of_uuid)
141 self.logger.setLevel( getattr(logging, debug) )
142 self.logger.name = OF_connector.name + " " + self.OF_connector.dpid
143 self.queueLock = threading.Lock()
144 self.taskQueue = Queue.Queue(2000)
145
146 def insert_task(self, task, *aditional):
147 try:
148 self.queueLock.acquire()
149 task = self.taskQueue.put( (task,) + aditional, timeout=5)
150 self.queueLock.release()
151 return 1, None
152 except Queue.Full:
153 return -1, "timeout inserting a task over openflow thread " + self.name
154
155 def run(self):
156 self.logger.debug("Start openflow thread")
157 while True:
158 self.queueLock.acquire()
159 if not self.taskQueue.empty():
160 task = self.taskQueue.get()
161 else:
162 task = None
163 self.queueLock.release()
164
165 if task is None:
166 time.sleep(1)
167 continue
168
169 if task[0] == 'update-net':
170 r,c = self.update_of_flows(task[1])
171 #update database status
172 self.db_lock.acquire()
173 if r<0:
174 UPDATE={'status':'ERROR', 'last_error': str(c)}
175 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
176 else:
177 UPDATE={'status':'ACTIVE', 'last_error': None}
178 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
179 self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
180 self.db_lock.release()
181
182 elif task[0] == 'clear-all':
183 r,c = self.clear_all_flows()
184 if r<0:
185 self.logger.error("processing task 'clear-all': %s", c)
186 else:
187 self.logger.debug("processing task 'clear-all': OK")
188 elif task[0] == 'exit':
189 self.logger.debug("exit from openflow_thread")
190 self.terminate()
191 return 0
192 else:
193 self.logger.error("unknown task %s", str(task))
194
195 def terminate(self):
196 pass
197 #print self.name, ": exit from openflow_thread"
198
199 def update_of_flows(self, net_id):
200 ports=()
201 self.db_lock.acquire()
202 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
203 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
204 #get all the networks binding to this
205 if result > 0:
206 if nets[0]['bind_net']:
207 bind_id = nets[0]['bind_net']
208 else:
209 bind_id = net_id
210 #get our net and all bind_nets
211 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
212 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
213
214 self.db_lock.release()
215 if result < 0:
216 return -1, "DB error getting net: " + nets
217 #elif result==0:
218 #net has been deleted
219 ifaces_nb = 0
220 database_flows = []
221 for net in nets:
222 net_id = net["uuid"]
223 if net['admin_state_up'] == 'false':
224 net['ports'] = ()
225 else:
226 self.db_lock.acquire()
227 nb_ports, net_ports = self.db.get_table(
228 FROM='ports',
229 SELECT=('switch_port','vlan','uuid','mac','type','model'),
230 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
231 self.db_lock.release()
232 if nb_ports < 0:
233 #print self.name, ": update_of_flows() ERROR getting ports", ports
234 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
235
236 #add the binding as an external port
237 if net['provider'] and net['provider'][:9]=="openflow:":
238 external_port={"type":"external","mac":None}
239 external_port['uuid'] = net_id + ".1" #fake uuid
240 if net['provider'][-5:]==":vlan":
241 external_port["vlan"] = net["vlan"]
242 external_port["switch_port"] = net['provider'][9:-5]
243 else:
244 external_port["vlan"] = None
245 external_port["switch_port"] = net['provider'][9:]
246 net_ports = net_ports + (external_port,)
247 nb_ports += 1
248 net['ports'] = net_ports
249 ifaces_nb += nb_ports
250
251 # Get the name of flows that will be affected by this NET
252 self.db_lock.acquire()
253 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
254 self.db_lock.release()
255 if result < 0:
256 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
257 return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
258 database_flows += database_net_flows
259 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
260 self.db_lock.acquire()
261 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
262 self.db_lock.release()
263 if result < 0:
264 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
265 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
266 database_flows += database_net_flows
267
268 #Get the existing flows at openflow controller
269 result, of_flows = self.OF_connector.get_of_rules()
270 if result < 0:
271 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
272 return -1, "OF error getting flows: " + of_flows
273
274 if ifaces_nb < 2:
275 pass
276 elif net['type'] == 'ptp':
277 if ifaces_nb > 2:
278 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
279 # str(ifaces_nb)+' interfaces.'
280 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
281 elif net['type'] == 'data':
282 if ifaces_nb > 2 and self.pmp_with_same_vlan:
283 # check all ports are VLAN (tagged) or none
284 vlan_tag = None
285 for port in ports:
286 if port["type"]=="external":
287 if port["vlan"] != None:
288 if port["vlan"]!=net["vlan"]:
289 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
291 return -1, text
292 if vlan_tag == None:
293 vlan_tag=True
294 elif vlan_tag==False:
295 text="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
296 #print self.name, "Error", text
297 return -1, text
298 else:
299 if vlan_tag == None:
300 vlan_tag=False
301 elif vlan_tag == True:
302 text="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
303 #print self.name, "Error", text
304 return -1, text
305 elif port["model"]=="PF" or port["model"]=="VFnotShared":
306 if vlan_tag == None:
307 vlan_tag=False
308 elif vlan_tag==True:
309 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
310 #print self.name, "Error", text
311 return -1, text
312 elif port["model"] == "VF":
313 if vlan_tag == None:
314 vlan_tag=True
315 elif vlan_tag==False:
316 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
317 #print self.name, "Error", text
318 return -1, text
319 else:
320 return -1, 'Only ptp and data networks are supported for openflow'
321
322 # calculate new flows to be inserted
323 result, new_flows = self._compute_net_flows(nets)
324 if result < 0:
325 return result, new_flows
326
327 #modify database flows format and get the used names
328 used_names=[]
329 for flow in database_flows:
330 try:
331 change_db2of(flow)
332 except FlowBadFormat as e:
333 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
334 continue
335 used_names.append(flow['name'])
336 name_index=0
337 #insert at database the new flows, change actions to human text
338 for flow in new_flows:
339 #1 check if an equal flow is already present
340 index = self._check_flow_already_present(flow, database_flows)
341 if index>=0:
342 database_flows[index]["not delete"]=True
343 self.logger.debug("Skipping already present flow %s", str(flow))
344 continue
345 #2 look for a non used name
346 flow_name=flow["net_id"]+"."+str(name_index)
347 while flow_name in used_names or flow_name in of_flows:
348 name_index += 1
349 flow_name=flow["net_id"]+"."+str(name_index)
350 used_names.append(flow_name)
351 flow['name'] = flow_name
352 #3 insert at openflow
353 result, content = self.OF_connector.new_flow(flow)
354 if result < 0:
355 #print self.name, ": Error '%s' at flow insertion" % c, flow
356 return -1, content
357 #4 insert at database
358 try:
359 change_of2db(flow)
360 except FlowBadFormat as e:
361 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
362 return -1, str(e)
363 self.db_lock.acquire()
364 result, content = self.db.new_row('of_flows', flow)
365 self.db_lock.release()
366 if result < 0:
367 #print self.name, ": Error '%s' at database insertion" % content, flow
368 return -1, content
369
370 #delete not needed old flows from openflow and from DDBB,
371 #check that the needed flows at DDBB are present in controller or insert them otherwise
372 for flow in database_flows:
373 if "not delete" in flow:
374 if flow["name"] not in of_flows:
375 #not in controller, insert it
376 result, content = self.OF_connector.new_flow(flow)
377 if result < 0:
378 #print self.name, ": Error '%s' at flow insertion" % c, flow
379 return -1, content
380 continue
381 #Delete flow
382 if flow["name"] in of_flows:
383 result, content = self.OF_connector.del_flow(flow['name'])
384 if result<0:
385 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
386 continue #skip deletion from database
387 #delete from database
388 self.db_lock.acquire()
389 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
390 self.db_lock.release()
391 if result<0:
392 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
393
394 return 0, 'Success'
395
396 def clear_all_flows(self):
397 try:
398 if not self.test:
399 self.OF_connector.clear_all_flows()
400 #remove from database
401 self.db_lock.acquire()
402 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
403 self.db_lock.release()
404 return 0, None
405 except requests.exceptions.RequestException as e:
406 #print self.name, ": clear_all_flows Exception:", str(e)
407 return -1, str(e)
408
409 flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
410 def _check_flow_already_present(self, new_flow, flow_list):
411 '''check if the same flow is already present in the flow list
412 The flow is repeated if all the fields, apart from name, are equal
413 Return the index of matching flow, -1 if not match'''
414 index=0
415 for flow in flow_list:
416 equal=True
417 for f in self.flow_fields:
418 if flow.get(f) != new_flow.get(f):
419 equal=False
420 break
421 if equal:
422 return index
423 index += 1
424 return -1
425
426 def _compute_net_flows(self, nets):
427 new_flows=[]
428 new_broadcast_flows={}
429 nb_ports = 0
430
431 # Check switch_port information is right
432 self.logger.debug("_compute_net_flows nets: %s", str(nets))
433 for net in nets:
434 for port in net['ports']:
435 nb_ports += 1
436 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
437 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
438 #print self.name, ": ERROR " + error_text
439 return -1, error_text
440
441 for net_src in nets:
442 net_id = net_src["uuid"]
443 for net_dst in nets:
444 vlan_net_in = None
445 vlan_net_out = None
446 if net_src == net_dst:
447 #intra net rules
448 priority = 1000
449 elif net_src['bind_net'] == net_dst['uuid']:
450 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
451 vlan_net_out = int(net_src['bind_type'][5:])
452 priority = 1100
453 elif net_dst['bind_net'] == net_src['uuid']:
454 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
455 vlan_net_in = int(net_dst['bind_type'][5:])
456 priority = 1100
457 else:
458 #nets not binding
459 continue
460 for src_port in net_src['ports']:
461 vlan_in = vlan_net_in
462 if vlan_in == None and src_port['vlan'] != None:
463 vlan_in = src_port['vlan']
464 elif vlan_in != None and src_port['vlan'] != None:
465 #TODO this is something that we can not do. It requires a double VLAN check
466 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
467 continue
468
469 # BROADCAST:
470 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
471 if broadcast_key in new_broadcast_flows:
472 flow_broadcast = new_broadcast_flows[broadcast_key]
473 else:
474 flow_broadcast = {'priority': priority,
475 'net_id': net_id,
476 'dst_mac': 'ff:ff:ff:ff:ff:ff',
477 "ingress_port": str(src_port['switch_port']),
478 'actions': []
479 }
480 new_broadcast_flows[broadcast_key] = flow_broadcast
481 if vlan_in is not None:
482 flow_broadcast['vlan_id'] = str(vlan_in)
483
484 for dst_port in net_dst['ports']:
485 vlan_out = vlan_net_out
486 if vlan_out == None and dst_port['vlan'] != None:
487 vlan_out = dst_port['vlan']
488 elif vlan_out != None and dst_port['vlan'] != None:
489 #TODO this is something that we can not do. It requires a double VLAN set
490 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
491 continue
492 #if src_port == dst_port:
493 # continue
494 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
495 continue
496 flow = {
497 "priority": priority,
498 'net_id': net_id,
499 "ingress_port": str(src_port['switch_port']),
500 'actions': []
501 }
502 if vlan_in is not None:
503 flow['vlan_id'] = str(vlan_in)
504 # allow that one port have no mac
505 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
506 flow['priority'] = priority-5 # less priority
507 else:
508 flow['dst_mac'] = str(dst_port['mac'])
509
510 if vlan_out == None:
511 if vlan_in != None:
512 flow['actions'].append( ('vlan',None) )
513 else:
514 flow['actions'].append( ('vlan', vlan_out ) )
515 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
516
517 if self._check_flow_already_present(flow, new_flows) >= 0:
518 self.logger.debug("Skipping repeated flow '%s'", str(flow))
519 continue
520
521 new_flows.append(flow)
522
523 # BROADCAST:
524 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
525 continue
526 out = (vlan_out, str(dst_port['switch_port']))
527 if out not in flow_broadcast['actions']:
528 flow_broadcast['actions'].append( out )
529
530 #BROADCAST
531 for flow_broadcast in new_broadcast_flows.values():
532 if len(flow_broadcast['actions'])==0:
533 continue #nothing to do, skip
534 flow_broadcast['actions'].sort()
535 if 'vlan_id' in flow_broadcast:
536 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
537 else:
538 previous_vlan = None
539 final_actions=[]
540 action_number = 0
541 for action in flow_broadcast['actions']:
542 if action[0] != previous_vlan:
543 final_actions.append( ('vlan', action[0]) )
544 previous_vlan = action[0]
545 if self.pmp_with_same_vlan and action_number:
546 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
547 action_number += 1
548 final_actions.append( ('out', action[1]) )
549 flow_broadcast['actions'] = final_actions
550
551 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
552 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
553 continue
554
555 new_flows.append(flow_broadcast)
556
557 #UNIFY openflow rules with the same input port and vlan and the same output actions
558 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
559 #this can happen if there is only two ports. It is converted to a point to point connection
560 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
561 for flow in new_flows:
562 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
563 if key in flow_dict:
564 flow_dict[key].append(flow)
565 else:
566 flow_dict[key]=[ flow ]
567 new_flows2=[]
568 for flow_list in flow_dict.values():
569 convert2ptp=False
570 if len (flow_list)>=2:
571 convert2ptp=True
572 for f in flow_list:
573 if f['actions'] != flow_list[0]['actions']:
574 convert2ptp=False
575 break
576 if convert2ptp: # add only one unified rule without dst_mac
577 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
578 flow_list[0].pop('dst_mac')
579 flow_list[0]["priority"] -= 5
580 new_flows2.append(flow_list[0])
581 else: # add all the rules
582 new_flows2 += flow_list
583 return 0, new_flows2
584