Merge remote-tracking branch 'upstream/master'
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12 import urllib2
13 from functools import partial
14
15 from mininet.net import Containernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Containernet):
25 """
26 Wraps the original Mininet/Containernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
34 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
35 dc_emulation_max_mem=512, # emulation max mem in MB
36 **kwargs):
37 """
38 Create an extended version of a Containernet network
39 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
40 :param kwargs: path through for Mininet parameters
41 :return:
42 """
43 self.dcs = {}
44
45 # call original Docker.__init__ and setup default controller
46 Containernet.__init__(
47 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
48
49
50 # Ryu management
51 self.ryu_process = None
52 if controller == RemoteController:
53 # start Ryu controller
54 self.startRyu(learning_switch=enable_learning)
55
56 # add the specified controller
57 self.addController('c0', controller=controller)
58
59 # graph of the complete DC network
60 self.DCNetwork_graph = nx.MultiDiGraph()
61
62 # initialize pool of vlan tags to setup the SDN paths
63 self.vlans = range(4096)[::-1]
64
65 # link to Ryu REST_API
66 ryu_ip = '0.0.0.0'
67 ryu_port = '8080'
68 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
69
70 # monitoring agent
71 if monitor:
72 self.monitor_agent = DCNetworkMonitor(self)
73 else:
74 self.monitor_agent = None
75
76 # initialize resource model registrar
77 self.rm_registrar = ResourceModelRegistrar(
78 dc_emulation_max_cpu, dc_emulation_max_mem)
79
80 def addDatacenter(self, label, metadata={}, resource_log_path=None):
81 """
82 Create and add a logical cloud data center to the network.
83 """
84 if label in self.dcs:
85 raise Exception("Data center label already exists: %s" % label)
86 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
87 dc.net = self # set reference to network
88 self.dcs[label] = dc
89 dc.create() # finally create the data center in our Mininet instance
90 logging.info("added data center: %s" % label)
91 return dc
92
93 def addLink(self, node1, node2, **params):
94 """
95 Able to handle Datacenter objects as link
96 end points.
97 """
98 assert node1 is not None
99 assert node2 is not None
100 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
101 # ensure type of node1
102 if isinstance( node1, basestring ):
103 if node1 in self.dcs:
104 node1 = self.dcs[node1].switch
105 if isinstance( node1, Datacenter ):
106 node1 = node1.switch
107 # ensure type of node2
108 if isinstance( node2, basestring ):
109 if node2 in self.dcs:
110 node2 = self.dcs[node2].switch
111 if isinstance( node2, Datacenter ):
112 node2 = node2.switch
113 # try to give containers a default IP
114 if isinstance( node1, Docker ):
115 if "params1" not in params:
116 params["params1"] = {}
117 if "ip" not in params["params1"]:
118 params["params1"]["ip"] = self.getNextIp()
119 if isinstance( node2, Docker ):
120 if "params2" not in params:
121 params["params2"] = {}
122 if "ip" not in params["params2"]:
123 params["params2"]["ip"] = self.getNextIp()
124 # ensure that we allow TCLinks between data centers
125 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
126 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
127 if "cls" not in params:
128 params["cls"] = TCLink
129
130 link = Containernet.addLink(self, node1, node2, **params)
131
132 # try to give container interfaces a default id
133 node1_port_id = node1.ports[link.intf1]
134 if isinstance(node1, Docker):
135 if "id" in params["params1"]:
136 node1_port_id = params["params1"]["id"]
137 node1_port_name = link.intf1.name
138
139 node2_port_id = node2.ports[link.intf2]
140 if isinstance(node2, Docker):
141 if "id" in params["params2"]:
142 node2_port_id = params["params2"]["id"]
143 node2_port_name = link.intf2.name
144
145
146 # add edge and assigned port number to graph in both directions between node1 and node2
147 # port_id: id given in descriptor (if available, otherwise same as port)
148 # port: portnumber assigned by Containernet
149
150 attr_dict = {}
151 # possible weight metrics allowed by TClink class:
152 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
153 edge_attributes = [p for p in params if p in weight_metrics]
154 for attr in edge_attributes:
155 # if delay: strip ms (need number as weight in graph)
156 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
157 if match:
158 attr_number = match.group(1)
159 else:
160 attr_number = None
161 attr_dict[attr] = attr_number
162
163
164 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
165 'src_port_name': node1_port_name,
166 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
167 'dst_port_name': node2_port_name}
168 attr_dict2.update(attr_dict)
169 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
170
171 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
172 'src_port_name': node2_port_name,
173 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
174 'dst_port_name': node1_port_name}
175 attr_dict2.update(attr_dict)
176 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
177
178 return link
179
180 def addDocker( self, label, **params ):
181 """
182 Wrapper for addDocker method to use custom container class.
183 """
184 self.DCNetwork_graph.add_node(label)
185 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
186
187 def removeDocker( self, label, **params ):
188 """
189 Wrapper for removeDocker method to update graph.
190 """
191 self.DCNetwork_graph.remove_node(label)
192 return Containernet.removeDocker(self, label, **params)
193
194 def addSwitch( self, name, add_to_graph=True, **params ):
195 """
196 Wrapper for addSwitch method to store switch also in graph.
197 """
198 if add_to_graph:
199 self.DCNetwork_graph.add_node(name)
200 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
201
202 def getAllContainers(self):
203 """
204 Returns a list with all containers within all data centers.
205 """
206 all_containers = []
207 for dc in self.dcs.itervalues():
208 all_containers += dc.listCompute()
209 return all_containers
210
211 def start(self):
212 # start
213 for dc in self.dcs.itervalues():
214 dc.start()
215 Containernet.start(self)
216
217 def stop(self):
218
219 # stop the monitor agent
220 if self.monitor_agent is not None:
221 self.monitor_agent.stop()
222
223 # stop emulator net
224 Containernet.stop(self)
225
226 # stop Ryu controller
227 self.stopRyu()
228
229
230 def CLI(self):
231 CLI(self)
232
233 # to remove chain do setChain( src, dst, cmd='del-flows')
234 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
235 cmd = kwargs.get('cmd')
236 if cmd == 'add-flow':
237 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
238 if kwargs.get('bidirectional'):
239 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
240
241 elif cmd == 'del-flows': # TODO: del-flow to be implemented
242 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
243 if kwargs.get('bidirectional'):
244 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
245
246 else:
247 ret = "Command unknown"
248
249 return ret
250
251
252 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
253
254 # TODO: this needs to be cleaned up
255 #check if port is specified (vnf:port)
256 if vnf_src_interface is None:
257 # take first interface by default
258 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
259 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
260 vnf_src_interface = link_dict[0]['src_port_id']
261
262 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
263 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
264 for link in link_dict:
265 if link_dict[link]['src_port_id'] == vnf_src_interface:
266 # found the right link and connected switch
267 src_sw = connected_sw
268
269 src_sw_inport_nr = link_dict[link]['dst_port_nr']
270 break
271
272 if vnf_dst_interface is None:
273 # take first interface by default
274 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
275 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
276 vnf_dst_interface = link_dict[0]['dst_port_id']
277
278 vnf_dst_name = vnf_dst_name.split(':')[0]
279 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
280 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
281 for link in link_dict:
282 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
283 # found the right link and connected switch
284 dst_sw = connected_sw
285 dst_sw_outport_nr = link_dict[link]['src_port_nr']
286 break
287
288
289 # get shortest path
290 try:
291 # returns the first found shortest path
292 # if all shortest paths are wanted, use: all_shortest_paths
293 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
294 except:
295 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
296 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
297
298 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
299
300 current_hop = src_sw
301 switch_inport_nr = src_sw_inport_nr
302
303 # choose free vlan if path contains more than 1 switch
304 cmd = kwargs.get('cmd')
305 vlan = None
306 if cmd == 'add-flow':
307 if len(path) > 1:
308 vlan = self.vlans.pop()
309
310 for i in range(0,len(path)):
311 current_node = self.getNodeByName(current_hop)
312
313 if path.index(current_hop) < len(path)-1:
314 next_hop = path[path.index(current_hop)+1]
315 else:
316 #last switch reached
317 next_hop = vnf_dst_name
318
319 next_node = self.getNodeByName(next_hop)
320
321 if next_hop == vnf_dst_name:
322 switch_outport_nr = dst_sw_outport_nr
323 logging.info("end node reached: {0}".format(vnf_dst_name))
324 elif not isinstance( next_node, OVSSwitch ):
325 logging.info("Next node: {0} is not a switch".format(next_hop))
326 return "Next node: {0} is not a switch".format(next_hop)
327 else:
328 # take first link between switches by default
329 index_edge_out = 0
330 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
331
332
333 # set of entry via ovs-ofctl
334 if isinstance( current_node, OVSSwitch ):
335 kwargs['vlan'] = vlan
336 kwargs['path'] = path
337 kwargs['current_hop'] = current_hop
338
339 if self.controller == RemoteController:
340 ## set flow entry via ryu rest api
341 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
342 else:
343 ## set flow entry via ovs-ofctl
344 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
345
346
347
348 # take first link between switches by default
349 if isinstance( next_node, OVSSwitch ):
350 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
351 current_hop = next_hop
352
353 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
354
355 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
356 match = 'in_port=%s' % switch_inport_nr
357
358 cookie = kwargs.get('cookie')
359 match_input = kwargs.get('match')
360 cmd = kwargs.get('cmd')
361 path = kwargs.get('path')
362 current_hop = kwargs.get('current_hop')
363 vlan = kwargs.get('vlan')
364
365 s = ','
366 if match_input:
367 match = s.join([match, match_input])
368
369 flow = {}
370 flow['dpid'] = int(node.dpid, 16)
371
372 if cookie:
373 flow['cookie'] = int(cookie)
374
375
376 flow['actions'] = []
377
378 # possible Ryu actions, match fields:
379 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
380 if cmd == 'add-flow':
381 prefix = 'stats/flowentry/add'
382 if vlan != None:
383 if path.index(current_hop) == 0: # first node
384 action = {}
385 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
386 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
387 flow['actions'].append(action)
388 action = {}
389 action['type'] = 'SET_FIELD'
390 action['field'] = 'vlan_vid'
391 action['value'] = vlan
392 flow['actions'].append(action)
393 elif path.index(current_hop) == len(path) - 1: # last node
394 match += ',dl_vlan=%s' % vlan
395 action = {}
396 action['type'] = 'POP_VLAN'
397 flow['actions'].append(action)
398 else: # middle nodes
399 match += ',dl_vlan=%s' % vlan
400 # output action must come last
401 action = {}
402 action['type'] = 'OUTPUT'
403 action['port'] = switch_outport_nr
404 flow['actions'].append(action)
405
406 elif cmd == 'del-flows':
407 prefix = 'stats/flowentry/delete'
408
409 if cookie:
410 # TODO: add cookie_mask as argument
411 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
412
413 action = {}
414 action['type'] = 'OUTPUT'
415 action['port'] = switch_outport_nr
416 flow['actions'].append(action)
417
418 flow['match'] = self._parse_match(match)
419 self.ryu_REST(prefix, data=flow)
420
421 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
422 match = 'in_port=%s' % switch_inport_nr
423
424 cookie = kwargs.get('cookie')
425 match_input = kwargs.get('match')
426 cmd = kwargs.get('cmd')
427 path = kwargs.get('path')
428 current_hop = kwargs.get('current_hop')
429 vlan = kwargs.get('vlan')
430
431 s = ','
432 if cookie:
433 cookie = 'cookie=%s' % cookie
434 match = s.join([cookie, match])
435 if match_input:
436 match = s.join([match, match_input])
437 if cmd == 'add-flow':
438 action = 'action=%s' % switch_outport_nr
439 if vlan != None:
440 if path.index(current_hop) == 0: # first node
441 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
442 match = '-O OpenFlow13 ' + match
443 elif path.index(current_hop) == len(path) - 1: # last node
444 match += ',dl_vlan=%s' % vlan
445 action = 'action=strip_vlan,output=%s' % switch_outport_nr
446 else: # middle nodes
447 match += ',dl_vlan=%s' % vlan
448 ofcmd = s.join([match, action])
449 elif cmd == 'del-flows':
450 ofcmd = match
451 else:
452 ofcmd = ''
453
454 node.dpctl(cmd, ofcmd)
455 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
456 switch_outport_nr, cmd))
457
458 # start Ryu Openflow controller as Remote Controller for the DCNetwork
459 def startRyu(self, learning_switch=True):
460 # start Ryu controller with rest-API
461 python_install_path = site.getsitepackages()[0]
462 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
463 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
464 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
465 # Ryu still uses 6633 as default
466 ryu_option = '--ofp-tcp-listen-port'
467 ryu_of_port = '6653'
468 ryu_cmd = 'ryu-manager'
469 FNULL = open("/tmp/ryu.log", 'w')
470 if learning_switch:
471 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
472 else:
473 # no learning switch
474 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
475 time.sleep(1)
476
477 def stopRyu(self):
478 if self.ryu_process is not None:
479 self.ryu_process.terminate()
480 self.ryu_process.kill()
481
482 def ryu_REST(self, prefix, dpid=None, data=None):
483 try:
484 if dpid:
485 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
486 else:
487 url = self.ryu_REST_api + '/' + str(prefix)
488 if data:
489 #logging.info('POST: {0}'.format(str(data)))
490 req = urllib2.Request(url, str(data))
491 else:
492 req = urllib2.Request(url)
493
494 ret = urllib2.urlopen(req).read()
495 return ret
496 except:
497 logging.info('error url: {0}'.format(str(url)))
498 if data: logging.info('error POST: {0}'.format(str(data)))
499
500 # need to respect that some match fields must be integers
501 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
502 def _parse_match(self, match):
503 matches = match.split(',')
504 dict = {}
505 for m in matches:
506 match = m.split('=')
507 if len(match) == 2:
508 try:
509 m2 = int(match[1], 0)
510 except:
511 m2 = match[1]
512
513 dict.update({match[0]:m2})
514 return dict
515