2f39fabb68af06a3d4fbaf1966e440ce238148dd
[osm/openvim.git] / osm_openvim / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39 import openflow_conn
40
41 OFC_STATUS_ACTIVE = 'ACTIVE'
42 OFC_STATUS_INACTIVE = 'INACTIVE'
43 OFC_STATUS_ERROR = 'ERROR'
44
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
47
48 def change_of2db(flow):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
53 action_str_list=[]
54 if type(flow)!=dict or "actions" not in flow:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
56 try:
57 for action in flow['actions']:
58 action_str_list.append( action[0] + "=" + str(action[1]) )
59 flow['actions'] = ",".join(action_str_list)
60 except:
61 raise FlowBadFormat("Unexpected format at 'actions'")
62
63 def change_db2of(flow):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
66 a double tuple list
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
69 actions=[]
70 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list = flow['actions'].split(",")
73 for action_item in action_list:
74 action_tuple = action_item.split("=")
75 if len(action_tuple) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple[0].strip().lower()=="vlan":
78 if action_tuple[1].strip().lower() in ("none", "strip"):
79 actions.append( ("vlan",None) )
80 else:
81 try:
82 actions.append( ("vlan", int(action_tuple[1])) )
83 except:
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple[0].strip().lower()=="out":
86 actions.append( ("out", str(action_tuple[1])) )
87 else:
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
89 flow['actions'] = actions
90
91
92 class openflow_thread(threading.Thread):
93 """
94 This thread interacts with a openflow controller to create dataplane connections
95 """
96 def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
97 threading.Thread.__init__(self)
98 self.of_uuid = of_uuid
99 self.db = db
100 self.pmp_with_same_vlan = pmp_with_same_vlan
101 self.name = "openflow"
102 self.test = of_test
103 self.db_lock = db_lock
104 self.OF_connector = of_connector
105 self.logger = logging.getLogger('vim.OF-' + of_uuid)
106 self.logger.setLevel(getattr(logging, debug))
107 self.logger.name = of_connector.name + " " + self.OF_connector.dpid
108 self.queueLock = threading.Lock()
109 self.taskQueue = Queue.Queue(2000)
110
111 def insert_task(self, task, *aditional):
112 try:
113 self.queueLock.acquire()
114 task = self.taskQueue.put( (task,) + aditional, timeout=5)
115 self.queueLock.release()
116 return 1, None
117 except Queue.Full:
118 return -1, "timeout inserting a task over openflow thread " + self.name
119
120 def run(self):
121 self.logger.debug("Start openflow thread")
122 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
123
124 while True:
125 try:
126 self.queueLock.acquire()
127 if not self.taskQueue.empty():
128 task = self.taskQueue.get()
129 else:
130 task = None
131 self.queueLock.release()
132
133 if task is None:
134 time.sleep(1)
135 continue
136
137 if task[0] == 'update-net':
138 r,c = self.update_of_flows(task[1])
139 # update database status
140 if r<0:
141 UPDATE={'status':'ERROR', 'last_error': str(c)}
142 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
143 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
144 else:
145 UPDATE={'status':'ACTIVE', 'last_error': None}
146 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
147 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
148 self.db_lock.acquire()
149 self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
150 self.db_lock.release()
151
152 elif task[0] == 'clear-all':
153 r,c = self.clear_all_flows()
154 if r<0:
155 self.logger.error("processing task 'clear-all': %s", c)
156 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
157 else:
158 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
159 self.logger.debug("processing task 'clear-all': OK")
160 elif task[0] == 'exit':
161 self.logger.debug("exit from openflow_thread")
162 self.terminate()
163 self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
164 return 0
165 else:
166 self.logger.error("unknown task %s", str(task))
167 except openflow_conn.OpenflowconnException as e:
168 self.logger.error("OpenflowconnException: " + str(e))
169 self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
170 except Exception as e:
171 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
172
173 def terminate(self):
174 pass
175 # print self.name, ": exit from openflow_thread"
176
177 def update_of_flows(self, net_id):
178 ports=()
179 self.db_lock.acquire()
180 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
181 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
182 #get all the networks binding to this
183 if result > 0:
184 if nets[0]['bind_net']:
185 bind_id = nets[0]['bind_net']
186 else:
187 bind_id = net_id
188 #get our net and all bind_nets
189 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
190 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
191
192 self.db_lock.release()
193 if result < 0:
194 return -1, "DB error getting net: " + nets
195 #elif result==0:
196 #net has been deleted
197 ifaces_nb = 0
198 database_flows = []
199 for net in nets:
200 net_id = net["uuid"]
201 if net['admin_state_up'] == 'false':
202 net['ports'] = ()
203 else:
204 self.db_lock.acquire()
205 nb_ports, net_ports = self.db.get_table(
206 FROM='ports',
207 SELECT=('switch_port','vlan','uuid','mac','type','model'),
208 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
209 self.db_lock.release()
210 if nb_ports < 0:
211
212 #print self.name, ": update_of_flows() ERROR getting ports", ports
213 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
214
215 #add the binding as an external port
216 if net['provider'] and net['provider'][:9]=="openflow:":
217 external_port={"type":"external","mac":None}
218 external_port['uuid'] = net_id + ".1" #fake uuid
219 if net['provider'][-5:]==":vlan":
220 external_port["vlan"] = net["vlan"]
221 external_port["switch_port"] = net['provider'][9:-5]
222 else:
223 external_port["vlan"] = None
224 external_port["switch_port"] = net['provider'][9:]
225 net_ports = net_ports + (external_port,)
226 nb_ports += 1
227 net['ports'] = net_ports
228 ifaces_nb += nb_ports
229
230 # Get the name of flows that will be affected by this NET
231 self.db_lock.acquire()
232 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
233 self.db_lock.release()
234 if result < 0:
235 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
236 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
237 return -1, error_msg
238 database_flows += database_net_flows
239 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
240 self.db_lock.acquire()
241 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
242 self.db_lock.release()
243 if result < 0:
244 error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
245 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
246 return -1, error_msg
247 database_flows += database_net_flows
248
249 # Get the existing flows at openflow controller
250 try:
251 of_flows = self.OF_connector.get_of_rules()
252 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
253 except openflow_conn.OpenflowconnException as e:
254 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
255 return -1, "OF error {} getting flows".format(str(e))
256
257 if ifaces_nb < 2:
258 pass
259 elif net['type'] == 'ptp':
260 if ifaces_nb > 2:
261 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
262 # str(ifaces_nb)+' interfaces.'
263 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
264 elif net['type'] == 'data':
265 if ifaces_nb > 2 and self.pmp_with_same_vlan:
266 # check all ports are VLAN (tagged) or none
267 vlan_tag = None
268 for port in ports:
269 if port["type"]=="external":
270 if port["vlan"] != None:
271 if port["vlan"]!=net["vlan"]:
272 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
273 #print self.name, "Error", text
274 return -1, text
275 if vlan_tag == None:
276 vlan_tag=True
277 elif vlan_tag==False:
278 text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
279 #print self.name, "Error", text
280 return -1, text
281 else:
282 if vlan_tag == None:
283 vlan_tag=False
284 elif vlan_tag == True:
285 text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
286 #print self.name, "Error", text
287 return -1, text
288 elif port["model"]=="PF" or port["model"]=="VFnotShared":
289 if vlan_tag == None:
290 vlan_tag=False
291 elif vlan_tag==True:
292 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
293 #print self.name, "Error", text
294 return -1, text
295 elif port["model"] == "VF":
296 if vlan_tag == None:
297 vlan_tag=True
298 elif vlan_tag==False:
299 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
300 #print self.name, "Error", text
301 return -1, text
302 else:
303 return -1, 'Only ptp and data networks are supported for openflow'
304
305 # calculate new flows to be inserted
306 result, new_flows = self._compute_net_flows(nets)
307 if result < 0:
308 return result, new_flows
309
310 #modify database flows format and get the used names
311 used_names=[]
312 for flow in database_flows:
313 try:
314 change_db2of(flow)
315 except FlowBadFormat as e:
316 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
317 continue
318 used_names.append(flow['name'])
319 name_index=0
320 # insert at database the new flows, change actions to human text
321 for flow in new_flows:
322 # 1 check if an equal flow is already present
323 index = self._check_flow_already_present(flow, database_flows)
324 if index>=0:
325 database_flows[index]["not delete"]=True
326 self.logger.debug("Skipping already present flow %s", str(flow))
327 continue
328 # 2 look for a non used name
329 flow_name=flow["net_id"]+"."+str(name_index)
330 while flow_name in used_names or flow_name in of_flows:
331 name_index += 1
332 flow_name=flow["net_id"]+"."+str(name_index)
333 used_names.append(flow_name)
334 flow['name'] = flow_name
335 # 3 insert at openflow
336
337 try:
338 self.OF_connector.new_flow(flow)
339 except openflow_conn.OpenflowconnException as e:
340 return -1, "Error creating new flow {}".format(str(e))
341
342 # 4 insert at database
343 try:
344 change_of2db(flow)
345 except FlowBadFormat as e:
346 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
347 return -1, str(e)
348 self.db_lock.acquire()
349 result, content = self.db.new_row('of_flows', flow)
350 self.db_lock.release()
351 if result < 0:
352 # print self.name, ": Error '%s' at database insertion" % content, flow
353 return -1, content
354
355 #delete not needed old flows from openflow and from DDBB,
356 #check that the needed flows at DDBB are present in controller or insert them otherwise
357 for flow in database_flows:
358 if "not delete" in flow:
359 if flow["name"] not in of_flows:
360 # not in controller, insert it
361 try:
362 self.OF_connector.new_flow(flow)
363 except openflow_conn.OpenflowconnException as e:
364 return -1, "Error creating new flow {}".format(str(e))
365
366 continue
367 # Delete flow
368 if flow["name"] in of_flows:
369 try:
370 self.OF_connector.del_flow(flow['name'])
371 except openflow_conn.OpenflowconnException as e:
372 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
373 # skip deletion from database
374 continue
375
376 # delete from database
377 self.db_lock.acquire()
378 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
379 self.db_lock.release()
380 if result<0:
381 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
382
383 return 0, 'Success'
384
385 def clear_all_flows(self):
386 try:
387 if not self.test:
388 self.OF_connector.clear_all_flows()
389
390 # remove from database
391 self.db_lock.acquire()
392 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
393 self.db_lock.release()
394 return 0, None
395 except openflow_conn.OpenflowconnException as e:
396 return -1, self.logger.error("Error deleting all flows {}", str(e))
397
398 flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
399
400 def _check_flow_already_present(self, new_flow, flow_list):
401 '''check if the same flow is already present in the flow list
402 The flow is repeated if all the fields, apart from name, are equal
403 Return the index of matching flow, -1 if not match'''
404 index=0
405 for flow in flow_list:
406 equal=True
407 for f in self.flow_fields:
408 if flow.get(f) != new_flow.get(f):
409 equal=False
410 break
411 if equal:
412 return index
413 index += 1
414 return -1
415
416 def _compute_net_flows(self, nets):
417 new_flows=[]
418 new_broadcast_flows={}
419 nb_ports = 0
420
421 # Check switch_port information is right
422 self.logger.debug("_compute_net_flows nets: %s", str(nets))
423 for net in nets:
424 for port in net['ports']:
425 nb_ports += 1
426 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
427 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
428 # print self.name, ": ERROR " + error_text
429 return -1, error_text
430
431 for net_src in nets:
432 net_id = net_src["uuid"]
433 for net_dst in nets:
434 vlan_net_in = None
435 vlan_net_out = None
436 if net_src == net_dst:
437 #intra net rules
438 priority = 1000
439 elif net_src['bind_net'] == net_dst['uuid']:
440 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
441 vlan_net_out = int(net_src['bind_type'][5:])
442 priority = 1100
443 elif net_dst['bind_net'] == net_src['uuid']:
444 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
445 vlan_net_in = int(net_dst['bind_type'][5:])
446 priority = 1100
447 else:
448 #nets not binding
449 continue
450 for src_port in net_src['ports']:
451 vlan_in = vlan_net_in
452 if vlan_in == None and src_port['vlan'] != None:
453 vlan_in = src_port['vlan']
454 elif vlan_in != None and src_port['vlan'] != None:
455 #TODO this is something that we cannot do. It requires a double VLAN check
456 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
457 continue
458
459 # BROADCAST:
460 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
461 if broadcast_key in new_broadcast_flows:
462 flow_broadcast = new_broadcast_flows[broadcast_key]
463 else:
464 flow_broadcast = {'priority': priority,
465 'net_id': net_id,
466 'dst_mac': 'ff:ff:ff:ff:ff:ff',
467 "ingress_port": str(src_port['switch_port']),
468 'actions': []
469 }
470 new_broadcast_flows[broadcast_key] = flow_broadcast
471 if vlan_in is not None:
472 flow_broadcast['vlan_id'] = str(vlan_in)
473
474 for dst_port in net_dst['ports']:
475 vlan_out = vlan_net_out
476 if vlan_out == None and dst_port['vlan'] != None:
477 vlan_out = dst_port['vlan']
478 elif vlan_out != None and dst_port['vlan'] != None:
479 #TODO this is something that we cannot do. It requires a double VLAN set
480 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
481 continue
482 #if src_port == dst_port:
483 # continue
484 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
485 continue
486 flow = {
487 "priority": priority,
488 'net_id': net_id,
489 "ingress_port": str(src_port['switch_port']),
490 'actions': []
491 }
492 if vlan_in is not None:
493 flow['vlan_id'] = str(vlan_in)
494 # allow that one port have no mac
495 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
496 flow['priority'] = priority-5 # less priority
497 else:
498 flow['dst_mac'] = str(dst_port['mac'])
499
500 if vlan_out == None:
501 if vlan_in != None:
502 flow['actions'].append( ('vlan',None) )
503 else:
504 flow['actions'].append( ('vlan', vlan_out ) )
505 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
506
507 if self._check_flow_already_present(flow, new_flows) >= 0:
508 self.logger.debug("Skipping repeated flow '%s'", str(flow))
509 continue
510
511 new_flows.append(flow)
512
513 # BROADCAST:
514 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
515 continue
516 out = (vlan_out, str(dst_port['switch_port']))
517 if out not in flow_broadcast['actions']:
518 flow_broadcast['actions'].append( out )
519
520 #BROADCAST
521 for flow_broadcast in new_broadcast_flows.values():
522 if len(flow_broadcast['actions'])==0:
523 continue #nothing to do, skip
524 flow_broadcast['actions'].sort()
525 if 'vlan_id' in flow_broadcast:
526 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
527 else:
528 previous_vlan = None
529 final_actions=[]
530 action_number = 0
531 for action in flow_broadcast['actions']:
532 if action[0] != previous_vlan:
533 final_actions.append( ('vlan', action[0]) )
534 previous_vlan = action[0]
535 if self.pmp_with_same_vlan and action_number:
536 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
537 action_number += 1
538 final_actions.append( ('out', action[1]) )
539 flow_broadcast['actions'] = final_actions
540
541 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
542 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
543 continue
544
545 new_flows.append(flow_broadcast)
546
547 #UNIFY openflow rules with the same input port and vlan and the same output actions
548 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
549 #this can happen if there is only two ports. It is converted to a point to point connection
550 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
551 for flow in new_flows:
552 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
553 if key in flow_dict:
554 flow_dict[key].append(flow)
555 else:
556 flow_dict[key]=[ flow ]
557 new_flows2=[]
558 for flow_list in flow_dict.values():
559 convert2ptp=False
560 if len (flow_list)>=2:
561 convert2ptp=True
562 for f in flow_list:
563 if f['actions'] != flow_list[0]['actions']:
564 convert2ptp=False
565 break
566 if convert2ptp: # add only one unified rule without dst_mac
567 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
568 flow_list[0].pop('dst_mac')
569 flow_list[0]["priority"] -= 5
570 new_flows2.append(flow_list[0])
571 else: # add all the rules
572 new_flows2 += flow_list
573 return 0, new_flows2
574
575 def set_openflow_controller_status(self, status, error_text=None):
576 """
577 Set openflow controller last operation status in DB
578 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
579 :param error_text: error text
580 :return:
581 """
582 if self.of_uuid == "Default":
583 return True
584
585 ofc = {}
586 ofc['status'] = status
587 ofc['last_error'] = error_text
588 self.db_lock.acquire()
589 result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
590 self.db_lock.release()
591 if result >= 0:
592 return True
593 else:
594 return False
595
596
597
598
599
600
601