fix bugs at host_thread; start net controller thread; sharing same variables db_lock...
[osm/openvim.git] / osm_openvim / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39 import openflow_conn
40
41 OFC_STATUS_ACTIVE = 'ACTIVE'
42 OFC_STATUS_INACTIVE = 'INACTIVE'
43 OFC_STATUS_ERROR = 'ERROR'
44
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
47
48 def change_of2db(flow):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
53 action_str_list=[]
54 if type(flow)!=dict or "actions" not in flow:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
56 try:
57 for action in flow['actions']:
58 action_str_list.append( action[0] + "=" + str(action[1]) )
59 flow['actions'] = ",".join(action_str_list)
60 except:
61 raise FlowBadFormat("Unexpected format at 'actions'")
62
63 def change_db2of(flow):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
66 a double tuple list
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
69 actions=[]
70 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list = flow['actions'].split(",")
73 for action_item in action_list:
74 action_tuple = action_item.split("=")
75 if len(action_tuple) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple[0].strip().lower()=="vlan":
78 if action_tuple[1].strip().lower() in ("none", "strip"):
79 actions.append( ("vlan",None) )
80 else:
81 try:
82 actions.append( ("vlan", int(action_tuple[1])) )
83 except:
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple[0].strip().lower()=="out":
86 actions.append( ("out", str(action_tuple[1])) )
87 else:
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
89 flow['actions'] = actions
90
91
92 class openflow_thread(threading.Thread):
93 """
94 This thread interacts with a openflow controller to create dataplane connections
95 """
96 def __init__(self, of_uuid, of_connector, db, db_lock, of_test, pmp_with_same_vlan=False, logger_name=None,
97 debug=None):
98 threading.Thread.__init__(self)
99 self.of_uuid = of_uuid
100 self.db = db
101 self.pmp_with_same_vlan = pmp_with_same_vlan
102 self.test = of_test
103 self.db_lock = db_lock
104 self.OF_connector = of_connector
105 if logger_name:
106 self.logger_name = logger_name
107 else:
108 self.logger_name = "openvim.ofc." + of_uuid
109 self.logger = logging.getLogger(self.logger_name)
110 if debug:
111 self.logger.setLevel(getattr(logging, debug))
112 self.queueLock = threading.Lock()
113 self.taskQueue = Queue.Queue(2000)
114
115 def insert_task(self, task, *aditional):
116 try:
117 self.queueLock.acquire()
118 task = self.taskQueue.put( (task,) + aditional, timeout=5)
119 self.queueLock.release()
120 return 1, None
121 except Queue.Full:
122 return -1, "timeout inserting a task over openflow thread " + self.of_uuid
123
124 def run(self):
125 self.logger.debug("Start openflow thread")
126 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
127
128 while True:
129 try:
130 self.queueLock.acquire()
131 if not self.taskQueue.empty():
132 task = self.taskQueue.get()
133 else:
134 task = None
135 self.queueLock.release()
136
137 if task is None:
138 time.sleep(1)
139 continue
140
141 if task[0] == 'update-net':
142 r,c = self.update_of_flows(task[1])
143 # update database status
144 if r<0:
145 UPDATE={'status':'ERROR', 'last_error': str(c)}
146 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
147 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
148 else:
149 UPDATE={'status':'ACTIVE', 'last_error': None}
150 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
151 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
152 self.db_lock.acquire()
153 self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
154 self.db_lock.release()
155
156 elif task[0] == 'clear-all':
157 r,c = self.clear_all_flows()
158 if r<0:
159 self.logger.error("processing task 'clear-all': %s", c)
160 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
161 else:
162 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
163 self.logger.debug("processing task 'clear-all': OK")
164 elif task[0] == 'exit':
165 self.logger.debug("exit from openflow_thread")
166 self.terminate()
167 self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
168 return 0
169 else:
170 self.logger.error("unknown task %s", str(task))
171 except openflow_conn.OpenflowconnException as e:
172 self.logger.error("OpenflowconnException: " + str(e))
173 self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
174 except Exception as e:
175 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
176
177 def terminate(self):
178 pass
179 # print self.name, ": exit from openflow_thread"
180
181 def update_of_flows(self, net_id):
182 ports=()
183 self.db_lock.acquire()
184 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
185 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
186 #get all the networks binding to this
187 if result > 0:
188 if nets[0]['bind_net']:
189 bind_id = nets[0]['bind_net']
190 else:
191 bind_id = net_id
192 #get our net and all bind_nets
193 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
194 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
195
196 self.db_lock.release()
197 if result < 0:
198 return -1, "DB error getting net: " + nets
199 #elif result==0:
200 #net has been deleted
201 ifaces_nb = 0
202 database_flows = []
203 for net in nets:
204 net_id = net["uuid"]
205 if net['admin_state_up'] == 'false':
206 net['ports'] = ()
207 else:
208 self.db_lock.acquire()
209 nb_ports, net_ports = self.db.get_table(
210 FROM='ports',
211 SELECT=('switch_port','vlan','uuid','mac','type','model'),
212 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
213 self.db_lock.release()
214 if nb_ports < 0:
215
216 #print self.name, ": update_of_flows() ERROR getting ports", ports
217 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
218
219 #add the binding as an external port
220 if net['provider'] and net['provider'][:9]=="openflow:":
221 external_port={"type":"external","mac":None}
222 external_port['uuid'] = net_id + ".1" #fake uuid
223 if net['provider'][-5:]==":vlan":
224 external_port["vlan"] = net["vlan"]
225 external_port["switch_port"] = net['provider'][9:-5]
226 else:
227 external_port["vlan"] = None
228 external_port["switch_port"] = net['provider'][9:]
229 net_ports = net_ports + (external_port,)
230 nb_ports += 1
231 net['ports'] = net_ports
232 ifaces_nb += nb_ports
233
234 # Get the name of flows that will be affected by this NET
235 self.db_lock.acquire()
236 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
237 self.db_lock.release()
238 if result < 0:
239 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
240 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
241 return -1, error_msg
242 database_flows += database_net_flows
243 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
244 self.db_lock.acquire()
245 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
246 self.db_lock.release()
247 if result < 0:
248 error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
249 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
250 return -1, error_msg
251 database_flows += database_net_flows
252
253 # Get the existing flows at openflow controller
254 try:
255 of_flows = self.OF_connector.get_of_rules()
256 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
257 except openflow_conn.OpenflowconnException as e:
258 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
259 return -1, "OF error {} getting flows".format(str(e))
260
261 if ifaces_nb < 2:
262 pass
263 elif net['type'] == 'ptp':
264 if ifaces_nb > 2:
265 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
266 # str(ifaces_nb)+' interfaces.'
267 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
268 elif net['type'] == 'data':
269 if ifaces_nb > 2 and self.pmp_with_same_vlan:
270 # check all ports are VLAN (tagged) or none
271 vlan_tag = None
272 for port in ports:
273 if port["type"]=="external":
274 if port["vlan"] != None:
275 if port["vlan"]!=net["vlan"]:
276 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
277 #print self.name, "Error", text
278 return -1, text
279 if vlan_tag == None:
280 vlan_tag=True
281 elif vlan_tag==False:
282 text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
283 #print self.name, "Error", text
284 return -1, text
285 else:
286 if vlan_tag == None:
287 vlan_tag=False
288 elif vlan_tag == True:
289 text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
290 #print self.name, "Error", text
291 return -1, text
292 elif port["model"]=="PF" or port["model"]=="VFnotShared":
293 if vlan_tag == None:
294 vlan_tag=False
295 elif vlan_tag==True:
296 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
297 #print self.name, "Error", text
298 return -1, text
299 elif port["model"] == "VF":
300 if vlan_tag == None:
301 vlan_tag=True
302 elif vlan_tag==False:
303 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
304 #print self.name, "Error", text
305 return -1, text
306 else:
307 return -1, 'Only ptp and data networks are supported for openflow'
308
309 # calculate new flows to be inserted
310 result, new_flows = self._compute_net_flows(nets)
311 if result < 0:
312 return result, new_flows
313
314 #modify database flows format and get the used names
315 used_names=[]
316 for flow in database_flows:
317 try:
318 change_db2of(flow)
319 except FlowBadFormat as e:
320 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
321 continue
322 used_names.append(flow['name'])
323 name_index=0
324 # insert at database the new flows, change actions to human text
325 for flow in new_flows:
326 # 1 check if an equal flow is already present
327 index = self._check_flow_already_present(flow, database_flows)
328 if index>=0:
329 database_flows[index]["not delete"]=True
330 self.logger.debug("Skipping already present flow %s", str(flow))
331 continue
332 # 2 look for a non used name
333 flow_name=flow["net_id"]+"."+str(name_index)
334 while flow_name in used_names or flow_name in of_flows:
335 name_index += 1
336 flow_name=flow["net_id"]+"."+str(name_index)
337 used_names.append(flow_name)
338 flow['name'] = flow_name
339 # 3 insert at openflow
340
341 try:
342 self.OF_connector.new_flow(flow)
343 except openflow_conn.OpenflowconnException as e:
344 return -1, "Error creating new flow {}".format(str(e))
345
346 # 4 insert at database
347 try:
348 change_of2db(flow)
349 except FlowBadFormat as e:
350 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
351 return -1, str(e)
352 self.db_lock.acquire()
353 result, content = self.db.new_row('of_flows', flow)
354 self.db_lock.release()
355 if result < 0:
356 # print self.name, ": Error '%s' at database insertion" % content, flow
357 return -1, content
358
359 #delete not needed old flows from openflow and from DDBB,
360 #check that the needed flows at DDBB are present in controller or insert them otherwise
361 for flow in database_flows:
362 if "not delete" in flow:
363 if flow["name"] not in of_flows:
364 # not in controller, insert it
365 try:
366 self.OF_connector.new_flow(flow)
367 except openflow_conn.OpenflowconnException as e:
368 return -1, "Error creating new flow {}".format(str(e))
369
370 continue
371 # Delete flow
372 if flow["name"] in of_flows:
373 try:
374 self.OF_connector.del_flow(flow['name'])
375 except openflow_conn.OpenflowconnException as e:
376 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
377 # skip deletion from database
378 continue
379
380 # delete from database
381 self.db_lock.acquire()
382 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
383 self.db_lock.release()
384 if result<0:
385 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
386
387 return 0, 'Success'
388
389 def clear_all_flows(self):
390 try:
391 if not self.test:
392 self.OF_connector.clear_all_flows()
393
394 # remove from database
395 self.db_lock.acquire()
396 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
397 self.db_lock.release()
398 return 0, None
399 except openflow_conn.OpenflowconnException as e:
400 return -1, self.logger.error("Error deleting all flows {}", str(e))
401
402 flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
403
404 def _check_flow_already_present(self, new_flow, flow_list):
405 '''check if the same flow is already present in the flow list
406 The flow is repeated if all the fields, apart from name, are equal
407 Return the index of matching flow, -1 if not match'''
408 index=0
409 for flow in flow_list:
410 equal=True
411 for f in self.flow_fields:
412 if flow.get(f) != new_flow.get(f):
413 equal=False
414 break
415 if equal:
416 return index
417 index += 1
418 return -1
419
420 def _compute_net_flows(self, nets):
421 new_flows=[]
422 new_broadcast_flows={}
423 nb_ports = 0
424
425 # Check switch_port information is right
426 self.logger.debug("_compute_net_flows nets: %s", str(nets))
427 for net in nets:
428 for port in net['ports']:
429 nb_ports += 1
430 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
431 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
432 # print self.name, ": ERROR " + error_text
433 return -1, error_text
434
435 for net_src in nets:
436 net_id = net_src["uuid"]
437 for net_dst in nets:
438 vlan_net_in = None
439 vlan_net_out = None
440 if net_src == net_dst:
441 #intra net rules
442 priority = 1000
443 elif net_src['bind_net'] == net_dst['uuid']:
444 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
445 vlan_net_out = int(net_src['bind_type'][5:])
446 priority = 1100
447 elif net_dst['bind_net'] == net_src['uuid']:
448 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
449 vlan_net_in = int(net_dst['bind_type'][5:])
450 priority = 1100
451 else:
452 #nets not binding
453 continue
454 for src_port in net_src['ports']:
455 vlan_in = vlan_net_in
456 if vlan_in == None and src_port['vlan'] != None:
457 vlan_in = src_port['vlan']
458 elif vlan_in != None and src_port['vlan'] != None:
459 #TODO this is something that we cannot do. It requires a double VLAN check
460 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
461 continue
462
463 # BROADCAST:
464 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
465 if broadcast_key in new_broadcast_flows:
466 flow_broadcast = new_broadcast_flows[broadcast_key]
467 else:
468 flow_broadcast = {'priority': priority,
469 'net_id': net_id,
470 'dst_mac': 'ff:ff:ff:ff:ff:ff',
471 "ingress_port": str(src_port['switch_port']),
472 'actions': []
473 }
474 new_broadcast_flows[broadcast_key] = flow_broadcast
475 if vlan_in is not None:
476 flow_broadcast['vlan_id'] = str(vlan_in)
477
478 for dst_port in net_dst['ports']:
479 vlan_out = vlan_net_out
480 if vlan_out == None and dst_port['vlan'] != None:
481 vlan_out = dst_port['vlan']
482 elif vlan_out != None and dst_port['vlan'] != None:
483 #TODO this is something that we cannot do. It requires a double VLAN set
484 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
485 continue
486 #if src_port == dst_port:
487 # continue
488 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
489 continue
490 flow = {
491 "priority": priority,
492 'net_id': net_id,
493 "ingress_port": str(src_port['switch_port']),
494 'actions': []
495 }
496 if vlan_in is not None:
497 flow['vlan_id'] = str(vlan_in)
498 # allow that one port have no mac
499 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
500 flow['priority'] = priority-5 # less priority
501 else:
502 flow['dst_mac'] = str(dst_port['mac'])
503
504 if vlan_out == None:
505 if vlan_in != None:
506 flow['actions'].append( ('vlan',None) )
507 else:
508 flow['actions'].append( ('vlan', vlan_out ) )
509 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
510
511 if self._check_flow_already_present(flow, new_flows) >= 0:
512 self.logger.debug("Skipping repeated flow '%s'", str(flow))
513 continue
514
515 new_flows.append(flow)
516
517 # BROADCAST:
518 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
519 continue
520 out = (vlan_out, str(dst_port['switch_port']))
521 if out not in flow_broadcast['actions']:
522 flow_broadcast['actions'].append( out )
523
524 #BROADCAST
525 for flow_broadcast in new_broadcast_flows.values():
526 if len(flow_broadcast['actions'])==0:
527 continue #nothing to do, skip
528 flow_broadcast['actions'].sort()
529 if 'vlan_id' in flow_broadcast:
530 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
531 else:
532 previous_vlan = None
533 final_actions=[]
534 action_number = 0
535 for action in flow_broadcast['actions']:
536 if action[0] != previous_vlan:
537 final_actions.append( ('vlan', action[0]) )
538 previous_vlan = action[0]
539 if self.pmp_with_same_vlan and action_number:
540 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
541 action_number += 1
542 final_actions.append( ('out', action[1]) )
543 flow_broadcast['actions'] = final_actions
544
545 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
546 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
547 continue
548
549 new_flows.append(flow_broadcast)
550
551 #UNIFY openflow rules with the same input port and vlan and the same output actions
552 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
553 #this can happen if there is only two ports. It is converted to a point to point connection
554 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
555 for flow in new_flows:
556 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
557 if key in flow_dict:
558 flow_dict[key].append(flow)
559 else:
560 flow_dict[key]=[ flow ]
561 new_flows2=[]
562 for flow_list in flow_dict.values():
563 convert2ptp=False
564 if len (flow_list)>=2:
565 convert2ptp=True
566 for f in flow_list:
567 if f['actions'] != flow_list[0]['actions']:
568 convert2ptp=False
569 break
570 if convert2ptp: # add only one unified rule without dst_mac
571 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
572 flow_list[0].pop('dst_mac')
573 flow_list[0]["priority"] -= 5
574 new_flows2.append(flow_list[0])
575 else: # add all the rules
576 new_flows2 += flow_list
577 return 0, new_flows2
578
579 def set_openflow_controller_status(self, status, error_text=None):
580 """
581 Set openflow controller last operation status in DB
582 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
583 :param error_text: error text
584 :return:
585 """
586 if self.of_uuid == "Default":
587 return True
588
589 ofc = {}
590 ofc['status'] = status
591 ofc['last_error'] = error_text
592 self.db_lock.acquire()
593 result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
594 self.db_lock.release()
595 if result >= 0:
596 return True
597 else:
598 return False
599
600
601
602
603
604
605