fix SDN chaining bug regarding cookies + add extra unittest SDN chaining multi service
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12 import urllib2
13 from functools import partial
14
15 from mininet.net import Containernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Containernet):
25 """
26 Wraps the original Mininet/Containernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
34 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
35 dc_emulation_max_mem=512, # emulation max mem in MB
36 **kwargs):
37 """
38 Create an extended version of a Containernet network
39 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
40 :param kwargs: path through for Mininet parameters
41 :return:
42 """
43 self.dcs = {}
44
45 # call original Docker.__init__ and setup default controller
46 Containernet.__init__(
47 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
48
49
50 # Ryu management
51 self.ryu_process = None
52 if controller == RemoteController:
53 # start Ryu controller
54 self.startRyu(learning_switch=enable_learning)
55
56 # add the specified controller
57 self.addController('c0', controller=controller)
58
59 # graph of the complete DC network
60 self.DCNetwork_graph = nx.MultiDiGraph()
61
62 # initialize pool of vlan tags to setup the SDN paths
63 self.vlans = range(4096)[::-1]
64
65 # link to Ryu REST_API
66 ryu_ip = '0.0.0.0'
67 ryu_port = '8080'
68 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
69
70 # monitoring agent
71 if monitor:
72 self.monitor_agent = DCNetworkMonitor(self)
73 else:
74 self.monitor_agent = None
75
76 # initialize resource model registrar
77 self.rm_registrar = ResourceModelRegistrar(
78 dc_emulation_max_cpu, dc_emulation_max_mem)
79
80 def addDatacenter(self, label, metadata={}, resource_log_path=None):
81 """
82 Create and add a logical cloud data center to the network.
83 """
84 if label in self.dcs:
85 raise Exception("Data center label already exists: %s" % label)
86 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
87 dc.net = self # set reference to network
88 self.dcs[label] = dc
89 dc.create() # finally create the data center in our Mininet instance
90 logging.info("added data center: %s" % label)
91 return dc
92
93 def addLink(self, node1, node2, **params):
94 """
95 Able to handle Datacenter objects as link
96 end points.
97 """
98 assert node1 is not None
99 assert node2 is not None
100 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
101 # ensure type of node1
102 if isinstance( node1, basestring ):
103 if node1 in self.dcs:
104 node1 = self.dcs[node1].switch
105 if isinstance( node1, Datacenter ):
106 node1 = node1.switch
107 # ensure type of node2
108 if isinstance( node2, basestring ):
109 if node2 in self.dcs:
110 node2 = self.dcs[node2].switch
111 if isinstance( node2, Datacenter ):
112 node2 = node2.switch
113 # try to give containers a default IP
114 if isinstance( node1, Docker ):
115 if "params1" not in params:
116 params["params1"] = {}
117 if "ip" not in params["params1"]:
118 params["params1"]["ip"] = self.getNextIp()
119 if isinstance( node2, Docker ):
120 if "params2" not in params:
121 params["params2"] = {}
122 if "ip" not in params["params2"]:
123 params["params2"]["ip"] = self.getNextIp()
124 # ensure that we allow TCLinks between data centers
125 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
126 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
127 if "cls" not in params:
128 params["cls"] = TCLink
129
130 link = Containernet.addLink(self, node1, node2, **params)
131
132 # try to give container interfaces a default id
133 node1_port_id = node1.ports[link.intf1]
134 if isinstance(node1, Docker):
135 if "id" in params["params1"]:
136 node1_port_id = params["params1"]["id"]
137 node1_port_name = link.intf1.name
138
139 node2_port_id = node2.ports[link.intf2]
140 if isinstance(node2, Docker):
141 if "id" in params["params2"]:
142 node2_port_id = params["params2"]["id"]
143 node2_port_name = link.intf2.name
144
145
146 # add edge and assigned port number to graph in both directions between node1 and node2
147 # port_id: id given in descriptor (if available, otherwise same as port)
148 # port: portnumber assigned by Containernet
149
150 attr_dict = {}
151 # possible weight metrics allowed by TClink class:
152 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
153 edge_attributes = [p for p in params if p in weight_metrics]
154 for attr in edge_attributes:
155 # if delay: strip ms (need number as weight in graph)
156 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
157 if match:
158 attr_number = match.group(1)
159 else:
160 attr_number = None
161 attr_dict[attr] = attr_number
162
163
164 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
165 'src_port_name': node1_port_name,
166 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
167 'dst_port_name': node2_port_name}
168 attr_dict2.update(attr_dict)
169 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
170
171 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
172 'src_port_name': node2_port_name,
173 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
174 'dst_port_name': node1_port_name}
175 attr_dict2.update(attr_dict)
176 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
177
178 return link
179
180 def addDocker( self, label, **params ):
181 """
182 Wrapper for addDocker method to use custom container class.
183 """
184 self.DCNetwork_graph.add_node(label)
185 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
186
187 def removeDocker( self, label, **params ):
188 """
189 Wrapper for removeDocker method to update graph.
190 """
191 self.DCNetwork_graph.remove_node(label)
192 return Containernet.removeDocker(self, label, **params)
193
194 def addSwitch( self, name, add_to_graph=True, **params ):
195 """
196 Wrapper for addSwitch method to store switch also in graph.
197 """
198 if add_to_graph:
199 self.DCNetwork_graph.add_node(name)
200 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
201
202 def getAllContainers(self):
203 """
204 Returns a list with all containers within all data centers.
205 """
206 all_containers = []
207 for dc in self.dcs.itervalues():
208 all_containers += dc.listCompute()
209 return all_containers
210
211 def start(self):
212 # start
213 for dc in self.dcs.itervalues():
214 dc.start()
215 Containernet.start(self)
216
217 def stop(self):
218
219 # stop the monitor agent
220 if self.monitor_agent is not None:
221 self.monitor_agent.stop()
222
223 # stop emulator net
224 Containernet.stop(self)
225
226 # stop Ryu controller
227 self.stopRyu()
228
229
230 def CLI(self):
231 CLI(self)
232
233 # to remove chain do setChain( src, dst, cmd='del-flows')
234 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
235 cmd = kwargs.get('cmd')
236 if cmd == 'add-flow':
237 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
238 if kwargs.get('bidirectional'):
239 return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
240
241 elif cmd == 'del-flows': # TODO: del-flow to be implemented
242 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
243 if kwargs.get('bidirectional'):
244 return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
245
246 else:
247 return "Command unknown"
248
249
250 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
251
252 # TODO: this needs to be cleaned up
253 #check if port is specified (vnf:port)
254 if vnf_src_interface is None:
255 # take first interface by default
256 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
257 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
258 vnf_src_interface = link_dict[0]['src_port_id']
259
260 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
261 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
262 for link in link_dict:
263 if link_dict[link]['src_port_id'] == vnf_src_interface:
264 # found the right link and connected switch
265 src_sw = connected_sw
266
267 src_sw_inport_nr = link_dict[link]['dst_port_nr']
268 break
269
270 if vnf_dst_interface is None:
271 # take first interface by default
272 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
273 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
274 vnf_dst_interface = link_dict[0]['dst_port_id']
275
276 vnf_dst_name = vnf_dst_name.split(':')[0]
277 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
278 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
279 for link in link_dict:
280 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
281 # found the right link and connected switch
282 dst_sw = connected_sw
283 dst_sw_outport_nr = link_dict[link]['src_port_nr']
284 break
285
286
287 # get shortest path
288 try:
289 # returns the first found shortest path
290 # if all shortest paths are wanted, use: all_shortest_paths
291 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
292 except:
293 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
294 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
295
296 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
297
298 current_hop = src_sw
299 switch_inport_nr = src_sw_inport_nr
300
301 # choose free vlan if path contains more than 1 switch
302 cmd = kwargs.get('cmd')
303 vlan = None
304 if cmd == 'add-flow':
305 if len(path) > 1:
306 vlan = self.vlans.pop()
307
308 for i in range(0,len(path)):
309 current_node = self.getNodeByName(current_hop)
310
311 if path.index(current_hop) < len(path)-1:
312 next_hop = path[path.index(current_hop)+1]
313 else:
314 #last switch reached
315 next_hop = vnf_dst_name
316
317 next_node = self.getNodeByName(next_hop)
318
319 if next_hop == vnf_dst_name:
320 switch_outport_nr = dst_sw_outport_nr
321 logging.info("end node reached: {0}".format(vnf_dst_name))
322 elif not isinstance( next_node, OVSSwitch ):
323 logging.info("Next node: {0} is not a switch".format(next_hop))
324 return "Next node: {0} is not a switch".format(next_hop)
325 else:
326 # take first link between switches by default
327 index_edge_out = 0
328 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
329
330
331 # set of entry via ovs-ofctl
332 if isinstance( current_node, OVSSwitch ):
333 kwargs['vlan'] = vlan
334 kwargs['path'] = path
335 kwargs['current_hop'] = current_hop
336
337 if self.controller == RemoteController:
338 ## set flow entry via ryu rest api
339 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
340 else:
341 ## set flow entry via ovs-ofctl
342 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
343
344
345
346 # take first link between switches by default
347 if isinstance( next_node, OVSSwitch ):
348 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
349 current_hop = next_hop
350
351 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
352
353 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
354 match = 'in_port=%s' % switch_inport_nr
355
356 cookie = kwargs.get('cookie')
357 match_input = kwargs.get('match')
358 cmd = kwargs.get('cmd')
359 path = kwargs.get('path')
360 current_hop = kwargs.get('current_hop')
361 vlan = kwargs.get('vlan')
362
363 s = ','
364 if match_input:
365 match = s.join([match, match_input])
366
367 flow = {}
368 flow['dpid'] = int(node.dpid, 16)
369
370 if cookie:
371 flow['cookie'] = int(cookie)
372
373
374 flow['actions'] = []
375
376 # possible Ryu actions, match fields:
377 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
378 if cmd == 'add-flow':
379 prefix = 'stats/flowentry/add'
380 if vlan != None:
381 if path.index(current_hop) == 0: # first node
382 action = {}
383 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
384 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
385 flow['actions'].append(action)
386 action = {}
387 action['type'] = 'SET_FIELD'
388 action['field'] = 'vlan_vid'
389 action['value'] = vlan
390 flow['actions'].append(action)
391 elif path.index(current_hop) == len(path) - 1: # last node
392 match += ',dl_vlan=%s' % vlan
393 action = {}
394 action['type'] = 'POP_VLAN'
395 flow['actions'].append(action)
396 else: # middle nodes
397 match += ',dl_vlan=%s' % vlan
398 # output action must come last
399 action = {}
400 action['type'] = 'OUTPUT'
401 action['port'] = switch_outport_nr
402 flow['actions'].append(action)
403
404 elif cmd == 'del-flows':
405 prefix = 'stats/flowentry/delete'
406
407 # if cookie is given, only delete flows by cookie
408 # do not specify other match -> also other cookies can be matched
409 if cookie:
410 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
411
412 action = {}
413 action['type'] = 'OUTPUT'
414 action['port'] = switch_outport_nr
415 flow['actions'].append(action)
416
417 flow['match'] = self._parse_match(match)
418 self.ryu_REST(prefix, data=flow)
419
420 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
421 match = 'in_port=%s' % switch_inport_nr
422
423 cookie = kwargs.get('cookie')
424 match_input = kwargs.get('match')
425 cmd = kwargs.get('cmd')
426 path = kwargs.get('path')
427 current_hop = kwargs.get('current_hop')
428 vlan = kwargs.get('vlan')
429
430 s = ','
431 if cookie:
432 cookie = 'cookie=%s' % cookie
433 match = s.join([cookie, match])
434 if match_input:
435 match = s.join([match, match_input])
436 if cmd == 'add-flow':
437 action = 'action=%s' % switch_outport_nr
438 if vlan != None:
439 if path.index(current_hop) == 0: # first node
440 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
441 match = '-O OpenFlow13 ' + match
442 elif path.index(current_hop) == len(path) - 1: # last node
443 match += ',dl_vlan=%s' % vlan
444 action = 'action=strip_vlan,output=%s' % switch_outport_nr
445 else: # middle nodes
446 match += ',dl_vlan=%s' % vlan
447 ofcmd = s.join([match, action])
448 elif cmd == 'del-flows':
449 ofcmd = match
450 else:
451 ofcmd = ''
452
453 node.dpctl(cmd, ofcmd)
454 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
455 switch_outport_nr, cmd))
456
457 # start Ryu Openflow controller as Remote Controller for the DCNetwork
458 def startRyu(self, learning_switch=True):
459 # start Ryu controller with rest-API
460 python_install_path = site.getsitepackages()[0]
461 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
462 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
463 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
464 # Ryu still uses 6633 as default
465 ryu_option = '--ofp-tcp-listen-port'
466 ryu_of_port = '6653'
467 ryu_cmd = 'ryu-manager'
468 FNULL = open("/tmp/ryu.log", 'w')
469 if learning_switch:
470 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
471 else:
472 # no learning switch
473 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
474 time.sleep(1)
475
476 def stopRyu(self):
477 if self.ryu_process is not None:
478 self.ryu_process.terminate()
479 self.ryu_process.kill()
480
481 def ryu_REST(self, prefix, dpid=None, data=None):
482 if data: logging.info('log POST: {0}'.format(str(data)))
483 try:
484 if dpid:
485 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
486 else:
487 url = self.ryu_REST_api + '/' + str(prefix)
488 if data:
489 #logging.info('POST: {0}'.format(str(data)))
490 req = urllib2.Request(url, str(data))
491 else:
492 req = urllib2.Request(url)
493
494 ret = urllib2.urlopen(req).read()
495 return ret
496 except:
497 logging.info('error url: {0}'.format(str(url)))
498 if data: logging.info('error POST: {0}'.format(str(data)))
499
500 # need to respect that some match fields must be integers
501 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
502 def _parse_match(self, match):
503 matches = match.split(',')
504 dict = {}
505 for m in matches:
506 match = m.split('=')
507 if len(match) == 2:
508 try:
509 m2 = int(match[1], 0)
510 except:
511 m2 = match[1]
512
513 dict.update({match[0]:m2})
514 return dict
515