Allow compute nodes without hugepages
[osm/openvim.git] / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow floodligth controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39
40 class FlowBadFormat(Exception):
41 '''raise when a bad format of flow is found'''
42
43 def change_of2db(flow):
44 '''Change 'flow' dictionary from openflow format to database format
45 Basically the change consist of changing 'flow[actions] from a list of
46 double tuple to a string
47 from [(A,B),(C,D),..] to "A=B,C=D" '''
48 action_str_list=[]
49 if type(flow)!=dict or "actions" not in flow:
50 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
51 try:
52 for action in flow['actions']:
53 action_str_list.append( action[0] + "=" + str(action[1]) )
54 flow['actions'] = ",".join(action_str_list)
55 except:
56 raise FlowBadFormat("Unexpected format at 'actions'")
57
58 def change_db2of(flow):
59 '''Change 'flow' dictionary from database format to openflow format
60 Basically the change consist of changing 'flow[actions]' from a string to
61 a double tuple list
62 from "A=B,C=D,..." to [(A,B),(C,D),..]
63 raise FlowBadFormat '''
64 actions=[]
65 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
66 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
67 action_list = flow['actions'].split(",")
68 for action_item in action_list:
69 action_tuple = action_item.split("=")
70 if len(action_tuple) != 2:
71 raise FlowBadFormat("Expected key=value format at 'actions'")
72 if action_tuple[0].strip().lower()=="vlan":
73 if action_tuple[1].strip().lower() in ("none", "strip"):
74 actions.append( ("vlan",None) )
75 else:
76 try:
77 actions.append( ("vlan", int(action_tuple[1])) )
78 except:
79 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
80 elif action_tuple[0].strip().lower()=="out":
81 actions.append( ("out", str(action_tuple[1])) )
82 else:
83 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
84 flow['actions'] = actions
85
86
87
88 class of_test_connector():
89 '''This is a fake openflow connector for testing.
90 It does nothing and it is used for running openvim without an openflow controller
91 '''
92 def __init__(self, params):
93 self.name = "ofc_test"
94 self.rules={}
95 self.logger = logging.getLogger('vim.OF.TEST')
96 self.logger.setLevel( getattr(logging, params.get("of_debug", "ERROR") ) )
97 def get_of_switches(self):
98 return 0, ()
99 def obtain_port_correspondence(self):
100 return 0, ()
101 def del_flow(self, flow_name):
102 if flow_name in self.rules:
103 self.logger.debug("del_flow OK")
104 del self.rules[flow_name]
105 return 0, None
106 else:
107 self.logger.warning("del_flow not found")
108 return -1, "flow %s not found"
109 def new_flow(self, data):
110 self.rules[ data["name"] ] = data
111 self.logger.debug("new_flow OK")
112 return 0, None
113 def get_of_rules(self, translate_of_ports=True):
114 return 0, self.rules
115
116 def clear_all_flows(self):
117 self.logger.debug("clear_all_flows OK")
118 self.rules={}
119 return 0, None
120
121
122
123 class openflow_thread(threading.Thread):
124 def __init__(self, OF_connector, db, db_lock, of_test, pmp_with_same_vlan, debug='ERROR'):
125 threading.Thread.__init__(self)
126
127 self.db = db
128 self.pmp_with_same_vlan = pmp_with_same_vlan
129 self.name = "openflow"
130 self.test = of_test
131 self.db_lock = db_lock
132 self.OF_connector = OF_connector
133 self.logger = logging.getLogger('vim.OF')
134 self.logger.setLevel( getattr(logging, debug) )
135
136 self.queueLock = threading.Lock()
137 self.taskQueue = Queue.Queue(2000)
138
139 def insert_task(self, task, *aditional):
140 try:
141 self.queueLock.acquire()
142 task = self.taskQueue.put( (task,) + aditional, timeout=5)
143 self.queueLock.release()
144 return 1, None
145 except Queue.Full:
146 return -1, "timeout inserting a task over openflow thread " + self.name
147
148 def run(self):
149 while True:
150 self.queueLock.acquire()
151 if not self.taskQueue.empty():
152 task = self.taskQueue.get()
153 else:
154 task = None
155 self.queueLock.release()
156
157 if task is None:
158 time.sleep(1)
159 continue
160
161 if task[0] == 'update-net':
162 r,c = self.update_of_flows(task[1])
163 #update database status
164 self.db_lock.acquire()
165 if r<0:
166 UPDATE={'status':'ERROR', 'last_error': str(c)}
167 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
168 else:
169 UPDATE={'status':'ACTIVE', 'last_error': None}
170 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
171 self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
172 self.db_lock.release()
173
174 elif task[0] == 'clear-all':
175 r,c = self.clear_all_flows()
176 if r<0:
177 self.logger.error("processing task 'clear-all': %s", c)
178 else:
179 self.logger.debug("processing task 'clear-all': OK")
180 elif task[0] == 'exit':
181 self.logger.debug("exit from openflow_thread")
182 self.terminate()
183 return 0
184 else:
185 self.logger.error("unknown task %s", str(task))
186
187 def terminate(self):
188 pass
189 #print self.name, ": exit from openflow_thread"
190
191 def update_of_flows(self, net_id):
192 ports=()
193 self.db_lock.acquire()
194 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
195 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
196 #get all the networks binding to this
197 if result > 0:
198 if nets[0]['bind_net']:
199 bind_id = nets[0]['bind_net']
200 else:
201 bind_id = net_id
202 #get our net and all bind_nets
203 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
204 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
205
206 self.db_lock.release()
207 if result < 0:
208 return -1, "DB error getting net: " + nets
209 #elif result==0:
210 #net has been deleted
211 ifaces_nb = 0
212 database_flows = []
213 for net in nets:
214 net_id = net["uuid"]
215 if net['admin_state_up'] == 'false':
216 net['ports'] = ()
217 else:
218 self.db_lock.acquire()
219 nb_ports, net_ports = self.db.get_table(
220 FROM='ports',
221 SELECT=('switch_port','vlan','uuid','mac','type','model'),
222 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
223 self.db_lock.release()
224 if nb_ports < 0:
225 #print self.name, ": update_of_flows() ERROR getting ports", ports
226 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
227
228 #add the binding as an external port
229 if net['provider'] and net['provider'][:9]=="openflow:":
230 external_port={"type":"external","mac":None}
231 external_port['uuid'] = net_id + ".1" #fake uuid
232 if net['provider'][-5:]==":vlan":
233 external_port["vlan"] = net["vlan"]
234 external_port["switch_port"] = net['provider'][9:-5]
235 else:
236 external_port["vlan"] = None
237 external_port["switch_port"] = net['provider'][9:]
238 net_ports = net_ports + (external_port,)
239 nb_ports += 1
240 net['ports'] = net_ports
241 ifaces_nb += nb_ports
242
243 # Get the name of flows that will be affected by this NET
244 self.db_lock.acquire()
245 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
246 self.db_lock.release()
247 if result < 0:
248 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
249 return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
250 database_flows += database_net_flows
251 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
252 self.db_lock.acquire()
253 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
254 self.db_lock.release()
255 if result < 0:
256 #print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
257 return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
258 database_flows += database_net_flows
259
260 #Get the existing flows at openflow controller
261 result, of_flows = self.OF_connector.get_of_rules()
262 if result < 0:
263 #print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
264 return -1, "OF error getting flows: " + of_flows
265
266 if ifaces_nb < 2:
267 pass
268 elif net['type'] == 'ptp':
269 if ifaces_nb > 2:
270 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
271 # str(ifaces_nb)+' interfaces.'
272 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
273 elif net['type'] == 'data':
274 if ifaces_nb > 2 and self.pmp_with_same_vlan:
275 # check all ports are VLAN (tagged) or none
276 vlan_tag = None
277 for port in ports:
278 if port["type"]=="external":
279 if port["vlan"] != None:
280 if port["vlan"]!=net["vlan"]:
281 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
282 #print self.name, "Error", text
283 return -1, text
284 if vlan_tag == None:
285 vlan_tag=True
286 elif vlan_tag==False:
287 text="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
288 #print self.name, "Error", text
289 return -1, text
290 else:
291 if vlan_tag == None:
292 vlan_tag=False
293 elif vlan_tag == True:
294 text="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
295 #print self.name, "Error", text
296 return -1, text
297 elif port["model"]=="PF" or port["model"]=="VFnotShared":
298 if vlan_tag == None:
299 vlan_tag=False
300 elif vlan_tag==True:
301 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
302 #print self.name, "Error", text
303 return -1, text
304 elif port["model"] == "VF":
305 if vlan_tag == None:
306 vlan_tag=True
307 elif vlan_tag==False:
308 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
309 #print self.name, "Error", text
310 return -1, text
311 else:
312 return -1, 'Only ptp and data networks are supported for openflow'
313
314 # calculate new flows to be inserted
315 result, new_flows = self._compute_net_flows(nets)
316 if result < 0:
317 return result, new_flows
318
319 #modify database flows format and get the used names
320 used_names=[]
321 for flow in database_flows:
322 try:
323 change_db2of(flow)
324 except FlowBadFormat as e:
325 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
326 continue
327 used_names.append(flow['name'])
328 name_index=0
329 #insert at database the new flows, change actions to human text
330 for flow in new_flows:
331 #1 check if an equal flow is already present
332 index = self._check_flow_already_present(flow, database_flows)
333 if index>=0:
334 database_flows[index]["not delete"]=True
335 self.logger.debug("Skipping already present flow %s", str(flow))
336 continue
337 #2 look for a non used name
338 flow_name=flow["net_id"]+"."+str(name_index)
339 while flow_name in used_names or flow_name in of_flows:
340 name_index += 1
341 flow_name=flow["net_id"]+"."+str(name_index)
342 used_names.append(flow_name)
343 flow['name'] = flow_name
344 #3 insert at openflow
345 result, content = self.OF_connector.new_flow(flow)
346 if result < 0:
347 #print self.name, ": Error '%s' at flow insertion" % c, flow
348 return -1, content
349 #4 insert at database
350 try:
351 change_of2db(flow)
352 except FlowBadFormat as e:
353 #print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
354 return -1, str(e)
355 self.db_lock.acquire()
356 result, content = self.db.new_row('of_flows', flow)
357 self.db_lock.release()
358 if result < 0:
359 #print self.name, ": Error '%s' at database insertion" % content, flow
360 return -1, content
361
362 #delete not needed old flows from openflow and from DDBB,
363 #check that the needed flows at DDBB are present in controller or insert them otherwise
364 for flow in database_flows:
365 if "not delete" in flow:
366 if flow["name"] not in of_flows:
367 #not in controller, insert it
368 result, content = self.OF_connector.new_flow(flow)
369 if result < 0:
370 #print self.name, ": Error '%s' at flow insertion" % c, flow
371 return -1, content
372 continue
373 #Delete flow
374 if flow["name"] in of_flows:
375 result, content = self.OF_connector.del_flow(flow['name'])
376 if result<0:
377 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
378 continue #skip deletion from database
379 #delete from database
380 self.db_lock.acquire()
381 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
382 self.db_lock.release()
383 if result<0:
384 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
385
386 return 0, 'Success'
387
388 def clear_all_flows(self):
389 try:
390 if not self.test:
391 self.OF_connector.clear_all_flows()
392 #remove from database
393 self.db_lock.acquire()
394 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
395 self.db_lock.release()
396 return 0, None
397 except requests.exceptions.RequestException as e:
398 #print self.name, ": clear_all_flows Exception:", str(e)
399 return -1, str(e)
400
401 flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
402 def _check_flow_already_present(self, new_flow, flow_list):
403 '''check if the same flow is already present in the flow list
404 The flow is repeated if all the fields, apart from name, are equal
405 Return the index of matching flow, -1 if not match'''
406 index=0
407 for flow in flow_list:
408 equal=True
409 for f in self.flow_fields:
410 if flow.get(f) != new_flow.get(f):
411 equal=False
412 break
413 if equal:
414 return index
415 index += 1
416 return -1
417
418 def _compute_net_flows(self, nets):
419 new_flows=[]
420 new_broadcast_flows={}
421 nb_ports = 0
422
423 # Check switch_port information is right
424 self.logger.debug("_compute_net_flows nets: %s", str(nets))
425 for net in nets:
426 for port in net['ports']:
427 nb_ports += 1
428 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
429 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
430 #print self.name, ": ERROR " + error_text
431 return -1, error_text
432
433 for net_src in nets:
434 net_id = net_src["uuid"]
435 for net_dst in nets:
436 vlan_net_in = None
437 vlan_net_out = None
438 if net_src == net_dst:
439 #intra net rules
440 priority = 1000
441 elif net_src['bind_net'] == net_dst['uuid']:
442 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
443 vlan_net_out = int(net_src['bind_type'][5:])
444 priority = 1100
445 elif net_dst['bind_net'] == net_src['uuid']:
446 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
447 vlan_net_in = int(net_dst['bind_type'][5:])
448 priority = 1100
449 else:
450 #nets not binding
451 continue
452 for src_port in net_src['ports']:
453 vlan_in = vlan_net_in
454 if vlan_in == None and src_port['vlan'] != None:
455 vlan_in = src_port['vlan']
456 elif vlan_in != None and src_port['vlan'] != None:
457 #TODO this is something that we can not do. It requires a double VLAN check
458 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
459 continue
460
461 # BROADCAST:
462 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
463 if broadcast_key in new_broadcast_flows:
464 flow_broadcast = new_broadcast_flows[broadcast_key]
465 else:
466 flow_broadcast = {'priority': priority,
467 'net_id': net_id,
468 'dst_mac': 'ff:ff:ff:ff:ff:ff',
469 "ingress_port": str(src_port['switch_port']),
470 'actions': []
471 }
472 new_broadcast_flows[broadcast_key] = flow_broadcast
473 if vlan_in is not None:
474 flow_broadcast['vlan_id'] = str(vlan_in)
475
476 for dst_port in net_dst['ports']:
477 vlan_out = vlan_net_out
478 if vlan_out == None and dst_port['vlan'] != None:
479 vlan_out = dst_port['vlan']
480 elif vlan_out != None and dst_port['vlan'] != None:
481 #TODO this is something that we can not do. It requires a double VLAN set
482 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
483 continue
484 #if src_port == dst_port:
485 # continue
486 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
487 continue
488 flow = {
489 "priority": priority,
490 'net_id': net_id,
491 "ingress_port": str(src_port['switch_port']),
492 'actions': []
493 }
494 if vlan_in is not None:
495 flow['vlan_id'] = str(vlan_in)
496 # allow that one port have no mac
497 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
498 flow['priority'] = priority-5 # less priority
499 else:
500 flow['dst_mac'] = str(dst_port['mac'])
501
502 if vlan_out == None:
503 if vlan_in != None:
504 flow['actions'].append( ('vlan',None) )
505 else:
506 flow['actions'].append( ('vlan', vlan_out ) )
507 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
508
509 if self._check_flow_already_present(flow, new_flows) >= 0:
510 self.logger.debug("Skipping repeated flow '%s'", str(flow))
511 continue
512
513 new_flows.append(flow)
514
515 # BROADCAST:
516 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
517 continue
518 out = (vlan_out, str(dst_port['switch_port']))
519 if out not in flow_broadcast['actions']:
520 flow_broadcast['actions'].append( out )
521
522 #BROADCAST
523 for flow_broadcast in new_broadcast_flows.values():
524 if len(flow_broadcast['actions'])==0:
525 continue #nothing to do, skip
526 flow_broadcast['actions'].sort()
527 if 'vlan_id' in flow_broadcast:
528 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
529 else:
530 previous_vlan = None
531 final_actions=[]
532 action_number = 0
533 for action in flow_broadcast['actions']:
534 if action[0] != previous_vlan:
535 final_actions.append( ('vlan', action[0]) )
536 previous_vlan = action[0]
537 if self.pmp_with_same_vlan and action_number:
538 return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
539 action_number += 1
540 final_actions.append( ('out', action[1]) )
541 flow_broadcast['actions'] = final_actions
542
543 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
544 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
545 continue
546
547 new_flows.append(flow_broadcast)
548
549 #UNIFY openflow rules with the same input port and vlan and the same output actions
550 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
551 #this can happen if there is only two ports. It is converted to a point to point connection
552 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
553 for flow in new_flows:
554 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
555 if key in flow_dict:
556 flow_dict[key].append(flow)
557 else:
558 flow_dict[key]=[ flow ]
559 new_flows2=[]
560 for flow_list in flow_dict.values():
561 convert2ptp=False
562 if len (flow_list)>=2:
563 convert2ptp=True
564 for f in flow_list:
565 if f['actions'] != flow_list[0]['actions']:
566 convert2ptp=False
567 break
568 if convert2ptp: # add only one unified rule without dst_mac
569 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
570 flow_list[0].pop('dst_mac')
571 flow_list[0]["priority"] -= 5
572 new_flows2.append(flow_list[0])
573 else: # add all the rules
574 new_flows2 += flow_list
575 return 0, new_flows2
576