Doing vim_db threading safe with a Lock.
[osm/openvim.git] / osm_openvim / openflow_thread.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 ##
5 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
6 # This file is part of openvim
7 # All Rights Reserved.
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
23 ##
24
25 '''
26 This thread interacts with a openflow controller to create dataplane connections
27 '''
28
29 __author__="Pablo Montes, Alfonso Tierno"
30 __date__ ="17-jul-2015"
31
32
33 #import json
34 import threading
35 import time
36 import Queue
37 import requests
38 import logging
39 import openflow_conn
40
41 OFC_STATUS_ACTIVE = 'ACTIVE'
42 OFC_STATUS_INACTIVE = 'INACTIVE'
43 OFC_STATUS_ERROR = 'ERROR'
44
45 class FlowBadFormat(Exception):
46 '''raise when a bad format of flow is found'''
47
48 def change_of2db(flow):
49 '''Change 'flow' dictionary from openflow format to database format
50 Basically the change consist of changing 'flow[actions] from a list of
51 double tuple to a string
52 from [(A,B),(C,D),..] to "A=B,C=D" '''
53 action_str_list=[]
54 if type(flow)!=dict or "actions" not in flow:
55 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
56 try:
57 for action in flow['actions']:
58 action_str_list.append( action[0] + "=" + str(action[1]) )
59 flow['actions'] = ",".join(action_str_list)
60 except:
61 raise FlowBadFormat("Unexpected format at 'actions'")
62
63 def change_db2of(flow):
64 '''Change 'flow' dictionary from database format to openflow format
65 Basically the change consist of changing 'flow[actions]' from a string to
66 a double tuple list
67 from "A=B,C=D,..." to [(A,B),(C,D),..]
68 raise FlowBadFormat '''
69 actions=[]
70 if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
71 raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
72 action_list = flow['actions'].split(",")
73 for action_item in action_list:
74 action_tuple = action_item.split("=")
75 if len(action_tuple) != 2:
76 raise FlowBadFormat("Expected key=value format at 'actions'")
77 if action_tuple[0].strip().lower()=="vlan":
78 if action_tuple[1].strip().lower() in ("none", "strip"):
79 actions.append( ("vlan",None) )
80 else:
81 try:
82 actions.append( ("vlan", int(action_tuple[1])) )
83 except:
84 raise FlowBadFormat("Expected integer after vlan= at 'actions'")
85 elif action_tuple[0].strip().lower()=="out":
86 actions.append( ("out", str(action_tuple[1])) )
87 else:
88 raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
89 flow['actions'] = actions
90
91
92 class openflow_thread(threading.Thread):
93 """
94 This thread interacts with a openflow controller to create dataplane connections
95 """
96 def __init__(self, of_uuid, of_connector, db, of_test, pmp_with_same_vlan=False, logger_name=None,
97 debug=None):
98 threading.Thread.__init__(self)
99 self.of_uuid = of_uuid
100 self.db = db
101 self.pmp_with_same_vlan = pmp_with_same_vlan
102 self.test = of_test
103 self.OF_connector = of_connector
104 if logger_name:
105 self.logger_name = logger_name
106 else:
107 self.logger_name = "openvim.ofc." + of_uuid
108 self.logger = logging.getLogger(self.logger_name)
109 if debug:
110 self.logger.setLevel(getattr(logging, debug))
111 self.queueLock = threading.Lock()
112 self.taskQueue = Queue.Queue(2000)
113
114 @staticmethod
115 def _format_error_msg(error_text, max_length=1024):
116 if error_text and len(error_text) >= max_length:
117 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
118 return error_text
119
120 def insert_task(self, task, *aditional):
121 try:
122 self.queueLock.acquire()
123 task = self.taskQueue.put( (task,) + aditional, timeout=5)
124 self.queueLock.release()
125 return 1, None
126 except Queue.Full:
127 return -1, "timeout inserting a task over openflow thread " + self.of_uuid
128
129 def run(self):
130 self.logger.debug("Start openflow thread")
131 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
132
133 while True:
134 try:
135 self.queueLock.acquire()
136 if not self.taskQueue.empty():
137 task = self.taskQueue.get()
138 else:
139 task = None
140 self.queueLock.release()
141
142 if task is None:
143 time.sleep(1)
144 continue
145
146 if task[0] == 'update-net':
147 r, c = self.update_of_flows(task[1])
148 # update database status
149 if r<0:
150 UPDATE={'status':'ERROR', 'last_error': self._format_error_msg(str(c), 255)}
151 self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
152 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error updating net {}".format(task[1]))
153 else:
154 UPDATE={'status':'ACTIVE', 'last_error': None}
155 self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
156 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
157 self.db.update_rows('nets', UPDATE, WHERE={'uuid': task[1]})
158
159 elif task[0] == 'clear-all':
160 r,c = self.clear_all_flows()
161 if r<0:
162 self.logger.error("processing task 'clear-all': %s", c)
163 self.set_openflow_controller_status(OFC_STATUS_ERROR, "Error deleting all flows")
164 else:
165 self.set_openflow_controller_status(OFC_STATUS_ACTIVE)
166 self.logger.debug("processing task 'clear-all': OK")
167 elif task[0] == 'exit':
168 self.logger.debug("exit from openflow_thread")
169 self.terminate()
170 self.set_openflow_controller_status(OFC_STATUS_INACTIVE, "Ofc with thread killed")
171 return 0
172 else:
173 self.logger.error("unknown task %s", str(task))
174 except openflow_conn.OpenflowconnException as e:
175 self.logger.error("OpenflowconnException: " + str(e))
176 self.set_openflow_controller_status(OFC_STATUS_ERROR, str(e))
177 except Exception as e:
178 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
179
180 def terminate(self):
181 pass
182 # print self.name, ": exit from openflow_thread"
183
184 def update_of_flows(self, net_id):
185 ports=()
186 select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
187 result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
188 #get all the networks binding to this
189 if result > 0:
190 if nets[0]['bind_net']:
191 bind_id = nets[0]['bind_net']
192 else:
193 bind_id = net_id
194 #get our net and all bind_nets
195 result, nets = self.db.get_table(FROM='nets', SELECT=select_,
196 WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
197
198 if result < 0:
199 return -1, "DB error getting net: " + nets
200 #elif result==0:
201 #net has been deleted
202 ifaces_nb = 0
203 database_flows = []
204 for net in nets:
205 net_id = net["uuid"]
206 if net['admin_state_up'] == 'false':
207 net['ports'] = ()
208 else:
209 nb_ports, net_ports = self.db.get_table(
210 FROM='ports',
211 SELECT=('switch_port','vlan','uuid','mac','type','model'),
212 WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
213 if nb_ports < 0:
214
215 #print self.name, ": update_of_flows() ERROR getting ports", ports
216 return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
217
218 #add the binding as an external port
219 if net['provider'] and net['provider'][:9]=="openflow:":
220 external_port={"type":"external","mac":None}
221 external_port['uuid'] = net_id + ".1" #fake uuid
222 if net['provider'][-5:]==":vlan":
223 external_port["vlan"] = net["vlan"]
224 external_port["switch_port"] = net['provider'][9:-5]
225 else:
226 external_port["vlan"] = None
227 external_port["switch_port"] = net['provider'][9:]
228 net_ports = net_ports + (external_port,)
229 nb_ports += 1
230 net['ports'] = net_ports
231 ifaces_nb += nb_ports
232
233 # Get the name of flows that will be affected by this NET
234 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
235 if result < 0:
236 error_msg = "DB error getting flows from net '{}': {}".format(net_id, database_net_flows)
237 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
238 return -1, error_msg
239 database_flows += database_net_flows
240 # Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
241 result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
242 if result < 0:
243 error_msg = "DB error getting flows from net 'null': {}".format(database_net_flows)
244 # print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
245 return -1, error_msg
246 database_flows += database_net_flows
247
248 # Get the existing flows at openflow controller
249 try:
250 of_flows = self.OF_connector.get_of_rules()
251 # print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
252 except openflow_conn.OpenflowconnException as e:
253 # self.set_openflow_controller_status(OFC_STATUS_ERROR, "OF error {} getting flows".format(str(e)))
254 return -1, "OF error {} getting flows".format(str(e))
255
256 if ifaces_nb < 2:
257 pass
258 elif net['type'] == 'ptp':
259 if ifaces_nb > 2:
260 #print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
261 # str(ifaces_nb)+' interfaces.'
262 return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
263 elif net['type'] == 'data':
264 if ifaces_nb > 2 and self.pmp_with_same_vlan:
265 # check all ports are VLAN (tagged) or none
266 vlan_tag = None
267 for port in ports:
268 if port["type"]=="external":
269 if port["vlan"] != None:
270 if port["vlan"]!=net["vlan"]:
271 text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
272 #print self.name, "Error", text
273 return -1, text
274 if vlan_tag == None:
275 vlan_tag=True
276 elif vlan_tag==False:
277 text="Passthrough and external port vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
278 #print self.name, "Error", text
279 return -1, text
280 else:
281 if vlan_tag == None:
282 vlan_tag=False
283 elif vlan_tag == True:
284 text="SR-IOV and external port not vlan-tagged cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
285 #print self.name, "Error", text
286 return -1, text
287 elif port["model"]=="PF" or port["model"]=="VFnotShared":
288 if vlan_tag == None:
289 vlan_tag=False
290 elif vlan_tag==True:
291 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
292 #print self.name, "Error", text
293 return -1, text
294 elif port["model"] == "VF":
295 if vlan_tag == None:
296 vlan_tag=True
297 elif vlan_tag==False:
298 text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
299 #print self.name, "Error", text
300 return -1, text
301 else:
302 return -1, 'Only ptp and data networks are supported for openflow'
303
304 # calculate new flows to be inserted
305 result, new_flows = self._compute_net_flows(nets)
306 if result < 0:
307 return result, new_flows
308
309 #modify database flows format and get the used names
310 used_names=[]
311 for flow in database_flows:
312 try:
313 change_db2of(flow)
314 except FlowBadFormat as e:
315 self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
316 continue
317 used_names.append(flow['name'])
318 name_index=0
319 # insert at database the new flows, change actions to human text
320 for flow in new_flows:
321 # 1 check if an equal flow is already present
322 index = self._check_flow_already_present(flow, database_flows)
323 if index>=0:
324 database_flows[index]["not delete"]=True
325 self.logger.debug("Skipping already present flow %s", str(flow))
326 continue
327 # 2 look for a non used name
328 flow_name=flow["net_id"]+"."+str(name_index)
329 while flow_name in used_names or flow_name in of_flows:
330 name_index += 1
331 flow_name=flow["net_id"]+"."+str(name_index)
332 used_names.append(flow_name)
333 flow['name'] = flow_name
334 # 3 insert at openflow
335
336 try:
337 self.OF_connector.new_flow(flow)
338 except openflow_conn.OpenflowconnException as e:
339 return -1, "Error creating new flow {}".format(str(e))
340
341 # 4 insert at database
342 try:
343 change_of2db(flow)
344 except FlowBadFormat as e:
345 # print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
346 return -1, str(e)
347 result, content = self.db.new_row('of_flows', flow)
348 if result < 0:
349 # print self.name, ": Error '%s' at database insertion" % content, flow
350 return -1, content
351
352 #delete not needed old flows from openflow and from DDBB,
353 #check that the needed flows at DDBB are present in controller or insert them otherwise
354 for flow in database_flows:
355 if "not delete" in flow:
356 if flow["name"] not in of_flows:
357 # not in controller, insert it
358 try:
359 self.OF_connector.new_flow(flow)
360 except openflow_conn.OpenflowconnException as e:
361 return -1, "Error creating new flow {}".format(str(e))
362
363 continue
364 # Delete flow
365 if flow["name"] in of_flows:
366 try:
367 self.OF_connector.del_flow(flow['name'])
368 except openflow_conn.OpenflowconnException as e:
369 self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], str(e))
370 # skip deletion from database
371 continue
372
373 # delete from database
374 result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
375 if result<0:
376 self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
377
378 return 0, 'Success'
379
380 def clear_all_flows(self):
381 try:
382 if not self.test:
383 self.OF_connector.clear_all_flows()
384
385 # remove from database
386 self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
387 return 0, None
388 except openflow_conn.OpenflowconnException as e:
389 return -1, self.logger.error("Error deleting all flows {}", str(e))
390
391 flow_fields = ('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
392
393 def _check_flow_already_present(self, new_flow, flow_list):
394 '''check if the same flow is already present in the flow list
395 The flow is repeated if all the fields, apart from name, are equal
396 Return the index of matching flow, -1 if not match'''
397 index=0
398 for flow in flow_list:
399 equal=True
400 for f in self.flow_fields:
401 if flow.get(f) != new_flow.get(f):
402 equal=False
403 break
404 if equal:
405 return index
406 index += 1
407 return -1
408
409 def _compute_net_flows(self, nets):
410 new_flows=[]
411 new_broadcast_flows={}
412 nb_ports = 0
413
414 # Check switch_port information is right
415 self.logger.debug("_compute_net_flows nets: %s", str(nets))
416 for net in nets:
417 for port in net['ports']:
418 nb_ports += 1
419 if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
420 error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
421 # print self.name, ": ERROR " + error_text
422 return -1, error_text
423
424 for net_src in nets:
425 net_id = net_src["uuid"]
426 for net_dst in nets:
427 vlan_net_in = None
428 vlan_net_out = None
429 if net_src == net_dst:
430 #intra net rules
431 priority = 1000
432 elif net_src['bind_net'] == net_dst['uuid']:
433 if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
434 vlan_net_out = int(net_src['bind_type'][5:])
435 priority = 1100
436 elif net_dst['bind_net'] == net_src['uuid']:
437 if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
438 vlan_net_in = int(net_dst['bind_type'][5:])
439 priority = 1100
440 else:
441 #nets not binding
442 continue
443 for src_port in net_src['ports']:
444 vlan_in = vlan_net_in
445 if vlan_in == None and src_port['vlan'] != None:
446 vlan_in = src_port['vlan']
447 elif vlan_in != None and src_port['vlan'] != None:
448 #TODO this is something that we cannot do. It requires a double VLAN check
449 #outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
450 continue
451
452 # BROADCAST:
453 broadcast_key = src_port['uuid'] + "." + str(vlan_in)
454 if broadcast_key in new_broadcast_flows:
455 flow_broadcast = new_broadcast_flows[broadcast_key]
456 else:
457 flow_broadcast = {'priority': priority,
458 'net_id': net_id,
459 'dst_mac': 'ff:ff:ff:ff:ff:ff',
460 "ingress_port": str(src_port['switch_port']),
461 'actions': []
462 }
463 new_broadcast_flows[broadcast_key] = flow_broadcast
464 if vlan_in is not None:
465 flow_broadcast['vlan_id'] = str(vlan_in)
466
467 for dst_port in net_dst['ports']:
468 vlan_out = vlan_net_out
469 if vlan_out == None and dst_port['vlan'] != None:
470 vlan_out = dst_port['vlan']
471 elif vlan_out != None and dst_port['vlan'] != None:
472 #TODO this is something that we cannot do. It requires a double VLAN set
473 #outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
474 continue
475 #if src_port == dst_port:
476 # continue
477 if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
478 continue
479 flow = {
480 "priority": priority,
481 'net_id': net_id,
482 "ingress_port": str(src_port['switch_port']),
483 'actions': []
484 }
485 if vlan_in is not None:
486 flow['vlan_id'] = str(vlan_in)
487 # allow that one port have no mac
488 if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
489 flow['priority'] = priority-5 # less priority
490 else:
491 flow['dst_mac'] = str(dst_port['mac'])
492
493 if vlan_out == None:
494 if vlan_in != None:
495 flow['actions'].append( ('vlan',None) )
496 else:
497 flow['actions'].append( ('vlan', vlan_out ) )
498 flow['actions'].append( ('out', str(dst_port['switch_port'])) )
499
500 if self._check_flow_already_present(flow, new_flows) >= 0:
501 self.logger.debug("Skipping repeated flow '%s'", str(flow))
502 continue
503
504 new_flows.append(flow)
505
506 # BROADCAST:
507 if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
508 continue
509 out = (vlan_out, str(dst_port['switch_port']))
510 if out not in flow_broadcast['actions']:
511 flow_broadcast['actions'].append( out )
512
513 #BROADCAST
514 for flow_broadcast in new_broadcast_flows.values():
515 if len(flow_broadcast['actions'])==0:
516 continue #nothing to do, skip
517 flow_broadcast['actions'].sort()
518 if 'vlan_id' in flow_broadcast:
519 previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
520 else:
521 previous_vlan = None
522 final_actions=[]
523 action_number = 0
524 for action in flow_broadcast['actions']:
525 if action[0] != previous_vlan:
526 final_actions.append( ('vlan', action[0]) )
527 previous_vlan = action[0]
528 if self.pmp_with_same_vlan and action_number:
529 return -1, "Cannot interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
530 action_number += 1
531 final_actions.append( ('out', action[1]) )
532 flow_broadcast['actions'] = final_actions
533
534 if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
535 self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
536 continue
537
538 new_flows.append(flow_broadcast)
539
540 #UNIFY openflow rules with the same input port and vlan and the same output actions
541 #These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
542 #this can happen if there is only two ports. It is converted to a point to point connection
543 flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
544 for flow in new_flows:
545 key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
546 if key in flow_dict:
547 flow_dict[key].append(flow)
548 else:
549 flow_dict[key]=[ flow ]
550 new_flows2=[]
551 for flow_list in flow_dict.values():
552 convert2ptp=False
553 if len (flow_list)>=2:
554 convert2ptp=True
555 for f in flow_list:
556 if f['actions'] != flow_list[0]['actions']:
557 convert2ptp=False
558 break
559 if convert2ptp: # add only one unified rule without dst_mac
560 self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
561 flow_list[0].pop('dst_mac')
562 flow_list[0]["priority"] -= 5
563 new_flows2.append(flow_list[0])
564 else: # add all the rules
565 new_flows2 += flow_list
566 return 0, new_flows2
567
568 def set_openflow_controller_status(self, status, error_text=None):
569 """
570 Set openflow controller last operation status in DB
571 :param status: ofc status ('ACTIVE','INACTIVE','ERROR')
572 :param error_text: error text
573 :return:
574 """
575 if self.of_uuid == "Default":
576 return True
577
578 ofc = {}
579 ofc['status'] = status
580 ofc['last_error'] = self._format_error_msg(error_text, 255)
581 result, content = self.db.update_rows('ofcs', ofc, WHERE={'uuid': self.of_uuid}, log=False)
582 if result >= 0:
583 return True
584 else:
585 return False
586
587
588
589
590
591
592