e41fb6bf71178ed8c03b13d1ebe747148d31088a
[osm/openvim.git] / osm_openvim / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39 import openflow_conn
40
41 OFC_STATUS_ACTIVE = 'ACTIVE'
42 OFC_STATUS_INACTIVE = 'INACTIVE'
43 OFC_STATUS_ERROR = 'ERROR'
44
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
47
48 def change_of2db(flow):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
53 action_str_list=[]
54 if type(flow)!=dict or "actions" not in flow:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
56 try:
57 for action in flow['actions']:
58 action_str_list.append( action[0] + "=" + str(action[1]) )
59 flow['actions'] = ",".join(action_str_list)
60 except:
61 raise FlowBadFormat("Unexpected format at 'actions'")
62
63 def change_db2of(flow):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
66 a double tuple list
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
69 actions=[]
70 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list = flow['actions'].split(",")
73 for action_item in action_list:
74 action_tuple = action_item.split("=")
75 if len(action_tuple) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple[0].strip().lower()=="vlan":
78 if action_tuple[1].strip().lower() in ("none", "strip"):
79 actions.append( ("vlan",None) )
80 else:
81 try:
82 actions.append( ("vlan", int(action_tuple[1])) )
83 except:
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple[0].strip().lower()=="out":
86 actions.append( ("out", str(action_tuple[1])) )
87 else:
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
89 flow['actions'] = actions
90
91
92 class openflow_thread(threading.Thread):
93 """
94 This thread interacts with a openflow controller to create dataplane connections
95 """
96 def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, logger_name=None,
97 debug=None):
98 threading.Thread.__init__(self)
99 self.of_uuid = of_uuid
100 self.db = db
101 self.pmp_with_same_vlan = pmp_with_same_vlan
102 self.test = of_test
103 self.db_lock = db_lock
104 self.OF_connector = of_connector
105 if logger_name:
106 self.logger_name = logger_name
107 else:
108 self.logger_name = "openvim.ofc." + of_uuid
109 self.logger = logging.getLogger(self.logger_name)
110 if debug:
111 self.logger.setLevel(getattr(logging, debug))
112 self.queueLock = threading.Lock()
113 self.taskQueue = Queue.Queue(2000)
114
115 @staticmethod
116 def _format_error_msg(error_text, max_length=1024):
117 if error_text and len(error_text) >= max_length:
118 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
119 return error_text
120
121 def insert_task(self, task, *aditional):
122 try:
123 self.queueLock.acquire()
124 task = self.taskQueue.put( (task,) + aditional, timeout=5)
125 self.queueLock.release()
126 return 1, None
127 except Queue.Full:
128 return -1, "timeout inserting a task over openflow thread " + self.of_uuid
129
130 def run(self):
131 self.logger.debug("Start openflow thread")
132 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
133
134 while True:
135 try:
136 self.queueLock.acquire()
137 if not self.taskQueue.empty():
138 task = self.taskQueue.get()
139 else:
140 task = None
141 self.queueLock.release()
142
143 if task is None:
144 time.sleep(1)
145 continue
146
147 if task[0] == 'update-net':
148 r, c = self.update_of_flows(task[1])
149 # update database status
150 if r<0:
151 UPDATE={'status':'ERROR', 'last_error': self._format_error_msg(str(c), 255)}
152 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
153 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
154 else:
155 UPDATE={'status':'ACTIVE', 'last_error': None}
156 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
157 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
158 self.db_lock.acquire()
159 self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
160 self.db_lock.release()
161
162 elif task[0] == 'clear-all':
163 r,c = self.clear_all_flows()
164 if r<0:
165 self.logger.error("processing task 'clear-all': %s", c)
166 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
167 else:
168 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
169 self.logger.debug("processing task 'clear-all': OK")
170 elif task[0] == 'exit':
171 self.logger.debug("exit from openflow_thread")
172 self.terminate()
173 self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
174 return 0
175 else:
176 self.logger.error("unknown task %s", str(task))
177 except openflow_conn.OpenflowconnException as e:
178 self.logger.error("OpenflowconnException: " + str(e))
179 self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
180 except Exception as e:
181 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
182
183 def terminate(self):
184 pass
185 # print self.name, ": exit from openflow_thread"
186
187 def update_of_flows(self, net_id):
188 ports=()
189 self.db_lock.acquire()
190 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
191 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
192 #get all the networks binding to this
193 if result > 0:
194 if nets[0]['bind_net']:
195 bind_id = nets[0]['bind_net']
196 else:
197 bind_id = net_id
198 #get our net and all bind_nets
199 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
200 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
201
202 self.db_lock.release()
203 if result < 0:
204 return -1, "DB error getting net: " + nets
205 #elif result==0:
206 #net has been deleted
207 ifaces_nb = 0
208 database_flows = []
209 for net in nets:
210 net_id = net["uuid"]
211 if net['admin_state_up'] == 'false':
212 net['ports'] = ()
213 else:
214 self.db_lock.acquire()
215 nb_ports, net_ports = self.db.get_table(
216 FROM='ports',
217 SELECT=('switch_port','vlan','uuid','mac','type','model'),
218 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
219 self.db_lock.release()
220 if nb_ports < 0:
221
222 #print self.name, ": update_of_flows() ERROR getting ports", ports
223 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
224
225 #add the binding as an external port
226 if net['provider'] and net['provider'][:9]=="openflow:":
227 external_port={"type":"external","mac":None}
228 external_port['uuid'] = net_id + ".1" #fake uuid
229 if net['provider'][-5:]==":vlan":
230 external_port["vlan"] = net["vlan"]
231 external_port["switch_port"] = net['provider'][9:-5]
232 else:
233 external_port["vlan"] = None
234 external_port["switch_port"] = net['provider'][9:]
235 net_ports = net_ports + (external_port,)
236 nb_ports += 1
237 net['ports'] = net_ports
238 ifaces_nb += nb_ports
239
240 # Get the name of flows that will be affected by this NET
241 self.db_lock.acquire()
242 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
243 self.db_lock.release()
244 if result < 0:
245 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
246 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
247 return -1, error_msg
248 database_flows += database_net_flows
249 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
250 self.db_lock.acquire()
251 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
252 self.db_lock.release()
253 if result < 0:
254 error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
255 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
256 return -1, error_msg
257 database_flows += database_net_flows
258
259 # Get the existing flows at openflow controller
260 try:
261 of_flows = self.OF_connector.get_of_rules()
262 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
263 except openflow_conn.OpenflowconnException as e:
264 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
265 return -1, "OF error {} getting flows".format(str(e))
266
267 if ifaces_nb < 2:
268 pass
269 elif net['type'] == 'ptp':
270 if ifaces_nb > 2:
271 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
272 # str(ifaces_nb)+' interfaces.'
273 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
274 elif net['type'] == 'data':
275 if ifaces_nb > 2 and self.pmp_with_same_vlan:
276 # check all ports are VLAN (tagged) or none
277 vlan_tag = None
278 for port in ports:
279 if port["type"]=="external":
280 if port["vlan"] != None:
281 if port["vlan"]!=net["vlan"]:
282 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
284 return -1, text
285 if vlan_tag == None:
286 vlan_tag=True
287 elif vlan_tag==False:
288 text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
289 #print self.name, "Error", text
290 return -1, text
291 else:
292 if vlan_tag == None:
293 vlan_tag=False
294 elif vlan_tag == True:
295 text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
296 #print self.name, "Error", text
297 return -1, text
298 elif port["model"]=="PF" or port["model"]=="VFnotShared":
299 if vlan_tag == None:
300 vlan_tag=False
301 elif vlan_tag==True:
302 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
303 #print self.name, "Error", text
304 return -1, text
305 elif port["model"] == "VF":
306 if vlan_tag == None:
307 vlan_tag=True
308 elif vlan_tag==False:
309 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
310 #print self.name, "Error", text
311 return -1, text
312 else:
313 return -1, 'Only ptp and data networks are supported for openflow'
314
315 # calculate new flows to be inserted
316 result, new_flows = self._compute_net_flows(nets)
317 if result < 0:
318 return result, new_flows
319
320 #modify database flows format and get the used names
321 used_names=[]
322 for flow in database_flows:
323 try:
324 change_db2of(flow)
325 except FlowBadFormat as e:
326 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
327 continue
328 used_names.append(flow['name'])
329 name_index=0
330 # insert at database the new flows, change actions to human text
331 for flow in new_flows:
332 # 1 check if an equal flow is already present
333 index = self._check_flow_already_present(flow, database_flows)
334 if index>=0:
335 database_flows[index]["not delete"]=True
336 self.logger.debug("Skipping already present flow %s", str(flow))
337 continue
338 # 2 look for a non used name
339 flow_name=flow["net_id"]+"."+str(name_index)
340 while flow_name in used_names or flow_name in of_flows:
341 name_index += 1
342 flow_name=flow["net_id"]+"."+str(name_index)
343 used_names.append(flow_name)
344 flow['name'] = flow_name
345 # 3 insert at openflow
346
347 try:
348 self.OF_connector.new_flow(flow)
349 except openflow_conn.OpenflowconnException as e:
350 return -1, "Error creating new flow {}".format(str(e))
351
352 # 4 insert at database
353 try:
354 change_of2db(flow)
355 except FlowBadFormat as e:
356 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
357 return -1, str(e)
358 self.db_lock.acquire()
359 result, content = self.db.new_row('of_flows', flow)
360 self.db_lock.release()
361 if result < 0:
362 # print self.name, ": Error '%s' at database insertion" % content, flow
363 return -1, content
364
365 #delete not needed old flows from openflow and from DDBB,
366 #check that the needed flows at DDBB are present in controller or insert them otherwise
367 for flow in database_flows:
368 if "not delete" in flow:
369 if flow["name"] not in of_flows:
370 # not in controller, insert it
371 try:
372 self.OF_connector.new_flow(flow)
373 except openflow_conn.OpenflowconnException as e:
374 return -1, "Error creating new flow {}".format(str(e))
375
376 continue
377 # Delete flow
378 if flow["name"] in of_flows:
379 try:
380 self.OF_connector.del_flow(flow['name'])
381 except openflow_conn.OpenflowconnException as e:
382 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
383 # skip deletion from database
384 continue
385
386 # delete from database
387 self.db_lock.acquire()
388 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
389 self.db_lock.release()
390 if result<0:
391 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
392
393 return 0, 'Success'
394
395 def clear_all_flows(self):
396 try:
397 if not self.test:
398 self.OF_connector.clear_all_flows()
399
400 # remove from database
401 self.db_lock.acquire()
402 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
403 self.db_lock.release()
404 return 0, None
405 except openflow_conn.OpenflowconnException as e:
406 return -1, self.logger.error("Error deleting all flows {}", str(e))
407
408 flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
409
410 def _check_flow_already_present(self, new_flow, flow_list):
411 '''check if the same flow is already present in the flow list
412 The flow is repeated if all the fields, apart from name, are equal
413 Return the index of matching flow, -1 if not match'''
414 index=0
415 for flow in flow_list:
416 equal=True
417 for f in self.flow_fields:
418 if flow.get(f) != new_flow.get(f):
419 equal=False
420 break
421 if equal:
422 return index
423 index += 1
424 return -1
425
426 def _compute_net_flows(self, nets):
427 new_flows=[]
428 new_broadcast_flows={}
429 nb_ports = 0
430
431 # Check switch_port information is right
432 self.logger.debug("_compute_net_flows nets: %s", str(nets))
433 for net in nets:
434 for port in net['ports']:
435 nb_ports += 1
436 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
437 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
438 # print self.name, ": ERROR " + error_text
439 return -1, error_text
440
441 for net_src in nets:
442 net_id = net_src["uuid"]
443 for net_dst in nets:
444 vlan_net_in = None
445 vlan_net_out = None
446 if net_src == net_dst:
447 #intra net rules
448 priority = 1000
449 elif net_src['bind_net'] == net_dst['uuid']:
450 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
451 vlan_net_out = int(net_src['bind_type'][5:])
452 priority = 1100
453 elif net_dst['bind_net'] == net_src['uuid']:
454 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
455 vlan_net_in = int(net_dst['bind_type'][5:])
456 priority = 1100
457 else:
458 #nets not binding
459 continue
460 for src_port in net_src['ports']:
461 vlan_in = vlan_net_in
462 if vlan_in == None and src_port['vlan'] != None:
463 vlan_in = src_port['vlan']
464 elif vlan_in != None and src_port['vlan'] != None:
465 #TODO this is something that we cannot do. It requires a double VLAN check
466 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
467 continue
468
469 # BROADCAST:
470 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
471 if broadcast_key in new_broadcast_flows:
472 flow_broadcast = new_broadcast_flows[broadcast_key]
473 else:
474 flow_broadcast = {'priority': priority,
475 'net_id': net_id,
476 'dst_mac': 'ff:ff:ff:ff:ff:ff',
477 "ingress_port": str(src_port['switch_port']),
478 'actions': []
479 }
480 new_broadcast_flows[broadcast_key] = flow_broadcast
481 if vlan_in is not None:
482 flow_broadcast['vlan_id'] = str(vlan_in)
483
484 for dst_port in net_dst['ports']:
485 vlan_out = vlan_net_out
486 if vlan_out == None and dst_port['vlan'] != None:
487 vlan_out = dst_port['vlan']
488 elif vlan_out != None and dst_port['vlan'] != None:
489 #TODO this is something that we cannot do. It requires a double VLAN set
490 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
491 continue
492 #if src_port == dst_port:
493 # continue
494 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
495 continue
496 flow = {
497 "priority": priority,
498 'net_id': net_id,
499 "ingress_port": str(src_port['switch_port']),
500 'actions': []
501 }
502 if vlan_in is not None:
503 flow['vlan_id'] = str(vlan_in)
504 # allow that one port have no mac
505 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
506 flow['priority'] = priority-5 # less priority
507 else:
508 flow['dst_mac'] = str(dst_port['mac'])
509
510 if vlan_out == None:
511 if vlan_in != None:
512 flow['actions'].append( ('vlan',None) )
513 else:
514 flow['actions'].append( ('vlan', vlan_out ) )
515 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
516
517 if self._check_flow_already_present(flow, new_flows) >= 0:
518 self.logger.debug("Skipping repeated flow '%s'", str(flow))
519 continue
520
521 new_flows.append(flow)
522
523 # BROADCAST:
524 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
525 continue
526 out = (vlan_out, str(dst_port['switch_port']))
527 if out not in flow_broadcast['actions']:
528 flow_broadcast['actions'].append( out )
529
530 #BROADCAST
531 for flow_broadcast in new_broadcast_flows.values():
532 if len(flow_broadcast['actions'])==0:
533 continue #nothing to do, skip
534 flow_broadcast['actions'].sort()
535 if 'vlan_id' in flow_broadcast:
536 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
537 else:
538 previous_vlan = None
539 final_actions=[]
540 action_number = 0
541 for action in flow_broadcast['actions']:
542 if action[0] != previous_vlan:
543 final_actions.append( ('vlan', action[0]) )
544 previous_vlan = action[0]
545 if self.pmp_with_same_vlan and action_number:
546 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
547 action_number += 1
548 final_actions.append( ('out', action[1]) )
549 flow_broadcast['actions'] = final_actions
550
551 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
552 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
553 continue
554
555 new_flows.append(flow_broadcast)
556
557 #UNIFY openflow rules with the same input port and vlan and the same output actions
558 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
559 #this can happen if there is only two ports. It is converted to a point to point connection
560 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
561 for flow in new_flows:
562 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
563 if key in flow_dict:
564 flow_dict[key].append(flow)
565 else:
566 flow_dict[key]=[ flow ]
567 new_flows2=[]
568 for flow_list in flow_dict.values():
569 convert2ptp=False
570 if len (flow_list)>=2:
571 convert2ptp=True
572 for f in flow_list:
573 if f['actions'] != flow_list[0]['actions']:
574 convert2ptp=False
575 break
576 if convert2ptp: # add only one unified rule without dst_mac
577 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
578 flow_list[0].pop('dst_mac')
579 flow_list[0]["priority"] -= 5
580 new_flows2.append(flow_list[0])
581 else: # add all the rules
582 new_flows2 += flow_list
583 return 0, new_flows2
584
585 def set_openflow_controller_status(self, status, error_text=None):
586 """
587 Set openflow controller last operation status in DB
588 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
589 :param error_text: error text
590 :return:
591 """
592 if self.of_uuid == "Default":
593 return True
594
595 ofc = {}
596 ofc['status'] = status
597 ofc['last_error'] = self._format_error_msg(error_text, 255)
598 self.db_lock.acquire()
599 result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
600 self.db_lock.release()
601 if result >= 0:
602 return True
603 else:
604 return False
605
606
607
608
609
610
611