Add openflow-port-mapping CLI command
[osm/openvim.git] / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39 import openflow_conn
40
41 OFC_STATUS_ACTIVE = 'ACTIVE'
42 OFC_STATUS_INACTIVE = 'INACTIVE'
43 OFC_STATUS_ERROR = 'ERROR'
44
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
47
48 def change_of2db(flow):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
53 action_str_list=[]
54 if type(flow)!=dict or "actions" not in flow:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
56 try:
57 for action in flow['actions']:
58 action_str_list.append( action[0] + "=" + str(action[1]) )
59 flow['actions'] = ",".join(action_str_list)
60 except:
61 raise FlowBadFormat("Unexpected format at 'actions'")
62
63 def change_db2of(flow):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
66 a double tuple list
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
69 actions=[]
70 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list = flow['actions'].split(",")
73 for action_item in action_list:
74 action_tuple = action_item.split("=")
75 if len(action_tuple) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple[0].strip().lower()=="vlan":
78 if action_tuple[1].strip().lower() in ("none", "strip"):
79 actions.append( ("vlan",None) )
80 else:
81 try:
82 actions.append( ("vlan", int(action_tuple[1])) )
83 except:
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple[0].strip().lower()=="out":
86 actions.append( ("out", str(action_tuple[1])) )
87 else:
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
89 flow['actions'] = actions
90
91
92 class openflow_thread(threading.Thread):
93 """
94 This thread interacts with a openflow controller to create dataplane connections
95 """
96 def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
97 threading.Thread.__init__(self)
98 self.of_uuid = of_uuid
99 self.db = db
100 self.pmp_with_same_vlan = pmp_with_same_vlan
101 self.name = "openflow"
102 self.test = of_test
103 self.db_lock = db_lock
104 self.OF_connector = of_connector
105 self.logger = logging.getLogger('vim.OF-' + of_uuid)
106 self.logger.setLevel(getattr(logging, debug))
107 self.logger.name = of_connector.name + " " + self.OF_connector.dpid
108 self.queueLock = threading.Lock()
109 self.taskQueue = Queue.Queue(2000)
110
111 def insert_task(self, task, *aditional):
112 try:
113 self.queueLock.acquire()
114 task = self.taskQueue.put( (task,) + aditional, timeout=5)
115 self.queueLock.release()
116 return 1, None
117 except Queue.Full:
118 return -1, "timeout inserting a task over openflow thread " + self.name
119
120 def run(self):
121 self.logger.debug("Start openflow thread")
122 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
123
124 while True:
125 try:
126 self.queueLock.acquire()
127 if not self.taskQueue.empty():
128 task = self.taskQueue.get()
129 else:
130 task = None
131 self.queueLock.release()
132
133 if task is None:
134 time.sleep(1)
135 continue
136
137 if task[0] == 'update-net':
138 r,c = self.update_of_flows(task[1])
139 # update database status
140 if r<0:
141 UPDATE={'status':'ERROR', 'last_error': str(c)}
142 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
143 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
144 else:
145 UPDATE={'status':'ACTIVE', 'last_error': None}
146 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
147 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
148 self.db_lock.acquire()
149 self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
150 self.db_lock.release()
151
152 elif task[0] == 'clear-all':
153 r,c = self.clear_all_flows()
154 if r<0:
155 self.logger.error("processing task 'clear-all': %s", c)
156 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
157 else:
158 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
159 self.logger.debug("processing task 'clear-all': OK")
160 elif task[0] == 'exit':
161 self.logger.debug("exit from openflow_thread")
162 self.terminate()
163 self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
164 return 0
165 else:
166 self.logger.error("unknown task %s", str(task))
167 except openflow_conn.OpenflowconnException as e:
168 self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
169
170 def terminate(self):
171 pass
172 # print self.name, ": exit from openflow_thread"
173
174 def update_of_flows(self, net_id):
175 ports=()
176 self.db_lock.acquire()
177 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
178 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
179 #get all the networks binding to this
180 if result > 0:
181 if nets[0]['bind_net']:
182 bind_id = nets[0]['bind_net']
183 else:
184 bind_id = net_id
185 #get our net and all bind_nets
186 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
187 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
188
189 self.db_lock.release()
190 if result < 0:
191 return -1, "DB error getting net: " + nets
192 #elif result==0:
193 #net has been deleted
194 ifaces_nb = 0
195 database_flows = []
196 for net in nets:
197 net_id = net["uuid"]
198 if net['admin_state_up'] == 'false':
199 net['ports'] = ()
200 else:
201 self.db_lock.acquire()
202 nb_ports, net_ports = self.db.get_table(
203 FROM='ports',
204 SELECT=('switch_port','vlan','uuid','mac','type','model'),
205 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
206 self.db_lock.release()
207 if nb_ports < 0:
208
209 #print self.name, ": update_of_flows() ERROR getting ports", ports
210 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
211
212 #add the binding as an external port
213 if net['provider'] and net['provider'][:9]=="openflow:":
214 external_port={"type":"external","mac":None}
215 external_port['uuid'] = net_id + ".1" #fake uuid
216 if net['provider'][-5:]==":vlan":
217 external_port["vlan"] = net["vlan"]
218 external_port["switch_port"] = net['provider'][9:-5]
219 else:
220 external_port["vlan"] = None
221 external_port["switch_port"] = net['provider'][9:]
222 net_ports = net_ports + (external_port,)
223 nb_ports += 1
224 net['ports'] = net_ports
225 ifaces_nb += nb_ports
226
227 # Get the name of flows that will be affected by this NET
228 self.db_lock.acquire()
229 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
230 self.db_lock.release()
231 if result < 0:
232 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
233 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
234 return -1, error_msg
235 database_flows += database_net_flows
236 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
237 self.db_lock.acquire()
238 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
239 self.db_lock.release()
240 if result < 0:
241 error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
242 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
243 return -1, error_msg
244 database_flows += database_net_flows
245
246 # Get the existing flows at openflow controller
247 try:
248 of_flows = self.OF_connector.get_of_rules()
249 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
250 except openflow_conn.OpenflowconnException as e:
251 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
252 return -1, "OF error {} getting flows".format(str(e))
253
254 if ifaces_nb < 2:
255 pass
256 elif net['type'] == 'ptp':
257 if ifaces_nb > 2:
258 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
259 # str(ifaces_nb)+' interfaces.'
260 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
261 elif net['type'] == 'data':
262 if ifaces_nb > 2 and self.pmp_with_same_vlan:
263 # check all ports are VLAN (tagged) or none
264 vlan_tag = None
265 for port in ports:
266 if port["type"]=="external":
267 if port["vlan"] != None:
268 if port["vlan"]!=net["vlan"]:
269 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
270 #print self.name, "Error", text
271 return -1, text
272 if vlan_tag == None:
273 vlan_tag=True
274 elif vlan_tag==False:
275 text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
276 #print self.name, "Error", text
277 return -1, text
278 else:
279 if vlan_tag == None:
280 vlan_tag=False
281 elif vlan_tag == True:
282 text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
284 return -1, text
285 elif port["model"]=="PF" or port["model"]=="VFnotShared":
286 if vlan_tag == None:
287 vlan_tag=False
288 elif vlan_tag==True:
289 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
291 return -1, text
292 elif port["model"] == "VF":
293 if vlan_tag == None:
294 vlan_tag=True
295 elif vlan_tag==False:
296 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
297 #print self.name, "Error", text
298 return -1, text
299 else:
300 return -1, 'Only ptp and data networks are supported for openflow'
301
302 # calculate new flows to be inserted
303 result, new_flows = self._compute_net_flows(nets)
304 if result < 0:
305 return result, new_flows
306
307 #modify database flows format and get the used names
308 used_names=[]
309 for flow in database_flows:
310 try:
311 change_db2of(flow)
312 except FlowBadFormat as e:
313 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
314 continue
315 used_names.append(flow['name'])
316 name_index=0
317 # insert at database the new flows, change actions to human text
318 for flow in new_flows:
319 # 1 check if an equal flow is already present
320 index = self._check_flow_already_present(flow, database_flows)
321 if index>=0:
322 database_flows[index]["not delete"]=True
323 self.logger.debug("Skipping already present flow %s", str(flow))
324 continue
325 # 2 look for a non used name
326 flow_name=flow["net_id"]+"."+str(name_index)
327 while flow_name in used_names or flow_name in of_flows:
328 name_index += 1
329 flow_name=flow["net_id"]+"."+str(name_index)
330 used_names.append(flow_name)
331 flow['name'] = flow_name
332 # 3 insert at openflow
333
334 try:
335 self.OF_connector.new_flow(flow)
336 except openflow_conn.OpenflowconnException as e:
337 return -1, "Error creating new flow {}".format(str(e))
338
339 # 4 insert at database
340 try:
341 change_of2db(flow)
342 except FlowBadFormat as e:
343 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
344 return -1, str(e)
345 self.db_lock.acquire()
346 result, content = self.db.new_row('of_flows', flow)
347 self.db_lock.release()
348 if result < 0:
349 # print self.name, ": Error '%s' at database insertion" % content, flow
350 return -1, content
351
352 #delete not needed old flows from openflow and from DDBB,
353 #check that the needed flows at DDBB are present in controller or insert them otherwise
354 for flow in database_flows:
355 if "not delete" in flow:
356 if flow["name"] not in of_flows:
357 # not in controller, insert it
358 try:
359 self.OF_connector.new_flow(flow)
360 except openflow_conn.OpenflowconnException as e:
361 return -1, "Error creating new flow {}".format(str(e))
362
363 continue
364 # Delete flow
365 if flow["name"] in of_flows:
366 try:
367 self.OF_connector.del_flow(flow['name'])
368 except openflow_conn.OpenflowconnException as e:
369 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
370 # skip deletion from database
371 continue
372
373 # delete from database
374 self.db_lock.acquire()
375 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
376 self.db_lock.release()
377 if result<0:
378 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
379
380 return 0, 'Success'
381
382 def clear_all_flows(self):
383 try:
384 if not self.test:
385 self.OF_connector.clear_all_flows()
386
387 # remove from database
388 self.db_lock.acquire()
389 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
390 self.db_lock.release()
391 return 0, None
392 except openflow_conn.OpenflowconnException as e:
393 return -1, self.logger.error("Error deleting all flows {}", str(e))
394
395 flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
396
397 def _check_flow_already_present(self, new_flow, flow_list):
398 '''check if the same flow is already present in the flow list
399 The flow is repeated if all the fields, apart from name, are equal
400 Return the index of matching flow, -1 if not match'''
401 index=0
402 for flow in flow_list:
403 equal=True
404 for f in self.flow_fields:
405 if flow.get(f) != new_flow.get(f):
406 equal=False
407 break
408 if equal:
409 return index
410 index += 1
411 return -1
412
413 def _compute_net_flows(self, nets):
414 new_flows=[]
415 new_broadcast_flows={}
416 nb_ports = 0
417
418 # Check switch_port information is right
419 self.logger.debug("_compute_net_flows nets: %s", str(nets))
420 for net in nets:
421 for port in net['ports']:
422 nb_ports += 1
423 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
424 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
425 # print self.name, ": ERROR " + error_text
426 return -1, error_text
427
428 for net_src in nets:
429 net_id = net_src["uuid"]
430 for net_dst in nets:
431 vlan_net_in = None
432 vlan_net_out = None
433 if net_src == net_dst:
434 #intra net rules
435 priority = 1000
436 elif net_src['bind_net'] == net_dst['uuid']:
437 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
438 vlan_net_out = int(net_src['bind_type'][5:])
439 priority = 1100
440 elif net_dst['bind_net'] == net_src['uuid']:
441 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
442 vlan_net_in = int(net_dst['bind_type'][5:])
443 priority = 1100
444 else:
445 #nets not binding
446 continue
447 for src_port in net_src['ports']:
448 vlan_in = vlan_net_in
449 if vlan_in == None and src_port['vlan'] != None:
450 vlan_in = src_port['vlan']
451 elif vlan_in != None and src_port['vlan'] != None:
452 #TODO this is something that we cannot do. It requires a double VLAN check
453 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
454 continue
455
456 # BROADCAST:
457 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
458 if broadcast_key in new_broadcast_flows:
459 flow_broadcast = new_broadcast_flows[broadcast_key]
460 else:
461 flow_broadcast = {'priority': priority,
462 'net_id': net_id,
463 'dst_mac': 'ff:ff:ff:ff:ff:ff',
464 "ingress_port": str(src_port['switch_port']),
465 'actions': []
466 }
467 new_broadcast_flows[broadcast_key] = flow_broadcast
468 if vlan_in is not None:
469 flow_broadcast['vlan_id'] = str(vlan_in)
470
471 for dst_port in net_dst['ports']:
472 vlan_out = vlan_net_out
473 if vlan_out == None and dst_port['vlan'] != None:
474 vlan_out = dst_port['vlan']
475 elif vlan_out != None and dst_port['vlan'] != None:
476 #TODO this is something that we cannot do. It requires a double VLAN set
477 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
478 continue
479 #if src_port == dst_port:
480 # continue
481 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
482 continue
483 flow = {
484 "priority": priority,
485 'net_id': net_id,
486 "ingress_port": str(src_port['switch_port']),
487 'actions': []
488 }
489 if vlan_in is not None:
490 flow['vlan_id'] = str(vlan_in)
491 # allow that one port have no mac
492 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
493 flow['priority'] = priority-5 # less priority
494 else:
495 flow['dst_mac'] = str(dst_port['mac'])
496
497 if vlan_out == None:
498 if vlan_in != None:
499 flow['actions'].append( ('vlan',None) )
500 else:
501 flow['actions'].append( ('vlan', vlan_out ) )
502 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
503
504 if self._check_flow_already_present(flow, new_flows) >= 0:
505 self.logger.debug("Skipping repeated flow '%s'", str(flow))
506 continue
507
508 new_flows.append(flow)
509
510 # BROADCAST:
511 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
512 continue
513 out = (vlan_out, str(dst_port['switch_port']))
514 if out not in flow_broadcast['actions']:
515 flow_broadcast['actions'].append( out )
516
517 #BROADCAST
518 for flow_broadcast in new_broadcast_flows.values():
519 if len(flow_broadcast['actions'])==0:
520 continue #nothing to do, skip
521 flow_broadcast['actions'].sort()
522 if 'vlan_id' in flow_broadcast:
523 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
524 else:
525 previous_vlan = None
526 final_actions=[]
527 action_number = 0
528 for action in flow_broadcast['actions']:
529 if action[0] != previous_vlan:
530 final_actions.append( ('vlan', action[0]) )
531 previous_vlan = action[0]
532 if self.pmp_with_same_vlan and action_number:
533 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
534 action_number += 1
535 final_actions.append( ('out', action[1]) )
536 flow_broadcast['actions'] = final_actions
537
538 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
539 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
540 continue
541
542 new_flows.append(flow_broadcast)
543
544 #UNIFY openflow rules with the same input port and vlan and the same output actions
545 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
546 #this can happen if there is only two ports. It is converted to a point to point connection
547 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
548 for flow in new_flows:
549 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
550 if key in flow_dict:
551 flow_dict[key].append(flow)
552 else:
553 flow_dict[key]=[ flow ]
554 new_flows2=[]
555 for flow_list in flow_dict.values():
556 convert2ptp=False
557 if len (flow_list)>=2:
558 convert2ptp=True
559 for f in flow_list:
560 if f['actions'] != flow_list[0]['actions']:
561 convert2ptp=False
562 break
563 if convert2ptp: # add only one unified rule without dst_mac
564 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
565 flow_list[0].pop('dst_mac')
566 flow_list[0]["priority"] -= 5
567 new_flows2.append(flow_list[0])
568 else: # add all the rules
569 new_flows2 += flow_list
570 return 0, new_flows2
571
572 def set_openflow_controller_status(self, status, error_text=None):
573 """
574 Set openflow controller last operation status in DB
575 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
576 :param error_text: error text
577 :return:
578 """
579 if self.of_uuid == "Default":
580 return True
581
582 ofc = {}
583 ofc['status'] = status
584 ofc['last_error'] = error_text
585 self.db_lock.acquire()
586 result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
587 self.db_lock.release()
588 if result >= 0:
589 return True
590 else:
591 return False
592
593
594
595
596
597
598