merge master
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12 import urllib2
13 from functools import partial
14
15 from mininet.net import Containernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Containernet):
25 """
26 Wraps the original Mininet/Containernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
34 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
35 dc_emulation_max_mem=512, # emulation max mem in MB
36 **kwargs):
37 """
38 Create an extended version of a Containernet network
39 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
40 :param kwargs: path through for Mininet parameters
41 :return:
42 """
43 self.dcs = {}
44
45 # call original Docker.__init__ and setup default controller
46 Containernet.__init__(
47 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
48
49
50 # Ryu management
51 self.ryu_process = None
52 if controller == RemoteController:
53 # start Ryu controller
54 self.startRyu(learning_switch=enable_learning)
55
56 # add the specified controller
57 self.addController('c0', controller=controller)
58
59 # graph of the complete DC network
60 self.DCNetwork_graph = nx.MultiDiGraph()
61
62 # initialize pool of vlan tags to setup the SDN paths
63 self.vlans = range(4096)[::-1]
64
65 # link to Ryu REST_API
66 ryu_ip = '0.0.0.0'
67 ryu_port = '8080'
68 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
69
70 # monitoring agent
71 if monitor:
72 self.monitor_agent = DCNetworkMonitor(self)
73 else:
74 self.monitor_agent = None
75
76 # initialize resource model registrar
77 self.rm_registrar = ResourceModelRegistrar(
78 dc_emulation_max_cpu, dc_emulation_max_mem)
79
80 def addDatacenter(self, label, metadata={}, resource_log_path=None):
81 """
82 Create and add a logical cloud data center to the network.
83 """
84 if label in self.dcs:
85 raise Exception("Data center label already exists: %s" % label)
86 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
87 dc.net = self # set reference to network
88 self.dcs[label] = dc
89 dc.create() # finally create the data center in our Mininet instance
90 logging.info("added data center: %s" % label)
91 return dc
92
93 def addLink(self, node1, node2, **params):
94 """
95 Able to handle Datacenter objects as link
96 end points.
97 """
98 assert node1 is not None
99 assert node2 is not None
100 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
101 # ensure type of node1
102 if isinstance( node1, basestring ):
103 if node1 in self.dcs:
104 node1 = self.dcs[node1].switch
105 if isinstance( node1, Datacenter ):
106 node1 = node1.switch
107 # ensure type of node2
108 if isinstance( node2, basestring ):
109 if node2 in self.dcs:
110 node2 = self.dcs[node2].switch
111 if isinstance( node2, Datacenter ):
112 node2 = node2.switch
113 # try to give containers a default IP
114 if isinstance( node1, Docker ):
115 if "params1" not in params:
116 params["params1"] = {}
117 if "ip" not in params["params1"]:
118 params["params1"]["ip"] = self.getNextIp()
119 if isinstance( node2, Docker ):
120 if "params2" not in params:
121 params["params2"] = {}
122 if "ip" not in params["params2"]:
123 params["params2"]["ip"] = self.getNextIp()
124 # ensure that we allow TCLinks between data centers
125 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
126 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
127 if "cls" not in params:
128 params["cls"] = TCLink
129
130 link = Containernet.addLink(self, node1, node2, **params)
131
132 # try to give container interfaces a default id
133 node1_port_id = node1.ports[link.intf1]
134 if isinstance(node1, Docker):
135 if "id" in params["params1"]:
136 node1_port_id = params["params1"]["id"]
137 node1_port_name = link.intf1.name
138
139 node2_port_id = node2.ports[link.intf2]
140 if isinstance(node2, Docker):
141 if "id" in params["params2"]:
142 node2_port_id = params["params2"]["id"]
143 node2_port_name = link.intf2.name
144
145
146 # add edge and assigned port number to graph in both directions between node1 and node2
147 # port_id: id given in descriptor (if available, otherwise same as port)
148 # port: portnumber assigned by Containernet
149
150 attr_dict = {}
151 # possible weight metrics allowed by TClink class:
152 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
153 edge_attributes = [p for p in params if p in weight_metrics]
154 for attr in edge_attributes:
155 # if delay: strip ms (need number as weight in graph)
156 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
157 if match:
158 attr_number = match.group(1)
159 else:
160 attr_number = None
161 attr_dict[attr] = attr_number
162
163
164 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
165 'src_port_name': node1_port_name,
166 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
167 'dst_port_name': node2_port_name}
168 attr_dict2.update(attr_dict)
169 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
170
171 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
172 'src_port_name': node2_port_name,
173 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
174 'dst_port_name': node1_port_name}
175 attr_dict2.update(attr_dict)
176 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
177
178 return link
179
180 def addDocker( self, label, **params ):
181 """
182 Wrapper for addDocker method to use custom container class.
183 """
184 self.DCNetwork_graph.add_node(label)
185 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
186
187 def removeDocker( self, label, **params ):
188 """
189 Wrapper for removeDocker method to update graph.
190 """
191 self.DCNetwork_graph.remove_node(label)
192 return Containernet.removeDocker(self, label, **params)
193
194 def addSwitch( self, name, add_to_graph=True, **params ):
195 """
196 Wrapper for addSwitch method to store switch also in graph.
197 """
198 if add_to_graph:
199 self.DCNetwork_graph.add_node(name)
200 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
201
202 def getAllContainers(self):
203 """
204 Returns a list with all containers within all data centers.
205 """
206 all_containers = []
207 for dc in self.dcs.itervalues():
208 all_containers += dc.listCompute()
209 return all_containers
210
211 def start(self):
212 # start
213 for dc in self.dcs.itervalues():
214 dc.start()
215 Containernet.start(self)
216
217 def stop(self):
218
219 # stop the monitor agent
220 if self.monitor_agent is not None:
221 self.monitor_agent.stop()
222
223 # stop emulator net
224 Containernet.stop(self)
225
226 # stop Ryu controller
227 self.stopRyu()
228
229
230 def CLI(self):
231 CLI(self)
232
233 # to remove chain do setChain( src, dst, cmd='del-flows')
234 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
235 cmd = kwargs.get('cmd')
236 if cmd == 'add-flow':
237 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
238 if kwargs.get('bidirectional'):
239 return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
240
241 elif cmd == 'del-flows': # TODO: del-flow to be implemented
242 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
243 if kwargs.get('bidirectional'):
244 return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
245
246 else:
247 return "Command unknown"
248
249
250 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
251
252 # TODO: this needs to be cleaned up
253 #check if port is specified (vnf:port)
254 if vnf_src_interface is None:
255 # take first interface by default
256 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
257 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
258 vnf_src_interface = link_dict[0]['src_port_id']
259
260 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
261 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
262 for link in link_dict:
263 if link_dict[link]['src_port_id'] == vnf_src_interface:
264 # found the right link and connected switch
265 src_sw = connected_sw
266
267 src_sw_inport_nr = link_dict[link]['dst_port_nr']
268 break
269
270 if vnf_dst_interface is None:
271 # take first interface by default
272 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
273 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
274 vnf_dst_interface = link_dict[0]['dst_port_id']
275
276 vnf_dst_name = vnf_dst_name.split(':')[0]
277 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
278 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
279 for link in link_dict:
280 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
281 # found the right link and connected switch
282 dst_sw = connected_sw
283 dst_sw_outport_nr = link_dict[link]['src_port_nr']
284 break
285
286
287 # get shortest path
288 try:
289 # returns the first found shortest path
290 # if all shortest paths are wanted, use: all_shortest_paths
291 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
292 except:
293 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
294 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
295
296 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
297
298 current_hop = src_sw
299 switch_inport_nr = src_sw_inport_nr
300
301 # choose free vlan if path contains more than 1 switch
302 cmd = kwargs.get('cmd')
303 vlan = None
304 if cmd == 'add-flow':
305 if len(path) > 1:
306 vlan = self.vlans.pop()
307
308 for i in range(0,len(path)):
309 current_node = self.getNodeByName(current_hop)
310
311 if path.index(current_hop) < len(path)-1:
312 next_hop = path[path.index(current_hop)+1]
313 else:
314 #last switch reached
315 next_hop = vnf_dst_name
316
317 next_node = self.getNodeByName(next_hop)
318
319 if next_hop == vnf_dst_name:
320 switch_outport_nr = dst_sw_outport_nr
321 logging.info("end node reached: {0}".format(vnf_dst_name))
322 elif not isinstance( next_node, OVSSwitch ):
323 logging.info("Next node: {0} is not a switch".format(next_hop))
324 return "Next node: {0} is not a switch".format(next_hop)
325 else:
326 # take first link between switches by default
327 index_edge_out = 0
328 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
329
330
331 # set of entry via ovs-ofctl
332 if isinstance( current_node, OVSSwitch ):
333 kwargs['vlan'] = vlan
334 kwargs['path'] = path
335 kwargs['current_hop'] = current_hop
336
337 if self.controller == RemoteController:
338 ## set flow entry via ryu rest api
339 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
340 else:
341 ## set flow entry via ovs-ofctl
342 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
343
344
345
346 # take first link between switches by default
347 if isinstance( next_node, OVSSwitch ):
348 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
349 current_hop = next_hop
350
351 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
352
353 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
354 match = 'in_port=%s' % switch_inport_nr
355
356 cookie = kwargs.get('cookie')
357 match_input = kwargs.get('match')
358 cmd = kwargs.get('cmd')
359 path = kwargs.get('path')
360 current_hop = kwargs.get('current_hop')
361 vlan = kwargs.get('vlan')
362
363 s = ','
364 if match_input:
365 match = s.join([match, match_input])
366
367 flow = {}
368 flow['dpid'] = int(node.dpid, 16)
369 logging.info('node name:{0}'.format(node.name))
370
371 if cookie:
372 flow['cookie'] = int(cookie)
373
374
375 flow['actions'] = []
376
377 # possible Ryu actions, match fields:
378 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
379 if cmd == 'add-flow':
380 prefix = 'stats/flowentry/add'
381 if vlan != None:
382 if path.index(current_hop) == 0: # first node
383 action = {}
384 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
385 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
386 flow['actions'].append(action)
387 action = {}
388 action['type'] = 'SET_FIELD'
389 action['field'] = 'vlan_vid'
390 action['value'] = vlan
391 flow['actions'].append(action)
392 elif path.index(current_hop) == len(path) - 1: # last node
393 match += ',dl_vlan=%s' % vlan
394 action = {}
395 action['type'] = 'POP_VLAN'
396 flow['actions'].append(action)
397 else: # middle nodes
398 match += ',dl_vlan=%s' % vlan
399 # output action must come last
400 action = {}
401 action['type'] = 'OUTPUT'
402 action['port'] = switch_outport_nr
403 flow['actions'].append(action)
404 #flow['match'] = self._parse_match(match)
405 elif cmd == 'del-flows':
406 #del(flow['actions'])
407 prefix = 'stats/flowentry/delete'
408 if cookie:
409 flow['cookie_mask'] = cookie
410 #if cookie is None:
411 # flow['match'] = self._parse_match(match)
412
413 action = {}
414 action['type'] = 'OUTPUT'
415 action['port'] = switch_outport_nr
416 flow['actions'].append(action)
417
418 flow['match'] = self._parse_match(match)
419 self.ryu_REST(prefix, data=flow)
420
421 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
422 match = 'in_port=%s' % switch_inport_nr
423
424 cookie = kwargs.get('cookie')
425 match_input = kwargs.get('match')
426 cmd = kwargs.get('cmd')
427 path = kwargs.get('path')
428 current_hop = kwargs.get('current_hop')
429 vlan = kwargs.get('vlan')
430
431 s = ','
432 if cookie:
433 cookie = 'cookie=%s' % cookie
434 match = s.join([cookie, match])
435 if match_input:
436 match = s.join([match, match_input])
437 if cmd == 'add-flow':
438 action = 'action=%s' % switch_outport_nr
439 if vlan != None:
440 if path.index(current_hop) == 0: # first node
441 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
442 match = '-O OpenFlow13 ' + match
443 elif path.index(current_hop) == len(path) - 1: # last node
444 match += ',dl_vlan=%s' % vlan
445 action = 'action=strip_vlan,output=%s' % switch_outport_nr
446 else: # middle nodes
447 match += ',dl_vlan=%s' % vlan
448 ofcmd = s.join([match, action])
449 elif cmd == 'del-flows':
450 ofcmd = match
451 else:
452 ofcmd = ''
453
454 node.dpctl(cmd, ofcmd)
455 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
456 switch_outport_nr, cmd))
457
458 # start Ryu Openflow controller as Remote Controller for the DCNetwork
459 def startRyu(self, learning_switch=True):
460 # start Ryu controller with rest-API
461 python_install_path = site.getsitepackages()[0]
462 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
463 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
464 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
465 # Ryu still uses 6633 as default
466 ryu_option = '--ofp-tcp-listen-port'
467 ryu_of_port = '6653'
468 ryu_cmd = 'ryu-manager'
469 FNULL = open("/tmp/ryu.log", 'w')
470 if learning_switch:
471 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
472 else:
473 # no learning switch
474 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
475 time.sleep(1)
476
477 def stopRyu(self):
478 if self.ryu_process is not None:
479 self.ryu_process.terminate()
480 self.ryu_process.kill()
481
482 def ryu_REST(self, prefix, dpid=None, data=None):
483 try:
484 if dpid:
485 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
486 else:
487 url = self.ryu_REST_api + '/' + str(prefix)
488 if data:
489 #logging.info('POST: {0}'.format(str(data)))
490 req = urllib2.Request(url, str(data))
491 else:
492 req = urllib2.Request(url)
493
494 ret = urllib2.urlopen(req).read()
495 return ret
496 except:
497 logging.info('error url: {0}'.format(str(url)))
498 if data: logging.info('error POST: {0}'.format(str(data)))
499
500 # need to respect that some match fields must be integers
501 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
502 def _parse_match(self, match):
503 matches = match.split(',')
504 dict = {}
505 for m in matches:
506 match = m.split('=')
507 if len(match) == 2:
508 try:
509 m2 = int(match[1], 0)
510 except:
511 m2 = match[1]
512
513 dict.update({match[0]:m2})
514 return dict
515